Conversation
Adds true async HTTP transport using CloseableHttpAsyncClient for non-blocking I/O. This eliminates thread-per-request blocking and provides better scalability under high concurrency. Key changes: - Add USE_ASYNC_HTTP config option (disabled by default) - Add CloseableHttpAsyncClient with NIO-based connection pooling - Add executeRequestAsync() returning CompletableFuture - Add non-blocking retry logic using AsyncRetryMarker pattern - Add QueryResponse constructor for buffered async responses - Add comprehensive test suite (AsyncHttpClientTests) Phase 1 limitations (documented with warnings): - Response bodies are fully buffered in memory (<10MB recommended) - Client request compression not supported in async mode - Insert and multipart operations use sync fallback
Implements streaming async responses using PipedInputStream to avoid buffering entire response in memory. This enables async queries for large result sets without OOM risk. Key changes: - Add StreamingAsyncResponseConsumer with PipedInputStream bridge - Complete future on headers arrival (not stream end) to prevent deadlock - Add executeRequestAsyncStreaming() method in HttpAPIClientHelper - Update QueryResponse with streaming InputStream constructor - Increase pipe buffer to 512KB for better throughput Fixes: - Critical deadlock when user delays reading from stream - Remove unnecessary flush() on every chunk (+10-20% throughput) - Use reasonable capacityIncrement (8KB vs Integer.MAX_VALUE) Tests: - testAsyncStreamingLargeResult: 100K row streaming - testAsyncStreamingIncrementalRead: incremental byte reading - testAsyncStreamingNoDeadlockOnDelayedRead: deadlock regression test
Implements request compression for async queries using both HTTP framed LZ4 and ClickHouse native LZ4 formats. Key changes: - Add compressLZ4() method supporting both compression formats - Apply compression in executeRequestAsyncStreaming() for queries - Move Content-Encoding header logic into addHeadersToRequest() - Remove startup warning about compression not being supported - Update documentation to list compression as supported feature Note: Compression is applied in-memory before sending, which is acceptable for queries (small payloads). For large insert operations with compression, sync client should be used (documented limitation). Tests: - testAsyncQueryWithCompression: HTTP framed LZ4 - testAsyncQueryWithNativeLZ4Compression: ClickHouse native LZ4
Implements Phase 4: True streaming async inserts with on-the-fly compression, achieving memory parity with sync client. Key Features: - StreamingAsyncEntityProducer for memory-efficient uploads - Background compression thread with pipe-based data flow - Constant ~520KB memory for any insert size - Support for HTTP and ClickHouse native LZ4 compression - 6 comprehensive integration tests Memory Usage: - Before: 500MB file → 500MB heap - After: 500MB file → 520KB heap (99.9% reduction) Feature Parity: 100% for queries and inserts
- Add Async HTTP (NIO) row to Client Features table - Add Async HTTP Support section with usage example - Add CHANGELOG entry for async HTTP feature
There was a problem hiding this comment.
Pull request overview
Adds opt-in async HTTP (Apache HttpClient 5 NIO) transport support to Client V2, including streaming query/insert handling and integration tests/docs to validate the new execution paths.
Changes:
- Introduces async HTTP execution paths (streaming query + streaming insert) and async client initialization/cleanup.
- Extends
QueryResponseto support async buffered responses and streaming responses. - Adds integration tests plus README/CHANGELOG updates describing the new async HTTP capability.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java | Adds async client creation and async request/streaming execution helpers. |
| client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncResponseConsumer.java | New streaming response consumer piping async response bodies to an InputStream. |
| client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncEntityProducer.java | New streaming request producer supporting optional on-the-fly LZ4 compression. |
| client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java | Adds constructors/handling for async buffered and async streaming responses. |
| client-v2/src/main/java/com/clickhouse/client/api/Client.java | Adds builder opt-in (useAsyncHttp) and routes query/insert to async HTTP paths when enabled. |
| client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java | Adds USE_ASYNC_HTTP configuration property. |
| client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientTests.java | Adds integration tests for async HTTP queries/inserts, streaming, retries, cancellation. |
| README.md | Documents async HTTP support and basic usage. |
| CHANGELOG.md | Notes new async HTTP feature and known limitations. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncResponseConsumer.java
Show resolved
Hide resolved
client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientTests.java
Outdated
Show resolved
Hide resolved
client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientTests.java
Outdated
Show resolved
Hide resolved
client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientTests.java
Outdated
Show resolved
Hide resolved
client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientTests.java
Outdated
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Outdated
Show resolved
Hide resolved
…pAPIClientHelper.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ests.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ests.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Fix 503 retry logic in async paths (executeRequestAsync, executeRequestAsyncStreaming, executeInsertAsyncStreaming) to let caller handle retry instead of immediately throwing exception - Fix NumberFormatException in getHeaderVal by wrapping converter in try-catch and returning default value on error - Add auth header conflict handling to addHeadersToRequest and addHeadersToSimpleRequest to match sync path behavior - Add compression support to createSimpleHttpRequest - Document async insert retry limitation in Client.java - Fix test port selection to use dynamicPort() instead of random - Fix metric assertion to use getResultRows() instead of getReadRows() - Fix resource leaks by using try-with-resources for BufferedReader
… GC pressure Instead of allocating a new byte[] for every ByteBuffer chunk received, use a fixed 8KB reusable buffer and write in chunks. This reduces allocation/GC pressure under high-throughput streaming scenarios.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 15 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncResponseConsumer.java
Show resolved
Hide resolved
client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientTests.java
Outdated
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Outdated
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Outdated
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Show resolved
Hide resolved
…vent deadlock The method reads from the streaming pipe to build an exception but never closed the stream. If the error body is larger than the pipe buffer, the NIO thread would block on write, reintroducing a deadlock. Also switched to using getHeaderVal for parsing the error code header to handle potential NumberFormatException safely.
- Close StreamingResponse in error paths to prevent resource leaks and deadlocks - Fix closeOutputStream() to complete future exceptionally when streamError is set - Fix unsafe Throwable cast to Exception in async retry path - Fix column list building to remove both comma and space (use setLength instead of deleteCharAt) - Close StreamingResponse before 503 retry to avoid resource leaks - Remove unused imports from HttpAPIClientHelper, Client, and AsyncHttpClientTests - Update CHANGELOG test count description - Update buildResult() comment to accurately describe behavior
Capture the Future returned by httpAsyncClient.execute() and cancel it when the user cancels the returned CompletableFuture. This ensures in-flight requests are aborted and connections released promptly. Applied to all 3 async methods: - executeRequestAsync - executeRequestAsyncStreaming - executeInsertAsyncStreaming
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientTests.java
Outdated
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java
Outdated
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncResponseConsumer.java
Show resolved
Hide resolved
- Remove unused imports (Random, AtomicReference) from AsyncHttpClientTests - Add Content-Encoding header in addHeadersToSimpleRequest when client compression is enabled (matching sync path behavior) - Handle null response body in QueryResponse constructor for async path Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncEntityProducer.java
Outdated
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java
Outdated
Show resolved
Hide resolved
client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientTests.java
Outdated
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Outdated
Show resolved
Hide resolved
client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncEntityProducer.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…eamingAsyncEntityProducer.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ests.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…pAPIClientHelper.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncEntityProducer.java
Show resolved
Hide resolved
client-v2/src/test/java/com/clickhouse/client/AsyncHttpManualValidation.java
Show resolved
Hide resolved
client-v2/src/test/java/com/clickhouse/client/AsyncHttpManualValidation.java
Show resolved
Hide resolved
client-v2/src/test/java/com/clickhouse/client/AsyncHttpManualValidation.java
Outdated
Show resolved
Hide resolved
…anup
1. Document static TIMEOUT_SCHEDULER and COMPRESSION_EXECUTOR:
- Explain they use daemon threads that terminate with JVM
- Document intentional design avoiding complex lifecycle management
2. Fix test table cleanup in AsyncHttpManualValidation:
- Move DROP TABLE to finally blocks in testBasicInsert, testLargeInsert,
and testInsertWithCompression
- Ensures cleanup happens regardless of test success/failure
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Implement reference-counted shared executors for compression and timeout - StreamingAsyncEntityProducer: acquireExecutor/releaseExecutor for compression thread pool - Client: acquireTimeoutScheduler/releaseTimeoutScheduler for timeout scheduler - Graceful shutdown when last async client is closed (5s timeout then force) - Executors are recreated if a new async client is created after shutdown This ensures proper resource cleanup when clients are closed, while still sharing executors efficiently across multiple concurrent async clients. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ests.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…pAPIClientHelper.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- executeQueryAsync: throw ServerException when 503 received after all retries exhausted - executeInsertAsync: throw ServerException on 503 (async inserts don't support retry) - Both paths now close the response before throwing to prevent resource leaks Previously, final 503 responses were incorrectly treated as successful operations and converted to QueryResponse/InsertResponse objects. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public static void releaseExecutor() { | ||
| synchronized (EXECUTOR_LOCK) { | ||
| if (EXECUTOR_REF_COUNT.decrementAndGet() == 0 && compressionExecutor != null) { | ||
| LOG.debug("Shutting down compression executor pool"); | ||
| compressionExecutor.shutdown(); | ||
| try { | ||
| if (!compressionExecutor.awaitTermination(5, TimeUnit.SECONDS)) { | ||
| compressionExecutor.shutdownNow(); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| compressionExecutor.shutdownNow(); | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| compressionExecutor = null; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The acquireExecutor and releaseExecutor methods have a race condition when the reference count becomes zero. Between checking if EXECUTOR_REF_COUNT is 0 (line 73) and shutting down the executor (line 75), another thread could call acquireExecutor (line 60), see the count is 0, and try to create a new executor while this thread is shutting it down. This could lead to shutting down an executor that just had a new reference acquired. Consider checking the ref count again after it becomes 0 to ensure no concurrent acquires happened.
There was a problem hiding this comment.
The synchronized block on EXECUTOR_LOCK actually prevents this race condition - a thread cannot enter releaseExecutor() while another is in acquireExecutor() (and vice versa) because both methods synchronize on the same lock. Added documentation in commit d02855c explaining the thread-safety guarantees.
| private static void releaseTimeoutScheduler() { | ||
| synchronized (TIMEOUT_SCHEDULER_LOCK) { | ||
| if (TIMEOUT_SCHEDULER_REF_COUNT.decrementAndGet() == 0 && timeoutScheduler != null) { | ||
| LOG.debug("Shutting down async timeout scheduler"); | ||
| timeoutScheduler.shutdown(); | ||
| try { | ||
| if (!timeoutScheduler.awaitTermination(5, TimeUnit.SECONDS)) { | ||
| timeoutScheduler.shutdownNow(); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| timeoutScheduler.shutdownNow(); | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| timeoutScheduler = null; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The acquireTimeoutScheduler and releaseTimeoutScheduler methods have the same race condition as StreamingAsyncEntityProducer. Between checking if TIMEOUT_SCHEDULER_REF_COUNT is 0 (line 144) and shutting down the scheduler (line 146), another thread could call acquireTimeoutScheduler (line 131), see the count is 0, and try to create a new scheduler while this thread is shutting it down. This could lead to shutting down a scheduler that just had a new reference acquired.
There was a problem hiding this comment.
Same as StreamingAsyncEntityProducer - the synchronized block on TIMEOUT_SCHEDULER_LOCK provides mutual exclusion between acquire and release. No race condition exists because threads cannot concurrently execute both methods. Added clarifying documentation in commit d02855c.
client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncResponseConsumer.java
Show resolved
Hide resolved
The USE_ASYNC_HTTP config property was added for async HTTP support, increasing the total config count from 34 to 35. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1. Add thread-safety documentation to acquire/release methods explaining that the synchronized block prevents the race condition concern (mutual exclusion ensures no concurrent acquire/release) 2. Fix potential resource leak in Client constructor - if acquiring the second executor fails, release the first one 3. Fix streamCompleteFuture not completing if close() throws - moved future completion outside try block to ensure it always happens 4. Change unsubstantiated "96-99%" performance claim to "Substantial reduction" per Copilot suggestion Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| import org.apache.hc.core5.http.Header; | ||
|
|
||
| import java.io.ByteArrayInputStream; | ||
| import java.io.IOException; |
There was a problem hiding this comment.
java.io.IOException is imported but not used in this class, which will fail builds that enforce no-unused-imports (e.g., Checkstyle). Remove the unused import (or change the signature/implementation to use it explicitly).
| import java.io.IOException; |
| QueryResponse response = client.query("SELECT number FROM numbers(1000)", settings).get(30, TimeUnit.SECONDS); | ||
| BufferedReader reader = new BufferedReader(new InputStreamReader(response.getInputStream())); | ||
| int lineCount = 0; | ||
| while (reader.readLine() != null) lineCount++; | ||
| response.close(); |
There was a problem hiding this comment.
BufferedReader created from response.getInputStream() is never closed. Even though response.close() closes the underlying stream, this still leaks the reader wrapper and is easy to fix by using try-with-resources for both QueryResponse and BufferedReader.
| QueryResponse response = client.query("SELECT number, toString(number) FROM numbers(100000)", settings) | ||
| .get(60, TimeUnit.SECONDS); | ||
| BufferedReader reader = new BufferedReader(new InputStreamReader(response.getInputStream())); | ||
| int lineCount = 0; | ||
| while (reader.readLine() != null) lineCount++; | ||
| response.close(); |
There was a problem hiding this comment.
BufferedReader is not closed (only response.close() is called). Please wrap both the QueryResponse and the BufferedReader in try-with-resources so the reader is always closed, even on early exceptions while counting lines.
Summary
Exploring async client setup mentioned in ClickHouse#1831
Closes
Checklist
Delete items not relevant to your PR: