From 41b7753bb2e10bb35667b1b61fbbc30741d3ba11 Mon Sep 17 00:00:00 2001
From: Ravi <13908473+rkdfx@users.noreply.github.com>
Date: Fri, 29 May 2026 22:02:12 +0530
Subject: [PATCH] CAMEL-23450: Remove unmaintained junit-toolbox from
camel-grpc and camel-thrift tests
Signed-off-by: Ravi <13908473+rkdfx@users.noreply.github.com>
---
components/camel-grpc/pom.xml | 12 --
.../grpc/GrpcConsumerConcurrentTest.java | 159 ++++++++++--------
components/camel-thrift/pom.xml | 12 --
.../thrift/ThriftConsumerConcurrentTest.java | 126 +++++++-------
parent/pom.xml | 1 -
5 files changed, 153 insertions(+), 157 deletions(-)
diff --git a/components/camel-grpc/pom.xml b/components/camel-grpc/pom.xml
index 9d5677a065f08..9d0834ba099bb 100644
--- a/components/camel-grpc/pom.xml
+++ b/components/camel-grpc/pom.xml
@@ -115,18 +115,6 @@
${mockito-version}
test
-
- com.googlecode.junit-toolbox
- junit-toolbox
- ${junit-toolbox-version}
- test
-
-
- junit
- junit
-
-
-
javax.annotation
diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java
index dbac2d3a16c71..e4b7b8b7df29e 100644
--- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java
+++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java
@@ -16,13 +16,16 @@
*/
package org.apache.camel.component.grpc;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import com.googlecode.junittoolbox.MultithreadingTester;
-import com.googlecode.junittoolbox.RunnableAssert;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
@@ -56,87 +59,95 @@ public static Integer getId() {
}
@Test
- public void testAsyncWithConcurrentThreads() {
+ public void testAsyncWithConcurrentThreads() throws Exception {
int asyncPort = getRoutePort("grpc-async");
- RunnableAssert ra = new RunnableAssert("foo") {
-
- @Override
- public void run() {
- final CountDownLatch latch = new CountDownLatch(1);
- ManagedChannel asyncRequestChannel
- = NettyChannelBuilder.forAddress("localhost", asyncPort).usePlaintext()
- .build();
- PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);
-
- PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);
- int instanceId = createId();
-
- final PingRequest pingRequest
- = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
- StreamObserver requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver);
- requestObserver.onNext(pingRequest);
- requestObserver.onNext(pingRequest);
- requestObserver.onCompleted();
- try {
- assertTrue(latch.await(5, TimeUnit.SECONDS));
- } catch (InterruptedException e) {
- LOG.debug("Unhandled exception (probably safe to ignore): {}", e.getMessage(), e);
- }
-
- PongResponse pongResponse = responseObserver.getPongResponse();
-
- assertNotNull(pongResponse, "instanceId = " + instanceId);
- assertEquals(instanceId, pongResponse.getPongId());
- assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName());
-
- asyncRequestChannel.shutdown().shutdownNow();
+ runConcurrent(() -> {
+ final CountDownLatch latch = new CountDownLatch(1);
+ ManagedChannel asyncRequestChannel
+ = NettyChannelBuilder.forAddress("localhost", asyncPort).usePlaintext()
+ .build();
+ PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);
+
+ PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);
+ int instanceId = createId();
+
+ final PingRequest pingRequest
+ = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
+ StreamObserver requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver);
+ requestObserver.onNext(pingRequest);
+ requestObserver.onNext(pingRequest);
+ requestObserver.onCompleted();
+ try {
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ LOG.debug("Unhandled exception (probably safe to ignore): {}", e.getMessage(), e);
}
- };
- new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT)
- .run();
+ PongResponse pongResponse = responseObserver.getPongResponse();
+
+ assertNotNull(pongResponse, "instanceId = " + instanceId);
+ assertEquals(instanceId, pongResponse.getPongId());
+ assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName());
+
+ asyncRequestChannel.shutdown().shutdownNow();
+ return null;
+ });
}
@Test
- public void testHeadersWithConcurrentThreads() {
+ public void testHeadersWithConcurrentThreads() throws Exception {
int headersPort = getRoutePort("grpc-headers");
- RunnableAssert ra = new RunnableAssert("foo") {
-
- @Override
- public void run() {
- int instanceId = createId();
- final CountDownLatch latch = new CountDownLatch(1);
- ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", headersPort)
- .userAgent(GRPC_USER_AGENT_PREFIX + instanceId)
- .usePlaintext().build();
- PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);
-
- PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);
-
- final PingRequest pingRequest
- = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
- StreamObserver requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver);
- requestObserver.onNext(pingRequest);
- requestObserver.onNext(pingRequest);
- requestObserver.onCompleted();
- try {
- assertTrue(latch.await(5, TimeUnit.SECONDS));
- } catch (InterruptedException e) {
- LOG.debug("Interrupted while waiting for the response", e);
- }
-
- PongResponse pongResponse = responseObserver.getPongResponse();
-
- assertNotNull(pongResponse, "instanceId = " + instanceId);
- assertEquals(instanceId, pongResponse.getPongId());
- assertEquals(GRPC_USER_AGENT_PREFIX + instanceId, pongResponse.getPongName());
-
- asyncRequestChannel.shutdown().shutdownNow();
+ runConcurrent(() -> {
+ int instanceId = createId();
+ final CountDownLatch latch = new CountDownLatch(1);
+ ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", headersPort)
+ .userAgent(GRPC_USER_AGENT_PREFIX + instanceId)
+ .usePlaintext().build();
+ PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);
+
+ PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);
+
+ final PingRequest pingRequest
+ = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
+ StreamObserver requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver);
+ requestObserver.onNext(pingRequest);
+ requestObserver.onNext(pingRequest);
+ requestObserver.onCompleted();
+ try {
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while waiting for the response", e);
}
- };
- new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT)
- .run();
+ PongResponse pongResponse = responseObserver.getPongResponse();
+
+ assertNotNull(pongResponse, "instanceId = " + instanceId);
+ assertEquals(instanceId, pongResponse.getPongId());
+ assertEquals(GRPC_USER_AGENT_PREFIX + instanceId, pongResponse.getPongName());
+
+ asyncRequestChannel.shutdown().shutdownNow();
+ return null;
+ });
+ }
+
+ private void runConcurrent(Callable> task) throws Exception {
+ ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_THREAD_COUNT);
+ try {
+ List> futures = new ArrayList<>();
+ for (int thread = 0; thread < CONCURRENT_THREAD_COUNT; thread++) {
+ futures.add(executor.submit(() -> {
+ for (int round = 0; round < ROUNDS_PER_THREAD_COUNT; round++) {
+ task.call();
+ }
+ return null;
+ }));
+ }
+ for (Future> future : futures) {
+ future.get(1, TimeUnit.MINUTES);
+ }
+ } finally {
+ executor.shutdownNow();
+ }
}
@Override
diff --git a/components/camel-thrift/pom.xml b/components/camel-thrift/pom.xml
index c9d3c38eddea6..5aa3b78168d38 100644
--- a/components/camel-thrift/pom.xml
+++ b/components/camel-thrift/pom.xml
@@ -75,18 +75,6 @@
gson
test
-
- com.googlecode.junit-toolbox
- junit-toolbox
- ${junit-toolbox-version}
- test
-
-
- junit
- junit
-
-
-
diff --git a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java
index b743ded59022a..63f071c893fe6 100644
--- a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java
+++ b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java
@@ -16,13 +16,16 @@
*/
package org.apache.camel.component.thrift;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import com.googlecode.junittoolbox.MultithreadingTester;
-import com.googlecode.junittoolbox.RunnableAssert;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.thrift.generated.Calculator;
import org.apache.camel.component.thrift.generated.Operation;
@@ -37,7 +40,6 @@
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -68,68 +70,76 @@ public static Integer getId() {
}
@Test
- public void testSyncWithConcurrentThreads() {
- RunnableAssert ra = new RunnableAssert("testSyncWithConcurrentThreads") {
-
- @Override
- public void run() throws TTransportException {
- TTransport transport = new TSocket("localhost", getPortForRoute(0));
- transport.open();
- TProtocol protocol = new TBinaryProtocol(new TFramedTransport(transport));
- Calculator.Client client = (new Calculator.Client.Factory()).getClient(protocol);
-
- int instanceId = createId();
+ public void testSyncWithConcurrentThreads() throws Exception {
+ runConcurrent(() -> {
+ TTransport transport = new TSocket("localhost", getPortForRoute(0));
+ transport.open();
+ TProtocol protocol = new TBinaryProtocol(new TFramedTransport(transport));
+ Calculator.Client client = (new Calculator.Client.Factory()).getClient(protocol);
+
+ int instanceId = createId();
+
+ int calculateResponse = 0;
+ try {
+ calculateResponse = client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY));
+ } catch (TException e) {
+ LOG.info("Exception", e);
+ }
- int calculateResponse = 0;
- try {
- calculateResponse = client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY));
- } catch (TException e) {
- LOG.info("Exception", e);
- }
+ assertNotEquals(0, calculateResponse, "instanceId = " + instanceId);
+ assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);
- assertNotEquals(0, calculateResponse, "instanceId = " + instanceId);
- assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);
+ transport.close();
+ return null;
+ });
+ }
- transport.close();
+ @Test
+ public void testAsyncWithConcurrentThreads() throws Exception {
+ runConcurrent(() -> {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ TNonblockingTransport transport = new TNonblockingSocket("localhost", getPortForRoute(1));
+ Calculator.AsyncClient client
+ = (new Calculator.AsyncClient.Factory(new TAsyncClientManager(), new TBinaryProtocol.Factory()))
+ .getAsyncClient(transport);
+
+ int instanceId = createId();
+ CalculateAsyncMethodCallback calculateCallback = new CalculateAsyncMethodCallback(latch);
+ try {
+ client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY), calculateCallback);
+ } catch (TException e) {
+ LOG.info("Exception", e);
}
- };
+ latch.await(5, TimeUnit.SECONDS);
- new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT)
- .run();
- }
+ int calculateResponse = calculateCallback.getCalculateResponse();
+ LOG.debug("instanceId = {}", instanceId);
+ assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);
- @Test
- public void testAsyncWithConcurrentThreads() {
- RunnableAssert ra = new RunnableAssert("testAsyncWithConcurrentThreads") {
+ transport.close();
+ return null;
+ });
+ }
- @Override
- public void run() throws TTransportException, IOException, InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
-
- TNonblockingTransport transport = new TNonblockingSocket("localhost", getPortForRoute(1));
- Calculator.AsyncClient client
- = (new Calculator.AsyncClient.Factory(new TAsyncClientManager(), new TBinaryProtocol.Factory()))
- .getAsyncClient(transport);
-
- int instanceId = createId();
- CalculateAsyncMethodCallback calculateCallback = new CalculateAsyncMethodCallback(latch);
- try {
- client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY), calculateCallback);
- } catch (TException e) {
- LOG.info("Exception", e);
- }
- latch.await(5, TimeUnit.SECONDS);
-
- int calculateResponse = calculateCallback.getCalculateResponse();
- LOG.debug("instanceId = {}", instanceId);
- assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);
-
- transport.close();
+ private void runConcurrent(Callable> task) throws Exception {
+ ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_THREAD_COUNT);
+ try {
+ List> futures = new ArrayList<>();
+ for (int thread = 0; thread < CONCURRENT_THREAD_COUNT; thread++) {
+ futures.add(executor.submit(() -> {
+ for (int round = 0; round < ROUNDS_PER_THREAD_COUNT; round++) {
+ task.call();
+ }
+ return null;
+ }));
}
- };
-
- new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT)
- .run();
+ for (Future> future : futures) {
+ future.get(1, TimeUnit.MINUTES);
+ }
+ } finally {
+ executor.shutdownNow();
+ }
}
public class CalculateAsyncMethodCallback implements AsyncMethodCallback {
diff --git a/parent/pom.xml b/parent/pom.xml
index f271454e3e6b5..a7559b181cae9 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -325,7 +325,6 @@
0.13.4
21.0.6
3.2.4
- 2.4
5.13.4
6.1.0
2.3.0