feat(log-ingestor): Use S3ObjectMetadataInputConfig for compression job submission (resolves #2018).#2087
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdded an Changes
Sequence Diagram(s)sequenceDiagram
participant Scanner as S3 Scanner
participant SQS as SQS Listener
participant Ingest as Ingestion Manager
participant Listener as Compression Listener
participant Buffer as Compression Buffer
participant Submitter as CompressionJobSubmitter
participant CLP as CLP API
Scanner->>Ingest: discover objects (bucket,key,size)
SQS->>Ingest: notify (bucket,key,size)
Ingest->>Ingest: persist metadata -> assign id
Ingest->>Listener: send Vec{CompressionBufferEntry(id,size)}
Listener->>Buffer: forward entries
Buffer->>Submitter: on threshold/timeout send &[S3ObjectMetadataId]
Submitter->>CLP: submit_for_compression(io_config with s3_object_metadata_ids)
CLP-->>Submitter: job response/status
Submitter-->>Buffer: ack / completion
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
3c67276 to
66a97b6
Compare
66a97b6 to
b267b52
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/src/compression/compression_job_submitter.rs (1)
55-71: 🛠️ Refactor suggestion | 🟠 MajorRemove the second
ingestion_job_idsource of truth.
ClpCompressionStatealready carries the ingestion job ID, but this constructor accepts another value and writes that one intoS3ObjectMetadataInputConfig. A mismatch here would update one job's DB rows while serializing another job ID into the compression config. Derive the field fromclp_compression_state.get_ingestion_job_id()and drop the extra parameter.Suggested fix
pub fn new( clp_compression_state: ClpCompressionState, aws_authentication: AwsAuthentication, archive_output_config: &ArchiveOutput, - ingestion_job_config: &BaseConfig, - ingestion_job_id: IngestionJobId, + ingestion_job_config: &BaseConfig, ) -> Self { + let ingestion_job_id = clp_compression_state.get_ingestion_job_id(); let s3_object_metadata_input_config = S3ObjectMetadataInputConfig { s3_config: S3Config { bucket: ingestion_job_config.bucket_name.clone(), region_code: ingestion_job_config.region.clone(), key_prefix: ingestion_job_config.key_prefix.clone(),Then remove the extra argument at the call site in
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/compression/compression_job_submitter.rs` around lines 55 - 71, The constructor for the compression job submitter accepts an extra ingestion_job_id that duplicates the ID already held in ClpCompressionState; remove the ingestion_job_id parameter from the constructor signature and inside new build S3ObjectMetadataInputConfig.ingestion_job_id by calling clp_compression_state.get_ingestion_job_id(), then update all call sites (the one in the CLP ingestion manager) to stop passing the extra ingestion_job_id argument. Ensure the S3ObjectMetadataInputConfig, ClpCompressionState::get_ingestion_job_id(), and the constructor/new method are adjusted consistently so only the ID from ClpCompressionState is used.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/clp-rust-utils/tests/clp_config_test.rs`:
- Around line 24-30: Add a second test case in clp_config_test.rs that
constructs a ClpIoConfig using InputConfig::S3ObjectMetadataInputConfig with
s3_object_metadata_ids set to a small populated Some(vec![...]) (e.g.
Some(vec![1u64]) or appropriate ID type), serialize it the same way as the
existing test, and assert the serialized payload contains the populated array
value; update the corresponding assertion near the other case (also mirrored
around the other test block) so the test covers both None and Some(...)
scenarios for s3_object_metadata_ids to match real scheduler payloads.
In `@components/log-ingestor/src/compression/compression_job_submitter.rs`:
- Around line 35-42: In submit(&self, buffer: &[S3ObjectMetadataId]) validate
that buffer is not empty before spawning the background task: if
buffer.is_empty() then raise the same programming-error guard used elsewhere
(panic/assert with a clear message) so callers do not observe Ok(()) on an
invalid input; add this check at the start of submit and keep the existing
tokio::spawn(submit_clp_compression_job_and_wait_for_completion(...)) behavior
unchanged.
---
Outside diff comments:
In `@components/log-ingestor/src/compression/compression_job_submitter.rs`:
- Around line 55-71: The constructor for the compression job submitter accepts
an extra ingestion_job_id that duplicates the ID already held in
ClpCompressionState; remove the ingestion_job_id parameter from the constructor
signature and inside new build S3ObjectMetadataInputConfig.ingestion_job_id by
calling clp_compression_state.get_ingestion_job_id(), then update all call sites
(the one in the CLP ingestion manager) to stop passing the extra
ingestion_job_id argument. Ensure the S3ObjectMetadataInputConfig,
ClpCompressionState::get_ingestion_job_id(), and the constructor/new method are
adjusted consistently so only the ID from ClpCompressionState is used.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a40b36a7-a957-47b3-aa39-596ea1ca9822
📒 Files selected for processing (12)
components/clp-rust-utils/src/job_config/clp_io_config.rscomponents/clp-rust-utils/src/s3.rscomponents/clp-rust-utils/tests/clp_config_test.rscomponents/log-ingestor/src/compression/buffer.rscomponents/log-ingestor/src/compression/compression_job_submitter.rscomponents/log-ingestor/src/compression/listener.rscomponents/log-ingestor/src/ingestion_job.rscomponents/log-ingestor/src/ingestion_job/s3_scanner.rscomponents/log-ingestor/src/ingestion_job/sqs_listener.rscomponents/log-ingestor/src/ingestion_job_manager/clp_ingestion.rscomponents/log-ingestor/tests/test_compression_listener.rscomponents/log-ingestor/tests/test_ingestion_job.rs
💤 Files with no reviewable changes (4)
- components/clp-rust-utils/src/s3.rs
- components/log-ingestor/tests/test_ingestion_job.rs
- components/log-ingestor/src/ingestion_job/sqs_listener.rs
- components/log-ingestor/src/ingestion_job/s3_scanner.rs
…ssionJobSubmitter
S3ObjectMetadataInputConfig to compression scheduler (addresses #2018)
LinZhihao-723
left a comment
There was a problem hiding this comment.
Reviewd:
- clp_io_config (clp-rust-utils)
- s3 (clp-rust-utils)
- clp_config_test (clp-rust-utils)
- buffer (log-ingestor)
- compression_job_submitter (log-ingestor)
| /// Represents the unique identifier for an ingestion job in CLP DB. | ||
| pub type IngestionJobId = u64; |
There was a problem hiding this comment.
I don't think this ID should belong to this mod.
Let's do the following:
- Move and rename this ID to
clp_rust_utils::job_config::ingestion::JobId - In
log_ingestor::ingestion_job, addpub use clp_rust_utils::job_config::ingestion::JobId as IngestionJobId
| pub s3_config: S3Config, | ||
|
|
||
| pub ingestion_job_id: IngestionJobId, | ||
| pub s3_object_metadata_ids: Option<Vec<S3ObjectMetadataId>>, |
There was a problem hiding this comment.
Why would we allow this field to be optional?
Notice that in Python definition, this field is not nulllable:
| }, | ||
| keys: None, | ||
| ingestion_job_id, | ||
| s3_object_metadata_ids: None, |
There was a problem hiding this comment.
Since we don't want to make this field optional, probably leave it as an empty vector.
|
|
||
| let compression_job_id = match state | ||
| .submit_for_compression(io_config_template, id_and_key_pairs) | ||
| .submit_for_compression(io_config_template, &object_metadata_ids) |
There was a problem hiding this comment.
I don't think we need to pass object metadata IDs as a reference: it's not efficient.
- It's been copied from a slice into a vector when calling
submit_clp_compression_job_and_wait_for_completion. - The reference here is passed all the way down to
ClpCompressionState::submit_for_compressionto reconstruct a vector.- Looks like u could just pass the ownership all the way down to the compression state instead.
|
|
||
| let compression_job_id = match state | ||
| .submit_for_compression(io_config_template, id_and_key_pairs) | ||
| .submit_for_compression(io_config_template, &object_metadata_ids) |
There was a problem hiding this comment.
And in fact, why not construct the io config here (outside of the CLP compression state)?
Originally, we put the logic inside ClpCompressionState::submit_for_compression because the object metadata -> ID mapping behavior wasn't well-formed.
Now, I think we could create the io-config by embedding object_metadata_ids directly into the field (it can happen even before this function, so this call only takes io_config: ClpIoConfig and submits it).
And what we pass into ClpCompressionState will be a well-formed config at the first place, and it can decide how to handle different types of input configs. For example, if we need to add Fs compression config, the same method should also be able to handle.
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/src/compression/buffer.rs`:
- Line 73: Guard the u64 addition of self.total_size += entry.size in the buffer
accumulation path (the Buffer struct where total_size is updated) by using
checked_add (or saturating_add) instead of unchecked addition; on overflow
handle it deterministically — e.g., set total_size to u64::MAX (or trigger an
immediate flush/schedule submission and log an error) so the threshold/flush
logic still fires reliably; update the code around the place that updates
self.total_size (referencing self.total_size and entry.size) to use
checked_add/saturating_add and add the overflow-handling branch.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: cd7fe55a-929b-47e7-9499-b34240c41f55
📒 Files selected for processing (3)
components/clp-rust-utils/src/job_config/clp_io_config.rscomponents/clp-rust-utils/tests/clp_config_test.rscomponents/log-ingestor/src/compression/buffer.rs
| object_metadata_to_ingest: Vec<CompressionBufferEntry>, | ||
| ) -> Result<()> { | ||
| for entry in object_metadata_to_ingest { | ||
| self.total_size += entry.size; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Read-only verification: find similar unchecked accumulator patterns in Rust code.
rg -nP --type=rust '\b[a-zA-Z_][a-zA-Z0-9_]*\s*\+=\s*[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)?\s*;'Repository: y-scope/clp
Length of output: 150
🏁 Script executed:
# Read the buffer.rs file to understand the context of line 73
cat -n components/log-ingestor/src/compression/buffer.rs | head -100Repository: y-scope/clp
Length of output: 3435
🏁 Script executed:
# Search for where Buffer is used and what size limits might exist
rg -n "total_size\|BufferSubmitter\|Buffer::add" components/log-ingestor/src/compression/buffer.rs -A 2 -B 2Repository: y-scope/clp
Length of output: 37
🏁 Script executed:
# Check for any size/capacity limits in the broader compression module
fd . components/log-ingestor/src/compression --type f -name "*.rs" | xargs grep -l "total_size\|MAX.*SIZE\|CAPACITY"Repository: y-scope/clp
Length of output: 287
🏁 Script executed:
# Search for configuration or constants that might limit buffer growth
rg "MAX_BUFFER_SIZE|BUFFER_CAPACITY|MAX_METADATA|max.*size" components/log-ingestor/src/ -B 2 -A 2Repository: y-scope/clp
Length of output: 37
🏁 Script executed:
# Look for how size_threshold is configured and typical values
rg "size_threshold|Buffer::new" components/log-ingestor/src/ -B 2 -A 2 | head -50Repository: y-scope/clp
Length of output: 2119
🏁 Script executed:
# Check error handling patterns in the compression module for similar operations
rg "checked_\|overflow\|saturating_" components/log-ingestor/src/compression/ -B 1 -A 1Repository: y-scope/clp
Length of output: 37
🏁 Script executed:
# Examine the rest of buffer.rs for error handling consistency
cat -n components/log-ingestor/src/compression/buffer.rs | tail -20Repository: y-scope/clp
Length of output: 636
🏁 Script executed:
# Search for any configuration constants or typical buffer/object sizes in the codebase
rg "BUFFER\|threshold\|SIZE|size.*=" components/log-ingestor/src/compression/ -i | grep -i "const\|let.*=" | head -20Repository: y-scope/clp
Length of output: 37
Guard total_size accumulation against overflow.
Line 73 uses unchecked u64 addition. In release builds, overflow silently wraps, breaking threshold-triggered submission logic. In debug builds, overflow panics. This causes incorrect buffer flushing behaviour and growth without proper scheduling.
Proposed fix
-use anyhow::Result;
+use anyhow::{anyhow, Result};
@@
- self.total_size += entry.size;
+ self.total_size = self
+ .total_size
+ .checked_add(entry.size)
+ .ok_or_else(|| anyhow!("total_size overflow while buffering metadata IDs"))?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| self.total_size += entry.size; | |
| use anyhow::{anyhow, Result}; | |
| // ... (other code) ... | |
| pub fn add(&mut self, entries: Vec<CompressionBufferEntry>) -> Result<()> { | |
| for entry in entries { | |
| self.total_size = self | |
| .total_size | |
| .checked_add(entry.size) | |
| .ok_or_else(|| anyhow!("total_size overflow while buffering metadata IDs"))?; | |
| } | |
| // ... (rest of function) ... | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/log-ingestor/src/compression/buffer.rs` at line 73, Guard the u64
addition of self.total_size += entry.size in the buffer accumulation path (the
Buffer struct where total_size is updated) by using checked_add (or
saturating_add) instead of unchecked addition; on overflow handle it
deterministically — e.g., set total_size to u64::MAX (or trigger an immediate
flush/schedule submission and log an error) so the threshold/flush logic still
fires reliably; update the code around the place that updates self.total_size
(referencing self.total_size and entry.size) to use checked_add/saturating_add
and add the overflow-handling branch.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 890-902: The loop currently uses objects.chunks_mut(chunk_size)
and chunk.iter_mut() even though objects are not mutated; change
objects.chunks_mut(chunk_size) to objects.chunks(chunk_size) and
chunk.iter_mut() to chunk.iter() in the block that builds buffer_entries (the
loop that enumerates chunks and zips with last_inserted_ids), keeping the use of
last_inserted_ids.get(chunk_id).expect(...) and the CompressionBufferEntry{id:
next_metadata_id, size: object.size} construction intact before calling
self.sender.send(buffer_entries).await?.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 5f9a06ce-133b-4997-b00e-0bb5e2bfa169
📒 Files selected for processing (6)
components/clp-rust-utils/src/job_config/clp_io_config.rscomponents/clp-rust-utils/src/job_config/ingestion.rscomponents/clp-rust-utils/tests/clp_config_test.rscomponents/log-ingestor/src/compression/compression_job_submitter.rscomponents/log-ingestor/src/ingestion_job.rscomponents/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs (1)
1051-1086:⚠️ Potential issue | 🟠 MajorValidate the new
ClpIoConfigcontract instead of panicking.This public method now accepts any
ClpIoConfig, but it only supportsS3ObjectMetadataInputConfig.unreachable!()turns a caller mistake into a runtime panic, and an emptys3_object_metadata_idslist currently inserts a compression job row without linking any buffered metadata. Please reject invalid or empty inputs here, or narrow the parameter type, and update the doc comment to match the new contract.Suggested direction
- let object_metadata_ids: &[S3ObjectMetadataId] = match &io_config.input { - InputConfig::S3ObjectMetadataInputConfig { config } => &config.s3_object_metadata_ids, - InputConfig::S3InputConfig { .. } => { - unreachable!("compression submission only supports `S3ObjectMetadataInputConfig`") - } - }; + let object_metadata_ids: &[S3ObjectMetadataId] = match &io_config.input { + InputConfig::S3ObjectMetadataInputConfig { config } + if !config.s3_object_metadata_ids.is_empty() => + { + &config.s3_object_metadata_ids + } + InputConfig::S3ObjectMetadataInputConfig { .. } => { + return Err(anyhow::anyhow!( + "Compression submission requires at least one S3 object metadata ID." + )); + } + InputConfig::S3InputConfig { .. } => { + return Err(anyhow::anyhow!( + "Compression submission requires `S3ObjectMetadataInputConfig`." + )); + } + };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around lines 1051 - 1086, The submit_for_compression method currently panics via unreachable!() for non-S3ObjectMetadataInputConfig and allows empty s3_object_metadata_ids; change it to validate the ClpIoConfig contract: in submit_for_compression detect if io_config.input is not InputConfig::S3ObjectMetadataInputConfig and return an anyhow::Error (or Result::Err) indicating unsupported input type, and if it is S3ObjectMetadataInputConfig but config.s3_object_metadata_ids is empty return an error indicating empty input; update the function doc comment to state it only accepts S3ObjectMetadataInputConfig with a non-empty s3_object_metadata_ids list instead of claiming it panics. Ensure you reference the symbols submit_for_compression, ClpIoConfig, InputConfig::S3ObjectMetadataInputConfig, and s3_object_metadata_ids when making the changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 1051-1086: The submit_for_compression method currently panics via
unreachable!() for non-S3ObjectMetadataInputConfig and allows empty
s3_object_metadata_ids; change it to validate the ClpIoConfig contract: in
submit_for_compression detect if io_config.input is not
InputConfig::S3ObjectMetadataInputConfig and return an anyhow::Error (or
Result::Err) indicating unsupported input type, and if it is
S3ObjectMetadataInputConfig but config.s3_object_metadata_ids is empty return an
error indicating empty input; update the function doc comment to state it only
accepts S3ObjectMetadataInputConfig with a non-empty s3_object_metadata_ids list
instead of claiming it panics. Ensure you reference the symbols
submit_for_compression, ClpIoConfig, InputConfig::S3ObjectMetadataInputConfig,
and s3_object_metadata_ids when making the changes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 0f9d7ecd-bb0a-485f-8b4e-0536f97a0f4d
📒 Files selected for processing (1)
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
| for (chunk_id, chunk) in objects.chunks_mut(chunk_size).enumerate() { | ||
| let mut buffer_entries = Vec::with_capacity(objects.len()); | ||
| for (chunk_id, chunk) in objects.chunks(chunk_size).enumerate() { | ||
| for (next_metadata_id, object) in | ||
| (*last_inserted_ids.get(chunk_id).expect("invalid chunk ID")..) | ||
| .zip(chunk.iter_mut()) | ||
| (*last_inserted_ids.get(chunk_id).expect("invalid chunk ID")..).zip(chunk.iter()) | ||
| { | ||
| object.id = Some(next_metadata_id); | ||
| buffer_entries.push(CompressionBufferEntry { | ||
| id: next_metadata_id, | ||
| size: object.size, | ||
| }); | ||
| } | ||
| } | ||
| self.sender.send(objects).await?; | ||
| self.sender.send(buffer_entries).await?; |
There was a problem hiding this comment.
This piece of code doesn't make sense anymore with the changes in this PR:
- We had to do it this way because previously if we let
ingest_s3_object_metadatato reconstruct a vector of object metadata it would be too inefficient. - However, for now, it makes more sense in terms of efficiency and ownership to let
ingest_s3_object_metadatatake the ownership ofobject, and return a vector ofCompressionBufferEntry. There's no need to know the chunk size outside ofingest_s3_object_metadataanymore.
|
|
||
| /// Represents a listener task that buffers and submits S3 object metadata. | ||
| /// Represents a listener task that buffers incoming [`CompressionBufferEntry`] values and submits | ||
| /// when full or on timeout. |
There was a problem hiding this comment.
| /// when full or on timeout. | |
| /// when a certain size threshold is reached or on timeout. |
| let entries1 = create_test_buffer_entries(1, 100); | ||
| let entries2 = create_test_buffer_entries(101, 100); | ||
| let entries3 = create_test_buffer_entries(201, 100); | ||
|
|
||
| // Spawn three tasks that send into the listener concurrently: | ||
| // 1) send all of `bucket_0_objects` | ||
| // 2) send the first 100 objects of `bucket_1_objects` | ||
| // 3) send the last 100 objects of `bucket_1_objects` | ||
| // Spawn three tasks that send into the listener concurrently | ||
| let sender1 = listener.get_new_sender(); | ||
| let sender2 = listener.get_new_sender(); | ||
| let sender3 = listener.get_new_sender(); | ||
|
|
||
| let objs1 = bucket_0_objects.clone(); | ||
| let objs2 = bucket_1_objects[..100].to_vec(); | ||
| let objs3 = bucket_1_objects[100..].to_vec(); | ||
|
|
||
| let h1 = tokio::spawn(async move { send_to_listener(objs1, sender1).await }); | ||
| let h2 = tokio::spawn(async move { send_to_listener(objs2, sender2).await }); | ||
| let h3 = tokio::spawn(async move { send_to_listener(objs3, sender3).await }); | ||
| let h1 = tokio::spawn(async move { send_to_listener(entries1, sender1).await }); | ||
| let h2 = tokio::spawn(async move { send_to_listener(entries2, sender2).await }); | ||
| let h3 = tokio::spawn(async move { send_to_listener(entries3, sender3).await }); |
There was a problem hiding this comment.
Might be a better way to refactor this:
| let entries1 = create_test_buffer_entries(1, 100); | |
| let entries2 = create_test_buffer_entries(101, 100); | |
| let entries3 = create_test_buffer_entries(201, 100); | |
| // Spawn three tasks that send into the listener concurrently: | |
| // 1) send all of `bucket_0_objects` | |
| // 2) send the first 100 objects of `bucket_1_objects` | |
| // 3) send the last 100 objects of `bucket_1_objects` | |
| // Spawn three tasks that send into the listener concurrently | |
| let sender1 = listener.get_new_sender(); | |
| let sender2 = listener.get_new_sender(); | |
| let sender3 = listener.get_new_sender(); | |
| let objs1 = bucket_0_objects.clone(); | |
| let objs2 = bucket_1_objects[..100].to_vec(); | |
| let objs3 = bucket_1_objects[100..].to_vec(); | |
| let h1 = tokio::spawn(async move { send_to_listener(objs1, sender1).await }); | |
| let h2 = tokio::spawn(async move { send_to_listener(objs2, sender2).await }); | |
| let h3 = tokio::spawn(async move { send_to_listener(objs3, sender3).await }); | |
| let h1 = tokio::spawn(async move { send_to_listener(entries1, sender1).await }); | |
| let h2 = tokio::spawn(async move { send_to_listener(entries2, sender2).await }); | |
| let h3 = tokio::spawn(async move { send_to_listener(entries3, sender3).await }); | |
| let expected_ids: Vec<S3ObjectMetadataId> = (1..=300).collect(); | |
| let mut handlers = Vec::new(); | |
| for ids in expected_ids.chunks(100) { | |
| let entries = create_test_buffer_entries(ids); | |
| let sender = listener.get_new_sender(); | |
| let h = tokio::spawn(async move { send_to_listener(entries, sender).await }); | |
| handlers.push(h); | |
| } | |
| // Wait for all sender tasks to finish | |
| for handler in handlers { | |
| handler.await.expect("sender task panicked"); | |
| } |
- We move
expected_idsto the front. - We change
create_test_buffer_entriesto the following:fn create_test_buffer_entries( ids: &[S3ObjectMetadataId], ) -> Vec<CompressionBufferEntry> { ids.iter() .map(|id| CompressionBufferEntry { id: *id, size: TEST_OBJECT_SIZE, }) .collect() }
There was a problem hiding this comment.
This file needs a bit more refactoring (I should have said this in the original problem statement for #2018 but I didn't make it clear). These test cases were written in this way because we used to have no "state" concept for an ingestion job. So when these test cases were written, the ingestion flow is: ingestion jobs -> channel senders. Later when we add the state, we didn't up this test case on time so it looks like ingestion jobs -> state -> channel senders, so we can still do all the check inside the channel receiver end. I think this is inefficient and confusing and we should change it to directly use the state as a sort of shared buffer for checking the ingested files. A possible implementation would be to implement SqsListenerState and S3ScannerState in this test file to hold a shared buffer protected by a mutex, and use this version of state implementation to replace the current use of ZeroFaultToleranceIngestionJobState.
That said, I do realize this is out of the scope for the current PR. But I think we should do this ASAP before we all forget about it, lol. Can you do a follow-up PR after this one to fix it?
There was a problem hiding this comment.
@coderabbitai Can you create an issue to track this?
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
gh issue create \
--repo y-scope/clp \
--title "refactor(log-ingestor): Refactor test_ingestion_job.rs to use state-based shared buffers" \
--body "## Overview
The tests in \`components/log-ingestor/tests/test_ingestion_job.rs\` were originally written when there was no \"state\" concept for an ingestion job. The original flow was:
\`\`\`
ingestion jobs → channel senders
\`\`\`
When the state concept was later added, the tests were not updated accordingly, resulting in the current (confusing) flow:
\`\`\`
ingestion jobs → state → channel senders
\`\`\`
This makes the tests inefficient and harder to understand.
## Proposed Refactor
- Implement \`SqsListenerState\` and \`S3ScannerState\` directly in the test file.
- Each state implementation should hold a shared buffer protected by a \`Mutex\`.
- Replace the current use of \`ZeroFaultToleranceIngestionJobState\` with these new test-local state implementations.
- Use the state as a shared buffer to check ingested files, instead of routing through a channel receiver.
## References
- Raised in PR `#2087` (comment: https://github.com/y-scope/clp/pull/2087#discussion_r2970205586)
- Requested by `@LinZhihao-723`" \
--assignee LinZhihao-723
Length of output: 42
@LinZhihao-723, I've created issue #2114 to track the refactoring of test_ingestion_job.rs to use SqsListenerState and S3ScannerState with mutex-protected shared buffers instead of ZeroFaultToleranceIngestionJobState. It's been assigned to you.
🧠 Learnings used
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2014
File: components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs:173-176
Timestamp: 2026-02-23T19:05:25.285Z
Learning: In `components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`, the `ingest_s3_object_metadata` method intentionally uses `assert!(!objects.is_empty())` to panic on empty input, as empty objects is considered a programming error that should never occur in practice.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2035
File: components/log-ingestor/src/ingestion_job_manager.rs:174-223
Timestamp: 2026-02-26T05:56:57.349Z
Learning: In components/log-ingestor/src/ingestion_job_manager.rs, the shutdown_and_remove_job_instance method is intentionally scoped to focus on removing the job instance and shutting it down, returning promptly once removal is complete. It does not include detailed status or failure messages in its return value, as status querying should be done separately if needed by callers.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2038
File: components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs:220-272
Timestamp: 2026-02-26T03:05:57.890Z
Learning: In components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs, the update_job_status method intentionally preserves the original status_msg when handling idempotent calls (when the job is already in the target status). This design ensures that only the first error message is stored, capturing the root cause rather than subsequent failures. This behavior is documented in the fail method's doc comment.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2001
File: components/log-ingestor/src/ingestion_job/s3_scanner.rs:132-148
Timestamp: 2026-02-17T03:03:57.933Z
Learning: In the log-ingestor's S3 scanner implementation, it is acceptable to call `state.ingest()` and update `start_after` before awaiting `sender.send()` in the ingestion flow. This ordering (where state is committed before channel send completes) is consistent with the system's failure model.
Learnt from: junhaoliao
Repo: y-scope/clp PR: 939
File: components/package-template/src/etc/clp-config.yml:64-64
Timestamp: 2025-06-24T20:13:46.758Z
Learning: When users ask CodeRabbit to create an issue after providing suggestions, they want a GitHub issue created with the high-level requirements and context, not specific code implementations.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR `#1152` discussion.
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.
Learnt from: quinntaylormitchell
Repo: y-scope/clp PR: 1125
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:267-291
Timestamp: 2025-09-15T22:20:40.750Z
Learning: For CLP compression jobs, the team has decided to fail the entire job immediately upon encountering any invalid input path, rather than continuing to process valid paths. This decision was made during PR `#1125` development.
Learnt from: SharafMohamed
Repo: y-scope/clp PR: 2080
File: components/core/src/clp/Profiler.hpp:95-113
Timestamp: 2026-03-07T10:51:24.315Z
Learning: In y-scope/clp (PR `#2080`), `Profiler::m_runtime_measurements` and all `PROFILE_SCOPE`/`ProfilerReporter` usages are strictly single-threaded. `FileCompressor::compress_file` is called in a sequential for-loop with no thread pool. Do not flag thread-safety issues on `Profiler` runtime measurement APIs unless new multi-threaded callers are introduced.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.
LinZhihao-723
left a comment
There was a problem hiding this comment.
Will do some package-level testing before approving.
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
LinZhihao-723
left a comment
There was a problem hiding this comment.
Verified that for both the S3 scanner and the SQS listener, the ingested objects can be submitted for compression, and the compression results will be reflected properly after the job finishes.
However, there's an issue in the WebUI where the compression jobs are not showing properly. This is because the new S3 object metadata compression job config is not added into the WebUI component. Already located the root cause and had a quick fix with @junhaoliao. Need to decide whether we want to submit WebUI changes as a separate PR or as a part of this PR.
LinZhihao-723
left a comment
There was a problem hiding this comment.
Pushed WebUI changes directly. @junhaoliao Can you help check?
| }); | ||
|
|
||
| /** | ||
| * Matching `S3ObjectMetadataInputConfig` in `job_orchestration.scheduler.job_config`. |
There was a problem hiding this comment.
(just a note for awareness - no code changes are needed in this PR)
The "matching" comment is misleading because what we have here is only a subset of job_orchestration.scheduler.job_config fields. That said, i do realize that has been a preexisting issue for the other types as well.
Currently this Type object is only used to derive a TypeScript type for type hints, so we shouldn't run into any issues. However, once we add the input form for s3 then the other type objects will be used to validate against inputs, and validation errors will be raised on additional fields that are not listed. anyways, ClpIoS3ObjectMetadataInputConfig is an internal input config and will not be used in any input form validation in the foreseeable future so no code changes are needed
There was a problem hiding this comment.
I agree. We should probably create a minimum schema that contains the fields needed.
Shall we ask rabbit to create an issue to track this renaming?
The |
LinZhihao-723
left a comment
There was a problem hiding this comment.
For the PR title, how about:
feat(log-ingestor): Use `S3ObjectMetadataInputConfig` for compression job submission (resolves #2018).
S3ObjectMetadataInputConfig to compression scheduler (addresses #2018)S3ObjectMetadataInputConfig for compression job submission (resolves #2018).
Description
This PR builds off of #2082 and completes the proposal in #2018.
In #2082, we added support for a new input that contains the ingestor_job_id & metadata_ids so the scheduler can directly query the ingested_s3_object_metadata table for the metadata. This input was unused when #2082 was merged since the
log-ingestorwas still submitting S3InputConfig.This PR updates the
log-ingestorto submit the newS3ObjectMetadataInputConfig. To achieve this, this PR:CompressionBufferEntryto associate object size with metadata_id and updates interfaces of listener and submitter to use this typelog-ingestorChecklist
breaking change.
Validation performed
task tests:rust-allTesting
Results
The following performance changes are not improvements that are gained from this PR. They are from #2082, I'm just documenting them here because testing is a little easier.
Before
scheduler_discovery_to_scheduling_seconds: 0.7837305455After
scheduler_discovery_to_scheduling_seconds: 0.321161The drop in
scheduler_discovery_to_scheduling_secondsmakes sense, we no longer make a trip to S3 per job.The increase in
full compression duration (s)is strange but certainly not a result of this PR. This metric comes from thedurationcolumn incompression_jobs.Methodology
elasticsearch.components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:On line 311:
On line 418:
These lines were added since our main focus is measuring the time from when the compression job starts to the file metadata all resolved.
Setup
Prereqs: To follow the commands below, you MUST have
miniodownloaded & have >90 GB of free space.test-dataelasticsearch/for i in $(seq 1 10); do cp -r elasticsearch.log "elasticsearch${i}.log"; doneelasticsearch.logand is a folder (this should be correct if you uploaded through the webui).If you haven't done this already:
# in a separate terminal in the parent of the test-data folder minio server ./test-dataVerifying
Create a S3 Scanner job (replace
<minio-host>):Verify it worked:
Summary by CodeRabbit
New Features
Refactor
Tests