Skip to content

Fix: Memory component - sorting by DESC now retrieves the correct messages#11887

Closed
druellan wants to merge 5 commits intolangflow-ai:mainfrom
druellan:main
Closed

Fix: Memory component - sorting by DESC now retrieves the correct messages#11887
druellan wants to merge 5 commits intolangflow-ai:mainfrom
druellan:main

Conversation

@druellan
Copy link
Copy Markdown

@druellan druellan commented Feb 24, 2026

The memory.py component fails to return the most recent events when "order: descending" and "Number of Messages" are used in conjunction.

  • What is expected: If I select "descending" as the sort option, the component must return the most recent entry first, followed by the rest in inverse-timestamp order
  • What happens: depending how many entries we have in the event history, might be the case the most recent event is cropped out from the list.
  • How to reproduce: make sure you have a history of events of at least 5 entries; select 4 entries as the max entries to return; select DESC order. The most recent event is going to be truncated off the list.
    Why: For external memory, the cropping takes place BEFORE the sorting, since usually the most recent events are the last on the list, these event are ALWAYS cropped out; for internal memory events, the list is cropped from the bottom up, negating the sorting.

Summary by CodeRabbit

  • New Features

    • Added a toggle option for Mustache-style template syntax (double brackets) in Prompt Template components, allowing flexible variable handling between mustache and standard modes.
  • Bug Fixes

    • Updated memory message retrieval logic to correctly select messages based on order and count parameters.
  • Chores

    • Updated external dependency version.

…sages if the history is bigger than the amount to retrieve
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Feb 24, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review

Walkthrough

The changes introduce a new use_double_brackets boolean field to Prompt Template components across multiple starter projects, and modify memory message retrieval logic in Memory components to use forward slicing with conditional reversal instead of negative indexing for selecting message subsets.

Changes

Cohort / File(s) Summary
Prompt Template Configuration
src/backend/base/langflow/initial_setup/starter_projects/Custom Component Generator.json, src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json, src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json
Added or relocated the use_double_brackets boolean field to Prompt Template components, enabling Mustache-style {{var}} variable syntax. Field includes updated code logic for template validation and processing when the mode changes.
Memory Component Hash Updates
src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json, src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json, src/lfx/src/lfx/_assets/component_index.json
Updated code_hash values for Memory and MemoryComponent reflecting revised message retrieval logic and internal implementation changes.
Memory Retrieval Logic
src/lfx/src/lfx/components/models_and_agents/memory.py, src/lfx/src/lfx/_assets/component_index.json
Modified message selection from negative indexing (stored[-n_messages:]) to forward slicing with conditional reversal (stored[:n_messages]), altering how N messages are selected for both external and internal memory paths.
Hash Registry and Dependencies
src/lfx/src/lfx/_assets/stable_hash_history.json, src/lfx/src/lfx/_assets/component_index.json
Updated Memory 0.3.0 hash mapping and bumped external dependency version (vlmrun from 0.1.35 to 0.5.4); updated SHA256 hash to reflect new content.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes


Important

Pre-merge checks failed

Please resolve all errors before merging. Addressing warnings is optional.

❌ Failed checks (1 error, 3 warnings)

Check name Status Explanation Resolution
Test Coverage For New Implementations ❌ Error PR fixes critical DESC ordering bug in MemoryComponent.retrieve_messages() but lacks corresponding regression tests to verify the fix. Add dedicated unit tests for MemoryComponent.retrieve_messages() covering DESC ordering with message limits and reproduce the original bug scenario.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Test Quality And Coverage ⚠️ Warning PR introduces critical DESC-order message retrieval bug fix but lacks specific test coverage for the fixed behavior. Add async tests validating DESC/ASC ordering, message limiting, and sender_type filtering with parametrized scenarios covering various edge cases.
Test File Naming And Structure ⚠️ Warning Pull request modifies critical MemoryComponent.retrieve_messages() method to fix DESC sorting bug with message limits, but no tests were added or updated to cover this specific bug fix. Create new test file src/backend/tests/unit/components/models_and_agents/test_memory_component.py with test functions covering DESC ordering with limits, external memory filtering, edge cases, and verify messages are ordered before limiting.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly describes the main fix: correcting message retrieval when using descending order in the Memory component, which aligns with the primary objective of the PR.
Excessive Mock Usage Warning ✅ Passed No test files were modified in this PR. Changes are limited to configuration files, assets, and the memory.py implementation file. Excessive mock usage assessment is not applicable.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added the community Pull Request from an external contributor label Feb 24, 2026
@druellan druellan marked this pull request as ready for review February 26, 2026 01:28
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/lfx/src/lfx/components/models_and_agents/memory.py (1)

