diff --git a/README.md b/README.md index a95a0f2bb..4be253377 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,8 @@ These sections show how to use the SDK to perform permission and user management 14. [Manage Project](#manage-project) 15. [Manage SSO Applications](#manage-sso-applications) 16. [Manage Outbound Applications](#manage-outbound-applications) +17. [Manage Descopers](#manage-descopers) +18. [Manage Management Keys](#manage-management-keys) If you wish to run any of our code samples and play with them, check out our [Code Examples](#code-examples) section. @@ -1565,6 +1567,141 @@ latest_tenant_token = descope_client.mgmt.outbound_application_by_token.fetch_te ) ``` +### Manage Descopers + +You can create, update, delete, load or list Descopers (users who have access to the Descope console): + +```python +from descope import ( + DescoperAttributes, + DescoperCreate, + DescoperProjectRole, + DescoperRBAC, + DescoperRole, +) + +# Create a new Descoper +resp = descope_client.mgmt.descoper.create( + descopers=[ + DescoperCreate( + login_id="user@example.com", + attributes=DescoperAttributes( + display_name="John Doe", + email="user@example.com", + phone="+1234567890", + ), + send_invite=True, # Send an invitation email + rbac=DescoperRBAC( + is_company_admin=False, + projects=[ + DescoperProjectRole( + project_ids=["project-id-1"], + role=DescoperRole.ADMIN, + ) + ], + ), + ) + ] +) +descopers = resp["descopers"] +total = resp["total"] + +# Load a Descoper by ID +resp = descope_client.mgmt.descoper.load("descoper-id") +descoper = resp["descoper"] + +# Update a Descoper's attributes and/or RBAC +# Note: All fields that are set will override existing values +resp = descope_client.mgmt.descoper.update( + id="descoper-id", + attributes=DescoperAttributes( + display_name="Updated Name", + ), + rbac=DescoperRBAC( + is_company_admin=True, + ), +) +updated_descoper = resp["descoper"] + +# List all Descopers +resp = descope_client.mgmt.descoper.list() +descopers = resp["descopers"] +total = resp["total"] +for descoper in descopers: + # Do something + +# Delete a Descoper +# Descoper deletion cannot be undone. Use carefully. +descope_client.mgmt.descoper.delete("descoper-id") +``` + +### Manage Management Keys + +You can create, update, delete, load or search management keys: + +```python +from descope import ( + MgmtKeyReBac, + MgmtKeyProjectRole, + MgmtKeyTagRole, + MgmtKeyStatus, +) + +# Create a new management key with RBAC configuration +# The rebac parameter defines the key's access permissions +rebac = MgmtKeyReBac( + company_roles=["company-full-access"], # Company-level roles + project_roles=[ # Project-specific roles + MgmtKeyProjectRole( + project_ids=["project-id-1", "project-id-2"], + roles=["project-admin"] + ) + ], + tag_roles=[ # Tag-based roles + MgmtKeyTagRole( + tags=["production"], + roles=["read-only"] + ) + ], +) + +create_resp = descope_client.mgmt.management_key.create( + name="My Management Key", + rebac=rebac, + description="Optional description for the management key", + expires_in=0, # Expiration time in seconds (0 for no expiration) + permitted_ips=["10.0.0.1", "192.168.1.0/24"], # Optional IP allowlist +) +key = create_resp["key"] +cleartext = create_resp["cleartext"] # Save this securely - it will not be returned again! + +# Load a specific management key by ID +load_resp = descope_client.mgmt.management_key.load("key-id") +loaded_key = load_resp["key"] + +# Search all management keys +search_resp = descope_client.mgmt.management_key.search() +keys = search_resp["keys"] +for key in keys: + # Do something + +# Update a management key +# IMPORTANT: All parameters will override existing values. Use carefully. +update_resp = descope_client.mgmt.management_key.update( + id="key-id", + name="Updated Key Name", + description="Updated description", + permitted_ips=["10.0.0.2"], + status=MgmtKeyStatus.ACTIVE, # Can be ACTIVE or INACTIVE +) +updated_key = update_resp["key"] + +# Delete management keys +# IMPORTANT: This action is irreversible. Use carefully. +delete_resp = descope_client.mgmt.management_key.delete(["key-id-1", "key-id-2"]) +total_deleted = delete_resp["total"] +``` + ### Utils for your end to end (e2e) tests and integration tests To ease your e2e tests, we exposed dedicated management methods, diff --git a/descope/__init__.py b/descope/__init__.py index 85090f525..29e3f4171 100644 --- a/descope/__init__.py +++ b/descope/__init__.py @@ -20,6 +20,16 @@ from descope.http_client import DescopeResponse from descope.management.common import ( AssociatedTenant, + DescoperAttributes, + DescoperCreate, + DescoperProjectRole, + DescoperRBAC, + DescoperRole, + DescoperTagRole, + MgmtKeyProjectRole, + MgmtKeyReBac, + MgmtKeyStatus, + MgmtKeyTagRole, SAMLIDPAttributeMappingInfo, SAMLIDPGroupsMappingInfo, SAMLIDPRoleGroupMappingInfo, diff --git a/descope/http_client.py b/descope/http_client.py index 648e8179a..351eb855c 100644 --- a/descope/http_client.py +++ b/descope/http_client.py @@ -219,6 +219,26 @@ def post( self._raise_from_response(response) return response + def put( + self, + uri: str, + *, + body: dict | list[dict] | list[str] | None = None, + params=None, + pswd: str | None = None, + ) -> requests.Response: + response = requests.put( + f"{self.base_url}{uri}", + headers=self._get_default_headers(pswd), + json=body, + allow_redirects=False, + verify=self.secure, + params=params, + timeout=self.timeout_seconds, + ) + self._raise_from_response(response) + return response + def patch( self, uri: str, @@ -245,12 +265,14 @@ def delete( self, uri: str, *, + body: dict | list[dict] | list[str] | None = None, params=None, pswd: str | None = None, ) -> requests.Response: response = requests.delete( f"{self.base_url}{uri}", params=params, + json=body, headers=self._get_default_headers(pswd), allow_redirects=False, verify=self.secure, diff --git a/descope/management/common.py b/descope/management/common.py index 886bf7c90..577b783c3 100644 --- a/descope/management/common.py +++ b/descope/management/common.py @@ -258,6 +258,20 @@ class MgmtV1: project_import = "/v1/mgmt/project/import" project_list_projects = "/v1/mgmt/projects/list" + # Descoper + descoper_create_path = "/v1/mgmt/descoper" + descoper_update_path = "/v1/mgmt/descoper" + descoper_load_path = "/v1/mgmt/descoper" + descoper_delete_path = "/v1/mgmt/descoper" + descoper_list_path = "/v1/mgmt/descoper/list" + + # management key + mgmt_key_create_path = "/v1/mgmt/managementkey" + mgmt_key_update_path = "/v1/mgmt/managementkey" + mgmt_key_load_path = "/v1/mgmt/managementkey" + mgmt_key_delete_path = "/v1/mgmt/managementkey/delete" + mgmt_key_search_path = "/v1/mgmt/managementkey/search" + class MgmtSignUpOptions: def __init__( @@ -468,3 +482,179 @@ def sort_to_dict(sort: List[Sort]) -> list: } ) return sort_list + + +class DescoperRole(Enum): + """Represents a Descoper role.""" + + ADMIN = "admin" + DEVELOPER = "developer" + SUPPORT = "support" + AUDITOR = "auditor" + + +class DescoperAttributes: + """ + Represents Descoper attributes, such as name and email/phone. + """ + + def __init__( + self, + display_name: Optional[str] = None, + email: Optional[str] = None, + phone: Optional[str] = None, + ): + self.display_name = display_name + self.email = email + self.phone = phone + + def to_dict(self) -> dict: + return { + "displayName": self.display_name, + "email": self.email, + "phone": self.phone, + } + + +class DescoperTagRole: + """ + Represents a Descoper tags to role mapping. + """ + + def __init__( + self, + tags: Optional[List[str]] = None, + role: Optional[DescoperRole] = None, + ): + self.tags = tags if tags is not None else [] + self.role = role + + def to_dict(self) -> dict: + return { + "tags": self.tags, + "role": self.role.value if self.role else None, + } + + +class DescoperProjectRole: + """ + Represents a Descoper projects to role mapping. + """ + + def __init__( + self, + project_ids: Optional[List[str]] = None, + role: Optional[DescoperRole] = None, + ): + self.project_ids = project_ids if project_ids is not None else [] + self.role = role + + def to_dict(self) -> dict: + return { + "projectIds": self.project_ids, + "role": self.role.value if self.role else None, + } + + +class DescoperRBAC: + """ + Represents Descoper RBAC configuration. + """ + + def __init__( + self, + is_company_admin: bool = False, + tags: Optional[List[DescoperTagRole]] = None, + projects: Optional[List[DescoperProjectRole]] = None, + ): + self.is_company_admin = is_company_admin + self.tags = tags if tags is not None else [] + self.projects = projects if projects is not None else [] + + def to_dict(self) -> dict: + return { + "isCompanyAdmin": self.is_company_admin, + "tags": [t.to_dict() for t in self.tags], + "projects": [p.to_dict() for p in self.projects], + } + + +class DescoperCreate: + """ + Represents a Descoper to be created. + """ + + def __init__( + self, + login_id: str, + attributes: Optional[DescoperAttributes] = None, + send_invite: bool = False, + rbac: Optional[DescoperRBAC] = None, + ): + self.login_id = login_id + self.attributes = attributes + self.send_invite = send_invite + self.rbac = rbac + + def to_dict(self) -> dict: + return { + "loginId": self.login_id, + "attributes": self.attributes.to_dict() if self.attributes else None, + "sendInvite": self.send_invite, + "rbac": self.rbac.to_dict() if self.rbac else None, + } + + +def descopers_to_dict(descopers: List[DescoperCreate]) -> list: + return [d.to_dict() for d in descopers] + + +class MgmtKeyStatus(Enum): + ACTIVE = "active" + INACTIVE = "inactive" + + +class MgmtKeyProjectRole: + def __init__(self, project_ids: List[str], roles: List[str]): + self.project_ids = project_ids + self.roles = roles + + def to_dict(self) -> dict: + return { + "projectIds": self.project_ids, + "roles": self.roles, + } + + +class MgmtKeyTagRole: + def __init__(self, tags: List[str], roles: List[str]): + self.tags = tags + self.roles = roles + + def to_dict(self) -> dict: + return { + "tags": self.tags, + "roles": self.roles, + } + + +class MgmtKeyReBac: + def __init__( + self, + company_roles: Optional[List[str]] = None, + project_roles: Optional[List[MgmtKeyProjectRole]] = None, + tag_roles: Optional[List[MgmtKeyTagRole]] = None, + ): + self.company_roles = company_roles + self.project_roles = project_roles + self.tag_roles = tag_roles + + def to_dict(self) -> dict: + res: dict = {} + if self.company_roles is not None: + res["companyRoles"] = self.company_roles + if self.project_roles is not None: + res["projectRoles"] = [pr.to_dict() for pr in self.project_roles] + if self.tag_roles is not None: + res["tagRoles"] = [tr.to_dict() for tr in self.tag_roles] + return res diff --git a/descope/management/descoper.py b/descope/management/descoper.py new file mode 100644 index 000000000..4396b2333 --- /dev/null +++ b/descope/management/descoper.py @@ -0,0 +1,153 @@ +from typing import List, Optional, Any + +from descope._http_base import HTTPBase +from descope.management.common import ( + DescoperAttributes, + DescoperCreate, + DescoperRBAC, + MgmtV1, + descopers_to_dict, +) + + +class Descoper(HTTPBase): + def create( + self, + descopers: List[DescoperCreate], + ) -> dict: + """ + Create new Descopers. + + Args: + descopers (List[DescoperCreate]): List of Descopers to create. + Note that tags are referred to by name, without the company ID prefix. + + Return value (dict): + Return dict in the format + { + "descopers": [...], + "total": + } + + Raise: + AuthException: raised if create operation fails + """ + if not descopers: + raise ValueError("descopers list cannot be empty") + + response = self._http.put( + MgmtV1.descoper_create_path, + body={"descopers": descopers_to_dict(descopers)}, + ) + return response.json() + + def update( + self, + id: str, + attributes: Optional[DescoperAttributes] = None, + rbac: Optional[DescoperRBAC] = None, + ) -> dict: + """ + Update an existing Descoper's RBAC and/or Attributes. + + IMPORTANT: All parameter *fields*, if set, will override whatever values are currently set + in the existing Descoper. Use carefully. + + Args: + id (str): The id of the Descoper to update. + attributes (DescoperAttributes): Optional attributes to update. + rbac (DescoperRBAC): Optional RBAC configuration to update. + + Return value (dict): + Return dict in the format + {"descoper": {...}} + Containing the updated Descoper information. + + Raise: + AuthException: raised if update operation fails + """ + if not id: + raise ValueError("id cannot be empty") + + body: dict[str, Any] = {"id": id} + if attributes is not None: + body["attributes"] = attributes.to_dict() + if rbac is not None: + body["rbac"] = rbac.to_dict() + + response = self._http.patch( + MgmtV1.descoper_update_path, + body=body, + ) + return response.json() + + def load( + self, + id: str, + ) -> dict: + """ + Load an existing Descoper by ID. + + Args: + id (str): The id of the Descoper to load. + + Return value (dict): + Return dict in the format + {"descoper": {...}} + Containing the loaded Descoper information. + + Raise: + AuthException: raised if load operation fails + """ + if not id: + raise ValueError("id cannot be empty") + + response = self._http.get( + uri=MgmtV1.descoper_load_path, + params={"id": id}, + ) + return response.json() + + def delete( + self, + id: str, + ): + """ + Delete an existing Descoper. IMPORTANT: This action is irreversible. Use carefully. + + Args: + id (str): The id of the Descoper to delete. + + Raise: + AuthException: raised if delete operation fails + """ + if not id: + raise ValueError("id cannot be empty") + + self._http.delete( + uri=MgmtV1.descoper_delete_path, + params={"id": id}, + ) + + def list( + self, + ) -> dict: + """ + List all Descopers. + + Return value (dict): + Return dict in the format + { + "descopers": [...], + "total": + } + Containing all Descopers and the total count. + + Raise: + AuthException: raised if list operation fails + """ + response = self._http.post( + MgmtV1.descoper_list_path, + body={}, + ) + return response.json() diff --git a/descope/management/management_key.py b/descope/management/management_key.py new file mode 100644 index 000000000..4b3cf5272 --- /dev/null +++ b/descope/management/management_key.py @@ -0,0 +1,180 @@ +from typing import List, Optional, Any + +from descope._http_base import HTTPBase +from descope.management.common import ( + MgmtKeyReBac, + MgmtKeyStatus, + MgmtV1, +) + + +class ManagementKey(HTTPBase): + def create( + self, + name: str, + rebac: MgmtKeyReBac, + description: Optional[str] = None, + expires_in: int = 0, + permitted_ips: Optional[List[str]] = None, + ) -> dict: + """ + Create a new management key. + + Args: + name (str): The name of the management key. + rebac (MgmtKeyReBac): RBAC configuration for the key. + description (str): Optional description for the management key. + expires_in (int): Expiration time in seconds (0 for no expiration). + permitted_ips (List[str]): Optional list of IP addresses or CIDR ranges that are allowed to use this key. + + Return value (dict): + Return dict in the format + { + "key": {...}, + "cleartext": "..." + } + + Raise: + AuthException: raised if create operation fails + """ + if not name: + raise ValueError("name cannot be empty") + if rebac is None: + raise ValueError("rebac cannot be empty") + + body: dict[str, Any] = { + "name": name, + "description": description, + "expiresIn": expires_in, + "permittedIps": permitted_ips if permitted_ips is not None else [], + "reBac": rebac.to_dict(), + } + + response = self._http.put( + MgmtV1.mgmt_key_create_path, + body=body, + ) + return response.json() + + def update( + self, + id: str, + name: str, + description: str, + permitted_ips: List[str], + status: MgmtKeyStatus, + ) -> dict: + """ + Update an existing management key. + + IMPORTANT: All parameters will override whatever values are currently set + in the existing management key. Use carefully. + + Args: + id (str): The id of the management key to update. + name (str): The updated name. + description (str): Updated description. + permitted_ips (List[str]): Updated list of IP addresses or CIDR ranges. + status (MgmtKeyStatus): Updated status. + + Return value (dict): + Return dict in the format + {"key": {...}} + Containing the updated management key information. + + Raise: + AuthException: raised if update operation fails + """ + if not id: + raise ValueError("id cannot be empty") + if not name: + raise ValueError("name cannot be empty") + if status is None: + raise ValueError("status cannot be empty") + + body: dict[str, Any] = { + "id": id, + "name": name, + "description": description, + "permittedIps": permitted_ips if permitted_ips is not None else [], + "status": status.value, + } + + response = self._http.patch( + MgmtV1.mgmt_key_update_path, + body=body, + ) + return response.json() + + def load( + self, + id: str, + ) -> dict: + """ + Get a management key by ID. + + Args: + id (str): The id of the management key to load. + + Return value (dict): + Return dict in the format + {"key": {...}} + Containing the loaded management key information. + + Raise: + AuthException: raised if load operation fails + """ + if not id: + raise ValueError("id cannot be empty") + + response = self._http.get( + uri=MgmtV1.mgmt_key_load_path, + params={"id": id}, + ) + return response.json() + + def delete( + self, + ids: List[str], + ) -> dict: + """ + Delete existing management keys. IMPORTANT: This action is irreversible. Use carefully. + + Args: + ids (List[str]): The ids of the management keys to delete. + + Return value (dict): + Return dict in the format + {"total": } + Containing the number of keys deleted. + + Raise: + AuthException: raised if delete operation fails + """ + if not ids: + raise ValueError("ids list cannot be empty") + + response = self._http.post( + uri=MgmtV1.mgmt_key_delete_path, + body={"ids": ids}, + ) + return response.json() + + def search(self) -> dict: + """ + Search for management keys. + + Return value (dict): + Return dict in the format + { + "keys": [...] + } + Containing the found management keys. + + Raise: + AuthException: raised if search operation fails + """ + response = self._http.get( + MgmtV1.mgmt_key_search_path, + ) + return response.json() diff --git a/descope/mgmt.py b/descope/mgmt.py index b1c36088c..c89641b3a 100644 --- a/descope/mgmt.py +++ b/descope/mgmt.py @@ -6,10 +6,12 @@ from descope.management.access_key import AccessKey from descope.management.audit import Audit from descope.management.authz import Authz +from descope.management.descoper import Descoper from descope.management.fga import FGA from descope.management.flow import Flow from descope.management.group import Group from descope.management.jwt import JWT +from descope.management.management_key import ManagementKey from descope.management.outbound_application import ( OutboundApplication, OutboundApplicationByToken, @@ -40,10 +42,12 @@ def __init__( self._access_key = AccessKey(http_client) self._audit = Audit(http_client) self._authz = Authz(http_client) + self._descoper = Descoper(http_client) self._fga = FGA(http_client, fga_cache_url=fga_cache_url) self._flow = Flow(http_client) self._group = Group(http_client) self._jwt = JWT(http_client, auth=auth) + self._management_key = ManagementKey(http_client) self._outbound_application = OutboundApplication(http_client) self._outbound_application_by_token = OutboundApplicationByToken(http_client) self._permission = Permission(http_client) @@ -141,3 +145,13 @@ def outbound_application(self): def outbound_application_by_token(self): # No management key check for outbound_app_token (as authentication for those methods is done by inbound app token) return self._outbound_application_by_token + + @property + def descoper(self): + self._ensure_management_key("descoper") + return self._descoper + + @property + def management_key(self): + self._ensure_management_key("management_key") + return self._management_key diff --git a/tests/management/test_descoper.py b/tests/management/test_descoper.py new file mode 100644 index 000000000..13e5435f6 --- /dev/null +++ b/tests/management/test_descoper.py @@ -0,0 +1,431 @@ +import json +from unittest import mock +from unittest.mock import patch + +from descope import ( + AuthException, + DescoperAttributes, + DescoperCreate, + DescoperProjectRole, + DescoperRBAC, + DescoperRole, + DescopeClient, +) +from descope.common import DEFAULT_TIMEOUT_SECONDS +from descope.management.common import MgmtV1 + +from .. import common + + +class TestDescoper(common.DescopeTest): + def setUp(self) -> None: + super().setUp() + self.dummy_project_id = "dummy" + self.dummy_management_key = "key" + + def test_create(self): + client = DescopeClient( + self.dummy_project_id, + None, + False, + self.dummy_management_key, + ) + + # Test failed flows + with patch("requests.put") as mock_put: + mock_put.return_value.ok = False + self.assertRaises( + AuthException, + client.mgmt.descoper.create, + [ + DescoperCreate( + login_id="user1@example.com", + ) + ], + ) + + # Test empty descopers + self.assertRaises( + ValueError, + client.mgmt.descoper.create, + [], + ) + + # Test success flow + with patch("requests.put") as mock_put: + network_resp = mock.Mock() + network_resp.ok = True + network_resp.json.return_value = json.loads( + """{ + "descopers": [{ + "id": "U2111111111111111111111111", + "attributes": { + "displayName": "Test User 2", + "email": "user2@example.com", + "phone": "+123456" + }, + "rbac": { + "isCompanyAdmin": false, + "tags": [], + "projects": [{ + "projectIds": ["P2111111111111111111111111"], + "role": "admin" + }] + }, + "status": "invited" + }], + "total": 1 + }""" + ) + mock_put.return_value = network_resp + resp = client.mgmt.descoper.create( + descopers=[ + DescoperCreate( + login_id="user1@example.com", + attributes=DescoperAttributes( + display_name="Test User 2", + phone="+123456", + email="user2@example.com", + ), + rbac=DescoperRBAC( + projects=[ + DescoperProjectRole( + project_ids=["P2111111111111111111111111"], + role=DescoperRole.ADMIN, + ) + ], + ), + ) + ], + ) + descopers = resp["descopers"] + self.assertEqual(len(descopers), 1) + self.assertEqual(descopers[0]["id"], "U2111111111111111111111111") + self.assertEqual(resp["total"], 1) + mock_put.assert_called_with( + f"{common.DEFAULT_BASE_URL}{MgmtV1.descoper_create_path}", + headers={ + **common.default_headers, + "Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}", + "x-descope-project-id": self.dummy_project_id, + }, + params=None, + json={ + "descopers": [ + { + "loginId": "user1@example.com", + "attributes": { + "displayName": "Test User 2", + "email": "user2@example.com", + "phone": "+123456", + }, + "sendInvite": False, + "rbac": { + "isCompanyAdmin": False, + "tags": [], + "projects": [ + { + "projectIds": ["P2111111111111111111111111"], + "role": "admin", + } + ], + }, + } + ] + }, + allow_redirects=False, + verify=True, + timeout=DEFAULT_TIMEOUT_SECONDS, + ) + + def test_load(self): + client = DescopeClient( + self.dummy_project_id, + None, + False, + self.dummy_management_key, + ) + + # Test failed flows + with patch("requests.get") as mock_get: + mock_get.return_value.ok = False + self.assertRaises( + AuthException, + client.mgmt.descoper.load, + "descoper-id", + ) + + # Test empty id + self.assertRaises( + ValueError, + client.mgmt.descoper.load, + "", + ) + + # Test success flow + with patch("requests.get") as mock_get: + network_resp = mock.Mock() + network_resp.ok = True + network_resp.json.return_value = json.loads( + """{ + "descoper": { + "id": "U2222222222222222222222222", + "attributes": { + "displayName": "Test User 2", + "email": "user2@example.com", + "phone": "+123456" + }, + "rbac": { + "isCompanyAdmin": false, + "tags": [], + "projects": [{ + "projectIds": ["P2111111111111111111111111"], + "role": "admin" + }] + }, + "status": "invited" + } + }""" + ) + mock_get.return_value = network_resp + resp = client.mgmt.descoper.load("U2222222222222222222222222") + descoper = resp["descoper"] + self.assertEqual(descoper["id"], "U2222222222222222222222222") + mock_get.assert_called_with( + f"{common.DEFAULT_BASE_URL}{MgmtV1.descoper_load_path}", + headers={ + **common.default_headers, + "Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}", + "x-descope-project-id": self.dummy_project_id, + }, + params={"id": "U2222222222222222222222222"}, + allow_redirects=True, + verify=True, + timeout=DEFAULT_TIMEOUT_SECONDS, + ) + + def test_update(self): + client = DescopeClient( + self.dummy_project_id, + None, + False, + self.dummy_management_key, + ) + + # Test failed flows + with patch("requests.patch") as mock_patch: + mock_patch.return_value.ok = False + self.assertRaises( + AuthException, + client.mgmt.descoper.update, + "descoper-id", + None, + DescoperRBAC(is_company_admin=True), + ) + + # Test empty id + self.assertRaises( + ValueError, + client.mgmt.descoper.update, + "", + ) + + # Test success flow + with patch("requests.patch") as mock_patch: + network_resp = mock.Mock() + network_resp.ok = True + network_resp.json.return_value = json.loads( + """{ + "descoper": { + "id": "U2333333333333333333333333", + "attributes": { + "displayName": "Updated User", + "email": "user4@example.com", + "phone": "+1234358730" + }, + "rbac": { + "isCompanyAdmin": true, + "tags": [], + "projects": [] + }, + "status": "invited" + } + }""" + ) + mock_patch.return_value = network_resp + resp = client.mgmt.descoper.update( + "U2333333333333333333333333", + None, + DescoperRBAC(is_company_admin=True), + ) + descoper = resp["descoper"] + self.assertEqual(descoper["id"], "U2333333333333333333333333") + self.assertTrue(descoper["rbac"]["isCompanyAdmin"]) + mock_patch.assert_called_with( + f"{common.DEFAULT_BASE_URL}{MgmtV1.descoper_update_path}", + headers={ + **common.default_headers, + "Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}", + "x-descope-project-id": self.dummy_project_id, + }, + params=None, + json={ + "id": "U2333333333333333333333333", + "rbac": { + "isCompanyAdmin": True, + "tags": [], + "projects": [], + }, + }, + allow_redirects=False, + verify=True, + timeout=DEFAULT_TIMEOUT_SECONDS, + ) + + def test_delete(self): + client = DescopeClient( + self.dummy_project_id, + None, + False, + self.dummy_management_key, + ) + + # Test failed flows + with patch("requests.delete") as mock_delete: + mock_delete.return_value.ok = False + self.assertRaises( + AuthException, + client.mgmt.descoper.delete, + "descoper-id", + ) + + # Test empty id + self.assertRaises( + ValueError, + client.mgmt.descoper.delete, + "", + ) + + # Test success flow + with patch("requests.delete") as mock_delete: + mock_delete.return_value.ok = True + self.assertIsNone(client.mgmt.descoper.delete("U2111111111111111111111111")) + mock_delete.assert_called_with( + f"{common.DEFAULT_BASE_URL}{MgmtV1.descoper_delete_path}", + params={"id": "U2111111111111111111111111"}, + json=None, + headers={ + **common.default_headers, + "Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}", + "x-descope-project-id": self.dummy_project_id, + }, + allow_redirects=False, + verify=True, + timeout=DEFAULT_TIMEOUT_SECONDS, + ) + + def test_list(self): + client = DescopeClient( + self.dummy_project_id, + None, + False, + self.dummy_management_key, + ) + + # Test failed flows + with patch("requests.post") as mock_post: + mock_post.return_value.ok = False + self.assertRaises( + AuthException, + client.mgmt.descoper.list, + ) + + # Test success flow + with patch("requests.post") as mock_post: + network_resp = mock.Mock() + network_resp.ok = True + network_resp.json.return_value = json.loads( + """{ + "descopers": [ + { + "id": "U2444444444444444444444444", + "attributes": { + "displayName": "Admin User", + "email": "admin@example.com", + "phone": "" + }, + "rbac": { + "isCompanyAdmin": true, + "tags": [], + "projects": [] + }, + "status": "enabled" + }, + { + "id": "U2555555555555555555555555", + "attributes": { + "displayName": "Another User", + "email": "user3@example.com", + "phone": "+123456" + }, + "rbac": { + "isCompanyAdmin": false, + "tags": [], + "projects": [] + }, + "status": "invited" + }, + { + "id": "U2666666666666666666666666", + "attributes": { + "displayName": "Test User 1", + "email": "user2@example.com", + "phone": "+123456" + }, + "rbac": { + "isCompanyAdmin": false, + "tags": [], + "projects": [{ + "projectIds": ["P2222222222222222222222222"], + "role": "admin" + }] + }, + "status": "invited" + } + ], + "total": 3 + }""" + ) + mock_post.return_value = network_resp + resp = client.mgmt.descoper.list() + descopers = resp["descopers"] + self.assertEqual(len(descopers), 3) + self.assertEqual(resp["total"], 3) + + # First descoper - company admin + self.assertEqual(descopers[0]["id"], "U2444444444444444444444444") + self.assertEqual(descopers[0]["attributes"]["displayName"], "Admin User") + self.assertTrue(descopers[0]["rbac"]["isCompanyAdmin"]) + self.assertEqual(descopers[0]["status"], "enabled") + + # Second descoper + self.assertEqual(descopers[1]["id"], "U2555555555555555555555555") + self.assertFalse(descopers[1]["rbac"]["isCompanyAdmin"]) + + # Third descoper - with project role + self.assertEqual(descopers[2]["id"], "U2666666666666666666666666") + self.assertEqual(len(descopers[2]["rbac"]["projects"]), 1) + + mock_post.assert_called_with( + f"{common.DEFAULT_BASE_URL}{MgmtV1.descoper_list_path}", + headers={ + **common.default_headers, + "Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}", + "x-descope-project-id": self.dummy_project_id, + }, + params=None, + json={}, + allow_redirects=False, + verify=True, + timeout=DEFAULT_TIMEOUT_SECONDS, + ) diff --git a/tests/management/test_mgmtkey.py b/tests/management/test_mgmtkey.py new file mode 100644 index 000000000..b58afffa4 --- /dev/null +++ b/tests/management/test_mgmtkey.py @@ -0,0 +1,309 @@ +import json +from unittest import mock +from unittest.mock import patch + +from descope import ( + DescopeClient, + MgmtKeyProjectRole, + MgmtKeyReBac, + MgmtKeyStatus, +) +from descope.common import DEFAULT_TIMEOUT_SECONDS +from descope.management.common import MgmtV1 + +from .. import common + + +class TestManagementKey(common.DescopeTest): + def setUp(self) -> None: + super().setUp() + self.dummy_project_id = "dummy" + self.dummy_management_key = "key" + + def test_create(self): + client = DescopeClient( + self.dummy_project_id, + None, + False, + self.dummy_management_key, + ) + + # Test success flow + with patch("requests.put") as mock_put: + network_resp = mock.Mock() + network_resp.ok = True + network_resp.json.return_value = { + "cleartext": "cleartext-secret", + "key": { + "id": "mk1", + "name": "test-key", + "description": "test key", + "permittedIps": ["10.0.0.1"], + "status": "active", + "createdTime": 1764849768, + "expireTime": 3600, + "reBac": { + "companyRoles": ["role1"], + "projectRoles": [], + "tagRoles": [], + }, + "version": 1, + "authzVersion": 1, + }, + } + mock_put.return_value = network_resp + resp = client.mgmt.management_key.create( + name="test-key", + rebac=MgmtKeyReBac(company_roles=["role1"]), + description="test key", + expires_in=3600, + permitted_ips=["10.0.0.1"], + ) + self.assertEqual(resp["cleartext"], "cleartext-secret") + key = resp["key"] + self.assertEqual(key["name"], "test-key") + self.assertEqual(key["description"], "test key") + self.assertEqual(len(key["permittedIps"]), 1) + self.assertEqual(key["permittedIps"][0], "10.0.0.1") + self.assertEqual(key["expireTime"], 3600) + self.assertIsNotNone(key["reBac"]) + self.assertEqual(len(key["reBac"]["companyRoles"]), 1) + self.assertEqual(key["reBac"]["companyRoles"][0], "role1") + mock_put.assert_called_with( + f"{common.DEFAULT_BASE_URL}{MgmtV1.mgmt_key_create_path}", + headers={ + **common.default_headers, + "Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}", + "x-descope-project-id": self.dummy_project_id, + }, + params=None, + json={ + "name": "test-key", + "description": "test key", + "expiresIn": 3600, + "permittedIps": ["10.0.0.1"], + "reBac": { + "companyRoles": ["role1"], + }, + }, + allow_redirects=False, + verify=True, + timeout=DEFAULT_TIMEOUT_SECONDS, + ) + + def test_update(self): + client = DescopeClient( + self.dummy_project_id, + None, + False, + self.dummy_management_key, + ) + + # Test success flow + with patch("requests.patch") as mock_patch: + network_resp = mock.Mock() + network_resp.ok = True + network_resp.json.return_value = { + "key": { + "id": "mk1", + "name": "updated-key", + "description": "updated key", + "permittedIps": ["1.2.3.4"], + "status": "inactive", + "createdTime": 1764673442, + "expireTime": 0, + "reBac": { + "companyRoles": [], + "projectRoles": [], + "tagRoles": [], + }, + "version": 22, + "authzVersion": 1, + }, + } + mock_patch.return_value = network_resp + resp = client.mgmt.management_key.update( + id="mk1", + name="updated-key", + description="updated key", + permitted_ips=["1.2.3.4"], + status=MgmtKeyStatus.INACTIVE, + ) + key = resp["key"] + self.assertEqual(key["id"], "mk1") + self.assertEqual(key["name"], "updated-key") + self.assertEqual(key["description"], "updated key") + self.assertEqual(len(key["permittedIps"]), 1) + self.assertEqual(key["permittedIps"][0], "1.2.3.4") + self.assertEqual(key["status"], "inactive") + mock_patch.assert_called_with( + f"{common.DEFAULT_BASE_URL}{MgmtV1.mgmt_key_update_path}", + headers={ + **common.default_headers, + "Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}", + "x-descope-project-id": self.dummy_project_id, + }, + params=None, + json={ + "id": "mk1", + "name": "updated-key", + "description": "updated key", + "permittedIps": ["1.2.3.4"], + "status": "inactive", + }, + allow_redirects=False, + verify=True, + timeout=DEFAULT_TIMEOUT_SECONDS, + ) + + def test_load(self): + client = DescopeClient( + self.dummy_project_id, + None, + False, + self.dummy_management_key, + ) + + # Test success flow + with patch("requests.get") as mock_get: + network_resp = mock.Mock() + network_resp.ok = True + network_resp.json.return_value = { + "key": { + "id": "mk1", + "name": "test-key", + "description": "a key description", + "status": "active", + "createdTime": 1764677065, + "expireTime": 0, + "permittedIps": [], + "reBac": { + "companyRoles": [], + "projectRoles": [], + "tagRoles": [], + }, + "version": 1, + "authzVersion": 1, + }, + } + mock_get.return_value = network_resp + resp = client.mgmt.management_key.load("mk1") + key = resp["key"] + self.assertIsNotNone(key) + self.assertEqual(key["name"], "test-key") + self.assertEqual(key["description"], "a key description") + self.assertEqual(key["status"], "active") + mock_get.assert_called_with( + f"{common.DEFAULT_BASE_URL}{MgmtV1.mgmt_key_load_path}", + headers={ + **common.default_headers, + "Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}", + "x-descope-project-id": self.dummy_project_id, + }, + params={"id": "mk1"}, + allow_redirects=True, + verify=True, + timeout=DEFAULT_TIMEOUT_SECONDS, + ) + + def test_delete(self): + client = DescopeClient( + self.dummy_project_id, + None, + False, + self.dummy_management_key, + ) + + # Test success flow + with patch("requests.post") as mock_post: + network_resp = mock.Mock() + network_resp.ok = True + network_resp.json.return_value = {"total": 2} + mock_post.return_value = network_resp + resp = client.mgmt.management_key.delete(["mk1", "mk2"]) + self.assertEqual(resp["total"], 2) + mock_post.assert_called_with( + f"{common.DEFAULT_BASE_URL}{MgmtV1.mgmt_key_delete_path}", + params=None, + json={"ids": ["mk1", "mk2"]}, + headers={ + **common.default_headers, + "Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}", + "x-descope-project-id": self.dummy_project_id, + }, + allow_redirects=False, + verify=True, + timeout=DEFAULT_TIMEOUT_SECONDS, + ) + + def test_search(self): + client = DescopeClient( + self.dummy_project_id, + None, + False, + self.dummy_management_key, + ) + + # Test success flow + with patch("requests.get") as mock_get: + network_resp = mock.Mock() + network_resp.ok = True + network_resp.json.return_value = { + "keys": [ + { + "id": "mk1", + "name": "key1", + "description": "", + "status": "active", + "createdTime": 1764677065, + "expireTime": 0, + "permittedIps": [], + "reBac": { + "companyRoles": [], + "projectRoles": [], + "tagRoles": [], + }, + "version": 1, + "authzVersion": 1, + }, + { + "id": "mk2", + "name": "key2", + "description": "", + "status": "inactive", + "createdTime": 1764773205, + "expireTime": 1234, + "permittedIps": [], + "reBac": { + "companyRoles": [], + "projectRoles": [], + "tagRoles": [], + }, + "version": 1, + "authzVersion": 1, + }, + ], + } + mock_get.return_value = network_resp + resp = client.mgmt.management_key.search() + keys = resp["keys"] + self.assertIsNotNone(keys) + self.assertEqual(len(keys), 2) + self.assertEqual(keys[0]["id"], "mk1") + self.assertEqual(keys[0]["name"], "key1") + self.assertEqual(keys[0]["status"], "active") + self.assertEqual(keys[1]["id"], "mk2") + self.assertEqual(keys[1]["name"], "key2") + self.assertEqual(keys[1]["status"], "inactive") + mock_get.assert_called_with( + f"{common.DEFAULT_BASE_URL}{MgmtV1.mgmt_key_search_path}", + headers={ + **common.default_headers, + "x-descope-project-id": self.dummy_project_id, + "Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}", + }, + params=None, + allow_redirects=True, + verify=True, + timeout=DEFAULT_TIMEOUT_SECONDS, + ) diff --git a/tests/management/test_sso_settings.py b/tests/management/test_sso_settings.py index f4d0764c3..329704b60 100644 --- a/tests/management/test_sso_settings.py +++ b/tests/management/test_sso_settings.py @@ -59,6 +59,7 @@ def test_delete_settings(self): mock_delete.assert_called_with( f"{common.DEFAULT_BASE_URL}{MgmtV1.sso_settings_path}", params={"tenantId": "tenant-id"}, + json=None, headers={ **common.default_headers, "Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}", diff --git a/tests/management/test_user.py b/tests/management/test_user.py index e8ff7aa00..ead0c3158 100644 --- a/tests/management/test_user.py +++ b/tests/management/test_user.py @@ -939,6 +939,7 @@ def test_delete_all_test_users(self): mock_delete.assert_called_with( f"{common.DEFAULT_BASE_URL}{MgmtV1.user_delete_all_test_users_path}", params=None, + json=None, headers={ **common.default_headers, "Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}", diff --git a/tests/test_auth.py b/tests/test_auth.py index a3d64ddd7..d45ec3288 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -781,6 +781,7 @@ def test_api_rate_limit_exception(self): mock_delete.assert_called_with( "http://127.0.0.1/a/b", params={"key": "value"}, + json=None, headers={ **common.default_headers, "Authorization": f"Bearer {self.dummy_project_id}:{'pswd'}",