From 41b7753bb2e10bb35667b1b61fbbc30741d3ba11 Mon Sep 17 00:00:00 2001 From: Ravi <13908473+rkdfx@users.noreply.github.com> Date: Fri, 29 May 2026 22:02:12 +0530 Subject: [PATCH] CAMEL-23450: Remove unmaintained junit-toolbox from camel-grpc and camel-thrift tests Signed-off-by: Ravi <13908473+rkdfx@users.noreply.github.com> --- components/camel-grpc/pom.xml | 12 -- .../grpc/GrpcConsumerConcurrentTest.java | 159 ++++++++++-------- components/camel-thrift/pom.xml | 12 -- .../thrift/ThriftConsumerConcurrentTest.java | 126 +++++++------- parent/pom.xml | 1 - 5 files changed, 153 insertions(+), 157 deletions(-) diff --git a/components/camel-grpc/pom.xml b/components/camel-grpc/pom.xml index 9d5677a065f08..9d0834ba099bb 100644 --- a/components/camel-grpc/pom.xml +++ b/components/camel-grpc/pom.xml @@ -115,18 +115,6 @@ ${mockito-version} test - - com.googlecode.junit-toolbox - junit-toolbox - ${junit-toolbox-version} - test - - - junit - junit - - - javax.annotation diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java index dbac2d3a16c71..e4b7b8b7df29e 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java @@ -16,13 +16,16 @@ */ package org.apache.camel.component.grpc; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import com.googlecode.junittoolbox.MultithreadingTester; -import com.googlecode.junittoolbox.RunnableAssert; import io.grpc.ManagedChannel; import io.grpc.netty.NettyChannelBuilder; import io.grpc.stub.StreamObserver; @@ -56,87 +59,95 @@ public static Integer getId() { } @Test - public void testAsyncWithConcurrentThreads() { + public void testAsyncWithConcurrentThreads() throws Exception { int asyncPort = getRoutePort("grpc-async"); - RunnableAssert ra = new RunnableAssert("foo") { - - @Override - public void run() { - final CountDownLatch latch = new CountDownLatch(1); - ManagedChannel asyncRequestChannel - = NettyChannelBuilder.forAddress("localhost", asyncPort).usePlaintext() - .build(); - PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); - - PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); - int instanceId = createId(); - - final PingRequest pingRequest - = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build(); - StreamObserver requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); - requestObserver.onNext(pingRequest); - requestObserver.onNext(pingRequest); - requestObserver.onCompleted(); - try { - assertTrue(latch.await(5, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - LOG.debug("Unhandled exception (probably safe to ignore): {}", e.getMessage(), e); - } - - PongResponse pongResponse = responseObserver.getPongResponse(); - - assertNotNull(pongResponse, "instanceId = " + instanceId); - assertEquals(instanceId, pongResponse.getPongId()); - assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); - - asyncRequestChannel.shutdown().shutdownNow(); + runConcurrent(() -> { + final CountDownLatch latch = new CountDownLatch(1); + ManagedChannel asyncRequestChannel + = NettyChannelBuilder.forAddress("localhost", asyncPort).usePlaintext() + .build(); + PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); + + PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); + int instanceId = createId(); + + final PingRequest pingRequest + = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build(); + StreamObserver requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); + requestObserver.onNext(pingRequest); + requestObserver.onNext(pingRequest); + requestObserver.onCompleted(); + try { + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + LOG.debug("Unhandled exception (probably safe to ignore): {}", e.getMessage(), e); } - }; - new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT) - .run(); + PongResponse pongResponse = responseObserver.getPongResponse(); + + assertNotNull(pongResponse, "instanceId = " + instanceId); + assertEquals(instanceId, pongResponse.getPongId()); + assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName()); + + asyncRequestChannel.shutdown().shutdownNow(); + return null; + }); } @Test - public void testHeadersWithConcurrentThreads() { + public void testHeadersWithConcurrentThreads() throws Exception { int headersPort = getRoutePort("grpc-headers"); - RunnableAssert ra = new RunnableAssert("foo") { - - @Override - public void run() { - int instanceId = createId(); - final CountDownLatch latch = new CountDownLatch(1); - ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", headersPort) - .userAgent(GRPC_USER_AGENT_PREFIX + instanceId) - .usePlaintext().build(); - PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); - - PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); - - final PingRequest pingRequest - = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build(); - StreamObserver requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); - requestObserver.onNext(pingRequest); - requestObserver.onNext(pingRequest); - requestObserver.onCompleted(); - try { - assertTrue(latch.await(5, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - LOG.debug("Interrupted while waiting for the response", e); - } - - PongResponse pongResponse = responseObserver.getPongResponse(); - - assertNotNull(pongResponse, "instanceId = " + instanceId); - assertEquals(instanceId, pongResponse.getPongId()); - assertEquals(GRPC_USER_AGENT_PREFIX + instanceId, pongResponse.getPongName()); - - asyncRequestChannel.shutdown().shutdownNow(); + runConcurrent(() -> { + int instanceId = createId(); + final CountDownLatch latch = new CountDownLatch(1); + ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", headersPort) + .userAgent(GRPC_USER_AGENT_PREFIX + instanceId) + .usePlaintext().build(); + PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); + + PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); + + final PingRequest pingRequest + = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build(); + StreamObserver requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver); + requestObserver.onNext(pingRequest); + requestObserver.onNext(pingRequest); + requestObserver.onCompleted(); + try { + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + LOG.debug("Interrupted while waiting for the response", e); } - }; - new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT) - .run(); + PongResponse pongResponse = responseObserver.getPongResponse(); + + assertNotNull(pongResponse, "instanceId = " + instanceId); + assertEquals(instanceId, pongResponse.getPongId()); + assertEquals(GRPC_USER_AGENT_PREFIX + instanceId, pongResponse.getPongName()); + + asyncRequestChannel.shutdown().shutdownNow(); + return null; + }); + } + + private void runConcurrent(Callable task) throws Exception { + ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_THREAD_COUNT); + try { + List> futures = new ArrayList<>(); + for (int thread = 0; thread < CONCURRENT_THREAD_COUNT; thread++) { + futures.add(executor.submit(() -> { + for (int round = 0; round < ROUNDS_PER_THREAD_COUNT; round++) { + task.call(); + } + return null; + })); + } + for (Future future : futures) { + future.get(1, TimeUnit.MINUTES); + } + } finally { + executor.shutdownNow(); + } } @Override diff --git a/components/camel-thrift/pom.xml b/components/camel-thrift/pom.xml index c9d3c38eddea6..5aa3b78168d38 100644 --- a/components/camel-thrift/pom.xml +++ b/components/camel-thrift/pom.xml @@ -75,18 +75,6 @@ gson test - - com.googlecode.junit-toolbox - junit-toolbox - ${junit-toolbox-version} - test - - - junit - junit - - - diff --git a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java index b743ded59022a..63f071c893fe6 100644 --- a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java +++ b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java @@ -16,13 +16,16 @@ */ package org.apache.camel.component.thrift; -import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import com.googlecode.junittoolbox.MultithreadingTester; -import com.googlecode.junittoolbox.RunnableAssert; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.thrift.generated.Calculator; import org.apache.camel.component.thrift.generated.Operation; @@ -37,7 +40,6 @@ import org.apache.thrift.transport.TNonblockingTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.layered.TFramedTransport; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -68,68 +70,76 @@ public static Integer getId() { } @Test - public void testSyncWithConcurrentThreads() { - RunnableAssert ra = new RunnableAssert("testSyncWithConcurrentThreads") { - - @Override - public void run() throws TTransportException { - TTransport transport = new TSocket("localhost", getPortForRoute(0)); - transport.open(); - TProtocol protocol = new TBinaryProtocol(new TFramedTransport(transport)); - Calculator.Client client = (new Calculator.Client.Factory()).getClient(protocol); - - int instanceId = createId(); + public void testSyncWithConcurrentThreads() throws Exception { + runConcurrent(() -> { + TTransport transport = new TSocket("localhost", getPortForRoute(0)); + transport.open(); + TProtocol protocol = new TBinaryProtocol(new TFramedTransport(transport)); + Calculator.Client client = (new Calculator.Client.Factory()).getClient(protocol); + + int instanceId = createId(); + + int calculateResponse = 0; + try { + calculateResponse = client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY)); + } catch (TException e) { + LOG.info("Exception", e); + } - int calculateResponse = 0; - try { - calculateResponse = client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY)); - } catch (TException e) { - LOG.info("Exception", e); - } + assertNotEquals(0, calculateResponse, "instanceId = " + instanceId); + assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse); - assertNotEquals(0, calculateResponse, "instanceId = " + instanceId); - assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse); + transport.close(); + return null; + }); + } - transport.close(); + @Test + public void testAsyncWithConcurrentThreads() throws Exception { + runConcurrent(() -> { + final CountDownLatch latch = new CountDownLatch(1); + + TNonblockingTransport transport = new TNonblockingSocket("localhost", getPortForRoute(1)); + Calculator.AsyncClient client + = (new Calculator.AsyncClient.Factory(new TAsyncClientManager(), new TBinaryProtocol.Factory())) + .getAsyncClient(transport); + + int instanceId = createId(); + CalculateAsyncMethodCallback calculateCallback = new CalculateAsyncMethodCallback(latch); + try { + client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY), calculateCallback); + } catch (TException e) { + LOG.info("Exception", e); } - }; + latch.await(5, TimeUnit.SECONDS); - new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT) - .run(); - } + int calculateResponse = calculateCallback.getCalculateResponse(); + LOG.debug("instanceId = {}", instanceId); + assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse); - @Test - public void testAsyncWithConcurrentThreads() { - RunnableAssert ra = new RunnableAssert("testAsyncWithConcurrentThreads") { + transport.close(); + return null; + }); + } - @Override - public void run() throws TTransportException, IOException, InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - - TNonblockingTransport transport = new TNonblockingSocket("localhost", getPortForRoute(1)); - Calculator.AsyncClient client - = (new Calculator.AsyncClient.Factory(new TAsyncClientManager(), new TBinaryProtocol.Factory())) - .getAsyncClient(transport); - - int instanceId = createId(); - CalculateAsyncMethodCallback calculateCallback = new CalculateAsyncMethodCallback(latch); - try { - client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY), calculateCallback); - } catch (TException e) { - LOG.info("Exception", e); - } - latch.await(5, TimeUnit.SECONDS); - - int calculateResponse = calculateCallback.getCalculateResponse(); - LOG.debug("instanceId = {}", instanceId); - assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse); - - transport.close(); + private void runConcurrent(Callable task) throws Exception { + ExecutorService executor = Executors.newFixedThreadPool(CONCURRENT_THREAD_COUNT); + try { + List> futures = new ArrayList<>(); + for (int thread = 0; thread < CONCURRENT_THREAD_COUNT; thread++) { + futures.add(executor.submit(() -> { + for (int round = 0; round < ROUNDS_PER_THREAD_COUNT; round++) { + task.call(); + } + return null; + })); } - }; - - new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT) - .run(); + for (Future future : futures) { + future.get(1, TimeUnit.MINUTES); + } + } finally { + executor.shutdownNow(); + } } public class CalculateAsyncMethodCallback implements AsyncMethodCallback { diff --git a/parent/pom.xml b/parent/pom.xml index f271454e3e6b5..a7559b181cae9 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -325,7 +325,6 @@ 0.13.4 21.0.6 3.2.4 - 2.4 5.13.4 6.1.0 2.3.0