forked from ClickHouse/clickhouse-java
-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/async http support #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
devdavidkarlsson
wants to merge
32
commits into
main
Choose a base branch
from
feature/async-http-support
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
787d858
Add async HTTP support using Apache HttpClient 5 async API (Phase 1)
devdavidkarlsson 860fe1b
Add streaming async response support (Phase 2)
devdavidkarlsson efef002
Add LZ4 request compression for async HTTP (Phase 3)
devdavidkarlsson 56d55cd
feat(async): Add streaming async inserts with compression support
devdavidkarlsson efecaef
docs: Add async HTTP documentation
devdavidkarlsson 768e857
Update client-v2/src/main/java/com/clickhouse/client/api/internal/Htt…
devdavidkarlsson d834acb
Update client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientT…
devdavidkarlsson 78fe78b
Update client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientT…
devdavidkarlsson ac12829
fix: address Copilot review comments for async HTTP support
devdavidkarlsson 9dcfd16
perf: use reusable buffer in StreamingAsyncResponseConsumer to reduce…
devdavidkarlsson 9a45872
fix: close StreamingResponse in readErrorFromStreamingResponse to pre…
devdavidkarlsson 2ff342c
fix: address additional Copilot review comments
devdavidkarlsson 234e347
fix: propagate cancellation to underlying async HTTP requests
devdavidkarlsson 5d5c6c8
fix: address Copilot review round 2
devdavidkarlsson c638218
Update CHANGELOG.md
devdavidkarlsson 2de0df1
Update client-v2/src/main/java/com/clickhouse/client/api/internal/Str…
devdavidkarlsson 0627b9e
Update client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientT…
devdavidkarlsson 9dae2eb
Update client-v2/src/main/java/com/clickhouse/client/api/internal/Htt…
devdavidkarlsson ceddd45
fix: address Copilot review round 3
devdavidkarlsson 060ef45
feat: improve async HTTP client production-readiness
devdavidkarlsson c4662af
fix: address Copilot review comments on static executors and test cle…
devdavidkarlsson f7a1633
feat: add proper lifecycle management for async executors
devdavidkarlsson 852cb9d
Update client-v2/src/main/java/com/clickhouse/client/api/Client.java
devdavidkarlsson ee1aeb1
Update client-v2/src/test/java/com/clickhouse/client/AsyncHttpClientT…
devdavidkarlsson ba7b17d
Update client-v2/src/test/resources/logback-test.xml
devdavidkarlsson a87204e
Update client-v2/src/main/java/com/clickhouse/client/api/internal/Htt…
devdavidkarlsson aa30e36
Update client-v2/src/main/java/com/clickhouse/client/api/Client.java
devdavidkarlsson 3cfc083
fix: properly handle 503 responses in async query/insert paths
devdavidkarlsson e57e89c
fix: update expected config count for new USE_ASYNC_HTTP property
devdavidkarlsson d02855c
fix: address Copilot review on executor lifecycle and error handling
devdavidkarlsson f89db10
Update client-v2/src/main/java/com/clickhouse/client/api/Client.java
devdavidkarlsson 942e8b7
Update client-v2/src/main/java/com/clickhouse/client/api/Client.java
devdavidkarlsson File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
353 changes: 351 additions & 2 deletions
353
client-v2/src/main/java/com/clickhouse/client/api/Client.java
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
683 changes: 680 additions & 3 deletions
683
client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
Large diffs are not rendered by default.
Oops, something went wrong.
315 changes: 315 additions & 0 deletions
315
client-v2/src/main/java/com/clickhouse/client/api/internal/StreamingAsyncEntityProducer.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,315 @@ | ||
| package com.clickhouse.client.api.internal; | ||
|
|
||
| import net.jpountz.lz4.LZ4Factory; | ||
| import org.apache.hc.core5.http.ContentType; | ||
| import org.apache.hc.core5.http.nio.AsyncEntityProducer; | ||
| import org.apache.hc.core5.http.nio.DataStreamChannel; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.io.OutputStream; | ||
| import java.io.PipedInputStream; | ||
| import java.io.PipedOutputStream; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
|
|
||
| /** | ||
| * Async entity producer that streams data from an InputStream with optional LZ4 compression. | ||
| * Supports on-the-fly compression without buffering the entire payload in memory. | ||
| * | ||
| * <p>For compression, data flows: User InputStream → Compression → PipedStream → NIO output</p> | ||
| */ | ||
| public class StreamingAsyncEntityProducer implements AsyncEntityProducer { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(StreamingAsyncEntityProducer.class); | ||
| private static final int DEFAULT_BUFFER_SIZE = 8 * 1024; // 8KB read buffer | ||
| private static final int PIPE_BUFFER_SIZE = 512 * 1024; // 512KB pipe buffer | ||
| private static final AtomicLong THREAD_COUNTER = new AtomicLong(0); // For unique thread names | ||
|
|
||
| /** | ||
| * Shared thread pool for compression tasks - bounded to prevent thread explosion under high concurrency. | ||
| * Uses daemon threads so it won't prevent JVM shutdown if shutdown is not called. | ||
| * | ||
| * <p>Lifecycle management: Call {@link #acquireExecutor()} when creating an async client and | ||
| * {@link #releaseExecutor()} when closing it. The pool is lazily created on first acquire and | ||
| * shut down when the last client releases it.</p> | ||
| */ | ||
| private static final int COMPRESSION_POOL_SIZE = Math.max(2, Runtime.getRuntime().availableProcessors()); | ||
| private static final Object EXECUTOR_LOCK = new Object(); | ||
| private static final AtomicInteger EXECUTOR_REF_COUNT = new AtomicInteger(0); | ||
| private static volatile ExecutorService compressionExecutor = null; | ||
|
|
||
| /** | ||
| * Acquires a reference to the shared compression executor. | ||
| * Call this when creating an async HTTP client that may use compression. | ||
| * Must be paired with a call to {@link #releaseExecutor()} when the client is closed. | ||
| * | ||
| * <p>Thread-safety: This method synchronizes on EXECUTOR_LOCK, ensuring mutual exclusion | ||
| * with releaseExecutor(). No race condition exists because a thread cannot enter | ||
| * releaseExecutor() while another is in acquireExecutor() (and vice versa).</p> | ||
| */ | ||
| public static void acquireExecutor() { | ||
| synchronized (EXECUTOR_LOCK) { | ||
| if (EXECUTOR_REF_COUNT.getAndIncrement() == 0) { | ||
| compressionExecutor = createCompressionExecutor(); | ||
| LOG.debug("Created compression executor pool"); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Releases a reference to the shared compression executor. | ||
| * When the last reference is released, the executor is gracefully shut down. | ||
| * | ||
| * <p>Thread-safety: This method synchronizes on EXECUTOR_LOCK, ensuring mutual exclusion | ||
| * with acquireExecutor(). The synchronized block guarantees that between checking the | ||
| * ref count and shutting down, no other thread can acquire a new reference.</p> | ||
| */ | ||
| public static void releaseExecutor() { | ||
| synchronized (EXECUTOR_LOCK) { | ||
| if (EXECUTOR_REF_COUNT.decrementAndGet() == 0 && compressionExecutor != null) { | ||
| LOG.debug("Shutting down compression executor pool"); | ||
| compressionExecutor.shutdown(); | ||
| try { | ||
| if (!compressionExecutor.awaitTermination(5, TimeUnit.SECONDS)) { | ||
| compressionExecutor.shutdownNow(); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| compressionExecutor.shutdownNow(); | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| compressionExecutor = null; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private static ExecutorService createCompressionExecutor() { | ||
| return new ThreadPoolExecutor( | ||
| COMPRESSION_POOL_SIZE, | ||
| COMPRESSION_POOL_SIZE, | ||
| 60L, TimeUnit.SECONDS, | ||
| new LinkedBlockingQueue<>(1000), // Bounded queue to provide backpressure | ||
| new ThreadFactory() { | ||
| @Override | ||
| public Thread newThread(Runnable r) { | ||
| Thread t = new Thread(r, "ch-async-compress-" + THREAD_COUNTER.incrementAndGet()); | ||
| t.setDaemon(true); | ||
| return t; | ||
| } | ||
| }, | ||
| new ThreadPoolExecutor.CallerRunsPolicy() // If queue full, run in caller thread (backpressure) | ||
| ); | ||
| } | ||
|
|
||
| private static ExecutorService getExecutor() { | ||
| ExecutorService exec = compressionExecutor; | ||
| if (exec == null || exec.isShutdown()) { | ||
| // Fallback: if executor not acquired properly, create inline (caller's thread) | ||
| // This handles edge cases but logs a warning | ||
| LOG.warn("Compression executor not acquired - compression will run in caller thread"); | ||
| return null; | ||
| } | ||
| return exec; | ||
| } | ||
|
|
||
devdavidkarlsson marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private final ContentType contentType; | ||
| private final InputStream sourceStream; | ||
| private final boolean compressData; | ||
| private final boolean useHttpCompression; | ||
| private final int compressionBufferSize; | ||
| private final LZ4Factory lz4Factory; | ||
|
|
||
| private final ByteBuffer readBuffer; | ||
| private final AtomicBoolean completed = new AtomicBoolean(false); | ||
| private final AtomicReference<Exception> error = new AtomicReference<>(); | ||
|
|
||
| // For compression: compress in thread pool, read compressed data here | ||
| private PipedInputStream compressedInputStream; | ||
| private InputStream activeInputStream; | ||
|
|
||
| public StreamingAsyncEntityProducer(InputStream sourceStream, ContentType contentType) { | ||
| this(sourceStream, contentType, false, false, 0, null); | ||
| } | ||
|
|
||
| public StreamingAsyncEntityProducer(InputStream sourceStream, ContentType contentType, | ||
| boolean compressData, boolean useHttpCompression, | ||
| int compressionBufferSize, LZ4Factory lz4Factory) { | ||
| this.sourceStream = sourceStream; | ||
| this.contentType = contentType; | ||
| this.compressData = compressData; | ||
| this.useHttpCompression = useHttpCompression; | ||
| this.compressionBufferSize = compressionBufferSize > 0 ? compressionBufferSize : ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE; | ||
| this.lz4Factory = lz4Factory; | ||
| this.readBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); | ||
| this.readBuffer.flip(); // Start empty | ||
| } | ||
|
|
||
| private void initializeStreams() throws IOException { | ||
| if (activeInputStream != null) { | ||
| return; // Already initialized | ||
| } | ||
|
|
||
| if (compressData && lz4Factory != null) { | ||
| // Setup compression pipeline: sourceStream → compress → pipedStream → NIO | ||
| PipedOutputStream compressedOutputStream = new PipedOutputStream(); | ||
| compressedInputStream = new PipedInputStream(compressedOutputStream, PIPE_BUFFER_SIZE); | ||
| activeInputStream = compressedInputStream; | ||
|
|
||
| // Submit compression task to shared thread pool (or run inline if not available) | ||
| ExecutorService executor = getExecutor(); | ||
| Runnable compressionTask = () -> { | ||
| try { | ||
| OutputStream compressingStream; | ||
| if (useHttpCompression) { | ||
| compressingStream = new org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream(compressedOutputStream); | ||
| } else { | ||
| compressingStream = new ClickHouseLZ4OutputStream(compressedOutputStream, lz4Factory.fastCompressor(), compressionBufferSize); | ||
| } | ||
|
|
||
| try { | ||
| byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; | ||
| int bytesRead; | ||
| while ((bytesRead = sourceStream.read(buffer)) != -1) { | ||
| compressingStream.write(buffer, 0, bytesRead); | ||
| } | ||
| } finally { | ||
| compressingStream.close(); | ||
| compressedOutputStream.close(); | ||
| } | ||
| } catch (IOException e) { | ||
| error.set(e); | ||
| try { | ||
| compressedOutputStream.close(); | ||
| } catch (IOException ignored) { | ||
| } | ||
| try { | ||
| sourceStream.close(); | ||
| } catch (IOException ignored) { | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| if (executor != null) { | ||
| executor.submit(compressionTask); | ||
| } else { | ||
| // Fallback: run compression in a new thread (less efficient but functional) | ||
| Thread t = new Thread(compressionTask, "ch-compress-fallback-" + THREAD_COUNTER.incrementAndGet()); | ||
| t.setDaemon(true); | ||
| t.start(); | ||
| } | ||
| } else { | ||
| // No compression - read directly from source | ||
| activeInputStream = sourceStream; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isRepeatable() { | ||
| return false; // Streaming is not repeatable | ||
| } | ||
|
|
||
| @Override | ||
| public String getContentType() { | ||
| return contentType != null ? contentType.toString() : null; | ||
| } | ||
|
|
||
| @Override | ||
| public long getContentLength() { | ||
| return -1; // Unknown length for streaming | ||
| } | ||
|
|
||
| @Override | ||
| public int available() { | ||
| try { | ||
| initializeStreams(); | ||
| if (readBuffer.hasRemaining()) { | ||
| return readBuffer.remaining(); | ||
| } | ||
| return activeInputStream.available(); | ||
| } catch (IOException e) { | ||
| return 0; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public String getContentEncoding() { | ||
| return null; // Content-Encoding header is set separately | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isChunked() { | ||
| return true; // Always chunked for streaming | ||
| } | ||
|
|
||
| @Override | ||
| public Set<String> getTrailerNames() { | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public void produce(DataStreamChannel channel) throws IOException { | ||
| initializeStreams(); | ||
|
|
||
| // Check for compression errors | ||
| Exception compressionError = error.get(); | ||
| if (compressionError != null) { | ||
| throw new IOException("Compression failed", compressionError); | ||
| } | ||
|
|
||
| // If buffer has data, write it first | ||
| if (readBuffer.hasRemaining()) { | ||
| channel.write(readBuffer); | ||
| if (readBuffer.hasRemaining()) { | ||
| return; // Channel couldn't accept all data, will be called again | ||
| } | ||
| } | ||
|
|
||
| // Read more data from stream | ||
| readBuffer.clear(); | ||
| byte[] array = readBuffer.array(); | ||
| int bytesRead = activeInputStream.read(array, 0, array.length); | ||
|
|
||
| if (bytesRead == -1) { | ||
| // End of stream | ||
| completed.set(true); | ||
| channel.endStream(); | ||
| } else if (bytesRead > 0) { | ||
| readBuffer.limit(bytesRead); | ||
| channel.write(readBuffer); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void failed(Exception cause) { | ||
| LOG.debug("Streaming entity producer failed", cause); | ||
| error.set(cause); | ||
| releaseResources(); | ||
| } | ||
|
|
||
| @Override | ||
| public void releaseResources() { | ||
| completed.set(true); | ||
| // Closing streams will cause any running compression task to fail and exit | ||
| try { | ||
| if (activeInputStream != null) { | ||
| activeInputStream.close(); | ||
| } | ||
| if (sourceStream != activeInputStream) { | ||
| sourceStream.close(); | ||
| } | ||
| } catch (IOException e) { | ||
| LOG.debug("Error closing streams", e); | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The acquireExecutor and releaseExecutor methods have a race condition when the reference count becomes zero. Between checking if EXECUTOR_REF_COUNT is 0 (line 73) and shutting down the executor (line 75), another thread could call acquireExecutor (line 60), see the count is 0, and try to create a new executor while this thread is shutting it down. This could lead to shutting down an executor that just had a new reference acquired. Consider checking the ref count again after it becomes 0 to ensure no concurrent acquires happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The synchronized block on EXECUTOR_LOCK actually prevents this race condition - a thread cannot enter releaseExecutor() while another is in acquireExecutor() (and vice versa) because both methods synchronize on the same lock. Added documentation in commit d02855c explaining the thread-safety guarantees.