Skip to content

feat(log-ingestor): Use S3ObjectMetadataInputConfig for compression job submission (resolves #2018).#2087

Merged
jonathan-imanu merged 19 commits intoy-scope:mainfrom
jonathan-imanu:log_ingestor_eliminate_s3_trip
Mar 27, 2026
Merged

feat(log-ingestor): Use S3ObjectMetadataInputConfig for compression job submission (resolves #2018).#2087
jonathan-imanu merged 19 commits intoy-scope:mainfrom
jonathan-imanu:log_ingestor_eliminate_s3_trip

Conversation

@jonathan-imanu
Copy link
Copy Markdown
Contributor

@jonathan-imanu jonathan-imanu commented Mar 9, 2026

Description

This PR builds off of #2082 and completes the proposal in #2018.

In #2082, we added support for a new input that contains the ingestor_job_id & metadata_ids so the scheduler can directly query the ingested_s3_object_metadata table for the metadata. This input was unused when #2082 was merged since thelog-ingestor was still submitting S3InputConfig.

This PR updates the log-ingestor to submit the new S3ObjectMetadataInputConfig. To achieve this, this PR:

  • updates the buffer to hold a list of metadata ids as opposed to complete metadata objects
  • introduces a new type CompressionBufferEntry to associate object size with metadata_id and updates interfaces of listener and submitter to use this type
  • updates the unit tests of log-ingestor

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • task tests:rust-all
  • Workflows
  • Verification of full flow & Compression Scheduler perf. comparisons against mainline (see below).

Testing

Results

The following performance changes are not improvements that are gained from this PR. They are from #2082, I'm just documenting them here because testing is a little easier.

Before

job_id scheduler_discovery_to_scheduling_seconds full compression duration (s)
1 0.742398 155.92
2 0.275263 165.578
3 0.849841 166.084
4 0.258035 127.227
5 1.874552 212.654
6 1.511318 112.755
7 0.746475 112.168
8 0.358103 112.835
9 0.783230 205.532
10 0.774677 205.212
11 0.447144 236.192
  • Average scheduler_discovery_to_scheduling_seconds: 0.7837305455
  • Average full compression duration (seconds): 164.7415454545

After

job_id scheduler_discovery_to_scheduling_seconds full compression duration (s)
1 0.600464 118.73
2 0.083286 119.076
3 0.066629 165.608
4 0.141759 122.536
5 0.108384 241.382
6 0.183196 125.094
7 0.449663 265.933
8 1.558069 345.997
9 0.091492 425.42
10 0.074876 438.922
11 0.174953 421.748
  • Average scheduler_discovery_to_scheduling_seconds: 0.321161
  • Average full compression duration (seconds): 253.6769090909

The drop in scheduler_discovery_to_scheduling_seconds makes sense, we no longer make a trip to S3 per job.
The increase in full compression duration (s) is strange but certainly not a result of this PR. This metric comes from the duration column in compression_jobs.

Methodology

  1. 11 copies of the elasticsearch structured dataset were loaded into a Minio bucket called elasticsearch.
  2. On both branches, I changed the following lines to components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:

On line 311:

t0 = time.perf_counter()

On line 418:

     duration_sec = time.perf_counter() - t0
            logger.info(
                "scheduler_discovery_to_scheduling_seconds=%.6f job_id=%s input_type=%s",
                duration_sec,
                job_id,
                input_config.type,
            )

These lines were added since our main focus is measuring the time from when the compression job starts to the file metadata all resolved.

Setup

Prereqs: To follow the commands below, you MUST have minio downloaded & have >90 GB of free space.

  • Download theelasticsearch structured dataset
  • Create a folder called test-data
  • Using the minio webui create a bucket called elasticsearch/
  • Put the uncompressed dataset in this bucket
  • Make copies:
    • for i in $(seq 1 10); do cp -r elasticsearch.log "elasticsearch${i}.log"; done
    • The command above assumes your dataset is also called elasticsearch.log and is a folder (this should be correct if you uploaded through the webui).

If you haven't done this already:

# in a separate terminal in the parent of the test-data folder
minio server ./test-data
  • Create a CLP config that allows S3 AND sets up the log-ingestor AND you have updated your credentials with minio creds!

Verifying

task
# IMPORTANT: UPDATE YOUR CREDENTIALS AND CLP.CONFIG WITH MINIO CREDS!
# ALSO MAKE SURE THE LOG-INGESTOR IS ENABLED!
build/clp-package/sbin/start-clp.sh
# alternatively you can do:
# python -m clp_package_utils.scripts.start_clp --config "PATH TO YOU CONFIG"

Create a S3 Scanner job (replace <minio-host>):

curl -X POST "http://127.0.0.1:3002/s3_scanner" \
  -H "Content-Type: application/json" \
  -d  '{"bucket_name":"elasticsearch","key_prefix":"elasticsearch","endpoint_url":"http://<minio-host>:9000","scanning_interval_sec":30}'

Verify it worked:

# see table entry
docker exec -it $(docker ps --filter name=database --format '{{.Names}}' | head -1) \
  mysql -u clp-user -p clp-db -e "SELECT id, status, status_msg, creation_time, start_time, update_time, duration, num_tasks, num_tasks_completed FROM compression_jobs;"
  
 # see worker logs
docker logs $(docker ps -a --filter name=compression-scheduler --format '{{.Names}}' | head -1) 2>&1 | tail -80

Summary by CodeRabbit

  • New Features

    • Added a structured S3 object‑metadata input config and a public ingestion‑job ID type.
  • Refactor

    • Compression/ingestion now use lightweight buffer entries (id+size) and metadata IDs instead of full object records; submission APIs updated accordingly.
  • Tests

    • Updated tests to use compression buffer entries and metadata‑ID workflows, and adjusted expected serialized payloads.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 9, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Added an s3_object_metadata InputConfig and S3ObjectMetadataInputConfig; removed id from ObjectMetadata; introduced CompressionBufferEntry { id, size }; refactored buffer/listener/submitter/ingestion/CLP flows and tests to buffer and submit S3ObjectMetadataId-based payloads.

Changes

Cohort / File(s) Summary
CLP utils / core types
components/clp-rust-utils/src/job_config/clp_io_config.rs, components/clp-rust-utils/src/job_config/ingestion.rs, components/clp-rust-utils/src/s3.rs
Added S3ObjectMetadataInputConfig and new serde InputConfig::S3ObjectMetadataInputConfig (s3_object_metadata); added pub type JobId = u64 and re-exported as IngestionJobId; removed id: Option<S3ObjectMetadataId> from ObjectMetadata.
Compression buffer & submitter
components/log-ingestor/src/compression/buffer.rs, components/log-ingestor/src/compression/compression_job_submitter.rs
Introduced CompressionBufferEntry { id, size }; Buffer now stores Vec<S3ObjectMetadataId> and add accepts entries; BufferSubmitter::submit now takes &[S3ObjectMetadataId]; CLP submission uses ClpIoConfig with s3_object_metadata_ids populated.
Listener & channel API
components/log-ingestor/src/compression/listener.rs
Changed listener channel payloads and public APIs from Vec<ObjectMetadata> to Vec<CompressionBufferEntry>; updated spawn/get_new_sender signatures and docs to use buffer entries.
Ingestion & managers
components/log-ingestor/src/ingestion_job.rs, components/log-ingestor/src/ingestion_job/s3_scanner.rs, components/log-ingestor/src/ingestion_job/sqs_listener.rs, components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Re-exported IngestionJobId from clp-rust-utils; removed explicit construction of id: None for ObjectMetadata; ingestion persists metadata then sends CompressionBufferEntry(id,size); ClpCompressionState::submit_for_compression now accepts full ClpIoConfig and uses s3_object_metadata_ids.
Tests
components/clp-rust-utils/tests/clp_config_test.rs, components/log-ingestor/tests/test_compression_listener.rs, components/log-ingestor/tests/test_ingestion_job.rs
Updated fixtures, expectations, and helpers to produce/use CompressionBufferEntry and S3ObjectMetadataId; updated BufferSubmitter test mocks and assertions to verify submitted ID lists; removed explicit id: None in a couple of tests.

Sequence Diagram(s)

sequenceDiagram
    participant Scanner as S3 Scanner
    participant SQS as SQS Listener
    participant Ingest as Ingestion Manager
    participant Listener as Compression Listener
    participant Buffer as Compression Buffer
    participant Submitter as CompressionJobSubmitter
    participant CLP as CLP API

    Scanner->>Ingest: discover objects (bucket,key,size)
    SQS->>Ingest: notify (bucket,key,size)
    Ingest->>Ingest: persist metadata -> assign id
    Ingest->>Listener: send Vec{CompressionBufferEntry(id,size)}
    Listener->>Buffer: forward entries
    Buffer->>Submitter: on threshold/timeout send &[S3ObjectMetadataId]
    Submitter->>CLP: submit_for_compression(io_config with s3_object_metadata_ids)
    CLP-->>Submitter: job response/status
    Submitter-->>Buffer: ack / completion
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • hoophalab
🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Title check ✅ Passed The title clearly and specifically summarizes the main change: updating the log-ingestor to use S3ObjectMetadataInputConfig for compression job submission, which aligns with the primary objective of the changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@jonathan-imanu jonathan-imanu changed the title feat(log-ingestor): Submit compression jobs by metadata IDs feat(log-ingestor): Submit compression jobs by metadata IDs (addresses #2018) Mar 9, 2026
@jonathan-imanu jonathan-imanu force-pushed the log_ingestor_eliminate_s3_trip branch from 3c67276 to 66a97b6 Compare March 15, 2026 01:37
@jonathan-imanu jonathan-imanu force-pushed the log_ingestor_eliminate_s3_trip branch from 66a97b6 to b267b52 Compare March 17, 2026 03:12
@jonathan-imanu jonathan-imanu changed the title feat(log-ingestor): Submit compression jobs by metadata IDs (addresses #2018) feat(log-ingestor): Submit S3ObjectMetadataInputConfig to compression scheduler (addresses #2018) Mar 17, 2026
@jonathan-imanu jonathan-imanu marked this pull request as ready for review March 17, 2026 05:31
@jonathan-imanu jonathan-imanu requested a review from a team as a code owner March 17, 2026 05:31
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/log-ingestor/src/compression/compression_job_submitter.rs (1)

55-71: 🛠️ Refactor suggestion | 🟠 Major

Remove the second ingestion_job_id source of truth.

ClpCompressionState already carries the ingestion job ID, but this constructor accepts another value and writes that one into S3ObjectMetadataInputConfig. A mismatch here would update one job's DB rows while serializing another job ID into the compression config. Derive the field from clp_compression_state.get_ingestion_job_id() and drop the extra parameter.

Suggested fix
 pub fn new(
     clp_compression_state: ClpCompressionState,
     aws_authentication: AwsAuthentication,
     archive_output_config: &ArchiveOutput,
-    ingestion_job_config: &BaseConfig,
-    ingestion_job_id: IngestionJobId,
+    ingestion_job_config: &BaseConfig,
 ) -> Self {
+    let ingestion_job_id = clp_compression_state.get_ingestion_job_id();
     let s3_object_metadata_input_config = S3ObjectMetadataInputConfig {
         s3_config: S3Config {
             bucket: ingestion_job_config.bucket_name.clone(),
             region_code: ingestion_job_config.region.clone(),
             key_prefix: ingestion_job_config.key_prefix.clone(),

Then remove the extra argument at the call site in components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/compression/compression_job_submitter.rs` around
lines 55 - 71, The constructor for the compression job submitter accepts an
extra ingestion_job_id that duplicates the ID already held in
ClpCompressionState; remove the ingestion_job_id parameter from the constructor
signature and inside new build S3ObjectMetadataInputConfig.ingestion_job_id by
calling clp_compression_state.get_ingestion_job_id(), then update all call sites
(the one in the CLP ingestion manager) to stop passing the extra
ingestion_job_id argument. Ensure the S3ObjectMetadataInputConfig,
ClpCompressionState::get_ingestion_job_id(), and the constructor/new method are
adjusted consistently so only the ID from ClpCompressionState is used.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/clp-rust-utils/tests/clp_config_test.rs`:
- Around line 24-30: Add a second test case in clp_config_test.rs that
constructs a ClpIoConfig using InputConfig::S3ObjectMetadataInputConfig with
s3_object_metadata_ids set to a small populated Some(vec![...]) (e.g.
Some(vec![1u64]) or appropriate ID type), serialize it the same way as the
existing test, and assert the serialized payload contains the populated array
value; update the corresponding assertion near the other case (also mirrored
around the other test block) so the test covers both None and Some(...)
scenarios for s3_object_metadata_ids to match real scheduler payloads.

In `@components/log-ingestor/src/compression/compression_job_submitter.rs`:
- Around line 35-42: In submit(&self, buffer: &[S3ObjectMetadataId]) validate
that buffer is not empty before spawning the background task: if
buffer.is_empty() then raise the same programming-error guard used elsewhere
(panic/assert with a clear message) so callers do not observe Ok(()) on an
invalid input; add this check at the start of submit and keep the existing
tokio::spawn(submit_clp_compression_job_and_wait_for_completion(...)) behavior
unchanged.

---

Outside diff comments:
In `@components/log-ingestor/src/compression/compression_job_submitter.rs`:
- Around line 55-71: The constructor for the compression job submitter accepts
an extra ingestion_job_id that duplicates the ID already held in
ClpCompressionState; remove the ingestion_job_id parameter from the constructor
signature and inside new build S3ObjectMetadataInputConfig.ingestion_job_id by
calling clp_compression_state.get_ingestion_job_id(), then update all call sites
(the one in the CLP ingestion manager) to stop passing the extra
ingestion_job_id argument. Ensure the S3ObjectMetadataInputConfig,
ClpCompressionState::get_ingestion_job_id(), and the constructor/new method are
adjusted consistently so only the ID from ClpCompressionState is used.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: a40b36a7-a957-47b3-aa39-596ea1ca9822

📥 Commits

Reviewing files that changed from the base of the PR and between 3c51803 and e124836.

📒 Files selected for processing (12)
  • components/clp-rust-utils/src/job_config/clp_io_config.rs
  • components/clp-rust-utils/src/s3.rs
  • components/clp-rust-utils/tests/clp_config_test.rs
  • components/log-ingestor/src/compression/buffer.rs
  • components/log-ingestor/src/compression/compression_job_submitter.rs
  • components/log-ingestor/src/compression/listener.rs
  • components/log-ingestor/src/ingestion_job.rs
  • components/log-ingestor/src/ingestion_job/s3_scanner.rs
  • components/log-ingestor/src/ingestion_job/sqs_listener.rs
  • components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
  • components/log-ingestor/tests/test_compression_listener.rs
  • components/log-ingestor/tests/test_ingestion_job.rs
💤 Files with no reviewable changes (4)
  • components/clp-rust-utils/src/s3.rs
  • components/log-ingestor/tests/test_ingestion_job.rs
  • components/log-ingestor/src/ingestion_job/sqs_listener.rs
  • components/log-ingestor/src/ingestion_job/s3_scanner.rs

Comment thread components/clp-rust-utils/tests/clp_config_test.rs
@jonathan-imanu jonathan-imanu changed the title feat(log-ingestor): Submit S3ObjectMetadataInputConfig to compression scheduler (addresses #2018) feat(log-ingestor): Submit S3ObjectMetadataInputConfig to compression scheduler (addresses #2018) Mar 17, 2026
Copy link
Copy Markdown
Member

@LinZhihao-723 LinZhihao-723 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewd:

  • clp_io_config (clp-rust-utils)
  • s3 (clp-rust-utils)
  • clp_config_test (clp-rust-utils)
  • buffer (log-ingestor)
  • compression_job_submitter (log-ingestor)

Comment on lines +13 to +14
/// Represents the unique identifier for an ingestion job in CLP DB.
pub type IngestionJobId = u64;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this ID should belong to this mod.
Let's do the following:

  • Move and rename this ID to clp_rust_utils::job_config::ingestion::JobId
  • In log_ingestor::ingestion_job, add pub use clp_rust_utils::job_config::ingestion::JobId as IngestionJobId

Comment thread components/clp-rust-utils/src/job_config/clp_io_config.rs
Comment thread components/clp-rust-utils/tests/clp_config_test.rs Outdated
pub s3_config: S3Config,

pub ingestion_job_id: IngestionJobId,
pub s3_object_metadata_ids: Option<Vec<S3ObjectMetadataId>>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we allow this field to be optional?
Notice that in Python definition, this field is not nulllable:

Comment thread components/log-ingestor/src/compression/buffer.rs Outdated
Comment thread components/log-ingestor/src/compression/buffer.rs Outdated
},
keys: None,
ingestion_job_id,
s3_object_metadata_ids: None,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't want to make this field optional, probably leave it as an empty vector.


let compression_job_id = match state
.submit_for_compression(io_config_template, id_and_key_pairs)
.submit_for_compression(io_config_template, &object_metadata_ids)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to pass object metadata IDs as a reference: it's not efficient.

  • It's been copied from a slice into a vector when calling submit_clp_compression_job_and_wait_for_completion.
  • The reference here is passed all the way down to ClpCompressionState::submit_for_compression to reconstruct a vector.
    • Looks like u could just pass the ownership all the way down to the compression state instead.


let compression_job_id = match state
.submit_for_compression(io_config_template, id_and_key_pairs)
.submit_for_compression(io_config_template, &object_metadata_ids)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in fact, why not construct the io config here (outside of the CLP compression state)?
Originally, we put the logic inside ClpCompressionState::submit_for_compression because the object metadata -> ID mapping behavior wasn't well-formed.
Now, I think we could create the io-config by embedding object_metadata_ids directly into the field (it can happen even before this function, so this call only takes io_config: ClpIoConfig and submits it).
And what we pass into ClpCompressionState will be a well-formed config at the first place, and it can decide how to handle different types of input configs. For example, if we need to add Fs compression config, the same method should also be able to handle.

jonathan-imanu and others added 2 commits March 20, 2026 22:07
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/src/compression/buffer.rs`:
- Line 73: Guard the u64 addition of self.total_size += entry.size in the buffer
accumulation path (the Buffer struct where total_size is updated) by using
checked_add (or saturating_add) instead of unchecked addition; on overflow
handle it deterministically — e.g., set total_size to u64::MAX (or trigger an
immediate flush/schedule submission and log an error) so the threshold/flush
logic still fires reliably; update the code around the place that updates
self.total_size (referencing self.total_size and entry.size) to use
checked_add/saturating_add and add the overflow-handling branch.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: cd7fe55a-929b-47e7-9499-b34240c41f55

📥 Commits

Reviewing files that changed from the base of the PR and between 2eaba80 and f4c7c92.

📒 Files selected for processing (3)
  • components/clp-rust-utils/src/job_config/clp_io_config.rs
  • components/clp-rust-utils/tests/clp_config_test.rs
  • components/log-ingestor/src/compression/buffer.rs

object_metadata_to_ingest: Vec<CompressionBufferEntry>,
) -> Result<()> {
for entry in object_metadata_to_ingest {
self.total_size += entry.size;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Read-only verification: find similar unchecked accumulator patterns in Rust code.
rg -nP --type=rust '\b[a-zA-Z_][a-zA-Z0-9_]*\s*\+=\s*[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)?\s*;'

Repository: y-scope/clp

Length of output: 150


🏁 Script executed:

# Read the buffer.rs file to understand the context of line 73
cat -n components/log-ingestor/src/compression/buffer.rs | head -100

Repository: y-scope/clp

Length of output: 3435


🏁 Script executed:

# Search for where Buffer is used and what size limits might exist
rg -n "total_size\|BufferSubmitter\|Buffer::add" components/log-ingestor/src/compression/buffer.rs -A 2 -B 2

Repository: y-scope/clp

Length of output: 37


🏁 Script executed:

# Check for any size/capacity limits in the broader compression module
fd . components/log-ingestor/src/compression --type f -name "*.rs" | xargs grep -l "total_size\|MAX.*SIZE\|CAPACITY"

Repository: y-scope/clp

Length of output: 287


🏁 Script executed:

# Search for configuration or constants that might limit buffer growth
rg "MAX_BUFFER_SIZE|BUFFER_CAPACITY|MAX_METADATA|max.*size" components/log-ingestor/src/ -B 2 -A 2

Repository: y-scope/clp

Length of output: 37


🏁 Script executed:

# Look for how size_threshold is configured and typical values
rg "size_threshold|Buffer::new" components/log-ingestor/src/ -B 2 -A 2 | head -50

Repository: y-scope/clp

Length of output: 2119


🏁 Script executed:

# Check error handling patterns in the compression module for similar operations
rg "checked_\|overflow\|saturating_" components/log-ingestor/src/compression/ -B 1 -A 1

Repository: y-scope/clp

Length of output: 37


🏁 Script executed:

# Examine the rest of buffer.rs for error handling consistency
cat -n components/log-ingestor/src/compression/buffer.rs | tail -20

Repository: y-scope/clp

Length of output: 636


🏁 Script executed:

# Search for any configuration constants or typical buffer/object sizes in the codebase
rg "BUFFER\|threshold\|SIZE|size.*=" components/log-ingestor/src/compression/ -i | grep -i "const\|let.*=" | head -20

Repository: y-scope/clp

Length of output: 37


Guard total_size accumulation against overflow.

Line 73 uses unchecked u64 addition. In release builds, overflow silently wraps, breaking threshold-triggered submission logic. In debug builds, overflow panics. This causes incorrect buffer flushing behaviour and growth without proper scheduling.

Proposed fix
-use anyhow::Result;
+use anyhow::{anyhow, Result};
@@
-            self.total_size += entry.size;
+            self.total_size = self
+                .total_size
+                .checked_add(entry.size)
+                .ok_or_else(|| anyhow!("total_size overflow while buffering metadata IDs"))?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self.total_size += entry.size;
use anyhow::{anyhow, Result};
// ... (other code) ...
pub fn add(&mut self, entries: Vec<CompressionBufferEntry>) -> Result<()> {
for entry in entries {
self.total_size = self
.total_size
.checked_add(entry.size)
.ok_or_else(|| anyhow!("total_size overflow while buffering metadata IDs"))?;
}
// ... (rest of function) ...
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/compression/buffer.rs` at line 73, Guard the u64
addition of self.total_size += entry.size in the buffer accumulation path (the
Buffer struct where total_size is updated) by using checked_add (or
saturating_add) instead of unchecked addition; on overflow handle it
deterministically — e.g., set total_size to u64::MAX (or trigger an immediate
flush/schedule submission and log an error) so the threshold/flush logic still
fires reliably; update the code around the place that updates self.total_size
(referencing self.total_size and entry.size) to use checked_add/saturating_add
and add the overflow-handling branch.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 890-902: The loop currently uses objects.chunks_mut(chunk_size)
and chunk.iter_mut() even though objects are not mutated; change
objects.chunks_mut(chunk_size) to objects.chunks(chunk_size) and
chunk.iter_mut() to chunk.iter() in the block that builds buffer_entries (the
loop that enumerates chunks and zips with last_inserted_ids), keeping the use of
last_inserted_ids.get(chunk_id).expect(...) and the CompressionBufferEntry{id:
next_metadata_id, size: object.size} construction intact before calling
self.sender.send(buffer_entries).await?.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 5f9a06ce-133b-4997-b00e-0bb5e2bfa169

📥 Commits

Reviewing files that changed from the base of the PR and between f4c7c92 and f6267f5.

📒 Files selected for processing (6)
  • components/clp-rust-utils/src/job_config/clp_io_config.rs
  • components/clp-rust-utils/src/job_config/ingestion.rs
  • components/clp-rust-utils/tests/clp_config_test.rs
  • components/log-ingestor/src/compression/compression_job_submitter.rs
  • components/log-ingestor/src/ingestion_job.rs
  • components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs

Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs (1)

1051-1086: ⚠️ Potential issue | 🟠 Major

Validate the new ClpIoConfig contract instead of panicking.

This public method now accepts any ClpIoConfig, but it only supports S3ObjectMetadataInputConfig. unreachable!() turns a caller mistake into a runtime panic, and an empty s3_object_metadata_ids list currently inserts a compression job row without linking any buffered metadata. Please reject invalid or empty inputs here, or narrow the parameter type, and update the doc comment to match the new contract.

Suggested direction
-        let object_metadata_ids: &[S3ObjectMetadataId] = match &io_config.input {
-            InputConfig::S3ObjectMetadataInputConfig { config } => &config.s3_object_metadata_ids,
-            InputConfig::S3InputConfig { .. } => {
-                unreachable!("compression submission only supports `S3ObjectMetadataInputConfig`")
-            }
-        };
+        let object_metadata_ids: &[S3ObjectMetadataId] = match &io_config.input {
+            InputConfig::S3ObjectMetadataInputConfig { config }
+                if !config.s3_object_metadata_ids.is_empty() =>
+            {
+                &config.s3_object_metadata_ids
+            }
+            InputConfig::S3ObjectMetadataInputConfig { .. } => {
+                return Err(anyhow::anyhow!(
+                    "Compression submission requires at least one S3 object metadata ID."
+                ));
+            }
+            InputConfig::S3InputConfig { .. } => {
+                return Err(anyhow::anyhow!(
+                    "Compression submission requires `S3ObjectMetadataInputConfig`."
+                ));
+            }
+        };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs` around
lines 1051 - 1086, The submit_for_compression method currently panics via
unreachable!() for non-S3ObjectMetadataInputConfig and allows empty
s3_object_metadata_ids; change it to validate the ClpIoConfig contract: in
submit_for_compression detect if io_config.input is not
InputConfig::S3ObjectMetadataInputConfig and return an anyhow::Error (or
Result::Err) indicating unsupported input type, and if it is
S3ObjectMetadataInputConfig but config.s3_object_metadata_ids is empty return an
error indicating empty input; update the function doc comment to state it only
accepts S3ObjectMetadataInputConfig with a non-empty s3_object_metadata_ids list
instead of claiming it panics. Ensure you reference the symbols
submit_for_compression, ClpIoConfig, InputConfig::S3ObjectMetadataInputConfig,
and s3_object_metadata_ids when making the changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 1051-1086: The submit_for_compression method currently panics via
unreachable!() for non-S3ObjectMetadataInputConfig and allows empty
s3_object_metadata_ids; change it to validate the ClpIoConfig contract: in
submit_for_compression detect if io_config.input is not
InputConfig::S3ObjectMetadataInputConfig and return an anyhow::Error (or
Result::Err) indicating unsupported input type, and if it is
S3ObjectMetadataInputConfig but config.s3_object_metadata_ids is empty return an
error indicating empty input; update the function doc comment to state it only
accepts S3ObjectMetadataInputConfig with a non-empty s3_object_metadata_ids list
instead of claiming it panics. Ensure you reference the symbols
submit_for_compression, ClpIoConfig, InputConfig::S3ObjectMetadataInputConfig,
and s3_object_metadata_ids when making the changes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 0f9d7ecd-bb0a-485f-8b4e-0536f97a0f4d

📥 Commits

Reviewing files that changed from the base of the PR and between f6267f5 and 7ddade4.

📒 Files selected for processing (1)
  • components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs

Comment on lines -905 to +901
for (chunk_id, chunk) in objects.chunks_mut(chunk_size).enumerate() {
let mut buffer_entries = Vec::with_capacity(objects.len());
for (chunk_id, chunk) in objects.chunks(chunk_size).enumerate() {
for (next_metadata_id, object) in
(*last_inserted_ids.get(chunk_id).expect("invalid chunk ID")..)
.zip(chunk.iter_mut())
(*last_inserted_ids.get(chunk_id).expect("invalid chunk ID")..).zip(chunk.iter())
{
object.id = Some(next_metadata_id);
buffer_entries.push(CompressionBufferEntry {
id: next_metadata_id,
size: object.size,
});
}
}
self.sender.send(objects).await?;
self.sender.send(buffer_entries).await?;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of code doesn't make sense anymore with the changes in this PR:

  • We had to do it this way because previously if we let ingest_s3_object_metadata to reconstruct a vector of object metadata it would be too inefficient.
  • However, for now, it makes more sense in terms of efficiency and ownership to let ingest_s3_object_metadata take the ownership of object, and return a vector of CompressionBufferEntry. There's no need to know the chunk size outside of ingest_s3_object_metadata anymore.

Comment thread components/log-ingestor/src/compression/listener.rs

/// Represents a listener task that buffers and submits S3 object metadata.
/// Represents a listener task that buffers incoming [`CompressionBufferEntry`] values and submits
/// when full or on timeout.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// when full or on timeout.
/// when a certain size threshold is reached or on timeout.

Comment on lines +85 to +96
let entries1 = create_test_buffer_entries(1, 100);
let entries2 = create_test_buffer_entries(101, 100);
let entries3 = create_test_buffer_entries(201, 100);

// Spawn three tasks that send into the listener concurrently:
// 1) send all of `bucket_0_objects`
// 2) send the first 100 objects of `bucket_1_objects`
// 3) send the last 100 objects of `bucket_1_objects`
// Spawn three tasks that send into the listener concurrently
let sender1 = listener.get_new_sender();
let sender2 = listener.get_new_sender();
let sender3 = listener.get_new_sender();

let objs1 = bucket_0_objects.clone();
let objs2 = bucket_1_objects[..100].to_vec();
let objs3 = bucket_1_objects[100..].to_vec();

let h1 = tokio::spawn(async move { send_to_listener(objs1, sender1).await });
let h2 = tokio::spawn(async move { send_to_listener(objs2, sender2).await });
let h3 = tokio::spawn(async move { send_to_listener(objs3, sender3).await });
let h1 = tokio::spawn(async move { send_to_listener(entries1, sender1).await });
let h2 = tokio::spawn(async move { send_to_listener(entries2, sender2).await });
let h3 = tokio::spawn(async move { send_to_listener(entries3, sender3).await });
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a better way to refactor this:

Suggested change
let entries1 = create_test_buffer_entries(1, 100);
let entries2 = create_test_buffer_entries(101, 100);
let entries3 = create_test_buffer_entries(201, 100);
// Spawn three tasks that send into the listener concurrently:
// 1) send all of `bucket_0_objects`
// 2) send the first 100 objects of `bucket_1_objects`
// 3) send the last 100 objects of `bucket_1_objects`
// Spawn three tasks that send into the listener concurrently
let sender1 = listener.get_new_sender();
let sender2 = listener.get_new_sender();
let sender3 = listener.get_new_sender();
let objs1 = bucket_0_objects.clone();
let objs2 = bucket_1_objects[..100].to_vec();
let objs3 = bucket_1_objects[100..].to_vec();
let h1 = tokio::spawn(async move { send_to_listener(objs1, sender1).await });
let h2 = tokio::spawn(async move { send_to_listener(objs2, sender2).await });
let h3 = tokio::spawn(async move { send_to_listener(objs3, sender3).await });
let h1 = tokio::spawn(async move { send_to_listener(entries1, sender1).await });
let h2 = tokio::spawn(async move { send_to_listener(entries2, sender2).await });
let h3 = tokio::spawn(async move { send_to_listener(entries3, sender3).await });
let expected_ids: Vec<S3ObjectMetadataId> = (1..=300).collect();
let mut handlers = Vec::new();
for ids in expected_ids.chunks(100) {
let entries = create_test_buffer_entries(ids);
let sender = listener.get_new_sender();
let h = tokio::spawn(async move { send_to_listener(entries, sender).await });
handlers.push(h);
}
// Wait for all sender tasks to finish
for handler in handlers {
handler.await.expect("sender task panicked");
}
  • We move expected_ids to the front.
  • We change create_test_buffer_entries to the following:
    fn create_test_buffer_entries(
        ids: &[S3ObjectMetadataId],
    ) -> Vec<CompressionBufferEntry> {
        ids.iter()
            .map(|id| CompressionBufferEntry {
                id: *id,
                size: TEST_OBJECT_SIZE,
            })
            .collect()
    }

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file needs a bit more refactoring (I should have said this in the original problem statement for #2018 but I didn't make it clear). These test cases were written in this way because we used to have no "state" concept for an ingestion job. So when these test cases were written, the ingestion flow is: ingestion jobs -> channel senders. Later when we add the state, we didn't up this test case on time so it looks like ingestion jobs -> state -> channel senders, so we can still do all the check inside the channel receiver end. I think this is inefficient and confusing and we should change it to directly use the state as a sort of shared buffer for checking the ingested files. A possible implementation would be to implement SqsListenerState and S3ScannerState in this test file to hold a shared buffer protected by a mutex, and use this version of state implementation to replace the current use of ZeroFaultToleranceIngestionJobState.
That said, I do realize this is out of the scope for the current PR. But I think we should do this ASAP before we all forget about it, lol. Can you do a follow-up PR after this one to fix it?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai Can you create an issue to track this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
gh issue create \
  --repo y-scope/clp \
  --title "refactor(log-ingestor): Refactor test_ingestion_job.rs to use state-based shared buffers" \
  --body "## Overview

The tests in \`components/log-ingestor/tests/test_ingestion_job.rs\` were originally written when there was no \"state\" concept for an ingestion job. The original flow was:

\`\`\`
ingestion jobs → channel senders
\`\`\`

When the state concept was later added, the tests were not updated accordingly, resulting in the current (confusing) flow:

\`\`\`
ingestion jobs → state → channel senders
\`\`\`

This makes the tests inefficient and harder to understand.

## Proposed Refactor

- Implement \`SqsListenerState\` and \`S3ScannerState\` directly in the test file.
- Each state implementation should hold a shared buffer protected by a \`Mutex\`.
- Replace the current use of \`ZeroFaultToleranceIngestionJobState\` with these new test-local state implementations.
- Use the state as a shared buffer to check ingested files, instead of routing through a channel receiver.

## References

- Raised in PR `#2087` (comment: https://github.com/y-scope/clp/pull/2087#discussion_r2970205586)
- Requested by `@LinZhihao-723`" \
  --assignee LinZhihao-723

Length of output: 42


@LinZhihao-723, I've created issue #2114 to track the refactoring of test_ingestion_job.rs to use SqsListenerState and S3ScannerState with mutex-protected shared buffers instead of ZeroFaultToleranceIngestionJobState. It's been assigned to you.


🧠 Learnings used
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2014
File: components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs:173-176
Timestamp: 2026-02-23T19:05:25.285Z
Learning: In `components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`, the `ingest_s3_object_metadata` method intentionally uses `assert!(!objects.is_empty())` to panic on empty input, as empty objects is considered a programming error that should never occur in practice.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2035
File: components/log-ingestor/src/ingestion_job_manager.rs:174-223
Timestamp: 2026-02-26T05:56:57.349Z
Learning: In components/log-ingestor/src/ingestion_job_manager.rs, the shutdown_and_remove_job_instance method is intentionally scoped to focus on removing the job instance and shutting it down, returning promptly once removal is complete. It does not include detailed status or failure messages in its return value, as status querying should be done separately if needed by callers.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2038
File: components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs:220-272
Timestamp: 2026-02-26T03:05:57.890Z
Learning: In components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs, the update_job_status method intentionally preserves the original status_msg when handling idempotent calls (when the job is already in the target status). This design ensures that only the first error message is stored, capturing the root cause rather than subsequent failures. This behavior is documented in the fail method's doc comment.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 2001
File: components/log-ingestor/src/ingestion_job/s3_scanner.rs:132-148
Timestamp: 2026-02-17T03:03:57.933Z
Learning: In the log-ingestor's S3 scanner implementation, it is acceptable to call `state.ingest()` and update `start_after` before awaiting `sender.send()` in the ingestion flow. This ordering (where state is committed before channel send completes) is consistent with the system's failure model.

Learnt from: junhaoliao
Repo: y-scope/clp PR: 939
File: components/package-template/src/etc/clp-config.yml:64-64
Timestamp: 2025-06-24T20:13:46.758Z
Learning: When users ask CodeRabbit to create an issue after providing suggestions, they want a GitHub issue created with the high-level requirements and context, not specific code implementations.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.

Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR `#1152` discussion.

Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.

Learnt from: quinntaylormitchell
Repo: y-scope/clp PR: 1125
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:267-291
Timestamp: 2025-09-15T22:20:40.750Z
Learning: For CLP compression jobs, the team has decided to fail the entire job immediately upon encountering any invalid input path, rather than continuing to process valid paths. This decision was made during PR `#1125` development.

Learnt from: SharafMohamed
Repo: y-scope/clp PR: 2080
File: components/core/src/clp/Profiler.hpp:95-113
Timestamp: 2026-03-07T10:51:24.315Z
Learning: In y-scope/clp (PR `#2080`), `Profiler::m_runtime_measurements` and all `PROFILE_SCOPE`/`ProfilerReporter` usages are strictly single-threaded. `FileCompressor::compress_file` is called in a sequential for-loop with no thread pool. Do not flag thread-safety issues on `Profiler` runtime measurement APIs unless new multi-threaded callers are introduced.

Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.

Copy link
Copy Markdown
Member

@LinZhihao-723 LinZhihao-723 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do some package-level testing before approving.

Comment thread components/log-ingestor/tests/test_compression_listener.rs Outdated
Comment thread components/log-ingestor/tests/test_compression_listener.rs Outdated
Copy link
Copy Markdown
Member

@LinZhihao-723 LinZhihao-723 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified that for both the S3 scanner and the SQS listener, the ingested objects can be submitted for compression, and the compression results will be reflected properly after the job finishes.
However, there's an issue in the WebUI where the compression jobs are not showing properly. This is because the new S3 object metadata compression job config is not added into the WebUI component. Already located the root cause and had a quick fix with @junhaoliao. Need to decide whether we want to submit WebUI changes as a separate PR or as a part of this PR.

Copy link
Copy Markdown
Member

@LinZhihao-723 LinZhihao-723 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed WebUI changes directly. @junhaoliao Can you help check?

Copy link
Copy Markdown
Member

@junhaoliao junhaoliao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that we have created this new job type, do you think we can remove the keys[] field in S3InputConfig?

});

/**
* Matching `S3ObjectMetadataInputConfig` in `job_orchestration.scheduler.job_config`.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(just a note for awareness - no code changes are needed in this PR)

The "matching" comment is misleading because what we have here is only a subset of job_orchestration.scheduler.job_config fields. That said, i do realize that has been a preexisting issue for the other types as well.

Currently this Type object is only used to derive a TypeScript type for type hints, so we shouldn't run into any issues. However, once we add the input form for s3 then the other type objects will be used to validate against inputs, and validation errors will be raised on additional fields that are not listed. anyways, ClpIoS3ObjectMetadataInputConfig is an internal input config and will not be used in any input form validation in the foreseeable future so no code changes are needed

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We should probably create a minimum schema that contains the fields needed.
Shall we ask rabbit to create an issue to track this renaming?

Copy link
Copy Markdown
Member

@junhaoliao junhaoliao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the webui changes lgtm

@LinZhihao-723
Copy link
Copy Markdown
Member

LinZhihao-723 commented Mar 26, 2026

now that we have created this new job type, do you think we can remove the keys[] field in S3InputConfig?

The keys field is actually used for key ingestion path: https://docs.yscope.com/clp/main/user-docs/guides-using-object-storage/aws-s3/using-clp-with-aws-s3.html#s3-object-compression-mode. I think we can drop it once we move all S3 ingestion into log-ingestor.

Copy link
Copy Markdown
Member

@LinZhihao-723 LinZhihao-723 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the PR title, how about:

feat(log-ingestor): Use `S3ObjectMetadataInputConfig` for compression job submission (resolves #2018).

@jonathan-imanu jonathan-imanu changed the title feat(log-ingestor): Submit S3ObjectMetadataInputConfig to compression scheduler (addresses #2018) feat(log-ingestor): Use S3ObjectMetadataInputConfig for compression job submission (resolves #2018). Mar 27, 2026
@jonathan-imanu jonathan-imanu merged commit 6d524a7 into y-scope:main Mar 27, 2026
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants