Add JDBC-based source coordination store plugin#6757
Add JDBC-based source coordination store plugin#6757lawofcycles wants to merge 6 commits intoopensearch-project:mainfrom
Conversation
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
There was a problem hiding this comment.
No unit tests for JdbcSourceCoordinationStore itself (only integration tests). The DynamoDB store has unit tests with mocked clients. Add unit tests that mock DataSource/Connection to test error handling paths, version mismatch behavior, and SQL exception translation without requiring a running database.
There was a problem hiding this comment.
Added JdbcSourceCoordinationStoreTest.
| import java.util.Optional; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertFalse; |
There was a problem hiding this comment.
Uses JUnit assertions (assertEquals, assertFalse, assertTrue). Project standard requires Hamcrest matchers (assertThat with equalTo, is(true), is(false)).
|
|
||
| final Optional<SourcePartitionStoreItem> first = store.tryAcquireAvailablePartition( | ||
| SOURCE_ID, "node-1", Duration.ofSeconds(2)); | ||
| assertTrue(first.isPresent()); |
There was a problem hiding this comment.
Thread.sleep(3000) in test. Use org.awaitility:awaitility for polling-based assertions instead of fixed sleeps. The DynamoDB store tests already use awaitility as a test dependency.
|
|
||
| import java.time.Instant; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; |
There was a problem hiding this comment.
uses JUnit assertions instead of Hamcrest matchers
| import java.util.Map; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertFalse; |
There was a problem hiding this comment.
uses JUnit assertions instead of Hamcrest matchers
| implementation 'com.zaxxer:HikariCP:5.1.0' | ||
| implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
| implementation 'javax.inject:javax.inject:1' | ||
| implementation 'org.postgresql:postgresql:42.7.7' |
There was a problem hiding this comment.
Any reason why both postgresql and mysql included as implementation dependencies?
As they should be runtimeOnly. No compile-time references to driver-specific classes exist in the source as they're loaded via DriverManager at runtime.
| implementation 'org.postgresql:postgresql:42.7.7' | ||
| implementation 'com.mysql:mysql-connector-j:8.4.0' | ||
| } | ||
|
|
There was a problem hiding this comment.
No JaCoCo configuration. New plugins should have jacocoTestCoverageVerification with minimum 1.0 ratio per project standards.
@dlvenable correct me if I am wrong.
There was a problem hiding this comment.
Added jacocoTestCoverageVerification with minimum 0.99. The threshold varies across plugins (from 0.84 to 1.0), and the current instruction coverage is 99%. The only uncovered line is the HikariDataSource constructor in createDataSource(), which is overridden in tests to inject a mock.
| @JsonCreator | ||
| public JdbcStoreSettings( | ||
| @JsonProperty("url") final String url, | ||
| @JsonProperty("username") final String username, |
There was a problem hiding this comment.
password is stored as plain String. Consider documenting that this field supports Data-Prepper's secret management integration, or note it as a known limitation for the initial version
There was a problem hiding this comment.
The password field already supports Data Prepper's secret management through VariableExpander. Users can use ${{aws_secrets:secretId:secretKey}} syntax, which is automatically resolved during deserialization via @JsonProperty.
However, there is currently only AwsSecretsPluginConfigValueTranslator, so it would be worth adding another implementation such as HashiCorp Vault for cloud-agnostic use cases in the future.
|
|
||
| private final JdbcStoreSettings settings; | ||
| private HikariDataSource dataSource; | ||
| private ScheduledExecutorService ttlExecutor; |
There was a problem hiding this comment.
HikariDataSource and ScheduledExecutorService are never shut down. Implement AutoCloseable with a close() that calls dataSource.close() and ttlExecutor.shutdownNow().
There was a problem hiding this comment.
Addressed. Implemented AutoCloseable with close() that shuts down both ttlExecutor and dataSource.
| } | ||
|
|
||
| @Override | ||
| public void initializeStore() { |
There was a problem hiding this comment.
initializeStore() is not guarded against concurrent calls. If called twice, it creates a second HikariDataSource without closing the first. Add an AtomicBoolean initialized guard or null check on dataSource.
There was a problem hiding this comment.
Addressed. Added AtomicBoolean guard in initializeStore() to prevent double initialization.
| } | ||
|
|
||
| if (settings.getTtl() != null) { | ||
| ttlExecutor = Executors.newSingleThreadScheduledExecutor(r -> { |
There was a problem hiding this comment.
The ScheduledExecutorService for TTL cleanup is never shut down. On pipeline stop this daemon thread keeps running. Store a reference and shut it down in the close() method.
…Store Implement AutoCloseable to shut down HikariDataSource and TTL executor on close. Add AtomicBoolean guard to prevent double initialization and connection pool leak when initializeStore is called more than once. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
… awaitility Align test style with project standards. Add awaitility test dependency and docker-compose.yml with PostgreSQL and MySQL for integration tests. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
…-compose Extract createDataSource() as package-private method to allow test subclass override, replacing reflection-based mock injection. Add JdbcSourceCoordinationStoreTest covering error handling, version mismatch, table creation, and TTL paths (34 test cases). Add jacocoTestCoverageVerification with minimum 0.90 ratio. Add docker-compose.yml with PostgreSQL 16 and MySQL 8.4 for integration tests. Replace FQCN usages with imports. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Extract createDataSource() as package-private method to enable test subclass override without reflection. Add test cases for table creation, MySQL duplicate index, non-constraint SQL exceptions, assigned-expired acquisition, null closedCount, and priority override branches. Raise JaCoCo minimum from 0.90 to 0.99. Replace remaining FQCN usages with imports in source and test files. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
|
Thank you for the thorough review, @srikanthpadakanti. All comments have been addressed. |
| private void createTable() { | ||
| final String tableName = settings.getTableName(); | ||
| final String createTableSql = "CREATE TABLE IF NOT EXISTS " + tableName + " (" | ||
| + "source_identifier VARCHAR(256) NOT NULL, " |
There was a problem hiding this comment.
source_identifier and source_partition_key are VARCHAR(256). S3 object keys can be up to 1024 characters and some source plugins may use them as partition keys. Consider increasing to VARCHAR(1024) or documenting the limitation.
Thanks for addressing them. Added one more comment. Everything else looks good to me. |
Description
Adds a new
jdbcsource coordination store plugin backed by a relational database.The plugin uses standard SQL operations to implement the
SourceCoordinationStoreinterface.INSERTwith duplicate key checkUPDATE ... WHERE version = ?COMPLETEDpartitionsConfiguration example
Tested with PostgreSQL and MySQL. Integration tests run against PostgreSQL by default and can be configured for MySQL via system properties. Also verified end-to-end with OpenSearch Source and Iceberg Source pipelines on both databases, including TTL-based cleanup of completed partitions.
Issues Resolved
Resolves #6740
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.