From 3cb5e32e49c71d94d4e2812cfc6f156dca7036d4 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 13 Jan 2026 03:58:22 -0800 Subject: [PATCH 01/14] feat: Secrets --- cloud_pipelines_backend/api_router.py | 24 +++ cloud_pipelines_backend/api_server_sql.py | 151 ++++++++++++++++-- cloud_pipelines_backend/backend_types_sql.py | 11 ++ .../component_structures.py | 18 ++- .../launchers/interfaces.py | 1 + cloud_pipelines_backend/orchestrator_sql.py | 29 ++++ 6 files changed, 223 insertions(+), 11 deletions(-) diff --git a/cloud_pipelines_backend/api_router.py b/cloud_pipelines_backend/api_router.py index 6652637..f2c0dac 100644 --- a/cloud_pipelines_backend/api_router.py +++ b/cloud_pipelines_backend/api_router.py @@ -390,6 +390,30 @@ def get_current_user( permissions=permissions, ) + ### Secrets routes + secrets_service = api_server_sql.SecretsApiService() + + router.get("/api/secrets/", tags=["secrets"], **default_config)( + inject_session_dependency( + inject_user_name(secrets_service.list_secrets, parameter_name="user_id") + ) + ) + router.post("/api/secrets/", tags=["secrets"], **default_config)( + inject_session_dependency( + inject_user_name(secrets_service.create_secret, parameter_name="user_id") + ) + ) + router.put("/api/secrets/{secret_id}", tags=["secrets"], **default_config)( + inject_session_dependency( + inject_user_name(secrets_service.update_secret, parameter_name="user_id") + ) + ) + router.delete("/api/secrets/{secret_id}", tags=["secrets"], **default_config)( + inject_session_dependency( + inject_user_name(secrets_service.delete_secret, parameter_name="user_id") + ) + ) + ### Component library routes component_service = components_api.ComponentService() diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index e8e0624..9fb8307 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -994,6 +994,116 @@ def get_signed_artifact_url( return GetArtifactSignedUrlResponse(signed_url=signed_url) +# === Secrets Service +@dataclasses.dataclass(kw_only=True) +class SecretInfoResponse: + secret_id: str + + +@dataclasses.dataclass(kw_only=True) +class ListSecretsResponse: + secrets: list[SecretInfoResponse] + + +class SecretsApiService: + + def create_secret( + self, + *, + session: orm.Session, + user_id: str, + secret_id: str, + secret_value: str, + ): + return self._set_secret_value( + session=session, + user_id=user_id, + secret_id=secret_id, + secret_value=secret_value, + raise_if_exists=True, + ) + + def update_secret( + self, + *, + session: orm.Session, + user_id: str, + secret_id: str, + secret_value: str, + ): + return self._set_secret_value( + session=session, + user_id=user_id, + secret_id=secret_id, + secret_value=secret_value, + raise_if_not_exists=True, + ) + + def _set_secret_value( + self, + *, + session: orm.Session, + user_id: str, + secret_id: str, + secret_value: str, + raise_if_not_exists: bool = False, + raise_if_exists: bool = False, + ): + current_time = _get_current_time() + secret = session.get(bts.Secret, (user_id, secret_id)) + if secret: + if not raise_if_exists: + raise errors.ItemAlreadyExistsError( + f"Secret with id '{secret_id}' already exists." + ) + secret.secret_value = secret_value + secret.updated_at = current_time + else: + if raise_if_not_exists: + raise errors.ItemNotFoundError( + f"Secret with id '{secret_id}' does not exist." + ) + secret = bts.Secret( + user_id=user_id, + secret_id=secret_id, + secret_value=secret_value, + created_at=current_time, + updated_at=current_time, + ) + session.add(secret) + session.commit() + + def delete_secret( + self, + *, + session: orm.Session, + user_id: str, + secret_id: str, + ): + secret = session.get(bts.Secret, (user_id, secret_id)) + if not secret: + raise errors.ItemNotFoundError( + f"Secret with id '{secret_id}' does not exist." + ) + session.delete(secret) + session.commit() + + def list_secrets( + self, + *, + session: orm.Session, + user_id: str, + ) -> ListSecretsResponse: + secrets = session.scalars( + sql.select(bts.Secret).where(bts.Secret.user_id == user_id) + ).all() + return ListSecretsResponse( + secrets=[ + SecretInfoResponse(secret_id=secret.secret_id) for secret in secrets + ] + ) + + # ============ # Idea for how to add deep nested graph: @@ -1005,11 +1115,14 @@ def get_signed_artifact_url( # No. Decided to first do topological sort and then 1-stage generation. +_ArtifactNodeOrSecretType = typing.Union[bts.ArtifactNode, structures.SecretReference] + + def _recursively_create_all_executions_and_artifacts_root( session: orm.Session, root_task_spec: structures.TaskSpec, ) -> bts.ExecutionNode: - input_artifact_nodes: dict[str, bts.ArtifactNode] = {} + input_artifact_nodes: dict[str, _ArtifactNodeOrSecretType] = {} root_component_spec = root_task_spec.component_ref.spec if not root_component_spec: @@ -1035,12 +1148,8 @@ def _recursively_create_all_executions_and_artifacts_root( raise ApiServiceError( f"root task arguments can only be constants, but got {input_name}={input_argument}. {root_task_spec=}" ) - elif not isinstance(input_argument, str): - raise ApiServiceError( - f"root task constant argument must be a string, but got {input_name}={input_argument}. {root_task_spec=}" - ) # TODO: Support constant input artifacts (artifact IDs) - if input_argument is not None: + elif isinstance(input_argument, str): input_artifact_nodes[input_name] = ( # _construct_constant_artifact_node_and_add_to_session( # session=session, value=input_argument, artifact_type=input_spec.type @@ -1052,6 +1161,12 @@ def _recursively_create_all_executions_and_artifacts_root( # This constant artifact won't be added to the DB # TODO: Actually, they will be added... # We don't need to link this input artifact here. It will be handled downstream. + elif isinstance(input_argument, structures.SecretArgument): + input_artifact_nodes[input_name] = input_argument.secret + else: + raise ApiServiceError( + f"root task constant argument must be a string, but got {input_name}={input_argument}. {root_task_spec=}" + ) root_execution_node = _recursively_create_all_executions_and_artifacts( session=session, @@ -1065,7 +1180,7 @@ def _recursively_create_all_executions_and_artifacts_root( def _recursively_create_all_executions_and_artifacts( session: orm.Session, root_task_spec: structures.TaskSpec, - input_artifact_nodes: dict[str, bts.ArtifactNode], + input_artifact_nodes: dict[str, _ArtifactNodeOrSecretType], ancestors: list[bts.ExecutionNode], ) -> bts.ExecutionNode: root_component_spec = root_task_spec.component_ref.spec @@ -1098,6 +1213,18 @@ def _recursively_create_all_executions_and_artifacts( input_artifact_nodes = dict(input_artifact_nodes) for input_spec in root_component_spec.inputs or []: input_artifact_node = input_artifact_nodes.get(input_spec.name) + if isinstance(input_artifact_node, structures.SecretReference): + # We don't use these secret arguments, but adding them just in case. + extra_data = root_execution_node.extra_data or {} + secret_reference_arguments = extra_data.setdefault( + bts.EXECUTION_NODE_EXTRA_DATA_SECRET_REFERENCE_ARGUMENTS_KEY, {} + ) + secret_reference_arguments[input_spec.name] = ( + input_artifact_node.to_json_dict() + ) + root_execution_node.extra_data = extra_data + # Not adding any artifact link for secret inputs + continue if input_artifact_node is None and not input_spec.optional: if input_spec.default: input_artifact_node = ( @@ -1163,7 +1290,8 @@ def _recursively_create_all_executions_and_artifacts( root_execution_node.container_execution_status = ( bts.ContainerExecutionStatus.QUEUED if all( - artifact_node.artifact_data + not isinstance(artifact_node, bts.ArtifactNode) + or artifact_node.artifact_data for artifact_node in input_artifact_nodes.values() ) else bts.ContainerExecutionStatus.WAITING_FOR_UPSTREAM @@ -1190,10 +1318,10 @@ def _recursively_create_all_executions_and_artifacts( raise ApiServiceError( f"child_task_spec.component_ref.spec is empty. {child_task_spec=}" ) - child_task_input_artifact_nodes: dict[str, bts.ArtifactNode] = {} + child_task_input_artifact_nodes: dict[str, _ArtifactNodeOrSecretType] = {} for input_spec in child_component_spec.inputs or []: input_argument = (child_task_spec.arguments or {}).get(input_spec.name) - input_artifact_node: bts.ArtifactNode | None = None + input_artifact_node: _ArtifactNodeOrSecretType | None = None if input_argument is None and not input_spec.optional: # Not failing on unconnected required input if there is a default value if input_spec.default is None: @@ -1233,6 +1361,9 @@ def _recursively_create_all_executions_and_artifacts( # artifact_type=input_spec.type, # ) # ) + elif isinstance(input_argument, structures.SecretArgument): + # We'll deal with secrets when launching the container. + input_artifact_node = input_argument.secret else: raise ApiServiceError( f"Unexpected task argument: {input_spec.name}={input_argument}. {child_task_spec=}" diff --git a/cloud_pipelines_backend/backend_types_sql.py b/cloud_pipelines_backend/backend_types_sql.py index af16b3c..65f4a53 100644 --- a/cloud_pipelines_backend/backend_types_sql.py +++ b/cloud_pipelines_backend/backend_types_sql.py @@ -406,6 +406,7 @@ class ExecutionNode(_TableBase): EXECUTION_NODE_EXTRA_DATA_ORCHESTRATION_ERROR_MESSAGE_KEY = ( "orchestration_error_message" ) +EXECUTION_NODE_EXTRA_DATA_SECRET_REFERENCE_ARGUMENTS_KEY = "secret_reference_arguments" CONTAINER_EXECUTION_EXTRA_DATA_ORCHESTRATION_ERROR_MESSAGE_KEY = ( "orchestration_error_message" ) @@ -476,3 +477,13 @@ class PipelineRunAnnotation(_TableBase): pipeline_run: orm.Mapped[PipelineRun] = orm.relationship(repr=False, init=False) key: orm.Mapped[str] = orm.mapped_column(default=None, primary_key=True) value: orm.Mapped[str | None] = orm.mapped_column(default=None) + + +class Secret(_TableBase): + __tablename__ = "secret" + user_id: orm.Mapped[str] = orm.mapped_column(primary_key=True, index=True) + secret_id: orm.Mapped[str] = orm.mapped_column(primary_key=True) + secret_value: orm.Mapped[str] + created_at: orm.Mapped[datetime.datetime | None] = orm.mapped_column(default=None) + updated_at: orm.Mapped[datetime.datetime | None] = orm.mapped_column(default=None) + extra_data: orm.Mapped[dict[str, Any] | None] = orm.mapped_column(default=None) diff --git a/cloud_pipelines_backend/component_structures.py b/cloud_pipelines_backend/component_structures.py index 12a26e0..d7c64ed 100644 --- a/cloud_pipelines_backend/component_structures.py +++ b/cloud_pipelines_backend/component_structures.py @@ -317,7 +317,23 @@ class TaskOutputArgument(_BaseModel): # Has additional constructor for convenie task_output: TaskOutputReference -ArgumentType = Union[PrimitiveTypes, GraphInputArgument, TaskOutputArgument] +@dataclasses.dataclass +class SecretReference(_BaseModel): + """References a secret""" + + id: str + + +@dataclasses.dataclass +class SecretArgument(_BaseModel): + """Argument that references a secret""" + + secret: SecretReference + + +ArgumentType = Union[ + PrimitiveTypes, GraphInputArgument, TaskOutputArgument, SecretArgument +] @dataclasses.dataclass diff --git a/cloud_pipelines_backend/launchers/interfaces.py b/cloud_pipelines_backend/launchers/interfaces.py index 1272e60..f5807f9 100644 --- a/cloud_pipelines_backend/launchers/interfaces.py +++ b/cloud_pipelines_backend/launchers/interfaces.py @@ -36,6 +36,7 @@ class InputArgument: value: str | None = None uri: str | None = None staging_uri: str + is_secret: bool = False class ContainerTaskLauncher(typing.Generic[_TLaunchedContainer], abc.ABC): diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index e81f09a..e958e8d 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -436,6 +436,34 @@ def generate_execution_log_uri( for output_spec in component_spec.outputs or [] } + # Handling secrets. + # We read secrets from execution_node.extra_data rather than from task_spec.arguments, + # because some secrets might have been passed from upstream graph inputs. + secret_reference_arguments = (execution.extra_data or {}).get( + bts.EXECUTION_NODE_EXTRA_DATA_SECRET_REFERENCE_ARGUMENTS_KEY, {} + ) + secret_hash = "" + for input_name, secret_reference_dict in secret_reference_arguments.items(): + user_id = pipeline_run.created_by + secret_id = secret_reference_dict["id"] + secret = session.get(bts.Secret, (user_id, secret_id)) + if not secret: + raise OrchestratorError( + f"{execution.id=}: User error: Error resolving a secret argument for {input_name=}: User {user_id} does not have secret {secret_id}." + ) + secret_value = secret.secret_value + input_artifact_data[input_name] = bts.ArtifactData( + total_size=len(secret_value.encode("utf-8")), + is_dir=False, + value=secret_value, + uri=None, + # This hash is not used, so we're using a dummy value here that makes it possible to identify the secret arguments in the following code. + hash=secret_hash, + ) + session.rollback() + + # Preparing the launcher input arguments + input_arguments = { input_name: launcher_interfaces.InputArgument( total_size=artifact_data.total_size, @@ -447,6 +475,7 @@ def generate_execution_log_uri( execution_id=container_execution_uuid, input_name=input_name, ), + is_secret=(artifact_data.hash == secret_hash), ) for input_name, artifact_data in input_artifact_data.items() } From 75a3bea3c86c97215ff303ceb123e93836805eb3 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Mon, 9 Feb 2026 21:13:07 -0800 Subject: [PATCH 02/14] Added tests --- tests/test_secrets.py | 134 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 tests/test_secrets.py diff --git a/tests/test_secrets.py b/tests/test_secrets.py new file mode 100644 index 0000000..871e333 --- /dev/null +++ b/tests/test_secrets.py @@ -0,0 +1,134 @@ +from typing import Callable +from unittest import mock + +from sqlalchemy import orm + +from cloud_pipelines_backend import api_server_sql +from cloud_pipelines_backend import component_structures +from cloud_pipelines_backend import database_ops +from cloud_pipelines_backend.launchers import interfaces as launcher_interfaces + + +def _initialize_db_and_get_session_factory() -> Callable[[], orm.Session]: + db_engine = database_ops.create_db_engine_and_migrate_db(database_uri="sqlite://") + return lambda: orm.Session(bind=db_engine) + + +def test_running_pipeline_with_secrets(): + user = "user1" + secret_id = "SECRET_1" + secret_value = "SECRET_1_VALUE" + + secret_input_name = "secret_input" + + component_spec = component_structures.ComponentSpec( + inputs=[ + component_structures.InputSpec(name=secret_input_name), + ], + implementation=component_structures.ContainerImplementation( + container=component_structures.ContainerSpec(image="python") + ), + ) + + task_spec1 = component_structures.TaskSpec( + component_ref=component_structures.ComponentReference(spec=component_spec), + arguments={ + secret_input_name: component_structures.SecretArgument( + secret=component_structures.SecretReference( + id=secret_id, + ) + ) + }, + ) + + graph_input_name = "graph_input_1" + task_spec2 = component_structures.TaskSpec( + component_ref=component_structures.ComponentReference(spec=component_spec), + arguments={ + secret_input_name: component_structures.GraphInputArgument( + graph_input=component_structures.GraphInputReference( + input_name=graph_input_name + ) + ) + }, + ) + + for task_spec in [task_spec1, task_spec2]: + pipeline_spec = component_structures.ComponentSpec( + inputs=[ + component_structures.InputSpec(name=graph_input_name), + ], + implementation=component_structures.GraphImplementation( + graph=component_structures.GraphSpec( + tasks={ + "task": task_spec, + } + ) + ), + ) + + root_pipeline_task = component_structures.TaskSpec( + component_ref=component_structures.ComponentReference(spec=pipeline_spec), + arguments={ + graph_input_name: component_structures.SecretArgument( + secret=component_structures.SecretReference(id=secret_id) + ) + }, + ) + + session_factory = _initialize_db_and_get_session_factory() + secrets_service = api_server_sql.SecretsApiService() + pipeline_runs_service = api_server_sql.PipelineRunsApiService_Sql() + + secrets_service.create_secret( + session=session_factory(), + user_id=user, + secret_id=secret_id, + secret_value=secret_value, + ) + + list_secrets_response = secrets_service.list_secrets( + session=session_factory(), + user_id=user, + ) + assert list_secrets_response.secrets + assert list_secrets_response.secrets[0].secret_id == secret_id + + pipeline_runs_service.create( + session=session_factory(), + root_task=root_pipeline_task, + created_by=user, + ) + + storage_provider_mock = mock.MagicMock() + launched_container_mock = mock.MagicMock( + status=launcher_interfaces.ContainerStatus.PENDING, + to_dict=lambda: {"foo": "bar"}, + ) + launch_container_task_mock = mock.MagicMock( + return_value=launched_container_mock + ) + launcher_mock = mock.MagicMock(launch_container_task=launch_container_task_mock) + data_root_uri = "file:///tmp/artifacts" + logs_root_uri = "file:///tmp/logs" + + from cloud_pipelines_backend import orchestrator_sql + + orchestrator = orchestrator_sql.OrchestratorService_Sql( + session_factory=session_factory, + launcher=launcher_mock, + storage_provider=storage_provider_mock, + data_root_uri=data_root_uri, + logs_root_uri=logs_root_uri, + ) + orchestrator.process_each_queue_once() + + launch_container_task_mock.assert_called_once() + input_arguments: dict[str, launcher_interfaces.InputArgument] | None = ( + launch_container_task_mock.call_args.kwargs.get("input_arguments") + ) + assert input_arguments + secret_argument = input_arguments.get(secret_input_name) + assert secret_argument + assert secret_argument.value == secret_value + assert secret_argument.is_secret From 20b6aa9fd674511dfbe3903c2e41270b5b4efb34 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 10 Feb 2026 01:14:18 -0800 Subject: [PATCH 03/14] Fixed bug in _set_secret_value when secret already exists --- cloud_pipelines_backend/api_server_sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 9fb8307..ac9220a 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -1052,7 +1052,7 @@ def _set_secret_value( current_time = _get_current_time() secret = session.get(bts.Secret, (user_id, secret_id)) if secret: - if not raise_if_exists: + if raise_if_exists: raise errors.ItemAlreadyExistsError( f"Secret with id '{secret_id}' already exists." ) From b7eaa2d347ec7ca72cb8b3c1e5b8d6abf5aba3af Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 10 Feb 2026 01:18:22 -0800 Subject: [PATCH 04/14] Renamed secret_id to secret_name --- cloud_pipelines_backend/api_router.py | 4 +-- cloud_pipelines_backend/api_server_sql.py | 28 ++++++++++---------- cloud_pipelines_backend/backend_types_sql.py | 2 +- tests/test_secrets.py | 6 ++--- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/cloud_pipelines_backend/api_router.py b/cloud_pipelines_backend/api_router.py index f2c0dac..6d39ec2 100644 --- a/cloud_pipelines_backend/api_router.py +++ b/cloud_pipelines_backend/api_router.py @@ -403,12 +403,12 @@ def get_current_user( inject_user_name(secrets_service.create_secret, parameter_name="user_id") ) ) - router.put("/api/secrets/{secret_id}", tags=["secrets"], **default_config)( + router.put("/api/secrets/{secret_name}", tags=["secrets"], **default_config)( inject_session_dependency( inject_user_name(secrets_service.update_secret, parameter_name="user_id") ) ) - router.delete("/api/secrets/{secret_id}", tags=["secrets"], **default_config)( + router.delete("/api/secrets/{secret_name}", tags=["secrets"], **default_config)( inject_session_dependency( inject_user_name(secrets_service.delete_secret, parameter_name="user_id") ) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index ac9220a..f05b77b 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -997,7 +997,7 @@ def get_signed_artifact_url( # === Secrets Service @dataclasses.dataclass(kw_only=True) class SecretInfoResponse: - secret_id: str + secret_name: str @dataclasses.dataclass(kw_only=True) @@ -1012,13 +1012,13 @@ def create_secret( *, session: orm.Session, user_id: str, - secret_id: str, + secret_name: str, secret_value: str, ): return self._set_secret_value( session=session, user_id=user_id, - secret_id=secret_id, + secret_name=secret_name, secret_value=secret_value, raise_if_exists=True, ) @@ -1028,13 +1028,13 @@ def update_secret( *, session: orm.Session, user_id: str, - secret_id: str, + secret_name: str, secret_value: str, ): return self._set_secret_value( session=session, user_id=user_id, - secret_id=secret_id, + secret_name=secret_name, secret_value=secret_value, raise_if_not_exists=True, ) @@ -1044,28 +1044,28 @@ def _set_secret_value( *, session: orm.Session, user_id: str, - secret_id: str, + secret_name: str, secret_value: str, raise_if_not_exists: bool = False, raise_if_exists: bool = False, ): current_time = _get_current_time() - secret = session.get(bts.Secret, (user_id, secret_id)) + secret = session.get(bts.Secret, (user_id, secret_name)) if secret: if raise_if_exists: raise errors.ItemAlreadyExistsError( - f"Secret with id '{secret_id}' already exists." + f"Secret with name '{secret_name}' already exists." ) secret.secret_value = secret_value secret.updated_at = current_time else: if raise_if_not_exists: raise errors.ItemNotFoundError( - f"Secret with id '{secret_id}' does not exist." + f"Secret with name '{secret_name}' does not exist." ) secret = bts.Secret( user_id=user_id, - secret_id=secret_id, + secret_name=secret_name, secret_value=secret_value, created_at=current_time, updated_at=current_time, @@ -1078,12 +1078,12 @@ def delete_secret( *, session: orm.Session, user_id: str, - secret_id: str, + secret_name: str, ): - secret = session.get(bts.Secret, (user_id, secret_id)) + secret = session.get(bts.Secret, (user_id, secret_name)) if not secret: raise errors.ItemNotFoundError( - f"Secret with id '{secret_id}' does not exist." + f"Secret with name '{secret_name}' does not exist." ) session.delete(secret) session.commit() @@ -1099,7 +1099,7 @@ def list_secrets( ).all() return ListSecretsResponse( secrets=[ - SecretInfoResponse(secret_id=secret.secret_id) for secret in secrets + SecretInfoResponse(secret_name=secret.secret_name) for secret in secrets ] ) diff --git a/cloud_pipelines_backend/backend_types_sql.py b/cloud_pipelines_backend/backend_types_sql.py index 65f4a53..19a856d 100644 --- a/cloud_pipelines_backend/backend_types_sql.py +++ b/cloud_pipelines_backend/backend_types_sql.py @@ -482,7 +482,7 @@ class PipelineRunAnnotation(_TableBase): class Secret(_TableBase): __tablename__ = "secret" user_id: orm.Mapped[str] = orm.mapped_column(primary_key=True, index=True) - secret_id: orm.Mapped[str] = orm.mapped_column(primary_key=True) + secret_name: orm.Mapped[str] = orm.mapped_column(primary_key=True) secret_value: orm.Mapped[str] created_at: orm.Mapped[datetime.datetime | None] = orm.mapped_column(default=None) updated_at: orm.Mapped[datetime.datetime | None] = orm.mapped_column(default=None) diff --git a/tests/test_secrets.py b/tests/test_secrets.py index 871e333..7a4e8f1 100644 --- a/tests/test_secrets.py +++ b/tests/test_secrets.py @@ -16,7 +16,7 @@ def _initialize_db_and_get_session_factory() -> Callable[[], orm.Session]: def test_running_pipeline_with_secrets(): user = "user1" - secret_id = "SECRET_1" + secret_name = "SECRET_1" secret_value = "SECRET_1_VALUE" secret_input_name = "secret_input" @@ -83,7 +83,7 @@ def test_running_pipeline_with_secrets(): secrets_service.create_secret( session=session_factory(), user_id=user, - secret_id=secret_id, + secret_name=secret_name, secret_value=secret_value, ) @@ -92,7 +92,7 @@ def test_running_pipeline_with_secrets(): user_id=user, ) assert list_secrets_response.secrets - assert list_secrets_response.secrets[0].secret_id == secret_id + assert list_secrets_response.secrets[0].secret_name == secret_name pipeline_runs_service.create( session=session_factory(), From d866bf143eb39ba914c5d568c74a91223283fbca Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 10 Feb 2026 01:53:04 -0800 Subject: [PATCH 05/14] Returning created/updated secret info from the create_secret/update_secret API methods --- cloud_pipelines_backend/api_server_sql.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index f05b77b..6fa2963 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -1014,7 +1014,7 @@ def create_secret( user_id: str, secret_name: str, secret_value: str, - ): + ) -> SecretInfoResponse: return self._set_secret_value( session=session, user_id=user_id, @@ -1030,7 +1030,7 @@ def update_secret( user_id: str, secret_name: str, secret_value: str, - ): + ) -> SecretInfoResponse: return self._set_secret_value( session=session, user_id=user_id, @@ -1048,7 +1048,7 @@ def _set_secret_value( secret_value: str, raise_if_not_exists: bool = False, raise_if_exists: bool = False, - ): + ) -> SecretInfoResponse: current_time = _get_current_time() secret = session.get(bts.Secret, (user_id, secret_name)) if secret: @@ -1079,7 +1079,7 @@ def delete_secret( session: orm.Session, user_id: str, secret_name: str, - ): + ) -> None: secret = session.get(bts.Secret, (user_id, secret_name)) if not secret: raise errors.ItemNotFoundError( From 1afe14d41963adb4e6ef89772171ed7c2917d022 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 10 Feb 2026 02:06:06 -0800 Subject: [PATCH 06/14] API Server - Return the secret creation time and update time --- cloud_pipelines_backend/api_server_sql.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 6fa2963..6c0b3ac 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -998,6 +998,16 @@ def get_signed_artifact_url( @dataclasses.dataclass(kw_only=True) class SecretInfoResponse: secret_name: str + created_at: datetime.datetime | None + updated_at: datetime.datetime | None + + @classmethod + def from_db(cls, secret_row: bts.Secret) -> "SecretInfoResponse": + return SecretInfoResponse( + secret_name=secret_row.secret_name, + created_at=secret_row.created_at, + updated_at=secret_row.updated_at, + ) @dataclasses.dataclass(kw_only=True) @@ -1071,7 +1081,9 @@ def _set_secret_value( updated_at=current_time, ) session.add(secret) + response = SecretInfoResponse.from_db(secret) session.commit() + return response def delete_secret( self, @@ -1098,9 +1110,7 @@ def list_secrets( sql.select(bts.Secret).where(bts.Secret.user_id == user_id) ).all() return ListSecretsResponse( - secrets=[ - SecretInfoResponse(secret_name=secret.secret_name) for secret in secrets - ] + secrets=[SecretInfoResponse.from_db(secret) for secret in secrets] ) From 02495a65d65b13164b4d8ca24c3f14d42b37f515 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 10 Feb 2026 02:41:01 -0800 Subject: [PATCH 07/14] API Server - Moved the secret_value to HTTP request body --- cloud_pipelines_backend/api_router.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/cloud_pipelines_backend/api_router.py b/cloud_pipelines_backend/api_router.py index 6d39ec2..74e1bdb 100644 --- a/cloud_pipelines_backend/api_router.py +++ b/cloud_pipelines_backend/api_router.py @@ -399,13 +399,25 @@ def get_current_user( ) ) router.post("/api/secrets/", tags=["secrets"], **default_config)( - inject_session_dependency( - inject_user_name(secrets_service.create_secret, parameter_name="user_id") + add_parameter_annotation_metadata( + inject_session_dependency( + inject_user_name( + secrets_service.create_secret, parameter_name="user_id" + ) + ), + parameter_name="secret_value", + annotation_metadata=fastapi.Body(embed=True), ) ) router.put("/api/secrets/{secret_name}", tags=["secrets"], **default_config)( - inject_session_dependency( - inject_user_name(secrets_service.update_secret, parameter_name="user_id") + add_parameter_annotation_metadata( + inject_session_dependency( + inject_user_name( + secrets_service.update_secret, parameter_name="user_id" + ) + ), + parameter_name="secret_value", + annotation_metadata=fastapi.Body(embed=True), ) ) router.delete("/api/secrets/{secret_name}", tags=["secrets"], **default_config)( From 9bee15df889c285e3db8448fca16f7408e6fcd87 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 10 Feb 2026 02:45:02 -0800 Subject: [PATCH 08/14] API Server - Disallow creating a secret with empty name --- cloud_pipelines_backend/api_server_sql.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 6c0b3ac..5e1f48e 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -1025,6 +1025,9 @@ def create_secret( secret_name: str, secret_value: str, ) -> SecretInfoResponse: + secret_name = secret_name.strip() + if not secret_name: + raise ApiServiceError(f"Secret name must not be empty.") return self._set_secret_value( session=session, user_id=user_id, From 1a6b84fae6562db06e499770c675e0f1310f1097 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 10 Feb 2026 01:37:51 -0800 Subject: [PATCH 09/14] Switched secret arguments to use the dynamicData argument type --- cloud_pipelines_backend/api_server_sql.py | 41 +++++++++------- cloud_pipelines_backend/backend_types_sql.py | 2 +- .../component_structures.py | 21 ++++---- cloud_pipelines_backend/orchestrator_sql.py | 48 +++++++++++-------- tests/test_secrets.py | 10 ++-- 5 files changed, 70 insertions(+), 52 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 5e1f48e..7c53b41 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -1128,14 +1128,16 @@ def list_secrets( # No. Decided to first do topological sort and then 1-stage generation. -_ArtifactNodeOrSecretType = typing.Union[bts.ArtifactNode, structures.SecretReference] +_ArtifactNodeOrDynamicDataType = typing.Union[ + bts.ArtifactNode, structures.DynamicDataArgument +] def _recursively_create_all_executions_and_artifacts_root( session: orm.Session, root_task_spec: structures.TaskSpec, ) -> bts.ExecutionNode: - input_artifact_nodes: dict[str, _ArtifactNodeOrSecretType] = {} + input_artifact_nodes: dict[str, _ArtifactNodeOrDynamicDataType] = {} root_component_spec = root_task_spec.component_ref.spec if not root_component_spec: @@ -1174,8 +1176,8 @@ def _recursively_create_all_executions_and_artifacts_root( # This constant artifact won't be added to the DB # TODO: Actually, they will be added... # We don't need to link this input artifact here. It will be handled downstream. - elif isinstance(input_argument, structures.SecretArgument): - input_artifact_nodes[input_name] = input_argument.secret + elif isinstance(input_argument, structures.DynamicDataArgument): + input_artifact_nodes[input_name] = input_argument else: raise ApiServiceError( f"root task constant argument must be a string, but got {input_name}={input_argument}. {root_task_spec=}" @@ -1193,7 +1195,7 @@ def _recursively_create_all_executions_and_artifacts_root( def _recursively_create_all_executions_and_artifacts( session: orm.Session, root_task_spec: structures.TaskSpec, - input_artifact_nodes: dict[str, _ArtifactNodeOrSecretType], + input_artifact_nodes: dict[str, _ArtifactNodeOrDynamicDataType], ancestors: list[bts.ExecutionNode], ) -> bts.ExecutionNode: root_component_spec = root_task_spec.component_ref.spec @@ -1226,15 +1228,20 @@ def _recursively_create_all_executions_and_artifacts( input_artifact_nodes = dict(input_artifact_nodes) for input_spec in root_component_spec.inputs or []: input_artifact_node = input_artifact_nodes.get(input_spec.name) - if isinstance(input_artifact_node, structures.SecretReference): + if isinstance(input_artifact_node, structures.DynamicDataArgument): # We don't use these secret arguments, but adding them just in case. extra_data = root_execution_node.extra_data or {} - secret_reference_arguments = extra_data.setdefault( - bts.EXECUTION_NODE_EXTRA_DATA_SECRET_REFERENCE_ARGUMENTS_KEY, {} - ) - secret_reference_arguments[input_spec.name] = ( - input_artifact_node.to_json_dict() + dynamic_data_arguments = extra_data.setdefault( + bts.EXECUTION_NODE_EXTRA_DATA_DYNAMIC_DATA_ARGUMENTS_KEY, {} ) + dynamic_data_arguments[input_spec.name] = input_artifact_node.dynamic_data + if not ( + isinstance(input_artifact_node.dynamic_data, str) + or len(input_artifact_node.dynamic_data) == 1 + ): + raise ApiServiceError( + f"Dynamic data argument must be a string or a dict with a single key set, but got {input_artifact_node.dynamic_data}" + ) root_execution_node.extra_data = extra_data # Not adding any artifact link for secret inputs continue @@ -1331,10 +1338,12 @@ def _recursively_create_all_executions_and_artifacts( raise ApiServiceError( f"child_task_spec.component_ref.spec is empty. {child_task_spec=}" ) - child_task_input_artifact_nodes: dict[str, _ArtifactNodeOrSecretType] = {} + child_task_input_artifact_nodes: dict[ + str, _ArtifactNodeOrDynamicDataType + ] = {} for input_spec in child_component_spec.inputs or []: input_argument = (child_task_spec.arguments or {}).get(input_spec.name) - input_artifact_node: _ArtifactNodeOrSecretType | None = None + input_artifact_node: _ArtifactNodeOrDynamicDataType | None = None if input_argument is None and not input_spec.optional: # Not failing on unconnected required input if there is a default value if input_spec.default is None: @@ -1374,9 +1383,9 @@ def _recursively_create_all_executions_and_artifacts( # artifact_type=input_spec.type, # ) # ) - elif isinstance(input_argument, structures.SecretArgument): - # We'll deal with secrets when launching the container. - input_artifact_node = input_argument.secret + elif isinstance(input_argument, structures.DynamicDataArgument): + # We'll deal with dynamic data (e.g. secrets) when launching the container. + input_artifact_node = input_argument else: raise ApiServiceError( f"Unexpected task argument: {input_spec.name}={input_argument}. {child_task_spec=}" diff --git a/cloud_pipelines_backend/backend_types_sql.py b/cloud_pipelines_backend/backend_types_sql.py index 19a856d..dca54c0 100644 --- a/cloud_pipelines_backend/backend_types_sql.py +++ b/cloud_pipelines_backend/backend_types_sql.py @@ -406,7 +406,7 @@ class ExecutionNode(_TableBase): EXECUTION_NODE_EXTRA_DATA_ORCHESTRATION_ERROR_MESSAGE_KEY = ( "orchestration_error_message" ) -EXECUTION_NODE_EXTRA_DATA_SECRET_REFERENCE_ARGUMENTS_KEY = "secret_reference_arguments" +EXECUTION_NODE_EXTRA_DATA_DYNAMIC_DATA_ARGUMENTS_KEY = "dynamic_data_arguments" CONTAINER_EXECUTION_EXTRA_DATA_ORCHESTRATION_ERROR_MESSAGE_KEY = ( "orchestration_error_message" ) diff --git a/cloud_pipelines_backend/component_structures.py b/cloud_pipelines_backend/component_structures.py index d7c64ed..1bc4550 100644 --- a/cloud_pipelines_backend/component_structures.py +++ b/cloud_pipelines_backend/component_structures.py @@ -317,22 +317,25 @@ class TaskOutputArgument(_BaseModel): # Has additional constructor for convenie task_output: TaskOutputReference -@dataclasses.dataclass -class SecretReference(_BaseModel): - """References a secret""" - - id: str +DynamicDataReference = str | dict[str, Any] @dataclasses.dataclass -class SecretArgument(_BaseModel): - """Argument that references a secret""" +class DynamicDataArgument(_BaseModel): + """Argument that references data that's dynamically produced by the execution system at runtime. + + Examples of dynamic data: + * Secret value + * Container execution ID + * Pipeline run ID + * Loop index/item + """ - secret: SecretReference + dynamic_data: DynamicDataReference ArgumentType = Union[ - PrimitiveTypes, GraphInputArgument, TaskOutputArgument, SecretArgument + PrimitiveTypes, GraphInputArgument, TaskOutputArgument, DynamicDataArgument ] diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index e958e8d..37ebe18 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -26,6 +26,9 @@ _T = typing.TypeVar("_T") +DYNAMIC_DATA_SECRET_KEY = "secret" +DYNAMIC_DATA_SECRET_NAME_KEY = "name" + class OrchestratorError(RuntimeError): pass @@ -439,31 +442,36 @@ def generate_execution_log_uri( # Handling secrets. # We read secrets from execution_node.extra_data rather than from task_spec.arguments, # because some secrets might have been passed from upstream graph inputs. - secret_reference_arguments = (execution.extra_data or {}).get( - bts.EXECUTION_NODE_EXTRA_DATA_SECRET_REFERENCE_ARGUMENTS_KEY, {} - ) + dynamic_data_arguments: dict[str, dict[str, Any]] = ( + execution.extra_data or {} + ).get(bts.EXECUTION_NODE_EXTRA_DATA_DYNAMIC_DATA_ARGUMENTS_KEY, {}) secret_hash = "" - for input_name, secret_reference_dict in secret_reference_arguments.items(): - user_id = pipeline_run.created_by - secret_id = secret_reference_dict["id"] - secret = session.get(bts.Secret, (user_id, secret_id)) - if not secret: - raise OrchestratorError( - f"{execution.id=}: User error: Error resolving a secret argument for {input_name=}: User {user_id} does not have secret {secret_id}." + for input_name, dynamic_data_argument in dynamic_data_arguments.items(): + if not isinstance(dynamic_data_argument, dict): + continue + dynamic_data_items = list(dynamic_data_argument.items()) + dynamic_data_key, dynamic_data_parameters = dynamic_data_items[0] + if dynamic_data_key == DYNAMIC_DATA_SECRET_KEY: + secret_parameters = dynamic_data_parameters + user_id = pipeline_run.created_by + secret_name = secret_parameters[DYNAMIC_DATA_SECRET_NAME_KEY] + secret = session.get(bts.Secret, (user_id, secret_name)) + if not secret: + raise OrchestratorError( + f"{execution.id=}: User error: Error resolving a secret argument for {input_name=}: User {user_id} does not have secret {secret_name}." + ) + secret_value = secret.secret_value + input_artifact_data[input_name] = bts.ArtifactData( + total_size=len(secret_value.encode("utf-8")), + is_dir=False, + value=secret_value, + uri=None, + # This hash is not used, so we're using a dummy value here that makes it possible to identify the secret arguments in the following code. + hash=secret_hash, ) - secret_value = secret.secret_value - input_artifact_data[input_name] = bts.ArtifactData( - total_size=len(secret_value.encode("utf-8")), - is_dir=False, - value=secret_value, - uri=None, - # This hash is not used, so we're using a dummy value here that makes it possible to identify the secret arguments in the following code. - hash=secret_hash, - ) session.rollback() # Preparing the launcher input arguments - input_arguments = { input_name: launcher_interfaces.InputArgument( total_size=artifact_data.total_size, diff --git a/tests/test_secrets.py b/tests/test_secrets.py index 7a4e8f1..6d4cb67 100644 --- a/tests/test_secrets.py +++ b/tests/test_secrets.py @@ -33,10 +33,8 @@ def test_running_pipeline_with_secrets(): task_spec1 = component_structures.TaskSpec( component_ref=component_structures.ComponentReference(spec=component_spec), arguments={ - secret_input_name: component_structures.SecretArgument( - secret=component_structures.SecretReference( - id=secret_id, - ) + secret_input_name: component_structures.DynamicDataArgument( + dynamic_data={"secret": {"name": secret_name}} ) }, ) @@ -70,8 +68,8 @@ def test_running_pipeline_with_secrets(): root_pipeline_task = component_structures.TaskSpec( component_ref=component_structures.ComponentReference(spec=pipeline_spec), arguments={ - graph_input_name: component_structures.SecretArgument( - secret=component_structures.SecretReference(id=secret_id) + graph_input_name: component_structures.DynamicDataArgument( + dynamic_data={"secret": {"name": secret_name}} ) }, ) From d79aa6a858ef4e069d0cf3e159a027504c2d86aa Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Wed, 11 Feb 2026 20:41:13 -0800 Subject: [PATCH 10/14] Made Secret.created_at and updated_at columns required --- cloud_pipelines_backend/api_server_sql.py | 4 ++-- cloud_pipelines_backend/backend_types_sql.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 7c53b41..510a60f 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -998,8 +998,8 @@ def get_signed_artifact_url( @dataclasses.dataclass(kw_only=True) class SecretInfoResponse: secret_name: str - created_at: datetime.datetime | None - updated_at: datetime.datetime | None + created_at: datetime.datetime + updated_at: datetime.datetime @classmethod def from_db(cls, secret_row: bts.Secret) -> "SecretInfoResponse": diff --git a/cloud_pipelines_backend/backend_types_sql.py b/cloud_pipelines_backend/backend_types_sql.py index dca54c0..7eae3c4 100644 --- a/cloud_pipelines_backend/backend_types_sql.py +++ b/cloud_pipelines_backend/backend_types_sql.py @@ -484,6 +484,6 @@ class Secret(_TableBase): user_id: orm.Mapped[str] = orm.mapped_column(primary_key=True, index=True) secret_name: orm.Mapped[str] = orm.mapped_column(primary_key=True) secret_value: orm.Mapped[str] - created_at: orm.Mapped[datetime.datetime | None] = orm.mapped_column(default=None) - updated_at: orm.Mapped[datetime.datetime | None] = orm.mapped_column(default=None) + created_at: orm.Mapped[datetime.datetime] + updated_at: orm.Mapped[datetime.datetime] extra_data: orm.Mapped[dict[str, Any] | None] = orm.mapped_column(default=None) From 428340ea0644136f9f8653b2d6bc00d5f114485a Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Wed, 11 Feb 2026 20:54:01 -0800 Subject: [PATCH 11/14] Added Secret description and expires_at attributes --- cloud_pipelines_backend/api_server_sql.py | 24 +++++++++++++++++--- cloud_pipelines_backend/backend_types_sql.py | 2 ++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 510a60f..2061475 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -1000,6 +1000,8 @@ class SecretInfoResponse: secret_name: str created_at: datetime.datetime updated_at: datetime.datetime + expires_at: datetime.datetime | None = None + description: str | None = None @classmethod def from_db(cls, secret_row: bts.Secret) -> "SecretInfoResponse": @@ -1007,6 +1009,8 @@ def from_db(cls, secret_row: bts.Secret) -> "SecretInfoResponse": secret_name=secret_row.secret_name, created_at=secret_row.created_at, updated_at=secret_row.updated_at, + expires_at=secret_row.expires_at, + description=secret_row.description, ) @@ -1024,15 +1028,19 @@ def create_secret( user_id: str, secret_name: str, secret_value: str, + description: str | None = None, + expires_at: datetime.datetime | None = None, ) -> SecretInfoResponse: secret_name = secret_name.strip() if not secret_name: raise ApiServiceError(f"Secret name must not be empty.") - return self._set_secret_value( + return self._create_or_update_secret( session=session, user_id=user_id, secret_name=secret_name, secret_value=secret_value, + description=description, + expires_at=expires_at, raise_if_exists=True, ) @@ -1043,22 +1051,28 @@ def update_secret( user_id: str, secret_name: str, secret_value: str, + description: str | None = None, + expires_at: datetime.datetime | None = None, ) -> SecretInfoResponse: - return self._set_secret_value( + return self._create_or_update_secret( session=session, user_id=user_id, secret_name=secret_name, secret_value=secret_value, + description=description, + expires_at=expires_at, raise_if_not_exists=True, ) - def _set_secret_value( + def _create_or_update_secret( self, *, session: orm.Session, user_id: str, secret_name: str, secret_value: str, + description: str | None = None, + expires_at: datetime.datetime | None = None, raise_if_not_exists: bool = False, raise_if_exists: bool = False, ) -> SecretInfoResponse: @@ -1084,6 +1098,10 @@ def _set_secret_value( updated_at=current_time, ) session.add(secret) + if description: + secret.description = description + if expires_at: + secret.expires_at = expires_at response = SecretInfoResponse.from_db(secret) session.commit() return response diff --git a/cloud_pipelines_backend/backend_types_sql.py b/cloud_pipelines_backend/backend_types_sql.py index 7eae3c4..c7b87d6 100644 --- a/cloud_pipelines_backend/backend_types_sql.py +++ b/cloud_pipelines_backend/backend_types_sql.py @@ -486,4 +486,6 @@ class Secret(_TableBase): secret_value: orm.Mapped[str] created_at: orm.Mapped[datetime.datetime] updated_at: orm.Mapped[datetime.datetime] + expires_at: orm.Mapped[datetime.datetime | None] = orm.mapped_column(default=None) + description: orm.Mapped[str | None] = orm.mapped_column(default=None) extra_data: orm.Mapped[dict[str, Any] | None] = orm.mapped_column(default=None) From d1b1db97c7e6e0c73d3e7429d7bd8b9ff5353832 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Wed, 11 Feb 2026 20:56:10 -0800 Subject: [PATCH 12/14] Added comment for session.rollback() --- cloud_pipelines_backend/orchestrator_sql.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index 37ebe18..5b79678 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -469,6 +469,7 @@ def generate_execution_log_uri( # This hash is not used, so we're using a dummy value here that makes it possible to identify the secret arguments in the following code. hash=secret_hash, ) + # Starting new transaction session.rollback() # Preparing the launcher input arguments From 98ddd399fd1ab3568c7ee9350d3cf486ca760c7d Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Wed, 11 Feb 2026 21:04:31 -0800 Subject: [PATCH 13/14] Changed the flow of putting dynamic data arguments into ExecutionNode.extra_data --- cloud_pipelines_backend/api_server_sql.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 2061475..5fc7d51 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -1247,19 +1247,22 @@ def _recursively_create_all_executions_and_artifacts( for input_spec in root_component_spec.inputs or []: input_artifact_node = input_artifact_nodes.get(input_spec.name) if isinstance(input_artifact_node, structures.DynamicDataArgument): - # We don't use these secret arguments, but adding them just in case. - extra_data = root_execution_node.extra_data or {} - dynamic_data_arguments = extra_data.setdefault( - bts.EXECUTION_NODE_EXTRA_DATA_DYNAMIC_DATA_ARGUMENTS_KEY, {} - ) - dynamic_data_arguments[input_spec.name] = input_artifact_node.dynamic_data if not ( isinstance(input_artifact_node.dynamic_data, str) - or len(input_artifact_node.dynamic_data) == 1 + or ( + isinstance(input_artifact_node.dynamic_data, dict) + and len(input_artifact_node.dynamic_data) == 1 + ) ): raise ApiServiceError( f"Dynamic data argument must be a string or a dict with a single key set, but got {input_artifact_node.dynamic_data}" ) + # Storing the dynamic data arguments for later use by the orchestrator. + extra_data = root_execution_node.extra_data or {} + extra_data.setdefault( + bts.EXECUTION_NODE_EXTRA_DATA_DYNAMIC_DATA_ARGUMENTS_KEY, {} + )[input_spec.name] = input_artifact_node.dynamic_data + root_execution_node.extra_data = extra_data # Not adding any artifact link for secret inputs continue From c0954366d3df0dbd046f29e04aa41e43b8ae47c9 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Wed, 11 Feb 2026 21:27:13 -0800 Subject: [PATCH 14/14] Transforming ItemAlreadyExistsError to HTTP error 409 Conflict --- cloud_pipelines_backend/api_router.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cloud_pipelines_backend/api_router.py b/cloud_pipelines_backend/api_router.py index 74e1bdb..2f5d79a 100644 --- a/cloud_pipelines_backend/api_router.py +++ b/cloud_pipelines_backend/api_router.py @@ -107,6 +107,15 @@ def handle_permission_error(request: fastapi.Request, exc: errors.PermissionErro content={"message": str(exc)}, ) + @app.exception_handler(errors.ItemAlreadyExistsError) + def handle_item_already_exists_error( + request: fastapi.Request, exc: errors.ItemAlreadyExistsError + ): + return fastapi.responses.JSONResponse( + status_code=409, + content={"message": str(exc)}, + ) + get_user_details_dependency = fastapi.Depends(user_details_getter) def get_user_name(