Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 209 additions & 1 deletion src/workos/authorization.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from typing import Any, Dict, Optional, Protocol, Sequence
from enum import Enum
from typing import Any, Dict, Optional, Protocol, Sequence, Union

from pydantic import TypeAdapter
from typing_extensions import TypedDict

from workos.types.authorization.environment_role import (
EnvironmentRole,
EnvironmentRoleList,
)
from workos.types.authorization.organization_role import OrganizationRole
from workos.types.authorization.permission import Permission
from workos.types.authorization.resource import Resource
from workos.types.authorization.role import Role, RoleList
from workos.types.list_resource import (
ListArgs,
Expand All @@ -27,9 +30,28 @@
REQUEST_METHOD_PUT,
)


class _Unset(Enum):
TOKEN = 0


UNSET: _Unset = _Unset.TOKEN

AUTHORIZATION_PERMISSIONS_PATH = "authorization/permissions"
AUTHORIZATION_RESOURCES_PATH = "authorization/resources"


class ParentResourceById(TypedDict):
parent_resource_id: str


class ParentResourceByExternalId(TypedDict):
parent_resource_external_id: str
parent_resource_type_slug: str


ParentResource = Union[ParentResourceById, ParentResourceByExternalId]

_role_adapter: TypeAdapter[Role] = TypeAdapter(Role)


Expand Down Expand Up @@ -162,6 +184,36 @@ def add_environment_role_permission(
permission_slug: str,
) -> SyncOrAsync[EnvironmentRole]: ...

# Resources

def get_resource(self, resource_id: str) -> SyncOrAsync[Resource]: ...

def create_resource(
self,
*,
resource_type_slug: str,
organization_id: str,
external_id: str,
name: str,
parent: Optional[ParentResource] = None,
description: Optional[str] = None,
) -> SyncOrAsync[Resource]: ...

def update_resource(
self,
resource_id: str,
*,
name: Optional[str] = None,
description: Union[str, None, _Unset] = UNSET,
) -> SyncOrAsync[Resource]: ...

def delete_resource(
self,
resource_id: str,
*,
cascade_delete: Optional[bool] = None,
) -> SyncOrAsync[None]: ...


class Authorization(AuthorizationModule):
_http_client: SyncHTTPClient
Expand Down Expand Up @@ -438,6 +490,84 @@ def add_environment_role_permission(

return EnvironmentRole.model_validate(response)

# Resources

def get_resource(self, resource_id: str) -> Resource:
response = self._http_client.request(
f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}",
method=REQUEST_METHOD_GET,
)

return Resource.model_validate(response)

def create_resource(
self,
*,
resource_type_slug: str,
organization_id: str,
external_id: str,
name: str,
parent: Optional[ParentResource] = None,
description: Optional[str] = None,
) -> Resource:
json: Dict[str, Any] = {
"resource_type_slug": resource_type_slug,
"organization_id": organization_id,
"external_id": external_id,
"name": name,
}
if parent is not None:
json.update(parent)
if description is not None:
json["description"] = description

response = self._http_client.request(
AUTHORIZATION_RESOURCES_PATH,
method=REQUEST_METHOD_POST,
json=json,
)

return Resource.model_validate(response)

def update_resource(
self,
resource_id: str,
*,
name: Optional[str] = None,
description: Union[str, None, _Unset] = UNSET,
) -> Resource:
json: Dict[str, Any] = {}
if name is not None:
json["name"] = name
if not isinstance(description, _Unset):
json["description"] = description

response = self._http_client.request(
f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}",
method=REQUEST_METHOD_PATCH,
json=json,
exclude_none=False,
)

return Resource.model_validate(response)

def delete_resource(
self,
resource_id: str,
*,
cascade_delete: Optional[bool] = None,
) -> None:
if cascade_delete is not None:
self._http_client.delete_with_body(
f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}",
json={"cascade_delete": cascade_delete},
)
else:
self._http_client.request(
f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}",
method=REQUEST_METHOD_DELETE,
)


class AsyncAuthorization(AuthorizationModule):
_http_client: AsyncHTTPClient
Expand Down Expand Up @@ -713,3 +843,81 @@ async def add_environment_role_permission(
)

return EnvironmentRole.model_validate(response)

# Resources

async def get_resource(self, resource_id: str) -> Resource:
response = await self._http_client.request(
f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}",
method=REQUEST_METHOD_GET,
)

return Resource.model_validate(response)

async def create_resource(
self,
*,
resource_type_slug: str,
organization_id: str,
external_id: str,
name: str,
parent: Optional[ParentResource] = None,
description: Optional[str] = None,
) -> Resource:
json: Dict[str, Any] = {
"resource_type_slug": resource_type_slug,
"organization_id": organization_id,
"external_id": external_id,
"name": name,
}
if parent is not None:
json.update(parent)
if description is not None:
json["description"] = description

response = await self._http_client.request(
AUTHORIZATION_RESOURCES_PATH,
method=REQUEST_METHOD_POST,
json=json,
)

return Resource.model_validate(response)

async def update_resource(
self,
resource_id: str,
*,
name: Optional[str] = None,
description: Union[str, None, _Unset] = UNSET,
) -> Resource:
json: Dict[str, Any] = {}
if name is not None:
json["name"] = name
if not isinstance(description, _Unset):
json["description"] = description

response = await self._http_client.request(
f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}",
method=REQUEST_METHOD_PATCH,
json=json,
exclude_none=False,
)

return Resource.model_validate(response)

async def delete_resource(
self,
resource_id: str,
*,
cascade_delete: Optional[bool] = None,
) -> None:
if cascade_delete is not None:
await self._http_client.delete_with_body(
f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}",
json={"cascade_delete": cascade_delete},
)
else:
await self._http_client.request(
f"{AUTHORIZATION_RESOURCES_PATH}/{resource_id}",
method=REQUEST_METHOD_DELETE,
)
3 changes: 2 additions & 1 deletion src/workos/utils/_base_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def _prepare_request(
headers: HeadersType = None,
exclude_default_auth_headers: bool = False,
force_include_body: bool = False,
exclude_none: bool = True,
) -> PreparedRequest:
"""Executes a request against the WorkOS API.

Expand Down Expand Up @@ -159,7 +160,7 @@ def _prepare_request(
params = {k: v for k, v in params.items() if v is not None}

# Remove any body values that are None
if json is not None and isinstance(json, Mapping):
if exclude_none and json is not None and isinstance(json, Mapping):
json = {k: v for k, v in json.items() if v is not None}

# We'll spread these return values onto the HTTP client request method
Expand Down
6 changes: 6 additions & 0 deletions src/workos/utils/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def request(
json: JsonType = None,
headers: HeadersType = None,
exclude_default_auth_headers: bool = False,
exclude_none: bool = True,
) -> ResponseJson:
"""Executes a request against the WorkOS API.

Expand All @@ -98,6 +99,7 @@ def request(
method (str): One of the supported methods as defined by the REQUEST_METHOD_X constants
params (ParamsType): Query params to be added to the request
json (JsonType): Body payload to be added to the request
exclude_none (bool): If True, removes None values from the JSON body

Returns:
ResponseJson: Response from WorkOS
Expand All @@ -109,6 +111,7 @@ def request(
json=json,
headers=headers,
exclude_default_auth_headers=exclude_default_auth_headers,
exclude_none=exclude_none,
)
response = self._client.request(**prepared_request_parameters)
return self._handle_response(response)
Expand Down Expand Up @@ -206,6 +209,7 @@ async def request(
json: JsonType = None,
headers: HeadersType = None,
exclude_default_auth_headers: bool = False,
exclude_none: bool = True,
) -> ResponseJson:
"""Executes a request against the WorkOS API.

Expand All @@ -216,6 +220,7 @@ async def request(
method (str): One of the supported methods as defined by the REQUEST_METHOD_X constants
params (ParamsType): Query params to be added to the request
json (JsonType): Body payload to be added to the request
exclude_none (bool): If True, removes None values from the JSON body

Returns:
ResponseJson: Response from WorkOS
Expand All @@ -227,6 +232,7 @@ async def request(
json=json,
headers=headers,
exclude_default_auth_headers=exclude_default_auth_headers,
exclude_none=exclude_none,
)
response = await self._client.request(**prepared_request_parameters)
return self._handle_response(response)
Expand Down
Loading