Skip to content

feat: SQS System Attributes and Custom Attributes Propagation#3095

Merged
yhl25 merged 28 commits intomainfrom
dev-nightly-feat/sqs-headers
Jan 23, 2026
Merged

feat: SQS System Attributes and Custom Attributes Propagation#3095
yhl25 merged 28 commits intomainfrom
dev-nightly-feat/sqs-headers

Conversation

@cosmic-chichu
Copy link
Copy Markdown
Contributor

@cosmic-chichu cosmic-chichu commented Dec 8, 2025

Summary

Enables propagation of SQS attributes through Numaflow pipelines, supporting FIFO queue semantics (MessageGroupId, MessageDeduplicationId) and user-defined message attributes.

Changes

  • SQS Source: Attribute extraction
    Renamed SqsMessage fields to system_attributes (SQS system attrs like SentTimestamp, MessageGroupId) and custom_attributes (user-defined message attributes). System attributes become message headers; custom attributes are stored in metadata.user_metadata["sqs"].

  • SQS Sink: FIFO queue support
    Added headers field to SqsSinkMessage. The sink reads MessageGroupId, MessageDeduplicationId, and DelaySeconds from headers and applies them to the SQS SendMessageBatch request.

  • numaflow-core integration
    Source maps system_attributesMessage.headers and custom_attributesMessage.metadata. Sink merges headers with metadata.user_metadata["sqs"], allowing UDFs/transformers to set or override FIFO attributes.

  • Improved error messages
    Changed Error::Sqs to store extracted error details instead of opaque AWS SDK error. Errors now display as "ErrorCode: Error message" instead of "unhandled error (CODE)".

  • Documentation
    Added sink documentation (docs/user-guide/sinks/sqs.md), updated source docs with attribute configuration examples, and added SQS example to message headers concept page.

Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
@codecov
Copy link
Copy Markdown

codecov Bot commented Dec 8, 2025

Codecov Report

❌ Patch coverage is 96.94915% with 9 lines in your changes missing coverage. Please review.
✅ Project coverage is 80.19%. Comparing base (09d60db) to head (0c00955).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
rust/extns/numaflow-sqs/src/sink.rs 95.00% 3 Missing ⚠️
rust/extns/numaflow-sqs/src/lib.rs 88.88% 2 Missing ⚠️
rust/extns/numaflow-sqs/src/source.rs 90.00% 2 Missing ⚠️
rust/numaflow-core/src/sinker/sink/sqs.rs 98.54% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3095      +/-   ##
==========================================
+ Coverage   80.14%   80.19%   +0.05%     
==========================================
  Files         295      295              
  Lines       66977    67224     +247     
==========================================
+ Hits        53679    53912     +233     
- Misses      12748    12762      +14     
  Partials      550      550              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
@cosmic-chichu cosmic-chichu marked this pull request as ready for review January 7, 2026 00:39
@cosmic-chichu cosmic-chichu changed the title feat: add control headers for sqs feat: SQS System Attributes and Custom Attributes Propagation Jan 7, 2026
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Comment thread docs/core-concepts/message-headers.md
Comment thread docs/user-guide/sinks/sqs.md
Comment thread rust/numaflow-core/src/sinker/sink/sqs.rs Outdated
Comment thread rust/extns/numaflow-sqs/src/sink.rs Outdated
Comment thread rust/extns/numaflow-sqs/src/source.rs Outdated
Comment thread rust/extns/numaflow-sqs/src/lib.rs Outdated
Comment thread docs/user-guide/sources/sqs.md Outdated
Comment thread docs/user-guide/sinks/sqs.md
Comment on lines -48 to +50
#[error("Failed with SQS error - {0}")]
Sqs(#[from] aws_sdk_sqs::Error),
#[error("{0}")]
Sqs(String),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use Sqs(String) for simpler error propagation since we don't branch on specific AWS error types—we just log and propagate them to the user. This is especially important with metadata propagation, where SQS errors can surface at runtime (not just during initialization), so clear, user-readable error messages are critical.

Comment thread rust/extns/numaflow-sqs/src/sink.rs Outdated
cosmic-chichu and others added 4 commits January 7, 2026 09:25
Co-authored-by: Vigith Maurice <vigith@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
@cosmic-chichu cosmic-chichu requested a review from vigith January 21, 2026 23:00
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Comment thread rust/extns/numaflow-sqs/src/sink.rs Outdated
Comment thread rust/extns/numaflow-sqs/src/sink.rs Outdated
yhl25 and others added 4 commits January 21, 2026 15:45
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
@cosmic-chichu cosmic-chichu requested a review from yhl25 January 22, 2026 23:39
@yhl25 yhl25 enabled auto-merge (squash) January 23, 2026 01:48
@yhl25 yhl25 merged commit 0fd8e0e into main Jan 23, 2026
27 checks passed
@yhl25 yhl25 deleted the dev-nightly-feat/sqs-headers branch January 23, 2026 01:58
@cosmic-chichu cosmic-chichu restored the dev-nightly-feat/sqs-headers branch January 26, 2026 18:02
@cosmic-chichu cosmic-chichu deleted the dev-nightly-feat/sqs-headers branch January 26, 2026 18:02
@cosmic-chichu cosmic-chichu mentioned this pull request Feb 10, 2026
14 tasks
yhl25 added a commit that referenced this pull request Feb 11, 2026
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Co-authored-by: Vigith Maurice <vigith@gmail.com>
Co-authored-by: Yashash <yashashhl25@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants