Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
787d858
Add async HTTP support using Apache HttpClient 5 async API (Phase 1)
devdavidkarlsson Feb 11, 2026
860fe1b
Add streaming async response support (Phase 2)
devdavidkarlsson Feb 11, 2026
efef002
Add LZ4 request compression for async HTTP (Phase 3)
devdavidkarlsson Feb 11, 2026
56d55cd
feat(async): Add streaming async inserts with compression support
devdavidkarlsson Feb 11, 2026
efecaef
docs: Add async HTTP documentation
devdavidkarlsson Feb 11, 2026
768e857
Update client-v2/src/main/java/com/clickhouse/client/api/internal/Htt…
devdavidkarlsson Feb 11, 2026
d834acb
Update client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientT…
devdavidkarlsson Feb 11, 2026
78fe78b
Update client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientT…
devdavidkarlsson Feb 11, 2026
ac12829
fix: address Copilot review comments for async HTTP support
devdavidkarlsson Feb 13, 2026
9dcfd16
perf: use reusable buffer in StreamingAsyncResponseConsumer to reduce…
devdavidkarlsson Feb 13, 2026
9a45872
fix: close StreamingResponse in readErrorFromStreamingResponse to pre…
devdavidkarlsson Feb 16, 2026
2ff342c
fix: address additional Copilot review comments
devdavidkarlsson Feb 16, 2026
234e347
fix: propagate cancellation to underlying async HTTP requests
devdavidkarlsson Feb 16, 2026
5d5c6c8
fix: address Copilot review round 2
devdavidkarlsson Feb 16, 2026
c638218
Update CHANGELOG.md
devdavidkarlsson Feb 16, 2026
2de0df1
Update client-v2/src/main/java/com/clickhouse/client/api/internal/Str…
devdavidkarlsson Feb 16, 2026
0627b9e
Update client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientT…
devdavidkarlsson Feb 16, 2026
9dae2eb
Update client-v2/src/main/java/com/clickhouse/client/api/internal/Htt…
devdavidkarlsson Feb 16, 2026
ceddd45
fix: address Copilot review round 3
devdavidkarlsson Feb 16, 2026
060ef45
feat: improve async HTTP client production-readiness
devdavidkarlsson Feb 17, 2026
c4662af
fix: address Copilot review comments on static executors and test cle…
devdavidkarlsson Feb 17, 2026
f7a1633
feat: add proper lifecycle management for async executors
devdavidkarlsson Feb 17, 2026
852cb9d
Update client-v2/src/main/java/com/clickhouse/client/api/Client.java
devdavidkarlsson Feb 17, 2026
ee1aeb1
Update client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientT…
devdavidkarlsson Feb 17, 2026
ba7b17d
Update client-v2/src/test/resources/logback-test.xml
devdavidkarlsson Feb 17, 2026
a87204e
Update client-v2/src/main/java/com/clickhouse/client/api/internal/Htt…
devdavidkarlsson Feb 17, 2026
aa30e36
Update client-v2/src/main/java/com/clickhouse/client/api/Client.java
devdavidkarlsson Feb 17, 2026
3cfc083
fix: properly handle 503 responses in async query/insert paths
devdavidkarlsson Feb 17, 2026
e57e89c
fix: update expected config count for new USE_ASYNC_HTTP property
devdavidkarlsson Feb 17, 2026
d02855c
fix: address Copilot review on executor lifecycle and error handling
devdavidkarlsson Feb 17, 2026
f89db10
Update client-v2/src/main/java/com/clickhouse/client/api/Client.java
devdavidkarlsson Feb 18, 2026
942e8b7
Update client-v2/src/main/java/com/clickhouse/client/api/Client.java
devdavidkarlsson Feb 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
## [Unreleased]

### New Features
- [client-v2] Async HTTP support using Apache HttpClient 5 NIO API (https://github.com/ClickHouse/clickhouse-java/issues/1831)
- True non-blocking I/O for queries and inserts
- Streaming responses with constant memory usage (~512KB regardless of result size)
- Streaming request compression (HTTP framed LZ4 and ClickHouse native LZ4)
- Eliminates blocking threads waiting for I/O under high concurrency, significantly reducing thread usage compared to the synchronous client
- Opt-in via `useAsyncHttp(true)` builder option
- Full backward compatibility (async disabled by default)
- Added test coverage (integration tests)

### Known Limitations
- [client-v2] Async HTTP: SOCKS proxy not supported (Apache HttpClient async limitation)
- [client-v2] Async HTTP: Multipart requests use sync fallback

## 0.9.6
Release is aimed to address potential security risk in one of the dependencies (see below). We strongly recommend to upgrade.

Expand Down
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Historically, there are two versions of both components. The previous version of
| Name | Client V2 | Client V1 | Comments
|----------------------------------------------|:---------:|:---------:|:---------:|
| Http Connection |✔ |✔ | |
| Async HTTP (NIO) |✔ |✗ | Non-blocking I/O for high concurrency |
| Http Compression (LZ4) |✔ |✔ | |
| Server Response Compression - LZ4 |✔ |✔ | |
| Client Request Compression - LZ4 |✔ |✔ | |
Expand Down Expand Up @@ -89,8 +90,36 @@ Nightly Builds: https://s01.oss.sonatype.org/content/repositories/snapshots/com/

[Begin-with Usage Examples](../../tree/main/examples/client-v2)

[Spring Demo Service](https://github.com/ClickHouse/clickhouse-java/tree/main/examples/demo-service)
[Spring Demo Service](https://github.com/ClickHouse/clickhouse-java/tree/main/examples/demo-service)

### Async HTTP Support

Client V2 supports true async HTTP using Apache HttpClient 5 NIO API for high-concurrency workloads.

**Features:**
- Non-blocking I/O - no thread-per-request blocking
- Streaming responses with constant memory usage
- Streaming request compression (HTTP and native LZ4)
- Substantial reduction in thread usage under high concurrency

**Usage:**
```java
Client client = new Client.Builder()
.addEndpoint("http://localhost:8123")
.setUsername("default")
.useAsyncHttp(true) // Enable async HTTP
.build();

// Queries and inserts work the same way
CompletableFuture<QueryResponse> future = client.query("SELECT * FROM table");
```

**When to use:**
- High concurrency (100+ concurrent requests)
- Large result sets or inserts (GB+ data)
- Memory-constrained environments

Async HTTP is opt-in and disabled by default. Existing code works without changes.

## JDBC Driver

Expand Down
353 changes: 351 additions & 2 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ public enum ClientConfigProperties {

ASYNC_OPERATIONS("async", Boolean.class, "false"),

/**
* Enables true async HTTP transport using Apache HttpClient 5 async API.
* When enabled, HTTP requests use NIO-based non-blocking I/O instead of
* blocking thread-per-request model. This provides better scalability
* under high concurrency.
*/
USE_ASYNC_HTTP("use_async_http", Boolean.class, "false"),

CONNECTION_TTL("connection_ttl", Long.class, "-1"),

CONNECTION_TIMEOUT("connection_timeout", Long.class),
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
package com.clickhouse.client.api.internal;

import net.jpountz.lz4.LZ4Factory;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* Async entity producer that streams data from an InputStream with optional LZ4 compression.
* Supports on-the-fly compression without buffering the entire payload in memory.
*
* <p>For compression, data flows: User InputStream → Compression → PipedStream → NIO output</p>
*/
public class StreamingAsyncEntityProducer implements AsyncEntityProducer {

private static final Logger LOG = LoggerFactory.getLogger(StreamingAsyncEntityProducer.class);
private static final int DEFAULT_BUFFER_SIZE = 8 * 1024; // 8KB read buffer
private static final int PIPE_BUFFER_SIZE = 512 * 1024; // 512KB pipe buffer
private static final AtomicLong THREAD_COUNTER = new AtomicLong(0); // For unique thread names

/**
* Shared thread pool for compression tasks - bounded to prevent thread explosion under high concurrency.
* Uses daemon threads so it won't prevent JVM shutdown if shutdown is not called.
*
* <p>Lifecycle management: Call {@link #acquireExecutor()} when creating an async client and
* {@link #releaseExecutor()} when closing it. The pool is lazily created on first acquire and
* shut down when the last client releases it.</p>
*/
private static final int COMPRESSION_POOL_SIZE = Math.max(2, Runtime.getRuntime().availableProcessors());
private static final Object EXECUTOR_LOCK = new Object();
private static final AtomicInteger EXECUTOR_REF_COUNT = new AtomicInteger(0);
private static volatile ExecutorService compressionExecutor = null;

/**
* Acquires a reference to the shared compression executor.
* Call this when creating an async HTTP client that may use compression.
* Must be paired with a call to {@link #releaseExecutor()} when the client is closed.
*
* <p>Thread-safety: This method synchronizes on EXECUTOR_LOCK, ensuring mutual exclusion
* with releaseExecutor(). No race condition exists because a thread cannot enter
* releaseExecutor() while another is in acquireExecutor() (and vice versa).</p>
*/
public static void acquireExecutor() {
synchronized (EXECUTOR_LOCK) {
if (EXECUTOR_REF_COUNT.getAndIncrement() == 0) {
compressionExecutor = createCompressionExecutor();
LOG.debug("Created compression executor pool");
}
}
}

/**
* Releases a reference to the shared compression executor.
* When the last reference is released, the executor is gracefully shut down.
*
* <p>Thread-safety: This method synchronizes on EXECUTOR_LOCK, ensuring mutual exclusion
* with acquireExecutor(). The synchronized block guarantees that between checking the
* ref count and shutting down, no other thread can acquire a new reference.</p>
*/
public static void releaseExecutor() {
synchronized (EXECUTOR_LOCK) {
if (EXECUTOR_REF_COUNT.decrementAndGet() == 0 && compressionExecutor != null) {
LOG.debug("Shutting down compression executor pool");
compressionExecutor.shutdown();
try {
if (!compressionExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
compressionExecutor.shutdownNow();
}
} catch (InterruptedException e) {
compressionExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
compressionExecutor = null;
}
}
}
Comment on lines +79 to +95
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The acquireExecutor and releaseExecutor methods have a race condition when the reference count becomes zero. Between checking if EXECUTOR_REF_COUNT is 0 (line 73) and shutting down the executor (line 75), another thread could call acquireExecutor (line 60), see the count is 0, and try to create a new executor while this thread is shutting it down. This could lead to shutting down an executor that just had a new reference acquired. Consider checking the ref count again after it becomes 0 to ensure no concurrent acquires happened.

Copilot uses AI. Check for mistakes.
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synchronized block on EXECUTOR_LOCK actually prevents this race condition - a thread cannot enter releaseExecutor() while another is in acquireExecutor() (and vice versa) because both methods synchronize on the same lock. Added documentation in commit d02855c explaining the thread-safety guarantees.


private static ExecutorService createCompressionExecutor() {
return new ThreadPoolExecutor(
COMPRESSION_POOL_SIZE,
COMPRESSION_POOL_SIZE,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // Bounded queue to provide backpressure
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "ch-async-compress-" + THREAD_COUNTER.incrementAndGet());
t.setDaemon(true);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // If queue full, run in caller thread (backpressure)
);
}

private static ExecutorService getExecutor() {
ExecutorService exec = compressionExecutor;
if (exec == null || exec.isShutdown()) {
// Fallback: if executor not acquired properly, create inline (caller's thread)
// This handles edge cases but logs a warning
LOG.warn("Compression executor not acquired - compression will run in caller thread");
return null;
}
return exec;
}

private final ContentType contentType;
private final InputStream sourceStream;
private final boolean compressData;
private final boolean useHttpCompression;
private final int compressionBufferSize;
private final LZ4Factory lz4Factory;

private final ByteBuffer readBuffer;
private final AtomicBoolean completed = new AtomicBoolean(false);
private final AtomicReference<Exception> error = new AtomicReference<>();

// For compression: compress in thread pool, read compressed data here
private PipedInputStream compressedInputStream;
private InputStream activeInputStream;

public StreamingAsyncEntityProducer(InputStream sourceStream, ContentType contentType) {
this(sourceStream, contentType, false, false, 0, null);
}

public StreamingAsyncEntityProducer(InputStream sourceStream, ContentType contentType,
boolean compressData, boolean useHttpCompression,
int compressionBufferSize, LZ4Factory lz4Factory) {
this.sourceStream = sourceStream;
this.contentType = contentType;
this.compressData = compressData;
this.useHttpCompression = useHttpCompression;
this.compressionBufferSize = compressionBufferSize > 0 ? compressionBufferSize : ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE;
this.lz4Factory = lz4Factory;
this.readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
this.readBuffer.flip(); // Start empty
}

private void initializeStreams() throws IOException {
if (activeInputStream != null) {
return; // Already initialized
}

if (compressData && lz4Factory != null) {
// Setup compression pipeline: sourceStream → compress → pipedStream → NIO
PipedOutputStream compressedOutputStream = new PipedOutputStream();
compressedInputStream = new PipedInputStream(compressedOutputStream, PIPE_BUFFER_SIZE);
activeInputStream = compressedInputStream;

// Submit compression task to shared thread pool (or run inline if not available)
ExecutorService executor = getExecutor();
Runnable compressionTask = () -> {
try {
OutputStream compressingStream;
if (useHttpCompression) {
compressingStream = new org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream(compressedOutputStream);
} else {
compressingStream = new ClickHouseLZ4OutputStream(compressedOutputStream, lz4Factory.fastCompressor(), compressionBufferSize);
}

try {
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
int bytesRead;
while ((bytesRead = sourceStream.read(buffer)) != -1) {
compressingStream.write(buffer, 0, bytesRead);
}
} finally {
compressingStream.close();
compressedOutputStream.close();
}
} catch (IOException e) {
error.set(e);
try {
compressedOutputStream.close();
} catch (IOException ignored) {
}
try {
sourceStream.close();
} catch (IOException ignored) {
}
}
};

if (executor != null) {
executor.submit(compressionTask);
} else {
// Fallback: run compression in a new thread (less efficient but functional)
Thread t = new Thread(compressionTask, "ch-compress-fallback-" + THREAD_COUNTER.incrementAndGet());
t.setDaemon(true);
t.start();
}
} else {
// No compression - read directly from source
activeInputStream = sourceStream;
}
}

@Override
public boolean isRepeatable() {
return false; // Streaming is not repeatable
}

@Override
public String getContentType() {
return contentType != null ? contentType.toString() : null;
}

@Override
public long getContentLength() {
return -1; // Unknown length for streaming
}

@Override
public int available() {
try {
initializeStreams();
if (readBuffer.hasRemaining()) {
return readBuffer.remaining();
}
return activeInputStream.available();
} catch (IOException e) {
return 0;
}
}

@Override
public String getContentEncoding() {
return null; // Content-Encoding header is set separately
}

@Override
public boolean isChunked() {
return true; // Always chunked for streaming
}

@Override
public Set<String> getTrailerNames() {
return null;
}

@Override
public void produce(DataStreamChannel channel) throws IOException {
initializeStreams();

// Check for compression errors
Exception compressionError = error.get();
if (compressionError != null) {
throw new IOException("Compression failed", compressionError);
}

// If buffer has data, write it first
if (readBuffer.hasRemaining()) {
channel.write(readBuffer);
if (readBuffer.hasRemaining()) {
return; // Channel couldn't accept all data, will be called again
}
}

// Read more data from stream
readBuffer.clear();
byte[] array = readBuffer.array();
int bytesRead = activeInputStream.read(array, 0, array.length);

if (bytesRead == -1) {
// End of stream
completed.set(true);
channel.endStream();
} else if (bytesRead > 0) {
readBuffer.limit(bytesRead);
channel.write(readBuffer);
}
}

@Override
public void failed(Exception cause) {
LOG.debug("Streaming entity producer failed", cause);
error.set(cause);
releaseResources();
}

@Override
public void releaseResources() {
completed.set(true);
// Closing streams will cause any running compression task to fail and exit
try {
if (activeInputStream != null) {
activeInputStream.close();
}
if (sourceStream != activeInputStream) {
sourceStream.close();
}
} catch (IOException e) {
LOG.debug("Error closing streams", e);
}
}
}
Loading
Loading