Fix: Memory component - sorting by DESC now retrieves the correct messages#11887
Fix: Memory component - sorting by DESC now retrieves the correct messages#11887druellan wants to merge 5 commits intolangflow-ai:mainfrom
Conversation
…sages if the history is bigger than the amount to retrieve
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughThe changes introduce a new Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Important Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional. ❌ Failed checks (1 error, 3 warnings)
✅ Passed checks (3 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/lfx/src/lfx/components/models_and_agents/memory.py (1)
217-225:⚠️ Potential issue | 🟠 MajorApply sender filtering before truncation in external-memory retrieval.
Line 220 truncates before sender filtering at Line 223, which can return fewer/wrong messages for filtered queries. Also guard
aget_messages()with an empty-list fallback before reverse/slice.Proposed fix
- stored = await self.memory.aget_messages() + stored = await self.memory.aget_messages() or [] # langchain memories are supposed to return messages in ascending order if order == "DESC": - stored = stored[::-1] # Reverse first + stored = stored[::-1] - if n_messages: - stored = stored[:n_messages] # Then take first N messages - stored = [Message.from_lc_message(m) for m in stored] if sender_type: expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER stored = [m for m in stored if m.type == expected_type] + if n_messages: + stored = stored[:n_messages]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/lfx/src/lfx/components/models_and_agents/memory.py` around lines 217 - 225, The retrieval logic reverses and truncates the list in variable stored before applying sender_type filtering and also assumes aget_messages() returned a non-empty list; update the code in the external-memory retrieval path (the block that uses stored, Message.from_lc_message, MESSAGE_SENDER_AI, MESSAGE_SENDER_USER and n_messages) to first guard against a None/empty result from aget_messages() (treat as empty list), then convert items via Message.from_lc_message, then apply the sender_type filter using expected_type derived from MESSAGE_SENDER_AI / MESSAGE_SENDER_USER, and only after filtering perform the reverse/slice truncation by n_messages so that sender-filtered queries return the correct N messages. Ensure you modify the same variables (stored) and preserve existing behavior when sender_type is not provided.
🧹 Nitpick comments (2)
src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json (1)
2498-2498: Optional: avoid committing volatilescorechurn in starter JSONs.This value appears tool-generated and non-functional; excluding/normalizing it would reduce diff noise.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/base/langflow/initial_setup/starter_projects/Meeting` Summary.json at line 2498, Remove the volatile "score" field from the starter JSON (e.g., Meeting Summary.json) or normalize it to a stable value (null or 0) so it no longer changes on tool runs; update the JSON by deleting the "score" key (or setting it to a consistent value) and verify the file remains valid JSON and that any loader/consumer code that expects "score" tolerates its absence or the normalized value.src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json (1)
991-991: Consider generating embedded component code from a single canonical source.Inlining full component source into starter-project JSON increases drift risk versus
src/lfx/src/lfx/components/models_and_agents/memory.py.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/base/langflow/initial_setup/starter_projects/Memory` Chatbot.json at line 991, The JSON embeds a full copy of the MemoryComponent source which risks drift; replace the inlined code with a generated snapshot produced from the canonical MemoryComponent implementation instead of hand-editing the JSON. Locate the MemoryComponent class (symbols: class MemoryComponent, methods store_message, retrieve_messages, retrieve_messages_as_text, update_build_config) in the canonical components module and update the starter-project generation pipeline to pull that source at build time and inject a single maintained snapshot into the starter-project JSON (or store a reference key) so changes to the canonical class automatically propagate; remove the duplicated literal source from the JSON and ensure the generator preserves the same exposed fields (inputs, outputs, default_keys, mode_config) when producing the embedded component.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/backend/base/langflow/initial_setup/starter_projects/Meeting`
Summary.json:
- Line 1782: The internal retrieval path in retrieve_messages currently
hardcodes limit=10000 when calling aget_messages which can over-fetch; change
the call in retrieve_messages to set the limit based on n_messages (e.g., use
limit=n_messages if n_messages is truthy, otherwise omit or use a reasonable
default) and then rely on the existing slicing (stored =
stored[:n_messages])/ordering logic so you don't fetch more than needed; update
the aget_messages(...) invocation and any related slicing to make limit derive
from the n_messages variable.
In `@src/lfx/src/lfx/_assets/component_index.json`:
- Line 90858: In retrieve_messages: defensively normalize stored = await
self.memory.aget_messages() to stored = stored or [] before any slicing, apply
Message.from_lc_message conversion and sender_type filtering (use
MESSAGE_SENDER_AI / MESSAGE_SENDER_USER to map expected_type and filter on
m.type) before taking the first n_messages, and ensure the order reversal
(stored = stored[::-1]) happens prior to filtering/slicing so you return the
most recent matching messages; update the external-memory branch around
aget_messages, the stored list handling, and the slicing logic in
retrieve_messages accordingly.
---
Outside diff comments:
In `@src/lfx/src/lfx/components/models_and_agents/memory.py`:
- Around line 217-225: The retrieval logic reverses and truncates the list in
variable stored before applying sender_type filtering and also assumes
aget_messages() returned a non-empty list; update the code in the
external-memory retrieval path (the block that uses stored,
Message.from_lc_message, MESSAGE_SENDER_AI, MESSAGE_SENDER_USER and n_messages)
to first guard against a None/empty result from aget_messages() (treat as empty
list), then convert items via Message.from_lc_message, then apply the
sender_type filter using expected_type derived from MESSAGE_SENDER_AI /
MESSAGE_SENDER_USER, and only after filtering perform the reverse/slice
truncation by n_messages so that sender-filtered queries return the correct N
messages. Ensure you modify the same variables (stored) and preserve existing
behavior when sender_type is not provided.
---
Nitpick comments:
In `@src/backend/base/langflow/initial_setup/starter_projects/Meeting`
Summary.json:
- Line 2498: Remove the volatile "score" field from the starter JSON (e.g.,
Meeting Summary.json) or normalize it to a stable value (null or 0) so it no
longer changes on tool runs; update the JSON by deleting the "score" key (or
setting it to a consistent value) and verify the file remains valid JSON and
that any loader/consumer code that expects "score" tolerates its absence or the
normalized value.
In `@src/backend/base/langflow/initial_setup/starter_projects/Memory`
Chatbot.json:
- Line 991: The JSON embeds a full copy of the MemoryComponent source which
risks drift; replace the inlined code with a generated snapshot produced from
the canonical MemoryComponent implementation instead of hand-editing the JSON.
Locate the MemoryComponent class (symbols: class MemoryComponent, methods
store_message, retrieve_messages, retrieve_messages_as_text,
update_build_config) in the canonical components module and update the
starter-project generation pipeline to pull that source at build time and inject
a single maintained snapshot into the starter-project JSON (or store a reference
key) so changes to the canonical class automatically propagate; remove the
duplicated literal source from the JSON and ensure the generator preserves the
same exposed fields (inputs, outputs, default_keys, mode_config) when producing
the embedded component.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
src/backend/base/langflow/initial_setup/starter_projects/Custom Component Generator.jsonsrc/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.jsonsrc/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.jsonsrc/lfx/src/lfx/_assets/component_index.jsonsrc/lfx/src/lfx/_assets/stable_hash_history.jsonsrc/lfx/src/lfx/components/models_and_agents/memory.py
| "title_case": false, | ||
| "type": "code", | ||
| "value": "from typing import Any, cast\n\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.helpers.data import data_to_text\nfrom lfx.inputs.inputs import DropdownInput, HandleInput, IntInput, MessageTextInput, MultilineInput, TabInput\nfrom lfx.memory import aget_messages, astore_message\nfrom lfx.schema.data import Data\nfrom lfx.schema.dataframe import DataFrame\nfrom lfx.schema.dotdict import dotdict\nfrom lfx.schema.message import Message\nfrom lfx.template.field.base import Output\nfrom lfx.utils.component_utils import set_current_fields, set_field_display\nfrom lfx.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass MemoryComponent(Component):\n display_name = \"Message History\"\n description = \"Stores or retrieves stored chat messages from Langflow tables or an external memory.\"\n documentation: str = \"https://docs.langflow.org/message-history\"\n icon = \"message-square-more\"\n name = \"Memory\"\n default_keys = [\"mode\", \"memory\", \"session_id\", \"context_id\"]\n mode_config = {\n \"Store\": [\"message\", \"memory\", \"sender\", \"sender_name\", \"session_id\", \"context_id\"],\n \"Retrieve\": [\"n_messages\", \"order\", \"template\", \"memory\", \"session_id\", \"context_id\"],\n }\n\n inputs = [\n TabInput(\n name=\"mode\",\n display_name=\"Mode\",\n options=[\"Retrieve\", \"Store\"],\n value=\"Retrieve\",\n info=\"Operation mode: Store messages or Retrieve messages.\",\n real_time_refresh=True,\n ),\n MessageTextInput(\n name=\"message\",\n display_name=\"Message\",\n info=\"The chat message to be stored.\",\n tool_mode=True,\n dynamic=True,\n show=False,\n ),\n HandleInput(\n name=\"memory\",\n display_name=\"External Memory\",\n input_types=[\"Memory\"],\n info=\"Retrieve messages from an external memory. If empty, it will use the Langflow tables.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"sender_type\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER, \"Machine and User\"],\n value=\"Machine and User\",\n info=\"Filter by sender type.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender\",\n display_name=\"Sender\",\n info=\"The sender of the message. Might be Machine or User. \"\n \"If empty, the current sender parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Filter by sender name.\",\n advanced=True,\n show=False,\n ),\n IntInput(\n name=\"n_messages\",\n display_name=\"Number of Messages\",\n value=100,\n info=\"Number of messages to retrieve.\",\n advanced=True,\n show=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n value=\"\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"context_id\",\n display_name=\"Context ID\",\n info=\"The context ID of the chat. Adds an extra layer to the local memory.\",\n value=\"\",\n advanced=True,\n ),\n DropdownInput(\n name=\"order\",\n display_name=\"Order\",\n options=[\"Ascending\", \"Descending\"],\n value=\"Ascending\",\n info=\"Order of the messages.\",\n advanced=True,\n tool_mode=True,\n required=True,\n ),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {sender} or any other key in the message data.\",\n value=\"{sender_name}: {text}\",\n advanced=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Message\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True),\n Output(display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True),\n ]\n\n def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict:\n \"\"\"Dynamically show only the relevant output based on the selected output type.\"\"\"\n if field_name == \"mode\":\n # Start with empty outputs\n frontend_node[\"outputs\"] = []\n if field_value == \"Store\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Stored Messages\",\n name=\"stored_messages\",\n method=\"store_message\",\n hidden=True,\n dynamic=True,\n )\n ]\n if field_value == \"Retrieve\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Messages\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True\n ),\n Output(\n display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True\n ),\n ]\n return frontend_node\n\n async def store_message(self) -> Message:\n message = Message(text=self.message) if isinstance(self.message, str) else self.message\n\n message.context_id = self.context_id or message.context_id\n message.session_id = self.session_id or message.session_id\n message.sender = self.sender or message.sender or MESSAGE_SENDER_AI\n message.sender_name = self.sender_name or message.sender_name or MESSAGE_SENDER_NAME_AI\n\n stored_messages: list[Message] = []\n\n if self.memory:\n self.memory.context_id = message.context_id\n self.memory.session_id = message.session_id\n lc_message = message.to_lc_message()\n await self.memory.aadd_messages([lc_message])\n\n stored_messages = await self.memory.aget_messages() or []\n\n stored_messages = [Message.from_lc_message(m) for m in stored_messages] if stored_messages else []\n\n if message.sender:\n stored_messages = [m for m in stored_messages if m.sender == message.sender]\n else:\n await astore_message(message, flow_id=self.graph.flow_id)\n stored_messages = (\n await aget_messages(\n session_id=message.session_id,\n context_id=message.context_id,\n sender_name=message.sender_name,\n sender=message.sender,\n )\n or []\n )\n\n if not stored_messages:\n msg = \"No messages were stored. Please ensure that the session ID and sender are properly set.\"\n raise ValueError(msg)\n\n stored_message = stored_messages[0]\n self.status = stored_message\n return stored_message\n\n async def retrieve_messages(self) -> Data:\n sender_type = self.sender_type\n sender_name = self.sender_name\n session_id = self.session_id\n context_id = self.context_id\n n_messages = self.n_messages\n order = \"DESC\" if self.order == \"Descending\" else \"ASC\"\n\n if sender_type == \"Machine and User\":\n sender_type = None\n\n if self.memory and not hasattr(self.memory, \"aget_messages\"):\n memory_name = type(self.memory).__name__\n err_msg = f\"External Memory object ({memory_name}) must have 'aget_messages' method.\"\n raise AttributeError(err_msg)\n # Check if n_messages is None or 0\n if n_messages == 0:\n stored = []\n elif self.memory:\n # override session_id\n self.memory.session_id = session_id\n self.memory.context_id = context_id\n\n stored = await self.memory.aget_messages()\n # langchain memories are supposed to return messages in ascending order\n\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages first\n\n if order == \"DESC\":\n stored = stored[::-1] # Then reverse if needed\n\n stored = [Message.from_lc_message(m) for m in stored]\n if sender_type:\n expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER\n stored = [m for m in stored if m.type == expected_type]\n else:\n # For internal memory, we always fetch the last N messages by ordering by DESC\n stored = await aget_messages(\n sender=sender_type,\n sender_name=sender_name,\n session_id=session_id,\n context_id=context_id,\n limit=10000,\n order=order,\n )\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages\n\n # self.status = stored\n return cast(\"Data\", stored)\n\n async def retrieve_messages_as_text(self) -> Message:\n stored_text = data_to_text(self.template, await self.retrieve_messages())\n # self.status = stored_text\n return Message(text=stored_text)\n\n async def retrieve_messages_dataframe(self) -> DataFrame:\n \"\"\"Convert the retrieved messages into a DataFrame.\n\n Returns:\n DataFrame: A DataFrame containing the message data.\n \"\"\"\n messages = await self.retrieve_messages()\n return DataFrame(messages)\n\n def update_build_config(\n self,\n build_config: dotdict,\n field_value: Any, # noqa: ARG002\n field_name: str | None = None, # noqa: ARG002\n ) -> dotdict:\n return set_current_fields(\n build_config=build_config,\n action_fields=self.mode_config,\n selected_action=build_config[\"mode\"][\"value\"],\n default_fields=self.default_keys,\n func=set_field_display,\n )\n" | ||
| "value": "from typing import Any, cast\n\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.helpers.data import data_to_text\nfrom lfx.inputs.inputs import DropdownInput, HandleInput, IntInput, MessageTextInput, MultilineInput, TabInput\nfrom lfx.memory import aget_messages, astore_message\nfrom lfx.schema.data import Data\nfrom lfx.schema.dataframe import DataFrame\nfrom lfx.schema.dotdict import dotdict\nfrom lfx.schema.message import Message\nfrom lfx.template.field.base import Output\nfrom lfx.utils.component_utils import set_current_fields, set_field_display\nfrom lfx.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass MemoryComponent(Component):\n display_name = \"Message History\"\n description = \"Stores or retrieves stored chat messages from Langflow tables or an external memory.\"\n documentation: str = \"https://docs.langflow.org/message-history\"\n icon = \"message-square-more\"\n name = \"Memory\"\n default_keys = [\"mode\", \"memory\", \"session_id\", \"context_id\"]\n mode_config = {\n \"Store\": [\"message\", \"memory\", \"sender\", \"sender_name\", \"session_id\", \"context_id\"],\n \"Retrieve\": [\"n_messages\", \"order\", \"template\", \"memory\", \"session_id\", \"context_id\"],\n }\n\n inputs = [\n TabInput(\n name=\"mode\",\n display_name=\"Mode\",\n options=[\"Retrieve\", \"Store\"],\n value=\"Retrieve\",\n info=\"Operation mode: Store messages or Retrieve messages.\",\n real_time_refresh=True,\n ),\n MessageTextInput(\n name=\"message\",\n display_name=\"Message\",\n info=\"The chat message to be stored.\",\n tool_mode=True,\n dynamic=True,\n show=False,\n ),\n HandleInput(\n name=\"memory\",\n display_name=\"External Memory\",\n input_types=[\"Memory\"],\n info=\"Retrieve messages from an external memory. If empty, it will use the Langflow tables.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"sender_type\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER, \"Machine and User\"],\n value=\"Machine and User\",\n info=\"Filter by sender type.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender\",\n display_name=\"Sender\",\n info=\"The sender of the message. Might be Machine or User. \"\n \"If empty, the current sender parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Filter by sender name.\",\n advanced=True,\n show=False,\n ),\n IntInput(\n name=\"n_messages\",\n display_name=\"Number of Messages\",\n value=100,\n info=\"Number of messages to retrieve.\",\n advanced=True,\n show=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n value=\"\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"context_id\",\n display_name=\"Context ID\",\n info=\"The context ID of the chat. Adds an extra layer to the local memory.\",\n value=\"\",\n advanced=True,\n ),\n DropdownInput(\n name=\"order\",\n display_name=\"Order\",\n options=[\"Ascending\", \"Descending\"],\n value=\"Ascending\",\n info=\"Order of the messages.\",\n advanced=True,\n tool_mode=True,\n required=True,\n ),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {sender} or any other key in the message data.\",\n value=\"{sender_name}: {text}\",\n advanced=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Message\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True),\n Output(display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True),\n ]\n\n def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict:\n \"\"\"Dynamically show only the relevant output based on the selected output type.\"\"\"\n if field_name == \"mode\":\n # Start with empty outputs\n frontend_node[\"outputs\"] = []\n if field_value == \"Store\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Stored Messages\",\n name=\"stored_messages\",\n method=\"store_message\",\n hidden=True,\n dynamic=True,\n )\n ]\n if field_value == \"Retrieve\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Messages\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True\n ),\n Output(\n display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True\n ),\n ]\n return frontend_node\n\n async def store_message(self) -> Message:\n message = Message(text=self.message) if isinstance(self.message, str) else self.message\n\n message.context_id = self.context_id or message.context_id\n message.session_id = self.session_id or message.session_id\n message.sender = self.sender or message.sender or MESSAGE_SENDER_AI\n message.sender_name = self.sender_name or message.sender_name or MESSAGE_SENDER_NAME_AI\n\n stored_messages: list[Message] = []\n\n if self.memory:\n self.memory.context_id = message.context_id\n self.memory.session_id = message.session_id\n lc_message = message.to_lc_message()\n await self.memory.aadd_messages([lc_message])\n\n stored_messages = await self.memory.aget_messages() or []\n\n stored_messages = [Message.from_lc_message(m) for m in stored_messages] if stored_messages else []\n\n if message.sender:\n stored_messages = [m for m in stored_messages if m.sender == message.sender]\n else:\n await astore_message(message, flow_id=self.graph.flow_id)\n stored_messages = (\n await aget_messages(\n session_id=message.session_id,\n context_id=message.context_id,\n sender_name=message.sender_name,\n sender=message.sender,\n )\n or []\n )\n\n if not stored_messages:\n msg = \"No messages were stored. Please ensure that the session ID and sender are properly set.\"\n raise ValueError(msg)\n\n stored_message = stored_messages[0]\n self.status = stored_message\n return stored_message\n\n async def retrieve_messages(self) -> Data:\n sender_type = self.sender_type\n sender_name = self.sender_name\n session_id = self.session_id\n context_id = self.context_id\n n_messages = self.n_messages\n order = \"DESC\" if self.order == \"Descending\" else \"ASC\"\n\n if sender_type == \"Machine and User\":\n sender_type = None\n\n if self.memory and not hasattr(self.memory, \"aget_messages\"):\n memory_name = type(self.memory).__name__\n err_msg = f\"External Memory object ({memory_name}) must have 'aget_messages' method.\"\n raise AttributeError(err_msg)\n # Check if n_messages is None or 0\n if n_messages == 0:\n stored = []\n elif self.memory:\n # override session_id\n self.memory.session_id = session_id\n self.memory.context_id = context_id\n\n stored = await self.memory.aget_messages()\n # langchain memories are supposed to return messages in ascending order\n\n if order == \"DESC\":\n stored = stored[::-1] # Reverse first\n\n if n_messages:\n stored = stored[:n_messages] # Then take first N messages\n\n stored = [Message.from_lc_message(m) for m in stored]\n if sender_type:\n expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER\n stored = [m for m in stored if m.type == expected_type]\n else:\n # For internal memory, we always fetch the last N messages by ordering by DESC\n stored = await aget_messages(\n sender=sender_type,\n sender_name=sender_name,\n session_id=session_id,\n context_id=context_id,\n limit=10000,\n order=order,\n )\n if n_messages:\n stored = stored[:n_messages] # Take first N messages (already sorted by order)\n\n # self.status = stored\n return cast(\"Data\", stored)\n\n async def retrieve_messages_as_text(self) -> Message:\n stored_text = data_to_text(self.template, await self.retrieve_messages())\n # self.status = stored_text\n return Message(text=stored_text)\n\n async def retrieve_messages_dataframe(self) -> DataFrame:\n \"\"\"Convert the retrieved messages into a DataFrame.\n\n Returns:\n DataFrame: A DataFrame containing the message data.\n \"\"\"\n messages = await self.retrieve_messages()\n return DataFrame(messages)\n\n def update_build_config(\n self,\n build_config: dotdict,\n field_value: Any, # noqa: ARG002\n field_name: str | None = None, # noqa: ARG002\n ) -> dotdict:\n return set_current_fields(\n build_config=build_config,\n action_fields=self.mode_config,\n selected_action=build_config[\"mode\"][\"value\"],\n default_fields=self.default_keys,\n func=set_field_display,\n )\n" |
There was a problem hiding this comment.
External-memory retrieval truncates before sender filtering (can drop valid recent matches).
In Line 90858 (retrieve_messages), the external-memory path applies stored[:n_messages] before sender_type filtering. With mixed sender history, this can return fewer messages than requested and skip newer matching entries. Also, aget_messages() should be normalized to [] defensively before slicing.
💡 Suggested fix
- stored = await self.memory.aget_messages()
+ stored = await self.memory.aget_messages() or []
# langchain memories are supposed to return messages in ascending order
if order == "DESC":
stored = stored[::-1] # Reverse first
- if n_messages:
- stored = stored[:n_messages] # Then take first N messages
-
stored = [Message.from_lc_message(m) for m in stored]
if sender_type:
expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER
stored = [m for m in stored if m.type == expected_type]
+ if n_messages:
+ stored = stored[:n_messages] # Limit after filtering🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lfx/src/lfx/_assets/component_index.json` at line 90858, In
retrieve_messages: defensively normalize stored = await
self.memory.aget_messages() to stored = stored or [] before any slicing, apply
Message.from_lc_message conversion and sender_type filtering (use
MESSAGE_SENDER_AI / MESSAGE_SENDER_USER to map expected_type and filter on
m.type) before taking the first n_messages, and ensure the order reversal
(stored = stored[::-1]) happens prior to filtering/slicing so you return the
most recent matching messages; update the external-memory branch around
aget_messages, the stored list handling, and the slicing logic in
retrieve_messages accordingly.
There was a problem hiding this comment.
This was in the original code, but might worth fix in this PR, since it is part of the sorting code we are fixing. Do you agree based on this new information?
There was a problem hiding this comment.
@druellan Yes, I agree it makes sense to include this fix in the current PR. Here's why:
-
Same code area: The fix is in the exact same method (
retrieve_messages) and code block (external memory path) that you're already modifying for the DESC sorting bug. -
Related functionality: Both issues affect message retrieval correctness. Fixing them together ensures the entire retrieval logic is correct and tested as a unit.
-
Reduces future complexity: Addressing both issues now avoids having multiple PRs touching the same sensitive code area, which reduces merge conflicts and makes the fix history cleaner.
-
Logical scope alignment: Since your PR is about "retrieving the correct messages," ensuring that sender filtering returns the requested number of matching messages (not just slicing first then filtering) falls within that scope.
The fix is small and well-contained, so including it won't significantly increase the PR's complexity. It's good practice to fix related correctness issues in the same area together.
|
Closing to reopen a clean version of the patch at: #11941 |
The memory.py component fails to return the most recent events when "order: descending" and "Number of Messages" are used in conjunction.
Why: For external memory, the cropping takes place BEFORE the sorting, since usually the most recent events are the last on the list, these event are ALWAYS cropped out; for internal memory events, the list is cropped from the bottom up, negating the sorting.
Summary by CodeRabbit
New Features
Bug Fixes
Chores