diff --git a/src/everos/memory/strategies/extract_agent_skill.py b/src/everos/memory/strategies/extract_agent_skill.py index 07b3b5fa6..a1b86dc0f 100644 --- a/src/everos/memory/strategies/extract_agent_skill.py +++ b/src/everos/memory/strategies/extract_agent_skill.py @@ -34,6 +34,8 @@ from __future__ import annotations +import asyncio + from everalgo.agent_memory import AgentSkillExtractor from everalgo.types import AgentCase as AlgoAgentCase from everalgo.types import AgentSkill as AlgoAgentSkill @@ -87,6 +89,15 @@ Ranking is ``(quality_score desc, timestamp desc)`` — same ordering opensource ``AgentSkillExtractor._load_case_history`` applies.""" +CASE_VISIBILITY_ATTEMPTS = 6 +CASE_VISIBILITY_POLL_SECONDS = 0.2 +"""Bounded wait for cascade to surface the fresh AgentCase in LanceDB. + +``SkillClusterUpdated`` can arrive before the md→LanceDB write has +become visible. A short local poll absorbs the normal watcher/indexer +lag without consuming the whole OME retry budget in a tight loop. +""" + class _ClusterMissingError(RuntimeError): """Race with the cluster strategy; OME retry will catch up.""" @@ -198,16 +209,21 @@ async def _load_target_case( app_id: str, project_id: str, ) -> LanceAgentCase: - """Pull the target case row, raising a retry-class error on cascade lag.""" - target = await agent_case_repo.find_by_owner_entry( - agent_id, case_entry_id, app_id=app_id, project_id=project_id - ) - if target is None: - # Cascade hasn't indexed the freshly-written md yet. - raise _CaseNotYetIndexedError( - f"AgentCase entry_id={case_entry_id} not in LanceDB yet; retrying" + """Pull the target case row, allowing a short wait for cascade lag.""" + for attempt in range(CASE_VISIBILITY_ATTEMPTS): + target = await agent_case_repo.find_by_owner_entry( + agent_id, case_entry_id, app_id=app_id, project_id=project_id ) - return target + if target is not None: + return target + if attempt < CASE_VISIBILITY_ATTEMPTS - 1: + await asyncio.sleep(CASE_VISIBILITY_POLL_SECONDS) + + raise _CaseNotYetIndexedError( + "AgentCase entry_id=" + f"{case_entry_id} not in LanceDB after " + f"{CASE_VISIBILITY_ATTEMPTS} visibility checks; retrying" + ) async def _select_existing_skills( diff --git a/tests/unit/test_memory/test_strategies/test_extract_agent_skill.py b/tests/unit/test_memory/test_strategies/test_extract_agent_skill.py index 7b584f4f2..972e215fd 100644 --- a/tests/unit/test_memory/test_strategies/test_extract_agent_skill.py +++ b/tests/unit/test_memory/test_strategies/test_extract_agent_skill.py @@ -38,6 +38,7 @@ _CaseNotYetIndexedError, _ClusterMissingError, _collect_supporting_entry_ids, + _load_target_case, _resolve_query_vector, _select_existing_skills, _select_supporting_cases, @@ -161,6 +162,7 @@ async def test_raises_when_target_case_not_yet_in_lancedb() -> None: patch( "everos.memory.strategies.extract_agent_skill.agent_case_repo" ) as mock_case_repo, + patch("asyncio.sleep", new_callable=AsyncMock), ): mock_cluster_repo.get_with_members = AsyncMock(return_value=_algo_cluster()) mock_case_repo.find_by_owner_entry = AsyncMock(return_value=None) @@ -168,6 +170,32 @@ async def test_raises_when_target_case_not_yet_in_lancedb() -> None: await extract_agent_skill(_event(), FakeStrategyContext()) +async def test_load_target_case_waits_for_lancedb_visibility() -> None: + """Short cascade lag should not force an immediate retry/dead-letter.""" + target = _lance_case("ac_20260517_0001") + + with ( + patch( + "everos.memory.strategies.extract_agent_skill.agent_case_repo" + ) as mock_case_repo, + patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep, + ): + mock_case_repo.find_by_owner_entry = AsyncMock( + side_effect=[None, None, target] + ) + + got = await _load_target_case( + "agent_42", + "ac_20260517_0001", + app_id="default", + project_id="default", + ) + + assert got is target + assert mock_case_repo.find_by_owner_entry.await_count == 3 + assert mock_sleep.await_count == 2 + + # ── end-to-end orchestration (mocked) ────────────────────────────────────