217-225: ⚠️ Potential issue | 🟠 Major

Apply sender filtering before truncation in external-memory retrieval.

Line 220 truncates before sender filtering at Line 223, which can return fewer/wrong messages for filtered queries. Also guard aget_messages() with an empty-list fallback before reverse/slice.

Proposed fix
-            stored = await self.memory.aget_messages()
+            stored = await self.memory.aget_messages() or []
             # langchain memories are supposed to return messages in ascending order

             if order == "DESC":
-                stored = stored[::-1]  # Reverse first
+                stored = stored[::-1]

-            if n_messages:
-                stored = stored[:n_messages]  # Then take first N messages
-
             stored = [Message.from_lc_message(m) for m in stored]
             if sender_type:
                 expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER
                 stored = [m for m in stored if m.type == expected_type]
+            if n_messages:
+                stored = stored[:n_messages]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lfx/src/lfx/components/models_and_agents/memory.py` around lines 217 -
225, The retrieval logic reverses and truncates the list in variable stored
before applying sender_type filtering and also assumes aget_messages() returned
a non-empty list; update the code in the external-memory retrieval path (the
block that uses stored, Message.from_lc_message, MESSAGE_SENDER_AI,
MESSAGE_SENDER_USER and n_messages) to first guard against a None/empty result
from aget_messages() (treat as empty list), then convert items via
Message.from_lc_message, then apply the sender_type filter using expected_type
derived from MESSAGE_SENDER_AI / MESSAGE_SENDER_USER, and only after filtering
perform the reverse/slice truncation by n_messages so that sender-filtered
queries return the correct N messages. Ensure you modify the same variables
(stored) and preserve existing behavior when sender_type is not provided.
🧹 Nitpick comments (2)
src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json (1)

2498-2498: Optional: avoid committing volatile score churn in starter JSONs.

This value appears tool-generated and non-functional; excluding/normalizing it would reduce diff noise.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/base/langflow/initial_setup/starter_projects/Meeting`
Summary.json at line 2498, Remove the volatile "score" field from the starter
JSON (e.g., Meeting Summary.json) or normalize it to a stable value (null or 0)
so it no longer changes on tool runs; update the JSON by deleting the "score"
key (or setting it to a consistent value) and verify the file remains valid JSON
and that any loader/consumer code that expects "score" tolerates its absence or
the normalized value.
src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json (1)

991-991: Consider generating embedded component code from a single canonical source.

Inlining full component source into starter-project JSON increases drift risk versus src/lfx/src/lfx/components/models_and_agents/memory.py.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/base/langflow/initial_setup/starter_projects/Memory` Chatbot.json
at line 991, The JSON embeds a full copy of the MemoryComponent source which
risks drift; replace the inlined code with a generated snapshot produced from
the canonical MemoryComponent implementation instead of hand-editing the JSON.
Locate the MemoryComponent class (symbols: class MemoryComponent, methods
store_message, retrieve_messages, retrieve_messages_as_text,
update_build_config) in the canonical components module and update the
starter-project generation pipeline to pull that source at build time and inject
a single maintained snapshot into the starter-project JSON (or store a reference
key) so changes to the canonical class automatically propagate; remove the
duplicated literal source from the JSON and ensure the generator preserves the
same exposed fields (inputs, outputs, default_keys, mode_config) when producing
the embedded component.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/backend/base/langflow/initial_setup/starter_projects/Meeting`
Summary.json:
- Line 1782: The internal retrieval path in retrieve_messages currently
hardcodes limit=10000 when calling aget_messages which can over-fetch; change
the call in retrieve_messages to set the limit based on n_messages (e.g., use
limit=n_messages if n_messages is truthy, otherwise omit or use a reasonable
default) and then rely on the existing slicing (stored =
stored[:n_messages])/ordering logic so you don't fetch more than needed; update
the aget_messages(...) invocation and any related slicing to make limit derive
from the n_messages variable.

In `@src/lfx/src/lfx/_assets/component_index.json`:
- Line 90858: In retrieve_messages: defensively normalize stored = await
self.memory.aget_messages() to stored = stored or [] before any slicing, apply
Message.from_lc_message conversion and sender_type filtering (use
MESSAGE_SENDER_AI / MESSAGE_SENDER_USER to map expected_type and filter on
m.type) before taking the first n_messages, and ensure the order reversal
(stored = stored[::-1]) happens prior to filtering/slicing so you return the
most recent matching messages; update the external-memory branch around
aget_messages, the stored list handling, and the slicing logic in
retrieve_messages accordingly.

---

Outside diff comments:
In `@src/lfx/src/lfx/components/models_and_agents/memory.py`:
- Around line 217-225: The retrieval logic reverses and truncates the list in
variable stored before applying sender_type filtering and also assumes
aget_messages() returned a non-empty list; update the code in the
external-memory retrieval path (the block that uses stored,
Message.from_lc_message, MESSAGE_SENDER_AI, MESSAGE_SENDER_USER and n_messages)
to first guard against a None/empty result from aget_messages() (treat as empty
list), then convert items via Message.from_lc_message, then apply the
sender_type filter using expected_type derived from MESSAGE_SENDER_AI /
MESSAGE_SENDER_USER, and only after filtering perform the reverse/slice
truncation by n_messages so that sender-filtered queries return the correct N
messages. Ensure you modify the same variables (stored) and preserve existing
behavior when sender_type is not provided.

---

Nitpick comments:
In `@src/backend/base/langflow/initial_setup/starter_projects/Meeting`
Summary.json:
- Line 2498: Remove the volatile "score" field from the starter JSON (e.g.,
Meeting Summary.json) or normalize it to a stable value (null or 0) so it no
longer changes on tool runs; update the JSON by deleting the "score" key (or
setting it to a consistent value) and verify the file remains valid JSON and
that any loader/consumer code that expects "score" tolerates its absence or the
normalized value.

In `@src/backend/base/langflow/initial_setup/starter_projects/Memory`
Chatbot.json:
- Line 991: The JSON embeds a full copy of the MemoryComponent source which
risks drift; replace the inlined code with a generated snapshot produced from
the canonical MemoryComponent implementation instead of hand-editing the JSON.
Locate the MemoryComponent class (symbols: class MemoryComponent, methods
store_message, retrieve_messages, retrieve_messages_as_text,
update_build_config) in the canonical components module and update the
starter-project generation pipeline to pull that source at build time and inject
a single maintained snapshot into the starter-project JSON (or store a reference
key) so changes to the canonical class automatically propagate; remove the
duplicated literal source from the JSON and ensure the generator preserves the
same exposed fields (inputs, outputs, default_keys, mode_config) when producing
the embedded component.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e346812 and e1ea618.

📒 Files selected for processing (6)
  • src/backend/base/langflow/initial_setup/starter_projects/Custom Component Generator.json
  • src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json
  • src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json
  • src/lfx/src/lfx/_assets/component_index.json
  • src/lfx/src/lfx/_assets/stable_hash_history.json
  • src/lfx/src/lfx/components/models_and_agents/memory.py

Comment thread src/backend/base/langflow/initial_setup/starter_projects/Meeting Summary.json Outdated
"title_case": false,
"type": "code",
"value": "from typing import Any, cast\n\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.helpers.data import data_to_text\nfrom lfx.inputs.inputs import DropdownInput, HandleInput, IntInput, MessageTextInput, MultilineInput, TabInput\nfrom lfx.memory import aget_messages, astore_message\nfrom lfx.schema.data import Data\nfrom lfx.schema.dataframe import DataFrame\nfrom lfx.schema.dotdict import dotdict\nfrom lfx.schema.message import Message\nfrom lfx.template.field.base import Output\nfrom lfx.utils.component_utils import set_current_fields, set_field_display\nfrom lfx.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass MemoryComponent(Component):\n display_name = \"Message History\"\n description = \"Stores or retrieves stored chat messages from Langflow tables or an external memory.\"\n documentation: str = \"https://docs.langflow.org/message-history\"\n icon = \"message-square-more\"\n name = \"Memory\"\n default_keys = [\"mode\", \"memory\", \"session_id\", \"context_id\"]\n mode_config = {\n \"Store\": [\"message\", \"memory\", \"sender\", \"sender_name\", \"session_id\", \"context_id\"],\n \"Retrieve\": [\"n_messages\", \"order\", \"template\", \"memory\", \"session_id\", \"context_id\"],\n }\n\n inputs = [\n TabInput(\n name=\"mode\",\n display_name=\"Mode\",\n options=[\"Retrieve\", \"Store\"],\n value=\"Retrieve\",\n info=\"Operation mode: Store messages or Retrieve messages.\",\n real_time_refresh=True,\n ),\n MessageTextInput(\n name=\"message\",\n display_name=\"Message\",\n info=\"The chat message to be stored.\",\n tool_mode=True,\n dynamic=True,\n show=False,\n ),\n HandleInput(\n name=\"memory\",\n display_name=\"External Memory\",\n input_types=[\"Memory\"],\n info=\"Retrieve messages from an external memory. If empty, it will use the Langflow tables.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"sender_type\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER, \"Machine and User\"],\n value=\"Machine and User\",\n info=\"Filter by sender type.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender\",\n display_name=\"Sender\",\n info=\"The sender of the message. Might be Machine or User. \"\n \"If empty, the current sender parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Filter by sender name.\",\n advanced=True,\n show=False,\n ),\n IntInput(\n name=\"n_messages\",\n display_name=\"Number of Messages\",\n value=100,\n info=\"Number of messages to retrieve.\",\n advanced=True,\n show=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n value=\"\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"context_id\",\n display_name=\"Context ID\",\n info=\"The context ID of the chat. Adds an extra layer to the local memory.\",\n value=\"\",\n advanced=True,\n ),\n DropdownInput(\n name=\"order\",\n display_name=\"Order\",\n options=[\"Ascending\", \"Descending\"],\n value=\"Ascending\",\n info=\"Order of the messages.\",\n advanced=True,\n tool_mode=True,\n required=True,\n ),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {sender} or any other key in the message data.\",\n value=\"{sender_name}: {text}\",\n advanced=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Message\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True),\n Output(display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True),\n ]\n\n def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict:\n \"\"\"Dynamically show only the relevant output based on the selected output type.\"\"\"\n if field_name == \"mode\":\n # Start with empty outputs\n frontend_node[\"outputs\"] = []\n if field_value == \"Store\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Stored Messages\",\n name=\"stored_messages\",\n method=\"store_message\",\n hidden=True,\n dynamic=True,\n )\n ]\n if field_value == \"Retrieve\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Messages\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True\n ),\n Output(\n display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True\n ),\n ]\n return frontend_node\n\n async def store_message(self) -> Message:\n message = Message(text=self.message) if isinstance(self.message, str) else self.message\n\n message.context_id = self.context_id or message.context_id\n message.session_id = self.session_id or message.session_id\n message.sender = self.sender or message.sender or MESSAGE_SENDER_AI\n message.sender_name = self.sender_name or message.sender_name or MESSAGE_SENDER_NAME_AI\n\n stored_messages: list[Message] = []\n\n if self.memory:\n self.memory.context_id = message.context_id\n self.memory.session_id = message.session_id\n lc_message = message.to_lc_message()\n await self.memory.aadd_messages([lc_message])\n\n stored_messages = await self.memory.aget_messages() or []\n\n stored_messages = [Message.from_lc_message(m) for m in stored_messages] if stored_messages else []\n\n if message.sender:\n stored_messages = [m for m in stored_messages if m.sender == message.sender]\n else:\n await astore_message(message, flow_id=self.graph.flow_id)\n stored_messages = (\n await aget_messages(\n session_id=message.session_id,\n context_id=message.context_id,\n sender_name=message.sender_name,\n sender=message.sender,\n )\n or []\n )\n\n if not stored_messages:\n msg = \"No messages were stored. Please ensure that the session ID and sender are properly set.\"\n raise ValueError(msg)\n\n stored_message = stored_messages[0]\n self.status = stored_message\n return stored_message\n\n async def retrieve_messages(self) -> Data:\n sender_type = self.sender_type\n sender_name = self.sender_name\n session_id = self.session_id\n context_id = self.context_id\n n_messages = self.n_messages\n order = \"DESC\" if self.order == \"Descending\" else \"ASC\"\n\n if sender_type == \"Machine and User\":\n sender_type = None\n\n if self.memory and not hasattr(self.memory, \"aget_messages\"):\n memory_name = type(self.memory).__name__\n err_msg = f\"External Memory object ({memory_name}) must have 'aget_messages' method.\"\n raise AttributeError(err_msg)\n # Check if n_messages is None or 0\n if n_messages == 0:\n stored = []\n elif self.memory:\n # override session_id\n self.memory.session_id = session_id\n self.memory.context_id = context_id\n\n stored = await self.memory.aget_messages()\n # langchain memories are supposed to return messages in ascending order\n\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages first\n\n if order == \"DESC\":\n stored = stored[::-1] # Then reverse if needed\n\n stored = [Message.from_lc_message(m) for m in stored]\n if sender_type:\n expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER\n stored = [m for m in stored if m.type == expected_type]\n else:\n # For internal memory, we always fetch the last N messages by ordering by DESC\n stored = await aget_messages(\n sender=sender_type,\n sender_name=sender_name,\n session_id=session_id,\n context_id=context_id,\n limit=10000,\n order=order,\n )\n if n_messages:\n stored = stored[-n_messages:] # Get last N messages\n\n # self.status = stored\n return cast(\"Data\", stored)\n\n async def retrieve_messages_as_text(self) -> Message:\n stored_text = data_to_text(self.template, await self.retrieve_messages())\n # self.status = stored_text\n return Message(text=stored_text)\n\n async def retrieve_messages_dataframe(self) -> DataFrame:\n \"\"\"Convert the retrieved messages into a DataFrame.\n\n Returns:\n DataFrame: A DataFrame containing the message data.\n \"\"\"\n messages = await self.retrieve_messages()\n return DataFrame(messages)\n\n def update_build_config(\n self,\n build_config: dotdict,\n field_value: Any, # noqa: ARG002\n field_name: str | None = None, # noqa: ARG002\n ) -> dotdict:\n return set_current_fields(\n build_config=build_config,\n action_fields=self.mode_config,\n selected_action=build_config[\"mode\"][\"value\"],\n default_fields=self.default_keys,\n func=set_field_display,\n )\n"
"value": "from typing import Any, cast\n\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.helpers.data import data_to_text\nfrom lfx.inputs.inputs import DropdownInput, HandleInput, IntInput, MessageTextInput, MultilineInput, TabInput\nfrom lfx.memory import aget_messages, astore_message\nfrom lfx.schema.data import Data\nfrom lfx.schema.dataframe import DataFrame\nfrom lfx.schema.dotdict import dotdict\nfrom lfx.schema.message import Message\nfrom lfx.template.field.base import Output\nfrom lfx.utils.component_utils import set_current_fields, set_field_display\nfrom lfx.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_USER\n\n\nclass MemoryComponent(Component):\n display_name = \"Message History\"\n description = \"Stores or retrieves stored chat messages from Langflow tables or an external memory.\"\n documentation: str = \"https://docs.langflow.org/message-history\"\n icon = \"message-square-more\"\n name = \"Memory\"\n default_keys = [\"mode\", \"memory\", \"session_id\", \"context_id\"]\n mode_config = {\n \"Store\": [\"message\", \"memory\", \"sender\", \"sender_name\", \"session_id\", \"context_id\"],\n \"Retrieve\": [\"n_messages\", \"order\", \"template\", \"memory\", \"session_id\", \"context_id\"],\n }\n\n inputs = [\n TabInput(\n name=\"mode\",\n display_name=\"Mode\",\n options=[\"Retrieve\", \"Store\"],\n value=\"Retrieve\",\n info=\"Operation mode: Store messages or Retrieve messages.\",\n real_time_refresh=True,\n ),\n MessageTextInput(\n name=\"message\",\n display_name=\"Message\",\n info=\"The chat message to be stored.\",\n tool_mode=True,\n dynamic=True,\n show=False,\n ),\n HandleInput(\n name=\"memory\",\n display_name=\"External Memory\",\n input_types=[\"Memory\"],\n info=\"Retrieve messages from an external memory. If empty, it will use the Langflow tables.\",\n advanced=True,\n ),\n DropdownInput(\n name=\"sender_type\",\n display_name=\"Sender Type\",\n options=[MESSAGE_SENDER_AI, MESSAGE_SENDER_USER, \"Machine and User\"],\n value=\"Machine and User\",\n info=\"Filter by sender type.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender\",\n display_name=\"Sender\",\n info=\"The sender of the message. Might be Machine or User. \"\n \"If empty, the current sender parameter will be used.\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"sender_name\",\n display_name=\"Sender Name\",\n info=\"Filter by sender name.\",\n advanced=True,\n show=False,\n ),\n IntInput(\n name=\"n_messages\",\n display_name=\"Number of Messages\",\n value=100,\n info=\"Number of messages to retrieve.\",\n advanced=True,\n show=True,\n ),\n MessageTextInput(\n name=\"session_id\",\n display_name=\"Session ID\",\n info=\"The session ID of the chat. If empty, the current session ID parameter will be used.\",\n value=\"\",\n advanced=True,\n ),\n MessageTextInput(\n name=\"context_id\",\n display_name=\"Context ID\",\n info=\"The context ID of the chat. Adds an extra layer to the local memory.\",\n value=\"\",\n advanced=True,\n ),\n DropdownInput(\n name=\"order\",\n display_name=\"Order\",\n options=[\"Ascending\", \"Descending\"],\n value=\"Ascending\",\n info=\"Order of the messages.\",\n advanced=True,\n tool_mode=True,\n required=True,\n ),\n MultilineInput(\n name=\"template\",\n display_name=\"Template\",\n info=\"The template to use for formatting the data. \"\n \"It can contain the keys {text}, {sender} or any other key in the message data.\",\n value=\"{sender_name}: {text}\",\n advanced=True,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Message\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True),\n Output(display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True),\n ]\n\n def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict:\n \"\"\"Dynamically show only the relevant output based on the selected output type.\"\"\"\n if field_name == \"mode\":\n # Start with empty outputs\n frontend_node[\"outputs\"] = []\n if field_value == \"Store\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Stored Messages\",\n name=\"stored_messages\",\n method=\"store_message\",\n hidden=True,\n dynamic=True,\n )\n ]\n if field_value == \"Retrieve\":\n frontend_node[\"outputs\"] = [\n Output(\n display_name=\"Messages\", name=\"messages_text\", method=\"retrieve_messages_as_text\", dynamic=True\n ),\n Output(\n display_name=\"Dataframe\", name=\"dataframe\", method=\"retrieve_messages_dataframe\", dynamic=True\n ),\n ]\n return frontend_node\n\n async def store_message(self) -> Message:\n message = Message(text=self.message) if isinstance(self.message, str) else self.message\n\n message.context_id = self.context_id or message.context_id\n message.session_id = self.session_id or message.session_id\n message.sender = self.sender or message.sender or MESSAGE_SENDER_AI\n message.sender_name = self.sender_name or message.sender_name or MESSAGE_SENDER_NAME_AI\n\n stored_messages: list[Message] = []\n\n if self.memory:\n self.memory.context_id = message.context_id\n self.memory.session_id = message.session_id\n lc_message = message.to_lc_message()\n await self.memory.aadd_messages([lc_message])\n\n stored_messages = await self.memory.aget_messages() or []\n\n stored_messages = [Message.from_lc_message(m) for m in stored_messages] if stored_messages else []\n\n if message.sender:\n stored_messages = [m for m in stored_messages if m.sender == message.sender]\n else:\n await astore_message(message, flow_id=self.graph.flow_id)\n stored_messages = (\n await aget_messages(\n session_id=message.session_id,\n context_id=message.context_id,\n sender_name=message.sender_name,\n sender=message.sender,\n )\n or []\n )\n\n if not stored_messages:\n msg = \"No messages were stored. Please ensure that the session ID and sender are properly set.\"\n raise ValueError(msg)\n\n stored_message = stored_messages[0]\n self.status = stored_message\n return stored_message\n\n async def retrieve_messages(self) -> Data:\n sender_type = self.sender_type\n sender_name = self.sender_name\n session_id = self.session_id\n context_id = self.context_id\n n_messages = self.n_messages\n order = \"DESC\" if self.order == \"Descending\" else \"ASC\"\n\n if sender_type == \"Machine and User\":\n sender_type = None\n\n if self.memory and not hasattr(self.memory, \"aget_messages\"):\n memory_name = type(self.memory).__name__\n err_msg = f\"External Memory object ({memory_name}) must have 'aget_messages' method.\"\n raise AttributeError(err_msg)\n # Check if n_messages is None or 0\n if n_messages == 0:\n stored = []\n elif self.memory:\n # override session_id\n self.memory.session_id = session_id\n self.memory.context_id = context_id\n\n stored = await self.memory.aget_messages()\n # langchain memories are supposed to return messages in ascending order\n\n if order == \"DESC\":\n stored = stored[::-1] # Reverse first\n\n if n_messages:\n stored = stored[:n_messages] # Then take first N messages\n\n stored = [Message.from_lc_message(m) for m in stored]\n if sender_type:\n expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER\n stored = [m for m in stored if m.type == expected_type]\n else:\n # For internal memory, we always fetch the last N messages by ordering by DESC\n stored = await aget_messages(\n sender=sender_type,\n sender_name=sender_name,\n session_id=session_id,\n context_id=context_id,\n limit=10000,\n order=order,\n )\n if n_messages:\n stored = stored[:n_messages] # Take first N messages (already sorted by order)\n\n # self.status = stored\n return cast(\"Data\", stored)\n\n async def retrieve_messages_as_text(self) -> Message:\n stored_text = data_to_text(self.template, await self.retrieve_messages())\n # self.status = stored_text\n return Message(text=stored_text)\n\n async def retrieve_messages_dataframe(self) -> DataFrame:\n \"\"\"Convert the retrieved messages into a DataFrame.\n\n Returns:\n DataFrame: A DataFrame containing the message data.\n \"\"\"\n messages = await self.retrieve_messages()\n return DataFrame(messages)\n\n def update_build_config(\n self,\n build_config: dotdict,\n field_value: Any, # noqa: ARG002\n field_name: str | None = None, # noqa: ARG002\n ) -> dotdict:\n return set_current_fields(\n build_config=build_config,\n action_fields=self.mode_config,\n selected_action=build_config[\"mode\"][\"value\"],\n default_fields=self.default_keys,\n func=set_field_display,\n )\n"
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

External-memory retrieval truncates before sender filtering (can drop valid recent matches).

In Line 90858 (retrieve_messages), the external-memory path applies stored[:n_messages] before sender_type filtering. With mixed sender history, this can return fewer messages than requested and skip newer matching entries. Also, aget_messages() should be normalized to [] defensively before slicing.

💡 Suggested fix
-            stored = await self.memory.aget_messages()
+            stored = await self.memory.aget_messages() or []
             # langchain memories are supposed to return messages in ascending order

             if order == "DESC":
                 stored = stored[::-1]  # Reverse first

-            if n_messages:
-                stored = stored[:n_messages]  # Then take first N messages
-
             stored = [Message.from_lc_message(m) for m in stored]
             if sender_type:
                 expected_type = MESSAGE_SENDER_AI if sender_type == MESSAGE_SENDER_AI else MESSAGE_SENDER_USER
                 stored = [m for m in stored if m.type == expected_type]
+            if n_messages:
+                stored = stored[:n_messages]  # Limit after filtering
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lfx/src/lfx/_assets/component_index.json` at line 90858, In
retrieve_messages: defensively normalize stored = await
self.memory.aget_messages() to stored = stored or [] before any slicing, apply
Message.from_lc_message conversion and sender_type filtering (use
MESSAGE_SENDER_AI / MESSAGE_SENDER_USER to map expected_type and filter on
m.type) before taking the first n_messages, and ensure the order reversal
(stored = stored[::-1]) happens prior to filtering/slicing so you return the
most recent matching messages; update the external-memory branch around
aget_messages, the stored list handling, and the slicing logic in
retrieve_messages accordingly.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was in the original code, but might worth fix in this PR, since it is part of the sorting code we are fixing. Do you agree based on this new information?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@druellan Yes, I agree it makes sense to include this fix in the current PR. Here's why:

  1. Same code area: The fix is in the exact same method (retrieve_messages) and code block (external memory path) that you're already modifying for the DESC sorting bug.

  2. Related functionality: Both issues affect message retrieval correctness. Fixing them together ensures the entire retrieval logic is correct and tested as a unit.

  3. Reduces future complexity: Addressing both issues now avoids having multiple PRs touching the same sensitive code area, which reduces merge conflicts and makes the fix history cleaner.

  4. Logical scope alignment: Since your PR is about "retrieving the correct messages," ensuring that sender filtering returns the requested number of matching messages (not just slicing first then filtering) falls within that scope.

The fix is small and well-contained, so including it won't significantly increase the PR's complexity. It's good practice to fix related correctness issues in the same area together.

@druellan druellan closed this by deleting the head repository Feb 27, 2026
@druellan
Copy link
Copy Markdown
Author

Closing to reopen a clean version of the patch at: #11941

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community Pull Request from an external contributor

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant