Skip to content

Feature/async http support#1

Open
devdavidkarlsson wants to merge 32 commits intomainfrom
feature/async-http-support
Open

Feature/async http support#1
devdavidkarlsson wants to merge 32 commits intomainfrom
feature/async-http-support

Conversation

@devdavidkarlsson
Copy link
Owner

@devdavidkarlsson devdavidkarlsson commented Feb 11, 2026

Summary

Exploring async client setup mentioned in ClickHouse#1831

Closes

Checklist

Delete items not relevant to your PR:

  • Closes #
  • Unit and integration tests covering the common scenarios were added
  • A human-readable description of the changes was provided to include in CHANGELOG
  • For significant changes, documentation in https://github.com/ClickHouse/clickhouse-docs was updated with further explanations or tutorials

Adds true async HTTP transport using CloseableHttpAsyncClient for
non-blocking I/O. This eliminates thread-per-request blocking and
provides better scalability under high concurrency.

Key changes:
- Add USE_ASYNC_HTTP config option (disabled by default)
- Add CloseableHttpAsyncClient with NIO-based connection pooling
- Add executeRequestAsync() returning CompletableFuture
- Add non-blocking retry logic using AsyncRetryMarker pattern
- Add QueryResponse constructor for buffered async responses
- Add comprehensive test suite (AsyncHttpClientTests)

Phase 1 limitations (documented with warnings):
- Response bodies are fully buffered in memory (<10MB recommended)
- Client request compression not supported in async mode
- Insert and multipart operations use sync fallback
Implements streaming async responses using PipedInputStream to avoid
buffering entire response in memory. This enables async queries for
large result sets without OOM risk.

Key changes:
- Add StreamingAsyncResponseConsumer with PipedInputStream bridge
- Complete future on headers arrival (not stream end) to prevent deadlock
- Add executeRequestAsyncStreaming() method in HttpAPIClientHelper
- Update QueryResponse with streaming InputStream constructor
- Increase pipe buffer to 512KB for better throughput

Fixes:
- Critical deadlock when user delays reading from stream
- Remove unnecessary flush() on every chunk (+10-20% throughput)
- Use reasonable capacityIncrement (8KB vs Integer.MAX_VALUE)

Tests:
- testAsyncStreamingLargeResult: 100K row streaming
- testAsyncStreamingIncrementalRead: incremental byte reading
- testAsyncStreamingNoDeadlockOnDelayedRead: deadlock regression test
Implements request compression for async queries using both HTTP framed
LZ4 and ClickHouse native LZ4 formats.

Key changes:
- Add compressLZ4() method supporting both compression formats
- Apply compression in executeRequestAsyncStreaming() for queries
- Move Content-Encoding header logic into addHeadersToRequest()
- Remove startup warning about compression not being supported
- Update documentation to list compression as supported feature

Note: Compression is applied in-memory before sending, which is acceptable
for queries (small payloads). For large insert operations with compression,
sync client should be used (documented limitation).

Tests:
- testAsyncQueryWithCompression: HTTP framed LZ4
- testAsyncQueryWithNativeLZ4Compression: ClickHouse native LZ4
Implements Phase 4: True streaming async inserts with on-the-fly
compression, achieving memory parity with sync client.

Key Features:
- StreamingAsyncEntityProducer for memory-efficient uploads
- Background compression thread with pipe-based data flow
- Constant ~520KB memory for any insert size
- Support for HTTP and ClickHouse native LZ4 compression
- 6 comprehensive integration tests

Memory Usage:
- Before: 500MB file → 500MB heap
- After:  500MB file → 520KB heap (99.9% reduction)

Feature Parity: 100% for queries and inserts
- Add Async HTTP (NIO) row to Client Features table
- Add Async HTTP Support section with usage example
- Add CHANGELOG entry for async HTTP feature
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds opt-in async HTTP (Apache HttpClient 5 NIO) transport support to Client V2, including streaming query/insert handling and integration tests/docs to validate the new execution paths.

Changes:

  • Introduces async HTTP execution paths (streaming query + streaming insert) and async client initialization/cleanup.
  • Extends QueryResponse to support async buffered responses and streaming responses.
  • Adds integration tests plus README/CHANGELOG updates describing the new async HTTP capability.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 15 comments.

Show a summary per file
File Description
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java Adds async client creation and async request/streaming execution helpers.
client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncResponseConsumer.java New streaming response consumer piping async response bodies to an InputStream.
client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncEntityProducer.java New streaming request producer supporting optional on-the-fly LZ4 compression.
client-v2/src/main/java/com/clickhouse/client/api/query/QueryResponse.java Adds constructors/handling for async buffered and async streaming responses.
client-v2/src/main/java/com/clickhouse/client/api/Client.java Adds builder opt-in (useAsyncHttp) and routes query/insert to async HTTP paths when enabled.
client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java Adds USE_ASYNC_HTTP configuration property.
client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientTests.java Adds integration tests for async HTTP queries/inserts, streaming, retries, cancellation.
README.md Documents async HTTP support and basic usage.
CHANGELOG.md Notes new async HTTP feature and known limitations.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

devdavidkarlsson and others added 5 commits February 11, 2026 18:37
…pAPIClientHelper.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ests.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ests.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Fix 503 retry logic in async paths (executeRequestAsync,
  executeRequestAsyncStreaming, executeInsertAsyncStreaming) to let
  caller handle retry instead of immediately throwing exception
- Fix NumberFormatException in getHeaderVal by wrapping converter
  in try-catch and returning default value on error
- Add auth header conflict handling to addHeadersToRequest and
  addHeadersToSimpleRequest to match sync path behavior
- Add compression support to createSimpleHttpRequest
- Document async insert retry limitation in Client.java
- Fix test port selection to use dynamicPort() instead of random
- Fix metric assertion to use getResultRows() instead of getReadRows()
- Fix resource leaks by using try-with-resources for BufferedReader
… GC pressure

Instead of allocating a new byte[] for every ByteBuffer chunk received,
use a fixed 8KB reusable buffer and write in chunks. This reduces
allocation/GC pressure under high-throughput streaming scenarios.
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 15 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

…vent deadlock

The method reads from the streaming pipe to build an exception but never
closed the stream. If the error body is larger than the pipe buffer, the
NIO thread would block on write, reintroducing a deadlock.

Also switched to using getHeaderVal for parsing the error code header
to handle potential NumberFormatException safely.
- Close StreamingResponse in error paths to prevent resource leaks and deadlocks
- Fix closeOutputStream() to complete future exceptionally when streamError is set
- Fix unsafe Throwable cast to Exception in async retry path
- Fix column list building to remove both comma and space (use setLength instead of deleteCharAt)
- Close StreamingResponse before 503 retry to avoid resource leaks
- Remove unused imports from HttpAPIClientHelper, Client, and AsyncHttpClientTests
- Update CHANGELOG test count description
- Update buildResult() comment to accurately describe behavior
Capture the Future returned by httpAsyncClient.execute() and cancel it
when the user cancels the returned CompletableFuture. This ensures
in-flight requests are aborted and connections released promptly.

Applied to all 3 async methods:
- executeRequestAsync
- executeRequestAsyncStreaming
- executeInsertAsyncStreaming
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

- Remove unused imports (Random, AtomicReference) from AsyncHttpClientTests
- Add Content-Encoding header in addHeadersToSimpleRequest when client
  compression is enabled (matching sync path behavior)
- Handle null response body in QueryResponse constructor for async path

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

devdavidkarlsson and others added 4 commits February 16, 2026 14:34
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…eamingAsyncEntityProducer.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ests.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…pAPIClientHelper.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

devdavidkarlsson and others added 2 commits February 17, 2026 11:01
…anup

1. Document static TIMEOUT_SCHEDULER and COMPRESSION_EXECUTOR:
   - Explain they use daemon threads that terminate with JVM
   - Document intentional design avoiding complex lifecycle management

2. Fix test table cleanup in AsyncHttpManualValidation:
   - Move DROP TABLE to finally blocks in testBasicInsert, testLargeInsert,
     and testInsertWithCompression
   - Ensures cleanup happens regardless of test success/failure

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Implement reference-counted shared executors for compression and timeout
- StreamingAsyncEntityProducer: acquireExecutor/releaseExecutor for compression thread pool
- Client: acquireTimeoutScheduler/releaseTimeoutScheduler for timeout scheduler
- Graceful shutdown when last async client is closed (5s timeout then force)
- Executors are recreated if a new async client is created after shutdown

This ensures proper resource cleanup when clients are closed, while still
sharing executors efficiently across multiple concurrent async clients.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

devdavidkarlsson and others added 6 commits February 17, 2026 15:44
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ests.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…pAPIClientHelper.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- executeQueryAsync: throw ServerException when 503 received after all retries exhausted
- executeInsertAsync: throw ServerException on 503 (async inserts don't support retry)
- Both paths now close the response before throwing to prevent resource leaks

Previously, final 503 responses were incorrectly treated as successful operations
and converted to QueryResponse/InsertResponse objects.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +71 to +87
public static void releaseExecutor() {
synchronized (EXECUTOR_LOCK) {
if (EXECUTOR_REF_COUNT.decrementAndGet() == 0 && compressionExecutor != null) {
LOG.debug("Shutting down compression executor pool");
compressionExecutor.shutdown();
try {
if (!compressionExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
compressionExecutor.shutdownNow();
}
} catch (InterruptedException e) {
compressionExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
compressionExecutor = null;
}
}
}
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The acquireExecutor and releaseExecutor methods have a race condition when the reference count becomes zero. Between checking if EXECUTOR_REF_COUNT is 0 (line 73) and shutting down the executor (line 75), another thread could call acquireExecutor (line 60), see the count is 0, and try to create a new executor while this thread is shutting it down. This could lead to shutting down an executor that just had a new reference acquired. Consider checking the ref count again after it becomes 0 to ensure no concurrent acquires happened.

Copilot uses AI. Check for mistakes.
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synchronized block on EXECUTOR_LOCK actually prevents this race condition - a thread cannot enter releaseExecutor() while another is in acquireExecutor() (and vice versa) because both methods synchronize on the same lock. Added documentation in commit d02855c explaining the thread-safety guarantees.

Comment on lines +142 to +158
private static void releaseTimeoutScheduler() {
synchronized (TIMEOUT_SCHEDULER_LOCK) {
if (TIMEOUT_SCHEDULER_REF_COUNT.decrementAndGet() == 0 && timeoutScheduler != null) {
LOG.debug("Shutting down async timeout scheduler");
timeoutScheduler.shutdown();
try {
if (!timeoutScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
timeoutScheduler.shutdownNow();
}
} catch (InterruptedException e) {
timeoutScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
timeoutScheduler = null;
}
}
}
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The acquireTimeoutScheduler and releaseTimeoutScheduler methods have the same race condition as StreamingAsyncEntityProducer. Between checking if TIMEOUT_SCHEDULER_REF_COUNT is 0 (line 144) and shutting down the scheduler (line 146), another thread could call acquireTimeoutScheduler (line 131), see the count is 0, and try to create a new scheduler while this thread is shutting it down. This could lead to shutting down a scheduler that just had a new reference acquired.

Copilot uses AI. Check for mistakes.
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as StreamingAsyncEntityProducer - the synchronized block on TIMEOUT_SCHEDULER_LOCK provides mutual exclusion between acquire and release. No race condition exists because threads cannot concurrently execute both methods. Added clarifying documentation in commit d02855c.

devdavidkarlsson and others added 2 commits February 17, 2026 16:37
The USE_ASYNC_HTTP config property was added for async HTTP support,
increasing the total config count from 34 to 35.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1. Add thread-safety documentation to acquire/release methods explaining
   that the synchronized block prevents the race condition concern
   (mutual exclusion ensures no concurrent acquire/release)

2. Fix potential resource leak in Client constructor - if acquiring the
   second executor fails, release the first one

3. Fix streamCompleteFuture not completing if close() throws - moved
   future completion outside try block to ensure it always happens

4. Change unsubstantiated "96-99%" performance claim to "Substantial
   reduction" per Copilot suggestion

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

devdavidkarlsson and others added 2 commits February 18, 2026 14:31
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

import org.apache.hc.core5.http.Header;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.io.IOException is imported but not used in this class, which will fail builds that enforce no-unused-imports (e.g., Checkstyle). Remove the unused import (or change the signature/implementation to use it explicitly).

Suggested change
import java.io.IOException;

Copilot uses AI. Check for mistakes.
Comment on lines +166 to +170
QueryResponse response = client.query("SELECT number FROM numbers(1000)", settings).get(30, TimeUnit.SECONDS);
BufferedReader reader = new BufferedReader(new InputStreamReader(response.getInputStream()));
int lineCount = 0;
while (reader.readLine() != null) lineCount++;
response.close();
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferedReader created from response.getInputStream() is never closed. Even though response.close() closes the underlying stream, this still leaks the reader wrapper and is easy to fix by using try-with-resources for both QueryResponse and BufferedReader.

Copilot uses AI. Check for mistakes.
Comment on lines +189 to +194
QueryResponse response = client.query("SELECT number, toString(number) FROM numbers(100000)", settings)
.get(60, TimeUnit.SECONDS);
BufferedReader reader = new BufferedReader(new InputStreamReader(response.getInputStream()));
int lineCount = 0;
while (reader.readLine() != null) lineCount++;
response.close();
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferedReader is not closed (only response.close() is called). Please wrap both the QueryResponse and the BufferedReader in try-with-resources so the reader is always closed, even on early exceptions while counting lines.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments