Skip to content

Add JDBC-based source coordination store plugin#6757

Open
lawofcycles wants to merge 6 commits intoopensearch-project:mainfrom
lawofcycles:feature/jdbc-coordination-store
Open

Add JDBC-based source coordination store plugin#6757
lawofcycles wants to merge 6 commits intoopensearch-project:mainfrom
lawofcycles:feature/jdbc-coordination-store

Conversation

@lawofcycles
Copy link
Copy Markdown
Contributor

Description

Adds a new jdbc source coordination store plugin backed by a relational database.

The plugin uses standard SQL operations to implement the SourceCoordinationStore interface.

Capability Implementation
Conditional create INSERT with duplicate key check
Optimistic locking UPDATE ... WHERE version = ?
Partition acquisition Status-based query with priority ordering
TTL cleanup Scheduled deletion of expired COMPLETED partitions

Configuration example

source_coordination:
 store:
   jdbc:
     url: "jdbc:postgresql://localhost:5432/dataprepper"
     username: "dp_user"
     password: "dp_pass"

Tested with PostgreSQL and MySQL. Integration tests run against PostgreSQL by default and can be configured for MySQL via system properties. Also verified end-to-end with OpenSearch Source and Iceberg Source pipelines on both databases, including TTL-based cleanup of completed partitions.

Issues Resolved

Resolves #6740

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No unit tests for JdbcSourceCoordinationStore itself (only integration tests). The DynamoDB store has unit tests with mocked clients. Add unit tests that mock DataSource/Connection to test error handling paths, version mismatch behavior, and SQL exception translation without requiring a running database.

Copy link
Copy Markdown
Contributor Author

@lawofcycles lawofcycles Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added JdbcSourceCoordinationStoreTest.

import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uses JUnit assertions (assertEquals, assertFalse, assertTrue). Project standard requires Hamcrest matchers (assertThat with equalTo, is(true), is(false)).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed


final Optional<SourcePartitionStoreItem> first = store.tryAcquireAvailablePartition(
SOURCE_ID, "node-1", Duration.ofSeconds(2));
assertTrue(first.isPresent());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.sleep(3000) in test. Use org.awaitility:awaitility for polling-based assertions instead of fixed sleeps. The DynamoDB store tests already use awaitility as a test dependency.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed


import java.time.Instant;

import static org.junit.jupiter.api.Assertions.assertEquals;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uses JUnit assertions instead of Hamcrest matchers

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uses JUnit assertions instead of Hamcrest matchers

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

implementation 'com.zaxxer:HikariCP:5.1.0'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'javax.inject:javax.inject:1'
implementation 'org.postgresql:postgresql:42.7.7'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why both postgresql and mysql included as implementation dependencies?
As they should be runtimeOnly. No compile-time references to driver-specific classes exist in the source as they're loaded via DriverManager at runtime.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed.

implementation 'org.postgresql:postgresql:42.7.7'
implementation 'com.mysql:mysql-connector-j:8.4.0'
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No JaCoCo configuration. New plugins should have jacocoTestCoverageVerification with minimum 1.0 ratio per project standards.
@dlvenable correct me if I am wrong.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added jacocoTestCoverageVerification with minimum 0.99. The threshold varies across plugins (from 0.84 to 1.0), and the current instruction coverage is 99%. The only uncovered line is the HikariDataSource constructor in createDataSource(), which is overridden in tests to inject a mock.

@JsonCreator
public JdbcStoreSettings(
@JsonProperty("url") final String url,
@JsonProperty("username") final String username,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

password is stored as plain String. Consider documenting that this field supports Data-Prepper's secret management integration, or note it as a known limitation for the initial version

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The password field already supports Data Prepper's secret management through VariableExpander. Users can use ${{aws_secrets:secretId:secretKey}} syntax, which is automatically resolved during deserialization via @JsonProperty.
However, there is currently only AwsSecretsPluginConfigValueTranslator, so it would be worth adding another implementation such as HashiCorp Vault for cloud-agnostic use cases in the future.


private final JdbcStoreSettings settings;
private HikariDataSource dataSource;
private ScheduledExecutorService ttlExecutor;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HikariDataSource and ScheduledExecutorService are never shut down. Implement AutoCloseable with a close() that calls dataSource.close() and ttlExecutor.shutdownNow().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. Implemented AutoCloseable with close() that shuts down both ttlExecutor and dataSource.

}

@Override
public void initializeStore() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initializeStore() is not guarded against concurrent calls. If called twice, it creates a second HikariDataSource without closing the first. Add an AtomicBoolean initialized guard or null check on dataSource.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. Added AtomicBoolean guard in initializeStore() to prevent double initialization.

}

if (settings.getTtl() != null) {
ttlExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ScheduledExecutorService for TTL cleanup is never shut down. On pipeline stop this daemon thread keeps running. Store a reference and shut it down in the close() method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

…Store

Implement AutoCloseable to shut down HikariDataSource and TTL executor
on close. Add AtomicBoolean guard to prevent double initialization and
connection pool leak when initializeStore is called more than once.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
… awaitility

Align test style with project standards. Add awaitility test dependency
and docker-compose.yml with PostgreSQL and MySQL for integration tests.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
…-compose

Extract createDataSource() as package-private method to allow test
subclass override, replacing reflection-based mock injection.
Add JdbcSourceCoordinationStoreTest covering error handling, version
mismatch, table creation, and TTL paths (34 test cases).

Add jacocoTestCoverageVerification with minimum 0.90 ratio.
Add docker-compose.yml with PostgreSQL 16 and MySQL 8.4 for
integration tests. Replace FQCN usages with imports.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Extract createDataSource() as package-private method to enable test
subclass override without reflection. Add test cases for table
creation, MySQL duplicate index, non-constraint SQL exceptions,
assigned-expired acquisition, null closedCount, and priority override
branches. Raise JaCoCo minimum from 0.90 to 0.99. Replace remaining
FQCN usages with imports in source and test files.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles
Copy link
Copy Markdown
Contributor Author

Thank you for the thorough review, @srikanthpadakanti. All comments have been addressed.

private void createTable() {
final String tableName = settings.getTableName();
final String createTableSql = "CREATE TABLE IF NOT EXISTS " + tableName + " ("
+ "source_identifier VARCHAR(256) NOT NULL, "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source_identifier and source_partition_key are VARCHAR(256). S3 object keys can be up to 1024 characters and some source plugins may use them as partition keys. Consider increasing to VARCHAR(1024) or documenting the limitation.

@srikanthpadakanti
Copy link
Copy Markdown
Contributor

srikanthpadakanti commented Apr 16, 2026

Thank you for the thorough review, @srikanthpadakanti. All comments have been addressed.

Thanks for addressing them. Added one more comment. Everything else looks good to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add JDBC-based source coordination store

2 participants