diff --git a/infra/vscode_web/.env b/infra/vscode_web/.env new file mode 100644 index 00000000..14110474 --- /dev/null +++ b/infra/vscode_web/.env @@ -0,0 +1,7 @@ +AZURE_EXISTING_AGENT_ID="<%= agentId %>" +AZURE_ENV_NAME="<%= playgroundName %>" +# AZURE_LOCATION="<%= location %>" +AZURE_SUBSCRIPTION_ID="<%= subscriptionId %>" +AZURE_EXISTING_AIPROJECT_ENDPOINT="<%= endpoint %>" +AZURE_EXISTING_AIPROJECT_RESOURCE_ID="<%= projectResourceId %>" +AZD_ALLOW_NON_EMPTY_FOLDER=true diff --git a/src/ContentProcessor/src/libs/pipeline/queue_handler_base.py b/src/ContentProcessor/src/libs/pipeline/queue_handler_base.py index b739664b..ef474b6a 100644 --- a/src/ContentProcessor/src/libs/pipeline/queue_handler_base.py +++ b/src/ContentProcessor/src/libs/pipeline/queue_handler_base.py @@ -246,36 +246,68 @@ def _get_artifact_type(step_name: str) -> ArtifactType: container_name=self.application_context.configuration.app_cps_processes, ) - ContentProcess( - process_id=self._current_message_context.data_pipeline.process_id, - processed_file_name=self._current_message_context.data_pipeline.files[ - 0 - ].name, - status="Error", - processed_file_mime_type=self._current_message_context.data_pipeline.files[ - 0 - ].mime_type, - last_modified_time=datetime.datetime.now(datetime.UTC), - last_modified_by=step_name, - imported_time=datetime.datetime.strptime( - self._current_message_context.data_pipeline.pipeline_status.creation_time, - "%Y-%m-%dT%H:%M:%S.%fZ", - ), - process_output=[ - Step_Outputs( - step_name=self.handler_name, - step_result=exception_result.result, - ) - ], - ).update_status_to_cosmos( - connection_string=self.application_context.configuration.app_cosmos_connstr, - database_name=self.application_context.configuration.app_cosmos_database, - collection_name=self.application_context.configuration.app_cosmos_container_process, - ) + # Only mark as terminal "Error" when retries are + # exhausted. While retries remain, use "Retrying" + # so the workflow poller keeps waiting instead of + # treating the first transient failure as final. + has_retries_remaining = queue_message.dequeue_count <= 5 + + if has_retries_remaining: + # Lightweight status-only update — avoids + # overwriting the document with null result / + # scores that a previous successful step may + # have written. + ContentProcess( + process_id=self._current_message_context.data_pipeline.process_id, + processed_file_name=self._current_message_context.data_pipeline.files[ + 0 + ].name, + status="Retrying", + processed_file_mime_type=self._current_message_context.data_pipeline.files[ + 0 + ].mime_type, + last_modified_time=datetime.datetime.now(datetime.UTC), + last_modified_by=step_name, + imported_time=datetime.datetime.strptime( + self._current_message_context.data_pipeline.pipeline_status.creation_time, + "%Y-%m-%dT%H:%M:%S.%fZ", + ), + ).update_process_status_to_cosmos( + connection_string=self.application_context.configuration.app_cosmos_connstr, + database_name=self.application_context.configuration.app_cosmos_database, + collection_name=self.application_context.configuration.app_cosmos_container_process, + ) + else: + ContentProcess( + process_id=self._current_message_context.data_pipeline.process_id, + processed_file_name=self._current_message_context.data_pipeline.files[ + 0 + ].name, + status="Error", + processed_file_mime_type=self._current_message_context.data_pipeline.files[ + 0 + ].mime_type, + last_modified_time=datetime.datetime.now(datetime.UTC), + last_modified_by=step_name, + imported_time=datetime.datetime.strptime( + self._current_message_context.data_pipeline.pipeline_status.creation_time, + "%Y-%m-%dT%H:%M:%S.%fZ", + ), + process_output=[ + Step_Outputs( + step_name=self.handler_name, + step_result=exception_result.result, + ) + ], + ).update_status_to_cosmos( + connection_string=self.application_context.configuration.app_cosmos_connstr, + database_name=self.application_context.configuration.app_cosmos_database, + collection_name=self.application_context.configuration.app_cosmos_container_process, + ) process_outputs: list[Step_Outputs] = [] - if queue_message.dequeue_count > 5: + if not has_retries_remaining: logging.info( "Message will be moved to the Dead Letter Queue." ) diff --git a/src/ContentProcessorWorkflow/src/services/content_process_service.py b/src/ContentProcessorWorkflow/src/services/content_process_service.py index 7b1e447f..020e4298 100644 --- a/src/ContentProcessorWorkflow/src/services/content_process_service.py +++ b/src/ContentProcessorWorkflow/src/services/content_process_service.py @@ -267,15 +267,27 @@ async def poll_status( poll_interval_seconds: float = 5.0, timeout_seconds: float = 600.0, on_poll: Callable[[dict], Awaitable[None] | None] | None = None, + error_confirmation_polls: int = 3, ) -> dict: """Poll Cosmos for status until a terminal state or timeout. + When an ``Error`` status is observed, the poller does not return + immediately. Instead it re-polls up to *error_confirmation_polls* + additional times (with the same interval) to confirm the error is + persistent and not a transient state during a ContentProcessor + retry cycle. If the status changes away from ``Error`` (e.g. + back to a step name or ``Retrying``), the normal polling loop + resumes. + Args: process_id: The content process ID to poll. poll_interval_seconds: Delay between poll attempts. timeout_seconds: Maximum elapsed time before giving up. on_poll: Optional callback invoked on each iteration with the current status dict. Accepts sync or async callables. + error_confirmation_polls: Number of additional polls to + perform after first observing ``Error`` before accepting + it as terminal. Defaults to 3. Returns: Final status dict with keys ``status``, ``process_id``, @@ -283,6 +295,7 @@ async def poll_status( """ elapsed = 0.0 result: dict | None = None + consecutive_error_polls = 0 while elapsed < timeout_seconds: result = await self.get_status(process_id) if result is None: @@ -299,10 +312,27 @@ async def poll_status( await poll_handler status = result.get("status", "processing") - if status in ("Completed", "Error"): + if status == "Completed": result["terminal"] = True return result + if status == "Error": + consecutive_error_polls += 1 + if consecutive_error_polls > error_confirmation_polls: + result["terminal"] = True + return result + logger.info( + "Process %s reported Error (confirmation %d/%d), " + "re-polling to confirm retries are exhausted.", + process_id, + consecutive_error_polls, + error_confirmation_polls, + ) + else: + # Status changed away from Error (e.g. retry started), + # reset the confirmation counter. + consecutive_error_polls = 0 + await asyncio.sleep(poll_interval_seconds) elapsed += poll_interval_seconds diff --git a/src/ContentProcessorWorkflow/tests/unit/services/test_content_process_service.py b/src/ContentProcessorWorkflow/tests/unit/services/test_content_process_service.py index 07405691..835fc591 100644 --- a/src/ContentProcessorWorkflow/tests/unit/services/test_content_process_service.py +++ b/src/ContentProcessorWorkflow/tests/unit/services/test_content_process_service.py @@ -142,9 +142,69 @@ async def _run(): record.processed_file_name = "test.pdf" svc._process_repo.get_async.return_value = record - result = await svc.poll_status("p1", poll_interval_seconds=0.01) + result = await svc.poll_status( + "p1", + poll_interval_seconds=0.01, + error_confirmation_polls=3, + ) assert result["status"] == "Error" assert result["terminal"] is True + # Should have been called 1 (initial) + 3 (confirmation) = 4 times + assert svc._process_repo.get_async.call_count == 4 + + asyncio.run(_run()) + + def test_error_recovers_to_retrying(self): + """When Error is seen but status changes to Retrying, polling continues.""" + + async def _run(): + svc = _make_service() + # Error -> Retrying -> extract -> Completed + statuses = iter(["Error", "Retrying", "extract", "Completed"]) + + async def _get_async(pid): + s = next(statuses) + rec = MagicMock() + rec.status = s + rec.processed_file_name = "test.pdf" + return rec + + svc._process_repo.get_async.side_effect = _get_async + + result = await svc.poll_status( + "p1", + poll_interval_seconds=0.01, + error_confirmation_polls=3, + ) + assert result["status"] == "Completed" + assert result["terminal"] is True + + asyncio.run(_run()) + + def test_error_recovers_to_step_name(self): + """When Error is seen but status changes to a step name, polling continues.""" + + async def _run(): + svc = _make_service() + # Error -> map (retry started) -> Completed + statuses = iter(["Error", "map", "Completed"]) + + async def _get_async(pid): + s = next(statuses) + rec = MagicMock() + rec.status = s + rec.processed_file_name = "test.pdf" + return rec + + svc._process_repo.get_async.side_effect = _get_async + + result = await svc.poll_status( + "p1", + poll_interval_seconds=0.01, + error_confirmation_polls=3, + ) + assert result["status"] == "Completed" + assert result["terminal"] is True asyncio.run(_run())