feat: SQS System Attributes and Custom Attributes Propagation#3095
Merged
feat: SQS System Attributes and Custom Attributes Propagation#3095
Conversation
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3095 +/- ##
==========================================
+ Coverage 80.14% 80.19% +0.05%
==========================================
Files 295 295
Lines 66977 67224 +247
==========================================
+ Hits 53679 53912 +233
- Misses 12748 12762 +14
Partials 550 550 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
…aproj/numaflow into dev-nightly-feat/sqs-headers
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
…aproj/numaflow into dev-nightly-feat/sqs-headers
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
KeranYang
reviewed
Jan 7, 2026
vigith
reviewed
Jan 7, 2026
Comment on lines
-48
to
+50
| #[error("Failed with SQS error - {0}")] | ||
| Sqs(#[from] aws_sdk_sqs::Error), | ||
| #[error("{0}")] | ||
| Sqs(String), |
Contributor
Author
There was a problem hiding this comment.
We use Sqs(String) for simpler error propagation since we don't branch on specific AWS error types—we just log and propagate them to the user. This is especially important with metadata propagation, where SQS errors can surface at runtime (not just during initialization), so clear, user-readable error messages are critical.
Co-authored-by: Vigith Maurice <vigith@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
…aproj/numaflow into dev-nightly-feat/sqs-headers
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
…aproj/numaflow into dev-nightly-feat/sqs-headers
yhl25
reviewed
Jan 21, 2026
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com>
vigith
approved these changes
Jan 23, 2026
yhl25
approved these changes
Jan 23, 2026
14 tasks
yhl25
added a commit
that referenced
this pull request
Feb 11, 2026
Signed-off-by: Shrivardhan Rao <shrivardhan92@gmail.com> Signed-off-by: Yashash <yashashhl25@gmail.com> Co-authored-by: Vigith Maurice <vigith@gmail.com> Co-authored-by: Yashash <yashashhl25@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Enables propagation of SQS attributes through Numaflow pipelines, supporting FIFO queue semantics (
MessageGroupId,MessageDeduplicationId) and user-defined message attributes.Changes
SQS Source: Attribute extraction
Renamed
SqsMessagefields tosystem_attributes(SQS system attrs likeSentTimestamp,MessageGroupId) andcustom_attributes(user-defined message attributes). System attributes become message headers; custom attributes are stored inmetadata.user_metadata["sqs"].SQS Sink: FIFO queue support
Added
headersfield toSqsSinkMessage. The sink readsMessageGroupId,MessageDeduplicationId, andDelaySecondsfrom headers and applies them to the SQSSendMessageBatchrequest.numaflow-core integration
Source maps
system_attributes→Message.headersandcustom_attributes→Message.metadata. Sink merges headers withmetadata.user_metadata["sqs"], allowing UDFs/transformers to set or override FIFO attributes.Improved error messages
Changed
Error::Sqsto store extracted error details instead of opaque AWS SDK error. Errors now display as"ErrorCode: Error message"instead of"unhandled error (CODE)".Documentation
Added sink documentation (
docs/user-guide/sinks/sqs.md), updated source docs with attribute configuration examples, and added SQS example to message headers concept page.