Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
Binary file added src/.DS_Store
Binary file not shown.
56 changes: 41 additions & 15 deletions src/casdoor/async_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,43 @@
from .user import User


def _build_enforce_params(
permission_id: str,
model_id: str,
resource_id: str,
enforce_id: str,
owner: str,
) -> Dict[str, str]:
"""
Build and validate parameters for enforce API calls.

Exactly one of the parameters must be provided and non-empty.

:return: Dictionary with exactly one parameter set
:raises ValueError: If zero or multiple parameters are provided
"""
params: Dict[str, str] = {}
if permission_id:
params["permissionId"] = permission_id
if model_id:
params["modelId"] = model_id
if resource_id:
params["resourceId"] = resource_id
if enforce_id:
params["enforcerId"] = enforce_id
if owner:
params["owner"] = owner

if len(params) != 1:
raise ValueError(
"Exactly one of (permission_id, model_id, resource_id, enforce_id, owner) "
"must be provided and non-empty. "
f"Got {len(params)} parameters: {list(params.keys())}"
)

return params


class AioHttpClient:
def __init__(self, base_url):
self.base_url = base_url
Expand Down Expand Up @@ -277,19 +314,13 @@ async def enforce(
:param permission_id: the permission id (i.e. organization name/permission name)
:param model_id: the model id
:param resource_id: the resource id
:param enforce_id: the enforce id
:param enforce_id: the enforcer id (note: uses 'enforcerId' parameter in API)
:param owner: the owner of the permission
:param casbin_request: a list containing the request data (i.e. sub, obj, act)
:return: a boolean value indicating whether the request is allowed
"""
url = "/api/enforce"
params: Dict[str, str] = {
"permissionId": permission_id,
"modelId": model_id,
"resourceId": resource_id,
"enforceId": enforce_id,
"owner": owner,
}
params = _build_enforce_params(permission_id, model_id, resource_id, enforce_id, owner)

async with self._session as session:
response = await session.post(
Expand Down Expand Up @@ -328,18 +359,13 @@ async def batch_enforce(

:param permission_id: the permission id (i.e. organization name/permission name)
:param model_id: the model id
:param enforce_id: the enforce id
:param enforce_id: the enforcer id (note: uses 'enforcerId' parameter in API)
:param owner: the owner of the permission
:param casbin_request: a list of lists containing the request data
:return: a list of boolean values indicating whether each request is allowed
"""
url = "/api/batch-enforce"
params = {
"permissionId": permission_id,
"modelId": model_id,
"enforceId": enforce_id,
"owner": owner,
}
params = _build_enforce_params(permission_id, model_id, "", enforce_id, owner)

async with self._session as session:
response = await session.post(
Expand Down
58 changes: 43 additions & 15 deletions src/casdoor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,43 @@
from .webhook import _WebhookSDK


def _build_enforce_params(
permission_id: str,
model_id: str,
resource_id: str,
enforce_id: str,
owner: str,
) -> Dict[str, str]:
"""
Build and validate parameters for enforce API calls.

Exactly one of the parameters must be provided and non-empty.

:return: Dictionary with exactly one parameter set
:raises ValueError: If zero or multiple parameters are provided
"""
params = {}
if permission_id:
params["permissionId"] = permission_id
if model_id:
params["modelId"] = model_id
if resource_id:
params["resourceId"] = resource_id
if enforce_id:
params["enforcerId"] = enforce_id
if owner:
params["owner"] = owner

if len(params) != 1:
raise ValueError(
"Exactly one of (permission_id, model_id, resource_id, enforce_id, owner) "
"must be provided and non-empty. "
f"Got {len(params)} parameters: {list(params.keys())}"
)

return params


class CasdoorSDK(
_UserSDK,
_AdapterSDK,
Expand Down Expand Up @@ -281,19 +318,14 @@ def enforce(
:param permission_id: the permission id (i.e. organization name/permission name)
:param model_id: the model id
:param resource_id: the resource id
:param enforce_id: the enforce id
:param enforce_id: the enforcer id (note: uses 'enforcerId' parameter in API)
:param owner: the owner of the permission
:param casbin_request: a list containing the request data (i.e. sub, obj, act)
:return: a boolean value indicating whether the request is allowed
"""
url = self.endpoint + "/api/enforce"
params = {
"permissionId": permission_id,
"modelId": model_id,
"resourceId": resource_id,
"enforceId": enforce_id,
"owner": owner,
}
params = _build_enforce_params(permission_id, model_id, resource_id, enforce_id, owner)

r = requests.post(
url,
params=params,
Expand Down Expand Up @@ -332,18 +364,14 @@ def batch_enforce(

:param permission_id: the permission id (i.e. organization name/permission name)
:param model_id: the model id
:param enforce_id: the enforce id
:param enforce_id: the enforcer id (note: uses 'enforcerId' parameter in API)
:param owner: the owner of the permission
:param casbin_request: a list of lists containing the request data
:return: a list of boolean values indicating whether each request is allowed
"""
url = self.endpoint + "/api/batch-enforce"
params = {
"permissionId": permission_id,
"modelId": model_id,
"enforceId": enforce_id,
"owner": owner,
}
params = _build_enforce_params(permission_id, model_id, "", enforce_id, owner)

r = requests.post(
url,
params=params,
Expand Down