diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 8b1a99a6b668..9e2f91e6fd11 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; import java.io.UncheckedIOException; import java.util.ArrayList; @@ -63,8 +64,16 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.util.Time; +import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller; +import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat; +import org.apache.ratis.thirdparty.io.grpc.CallOptions; +import org.apache.ratis.thirdparty.io.grpc.Channel; +import org.apache.ratis.thirdparty.io.grpc.ClientCall; +import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor; +import org.apache.ratis.thirdparty.io.grpc.ClientInterceptors; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; +import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor; import org.apache.ratis.thirdparty.io.grpc.Status; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; @@ -91,6 +100,83 @@ public class XceiverClientGrpc extends XceiverClientSpi { private static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); private static final int SHUTDOWN_WAIT_INTERVAL_MILLIS = 100; private static final int SHUTDOWN_WAIT_MAX_SECONDS = 5; + + /** + * Zero-copy marshaller for inbound {@link ContainerCommandResponseProto}. + * For {@code ReadChunk} responses, the parsed proto's bytes fields alias + * the inbound Netty buffer (avoiding a chunk-sized memcpy); the buffer is + * held until {@link ChunkInputStream} releases it via + * {@link #releaseReceivedResponse}. Other response types are deep-copied + * inside the wrapping marshaller and released immediately, so callers + * never observe a buffer-aliased non-{@code ReadChunk} response. + */ + private static final ZeroCopyMessageMarshaller + ZERO_COPY_RESPONSE_MARSHALLER = new ZeroCopyMessageMarshaller<>( + ContainerCommandResponseProto.getDefaultInstance()); + + /** + * Marshaller used as the response marshaller of the {@code send} method. + * Delegates outbound {@code stream()} to the default, and inbound + * {@code parse()} to the zero-copy marshaller. After a successful + * zero-copy parse, non-{@code ReadChunk} responses are deep-copied and + * the original buffer-aliased response is released, so only + * {@code ReadChunk} responses retain the Netty buffer until the caller + * is done with the chunk bytes. + */ + private static final MethodDescriptor.Marshaller + RESPONSE_MARSHALLER = + new MethodDescriptor.Marshaller() { + @Override + public InputStream stream(ContainerCommandResponseProto value) { + return ZERO_COPY_RESPONSE_MARSHALLER.stream(value); + } + + @Override + public ContainerCommandResponseProto parse(InputStream stream) { + ContainerCommandResponseProto resp = + ZERO_COPY_RESPONSE_MARSHALLER.parse(stream); + if (resp == null + || resp.getCmdType() == ContainerProtos.Type.ReadChunk) { + return resp; + } + // Materialize all bytes fields into byte[]-backed ByteStrings so + // the response is independent of the Netty inbound buffer, then + // release the underlying buffer back to the pool. + try { + return ContainerCommandResponseProto.parseFrom(resp.toByteArray()); + } catch (InvalidProtocolBufferException e) { + throw Status.INTERNAL.withDescription("Failed to materialize response") + .withCause(e).asRuntimeException(); + } finally { + ZERO_COPY_RESPONSE_MARSHALLER.release(resp); + } + } + }; + + /** + * gRPC interceptor that installs {@link #RESPONSE_MARSHALLER} as the + * response marshaller for the {@code send} bidi-streaming method of the + * dedicated zero-copy stub. + */ + private static final ClientInterceptor ZERO_COPY_INTERCEPTOR = + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, + CallOptions callOptions, Channel next) { + if (XceiverClientProtocolServiceGrpc.getSendMethod() + .getFullMethodName().equals(method.getFullMethodName())) { + @SuppressWarnings("unchecked") + final MethodDescriptor.Marshaller respMarshaller = + (MethodDescriptor.Marshaller) RESPONSE_MARSHALLER; + method = method.toBuilder() + .setResponseMarshaller(respMarshaller) + .build(); + } + return next.newCall(method, callOptions); + } + }; + private final Pipeline pipeline; private final ConfigurationSource config; private final XceiverClientMetrics metrics; @@ -196,7 +282,10 @@ private ChannelInfo generateNewChannel(DatanodeDetails dn) throws IOException { ManagedChannel channel = createChannel(dn, port).build(); XceiverClientProtocolServiceStub stub = XceiverClientProtocolServiceGrpc.newStub(channel); - return new ChannelInfo(channel, stub); + XceiverClientProtocolServiceStub zeroCopyStub = + XceiverClientProtocolServiceGrpc.newStub( + ClientInterceptors.intercept(channel, ZERO_COPY_INTERCEPTOR)); + return new ChannelInfo(channel, stub, zeroCopyStub); } protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port) @@ -305,18 +394,7 @@ public Pipeline getPipeline() { @Override public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request) throws IOException { - try { - return sendCommandWithTraceIDAndRetry(request, null). - getResponse().get(); - } catch (ExecutionException e) { - throw getIOExceptionForSendCommand(request, e); - } catch (InterruptedException e) { - LOG.error("Command execution was interrupted."); - Thread.currentThread().interrupt(); - throw (IOException) new InterruptedIOException( - "Command " + processForDebug(request) + " was interrupted.") - .initCause(e); - } + return sendCommandAndWait(request, null, false); } @Override @@ -396,8 +474,22 @@ private ContainerCommandRequestProto reconstructRequestIfNeeded( public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request, List validators) throws IOException { + return sendCommandAndWait(request, validators, false); + } + + @Override + public ContainerCommandResponseProto sendCommandWithZeroCopy( + ContainerCommandRequestProto request, List validators) + throws IOException { + return sendCommandAndWait(request, validators, true); + } + + private ContainerCommandResponseProto sendCommandAndWait( + ContainerCommandRequestProto request, List validators, + boolean zeroCopy) throws IOException { try { - XceiverClientReply reply = sendCommandWithTraceIDAndRetry(request, validators); + XceiverClientReply reply = + sendCommandWithTraceIDAndRetry(request, validators, zeroCopy); return reply.getResponse().get(); } catch (ExecutionException e) { throw getIOExceptionForSendCommand(request, e); @@ -411,7 +503,8 @@ public ContainerCommandResponseProto sendCommand( } private XceiverClientReply sendCommandWithTraceIDAndRetry( - ContainerCommandRequestProto request, List validators) + ContainerCommandRequestProto request, List validators, + boolean zeroCopy) throws IOException { String spanName = "XceiverClientGrpc." + request.getCmdType().name(); @@ -424,7 +517,7 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry( if (!request.hasVersion()) { builder.setVersion(ClientVersion.CURRENT.toProtoValue()); } - return sendCommandWithRetry(builder.build(), validators); + return sendCommandWithRetry(builder.build(), validators, zeroCopy); }); } @@ -489,7 +582,8 @@ private static DatanodeBlockID getRequestBlockID(ContainerCommandRequestProto re } private XceiverClientReply sendCommandWithRetry( - ContainerCommandRequestProto request, List validators) + ContainerCommandRequestProto request, List validators, + boolean zeroCopy) throws IOException { ContainerCommandResponseProto responseProto = null; IOException ioException = null; @@ -509,7 +603,9 @@ private XceiverClientReply sendCommandWithRetry( // sendCommandAsyncCall will create a new channel and async stub // in case these don't exist for the specific datanode. reply.addDatanode(dn); - responseProto = sendCommandAsync(request, dn).getResponse().get(); + responseProto = (zeroCopy + ? sendCommandAsync(request, dn, true) + : sendCommandAsync(request, dn)).getResponse().get(); if (validators != null && !validators.isEmpty()) { for (Validator validator : validators) { validator.accept(request, responseProto); @@ -521,8 +617,11 @@ private XceiverClientReply sendCommandWithRetry( } break; } catch (IOException e) { + if (responseProto != null) { + releaseReceivedResponse(responseProto); + responseProto = null; + } ioException = e; - responseProto = null; if (LOG.isDebugEnabled()) { LOG.debug("Failed to execute command {} on datanode {}", processForDebug(request), dn, e); @@ -686,11 +785,19 @@ protected boolean shouldBlockAndWaitAsyncReply( public XceiverClientReply sendCommandAsync( ContainerCommandRequestProto request, DatanodeDetails dn) throws IOException, InterruptedException { + return sendCommandAsync(request, dn, false); + } + + @VisibleForTesting + protected XceiverClientReply sendCommandAsync( + ContainerCommandRequestProto request, DatanodeDetails dn, + boolean zeroCopy) + throws IOException, InterruptedException { checkOpen(dn); DatanodeID dnId = dn.getID(); if (LOG.isDebugEnabled()) { - LOG.debug("Send command {} to datanode {}", - request.getCmdType(), dn); + LOG.debug("Send command {} to datanode {}{}", + request.getCmdType(), dn, zeroCopy ? " with zero-copy response" : ""); } final CompletableFuture replyFuture = new CompletableFuture<>(); @@ -700,7 +807,9 @@ public XceiverClientReply sendCommandAsync( // create a new grpc message stream pair for each call. final StreamObserver requestObserver = - dnChannelInfoMap.get(dnId).getStub().withDeadlineAfter(timeout, TimeUnit.SECONDS) + (zeroCopy ? dnChannelInfoMap.get(dnId).getZeroCopyStub() + : dnChannelInfoMap.get(dnId).getStub()) + .withDeadlineAfter(timeout, TimeUnit.SECONDS) .send(new StreamObserver() { @Override public void onNext(ContainerCommandResponseProto value) { @@ -766,16 +875,28 @@ public void setTimeout(long timeout) { this.timeout = timeout; } + @Override + public void releaseReceivedResponse(ContainerCommandResponseProto response) { + if (response != null) { + // Idempotent: the marshaller no-ops if `response` was never tracked + // (i.e., parsed via the standard fallback path). + ZERO_COPY_RESPONSE_MARSHALLER.release(response); + } + } + /** * Group the channel and stub so that they are published together. */ private static class ChannelInfo { private final ManagedChannel channel; private final XceiverClientProtocolServiceStub stub; + private final XceiverClientProtocolServiceStub zeroCopyStub; - ChannelInfo(ManagedChannel channel, XceiverClientProtocolServiceStub stub) { + ChannelInfo(ManagedChannel channel, XceiverClientProtocolServiceStub stub, + XceiverClientProtocolServiceStub zeroCopyStub) { this.channel = channel; this.stub = stub; + this.zeroCopyStub = zeroCopyStub; } public ManagedChannel getChannel() { @@ -786,6 +907,10 @@ public XceiverClientProtocolServiceStub getStub() { return stub; } + public XceiverClientProtocolServiceStub getZeroCopyStub() { + return zeroCopyStub; + } + public boolean isChannelInactive() { return channel == null || channel.isTerminated() diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index 22917ce4b6c7..faa8f2a9066f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -96,6 +96,13 @@ public class ChunkInputStream extends InputStream // retry. Once the chunk is read, this variable is reset. private long chunkPosition = -1; + // Outer ContainerCommandResponseProto whose chunk data the current `buffers` + // view. Held until releaseBuffers() so the underlying zero-copy-tracked + // Netty buffer is not freed while the buffers are still in use. + // Null on the standard (non-zero-copy) parse path - the proto's bytes + // are heap-backed and need no explicit release. + private ContainerCommandResponseProto lastResponseToRelease; + private final Supplier> tokenSupplier; private static final int EOF = -1; @@ -431,14 +438,25 @@ private void readChunkDataIntoBuffers(ChunkInfo readChunkInfo) /** * Send RPC call to get the chunk from the container. + *

+ * On the zero-copy path (HDDS-10283), the returned {@link ByteBuffer}s + * view the underlying inbound Netty buffer of the response. Those bytes + * stay reachable until {@link #releaseBuffers()} releases them via + * {@link XceiverClientSpi#releaseReceivedResponse}, which this method + * registers as a side effect through {@link #lastResponseToRelease}. */ @VisibleForTesting protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) throws IOException { - ReadChunkResponseProto readChunkResponse = - ContainerProtocolCalls.readChunk(xceiverClient, readChunkInfo, datanodeBlockID, validators, - tokenSupplier.get()); + ContainerCommandResponseProto reply = + ContainerProtocolCalls.readChunkForZeroCopy(xceiverClient, readChunkInfo, + datanodeBlockID, validators, tokenSupplier.get()); + // The zero-copy-tracked buffer for the chunk data lives inside `reply`. + // Stash the outer reference so releaseBuffers() can return it once the + // chunk bytes have been fully consumed. + setLastResponseToRelease(reply); + final ReadChunkResponseProto readChunkResponse = reply.getReadChunk(); if (readChunkResponse.hasData()) { return readChunkResponse.getData().asReadOnlyByteBufferList() @@ -453,6 +471,16 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) } } + private void setLastResponseToRelease(ContainerCommandResponseProto reply) { + // If a previous response is still being held (e.g., readChunk called + // twice without an intervening releaseBuffers), release the prior one + // first so its Netty buffer goes back to the pool. + if (lastResponseToRelease != null) { + xceiverClient.releaseReceivedResponse(lastResponseToRelease); + } + lastResponseToRelease = reply; + } + private void validateChunk( ContainerCommandRequestProto request, ContainerCommandResponseProto response @@ -702,6 +730,13 @@ private void releaseBuffers() { buffers = null; bufferIndex = 0; firstUnreleasedBufferIndex = 0; + // Release the zero-copy-tracked Netty buffer of the response that backed + // these ByteBuffers. Idempotent and a no-op if the response was parsed + // via the standard heap-backed path. + if (lastResponseToRelease != null) { + xceiverClient.releaseReceivedResponse(lastResponseToRelease); + lastResponseToRelease = null; + } // We should not reset bufferOffsetWrtChunkData and buffersSize here // because when getPos() is called in chunkStreamEOF() we use these // values and determine whether chunk is read completely or not. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 54be3c5686a0..9e26916c3203 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -129,12 +129,15 @@ public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request, List validators) throws IOException { + ContainerCommandResponseProto responseProto = null; + boolean validatorsPassed = false; try { XceiverClientReply reply = sendCommandAsync(request); - ContainerCommandResponseProto responseProto = reply.getResponse().get(); + responseProto = reply.getResponse().get(); for (Validator function : validators) { function.accept(request, responseProto); } + validatorsPassed = true; return responseProto; } catch (InterruptedException e) { // Re-interrupt the thread while catching InterruptedException @@ -142,9 +145,34 @@ public ContainerCommandResponseProto sendCommand( throw getIOExceptionForSendCommand(request, e); } catch (ExecutionException e) { throw getIOExceptionForSendCommand(request, e); + } finally { + // If a validator threw, the caller never receives the response, so + // release any zero-copy-tracked buffer here. Successful responses are + // returned to the caller, who is responsible for releasing them. + if (responseProto != null && !validatorsPassed) { + releaseReceivedResponse(responseProto); + } } } + /** + * Sends a given command using an explicit zero-copy-capable response path + * when supported by the transport. + *

+ * The default implementation delegates to {@link #sendCommand( + * ContainerCommandRequestProto, List)}, so transports without zero-copy + * support continue to return normal heap-backed responses. + * + * @param request Request + * @param validators functions to validate the response + * @return Response to the command + */ + public ContainerCommandResponseProto sendCommandWithZeroCopy( + ContainerCommandRequestProto request, + List validators) throws IOException { + return sendCommand(request, validators); + } + public void initStreamRead(BlockID blockID, StreamingReaderSpi streamObserver) throws IOException { throw new UnsupportedOperationException("Stream read is not supported"); } @@ -204,4 +232,24 @@ public CompletableFuture watchForCommit(long index) { public abstract Map sendCommandOnAllNodes(ContainerCommandRequestProto request) throws IOException, InterruptedException; + + /** + * Release the resources held on behalf of a previously-received response. + *

+ * When the underlying transport parses a response with a zero-copy + * marshaller, the parsed proto's {@code bytes} fields reference the + * Netty-managed pooled buffer of the inbound message. Those buffers must + * be released back to Netty when the caller is done with the proto; + * otherwise direct memory accumulates. Callers of + * {@code sendCommandWithZeroCopy} that retain the response past the call + * (e.g. {@code ReadChunk} via + * {@code ContainerProtocolCalls.readChunkForZeroCopy(...)} in + * {@code ChunkInputStream}) must invoke this method once they are done. + *

+ * This method is idempotent and safe to call on responses that were + * never tracked by a zero-copy marshaller. + */ + public void releaseReceivedResponse(ContainerCommandResponseProto response) { + // Default: transport without zero-copy support -> no-op. + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 15879fb47649..a6100c460ac9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -344,17 +344,70 @@ public static ContainerCommandRequestProto getPutBlockRequest( } /** - * Calls the container protocol to read a chunk. + * Calls the container protocol to read a chunk and returns a detached + * {@link ReadChunkResponseProto}. + *

+ * Callers that need access to the zero-copy-backed outer response should use + * {@link #readChunkForZeroCopy(XceiverClientSpi, ChunkInfo, DatanodeBlockID, + * List, Token)}. * * @param xceiverClient client to perform call * @param chunk information about chunk to read * @param blockID ID of the block * @param validators functions to validate the response * @param token a token for this block (may be null) - * @return container protocol read chunk response + * @return detached read chunk response * @throws IOException if there is an I/O error while performing the call */ - public static ContainerProtos.ReadChunkResponseProto readChunk( + public static ReadChunkResponseProto readChunk( + XceiverClientSpi xceiverClient, ChunkInfo chunk, DatanodeBlockID blockID, + List validators, + Token token) throws IOException { + ReadChunkRequestProto.Builder readChunkRequest = + ReadChunkRequestProto.newBuilder() + .setBlockID(blockID) + .setChunkData(chunk) + .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1); + ContainerCommandRequestProto.Builder builder = + ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk) + .setContainerID(blockID.getContainerID()) + .setReadChunk(readChunkRequest); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); + } + + try (TracingUtil.TraceCloseable ignored = + TracingUtil.createActivatedSpan("readChunk")) { + Span span = TracingUtil.getActiveSpan(); + span.setAttribute("offset", chunk.getOffset()) + .setAttribute("length", chunk.getLen()) + .setAttribute("block", blockID.toString()); + return tryEachDatanode(xceiverClient.getPipeline(), + d -> readChunk(xceiverClient, chunk, blockID, + validators, builder, d, false).getReadChunk(), + d -> toErrorMessage(chunk, blockID, d)); + } + } + + /** + * Calls the container protocol to read a chunk and returns the outer + * {@link ContainerCommandResponseProto}. The caller is responsible for + * invoking + * {@link XceiverClientSpi#releaseReceivedResponse(ContainerCommandResponseProto)} + * on the returned response once the chunk bytes are no longer needed. + * The outer wrapper is returned (rather than just the inner + * {@link ReadChunkResponseProto}) so that the caller has a stable reference + * for the zero-copy release. + * + * @param xceiverClient client to perform call + * @param chunk information about chunk to read + * @param blockID ID of the block + * @param validators functions to validate the response + * @param token a token for this block (may be null) + * @return outer container command response containing the read chunk reply + * @throws IOException if there is an I/O error while performing the call + */ + public static ContainerCommandResponseProto readChunkForZeroCopy( XceiverClientSpi xceiverClient, ChunkInfo chunk, DatanodeBlockID blockID, List validators, Token token) throws IOException { @@ -378,16 +431,16 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( .setAttribute("block", blockID.toString()); return tryEachDatanode(xceiverClient.getPipeline(), d -> readChunk(xceiverClient, chunk, blockID, - validators, builder, d), + validators, builder, d, true), d -> toErrorMessage(chunk, blockID, d)); } } - private static ContainerProtos.ReadChunkResponseProto readChunk( + private static ContainerCommandResponseProto readChunk( XceiverClientSpi xceiverClient, ChunkInfo chunk, DatanodeBlockID blockID, List validators, ContainerCommandRequestProto.Builder builder, - DatanodeDetails d) throws IOException { + DatanodeDetails d, boolean zeroCopy) throws IOException { ContainerCommandRequestProto.Builder requestBuilder = builder .setDatanodeUuid(d.getUuidString()); String traceId = TracingUtil.exportCurrentSpan(); @@ -395,14 +448,21 @@ private static ContainerProtos.ReadChunkResponseProto readChunk( requestBuilder = requestBuilder.setTraceID(traceId); } ContainerCommandResponseProto reply = - xceiverClient.sendCommand(requestBuilder.build(), validators); + zeroCopy + ? xceiverClient.sendCommandWithZeroCopy( + requestBuilder.build(), validators) + : xceiverClient.sendCommand(requestBuilder.build(), validators); final ReadChunkResponseProto response = reply.getReadChunk(); final long readLen = getLen(response); if (readLen != chunk.getLen()) { + if (zeroCopy) { + // Release the zero-copy-tracked buffer before propagating the error. + xceiverClient.releaseReceivedResponse(reply); + } throw new IOException(toErrorMessage(chunk, blockID, d) + ": readLen=" + readLen); } - return response; + return reply; } static String toErrorMessage(ChunkInfo chunk, DatanodeBlockID blockId, diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java index c051b3478b4c..cf38520293be 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java @@ -510,7 +510,7 @@ public ContainerProtos.GetBlockResponseProto getBlock(BlockID blockID) throws IO .build(); } - public ContainerProtos.ReadChunkResponseProto readChunk(ContainerProtos.DatanodeBlockID blockId, + public ContainerProtos.ContainerCommandResponseProto readChunk(ContainerProtos.DatanodeBlockID blockId, ContainerProtos.ChunkInfo chunkInfo, List validators) throws IOException { KeyValueContainer container = getContainer(blockId.getContainerID()); ContainerProtos.ReadChunkResponseProto readChunkResponseProto = @@ -520,11 +520,11 @@ public ContainerProtos.ReadChunkResponseProto readChunk(ContainerProtos.Datanode .setData(handler.getChunkManager().readChunk(container, BlockID.getFromProtobuf(blockId), ChunkInfo.getFromProtoBuf(chunkInfo), null).toByteString()) .build(); - verifyChecksums(readChunkResponseProto, blockId, chunkInfo, validators); - return readChunkResponseProto; + return verifyChecksums(readChunkResponseProto, blockId, chunkInfo, validators); } - public void verifyChecksums(ContainerProtos.ReadChunkResponseProto readChunkResponseProto, + public ContainerProtos.ContainerCommandResponseProto verifyChecksums( + ContainerProtos.ReadChunkResponseProto readChunkResponseProto, ContainerProtos.DatanodeBlockID blockId, ContainerProtos.ChunkInfo chunkInfo, List validators) throws IOException { assertFalse(validators.isEmpty()); @@ -547,6 +547,7 @@ public void verifyChecksums(ContainerProtos.ReadChunkResponseProto readChunkResp for (XceiverClientSpi.Validator function : validators) { function.accept(requestProto, responseProto); } + return responseProto; } public KeyValueContainer getContainer(long containerID) { diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestFileChecksumHelper.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestFileChecksumHelper.java index bc894a58f9cc..8c72410b0313 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestFileChecksumHelper.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestFileChecksumHelper.java @@ -217,9 +217,9 @@ public void testOneBlock(ReplicationType helperType) throws IOException { XceiverClientGrpc xceiverClientGrpc = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { return buildValidResponse(helperType); } }; diff --git a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java index b17a6b54fbfd..f4b7c25af197 100644 --- a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java +++ b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java @@ -126,9 +126,12 @@ private void readReference() throws IOException { ContainerCommandRequestProto request = createReadChunkRequest(0); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - - checksum = new Checksum(ContainerProtos.ChecksumType.CRC32, chunkSize); - checksumReference = computeChecksum(response); + try { + checksum = new Checksum(ContainerProtos.ChecksumType.CRC32, chunkSize); + checksumReference = computeChecksum(response); + } finally { + xceiverClient.releaseReceivedResponse(response); + } } private void validateChunk(long stepNo) throws Exception { @@ -138,15 +141,18 @@ private void validateChunk(long stepNo) throws Exception { try { ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - - ChecksumData checksumOfChunk = computeChecksum(response); - - if (!checksumReference.equals(checksumOfChunk)) { - throw new IllegalStateException( - "Reference (=first) message checksum doesn't match " + - "with checksum of chunk " - + response.getReadChunk() - .getChunkData().getChunkName()); + try { + ChecksumData checksumOfChunk = computeChecksum(response); + + if (!checksumReference.equals(checksumOfChunk)) { + throw new IllegalStateException( + "Reference (=first) message checksum doesn't match " + + "with checksum of chunk " + + response.getReadChunk() + .getChunkData().getChunkName()); + } + } finally { + xceiverClient.releaseReceivedResponse(response); } } catch (IOException e) { LOG.warn("Could not read chunk due to IOException: ", e); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java index 0f3af071fc54..d92ed48b838c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -25,10 +26,12 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.RatisReplicationConfig; @@ -43,20 +46,27 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; /** * Tests for TestXceiverClientGrpc, to ensure topology aware reads work * select the closest node, and connections are re-used after a getBlock call. */ public class TestXceiverClientGrpc { - private Pipeline pipeline; private List dns; private List dnsInOrder; private OzoneConfiguration conf = new OzoneConfiguration(); + private enum ReadChunkApi { + LEGACY, + ZERO_COPY + } + @BeforeEach public void setup() { dns = new ArrayList<>(); @@ -97,9 +107,9 @@ public void testLeaderNodeIsCommandTarget() throws IOException { for (int i = 0; i < 100; i++) { try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { seenDN.add(dn); return buildValidResponse(); } @@ -116,9 +126,9 @@ public void testGetBlockRetryAlNodes() { assertThat(allDNs.size()).isGreaterThan(1); try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) throws IOException { + DatanodeDetails dn, boolean zeroCopy) throws IOException { allDNs.remove(dn); throw new IOException("Failed " + dn); } @@ -130,26 +140,192 @@ public XceiverClientReply sendCommandAsync( assertEquals(0, allDNs.size()); } - @Test - public void testReadChunkRetryAllNodes() { + @ParameterizedTest(name = "readChunkApi={0}") + @EnumSource(ReadChunkApi.class) + public void testReadChunkRetryAllNodes(ReadChunkApi readChunkApi) { final ArrayList allDNs = new ArrayList<>(dns); assertThat(allDNs.size()).isGreaterThan(1); try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) throws IOException { + DatanodeDetails dn, boolean zeroCopy) throws IOException { allDNs.remove(dn); throw new IOException("Failed " + dn); } }) { - invokeXceiverClientReadChunk(client); + invokeXceiverClientReadChunk(client, readChunkApi); } catch (IOException e) { e.printStackTrace(); } assertEquals(0, allDNs.size()); } + @ParameterizedTest(name = "readChunkApi={0}") + @EnumSource(ReadChunkApi.class) + public void testReadChunkApisReturnExpectedDataAndReleaseOwnership( + ReadChunkApi readChunkApi) throws IOException { + final AtomicInteger releaseCount = new AtomicInteger(); + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { + @Override + protected XceiverClientReply sendCommandAsync( + ContainerProtos.ContainerCommandRequestProto request, + DatanodeDetails dn, boolean zeroCopy) { + return buildValidReadChunkResponse(); + } + + @Override + public void releaseReceivedResponse( + ContainerProtos.ContainerCommandResponseProto response) { + releaseCount.incrementAndGet(); + } + }) { + BlockID bid = new BlockID(1, 1); + bid.setBlockCommitSequenceId(1); + ContainerProtos.ChunkInfo chunkInfo = + ContainerProtos.ChunkInfo.newBuilder() + .setChunkName("Anything") + .setChecksumData(ContainerProtos.ChecksumData.newBuilder() + .setBytesPerChecksum(1) + .setType(ContainerProtos.ChecksumType.CRC32) + .build()) + .setLen(1) + .setOffset(0) + .build(); + if (readChunkApi == ReadChunkApi.ZERO_COPY) { + ContainerProtos.ContainerCommandResponseProto reply = + ContainerProtocolCalls.readChunkForZeroCopy(client, chunkInfo, + bid.getDatanodeBlockIDProtobuf(), null, null); + assertEquals(0, releaseCount.get()); + try { + assertArrayEquals(new byte[] {1}, + reply.getReadChunk().getData().toByteArray()); + } finally { + client.releaseReceivedResponse(reply); + } + } else { + assertArrayEquals(new byte[] {1}, + ContainerProtocolCalls.readChunk(client, chunkInfo, + bid.getDatanodeBlockIDProtobuf(), null, null) + .getData().toByteArray()); + assertEquals(0, releaseCount.get()); + } + } + + assertEquals(readChunkApi == ReadChunkApi.ZERO_COPY ? 1 : 0, + releaseCount.get()); + } + + @Test + public void testReadChunkZeroCopyIsOptInOnly() throws IOException { + final AtomicInteger regularCallCount = new AtomicInteger(); + final AtomicInteger zeroCopyCallCount = new AtomicInteger(); + final BlockID blockID = new BlockID(1, 1); + final ContainerProtos.ChunkInfo chunkInfo = ContainerProtos.ChunkInfo + .newBuilder() + .setChunkName("Anything") + .setChecksumData(ContainerProtos.ChecksumData.newBuilder() + .setBytesPerChecksum(1) + .setType(ContainerProtos.ChecksumType.CRC32) + .build()) + .setLen(1) + .setOffset(0) + .build(); + final ContainerProtos.ContainerCommandRequestProto request = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.ReadChunk) + .setContainerID(blockID.getContainerID()) + .setDatanodeUuid(pipeline.getFirstNode().getUuidString()) + .setReadChunk(ContainerProtos.ReadChunkRequestProto.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .setChunkData(chunkInfo)) + .build(); + + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { + @Override + protected XceiverClientReply sendCommandAsync( + ContainerProtos.ContainerCommandRequestProto request, + DatanodeDetails dn, boolean zeroCopy) { + if (zeroCopy) { + zeroCopyCallCount.incrementAndGet(); + } else { + regularCallCount.incrementAndGet(); + } + return buildValidReadChunkResponse(); + } + }) { + assertArrayEquals(new byte[] {1}, + client.sendCommand(request).getReadChunk().getData().toByteArray()); + assertEquals(1, regularCallCount.get()); + assertEquals(0, zeroCopyCallCount.get()); + + ContainerProtos.ContainerCommandResponseProto reply = + ContainerProtocolCalls.readChunkForZeroCopy(client, chunkInfo, + blockID.getDatanodeBlockIDProtobuf(), null, null); + try { + assertArrayEquals(new byte[] {1}, + reply.getReadChunk().getData().toByteArray()); + } finally { + client.releaseReceivedResponse(reply); + } + } + + assertEquals(1, regularCallCount.get()); + assertEquals(1, zeroCopyCallCount.get()); + } + + @Test + public void testReadChunkValidatorFailureReleasesResponseBeforeRetry() + throws IOException { + final ArrayList allDNs = new ArrayList<>(dns); + final AtomicInteger releaseCount = new AtomicInteger(); + final BlockID blockID = new BlockID(1, 1); + final ContainerProtos.ChunkInfo chunkInfo = ContainerProtos.ChunkInfo + .newBuilder() + .setChunkName("Anything") + .setChecksumData(ContainerProtos.ChecksumData.newBuilder() + .setBytesPerChecksum(1) + .setType(ContainerProtos.ChecksumType.CRC32) + .build()) + .setLen(1) + .setOffset(0) + .build(); + final ContainerProtos.ContainerCommandRequestProto request = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.ReadChunk) + .setContainerID(blockID.getContainerID()) + .setDatanodeUuid(pipeline.getFirstNode().getUuidString()) + .setReadChunk(ContainerProtos.ReadChunkRequestProto.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .setChunkData(chunkInfo)) + .build(); + + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { + @Override + protected XceiverClientReply sendCommandAsync( + ContainerProtos.ContainerCommandRequestProto request, + DatanodeDetails dn, boolean zeroCopy) { + allDNs.remove(dn); + return buildValidReadChunkResponse(); + } + + @Override + public void releaseReceivedResponse( + ContainerProtos.ContainerCommandResponseProto response) { + releaseCount.incrementAndGet(); + } + }) { + IOException ex = assertThrows(IOException.class, + () -> client.sendCommand(request, Collections.singletonList((req, resp) -> { + throw new IOException("validator failed"); + }))); + assertThat(ex).hasMessageContaining("validator failed"); + } + + assertEquals(0, allDNs.size()); + assertEquals(dns.size(), releaseCount.get()); + } + @Test public void testInterruptedCommandThrowsInterruptedIOException() throws IOException { @@ -157,9 +333,9 @@ public void testInterruptedCommandThrowsInterruptedIOException() new CompletableFuture<>(); try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { return new XceiverClientReply(response); } }) { @@ -187,10 +363,13 @@ public void testFirstNodeIsCorrectWithTopologyForCommandTarget() for (int i = 0; i < 100; i++) { try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { seenDNs.add(dn); + if (request.getCmdType() == ContainerProtos.Type.ReadChunk) { + return buildValidReadChunkResponse(); + } return buildValidResponse(); } }) { @@ -219,9 +398,9 @@ public void testPrimaryReadFromNormalDatanode() setPersistedOpState(NodeOperationalState.IN_MAINTENANCE); try (XceiverClientGrpc client = new XceiverClientGrpc(randomPipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { seenDNs.add(dn); return buildValidResponse(); } @@ -235,24 +414,29 @@ public XceiverClientReply sendCommandAsync( } } - @Test - public void testConnectionReusedAfterGetBlock() throws IOException { + @ParameterizedTest(name = "readChunkApi={0}") + @EnumSource(ReadChunkApi.class) + public void testConnectionReusedAfterGetBlock(ReadChunkApi readChunkApi) + throws IOException { // With a new Client, make 100 calls. On each call, ensure that only one // DN is seen, indicating the same DN connection is reused. for (int i = 0; i < 100; i++) { final Set seenDNs = new HashSet<>(); try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { seenDNs.add(dn); + if (request.getCmdType() == ContainerProtos.Type.ReadChunk) { + return buildValidReadChunkResponse(); + } return buildValidResponse(); } }) { invokeXceiverClientGetBlock(client); invokeXceiverClientGetBlock(client); - invokeXceiverClientReadChunk(client); + invokeXceiverClientReadChunk(client, readChunkApi); invokeXceiverClientReadSmallFile(client); } assertEquals(1, seenDNs.size()); @@ -269,22 +453,30 @@ private void invokeXceiverClientGetBlock(XceiverClientSpi client) .build()), null, client.getPipeline().getReplicaIndexes()); } - private void invokeXceiverClientReadChunk(XceiverClientSpi client) + private void invokeXceiverClientReadChunk(XceiverClientSpi client, + ReadChunkApi readChunkApi) throws IOException { BlockID bid = new BlockID(1, 1); bid.setBlockCommitSequenceId(1); - ContainerProtocolCalls.readChunk(client, + ContainerProtos.ChunkInfo chunkInfo = ContainerProtos.ChunkInfo.newBuilder() .setChunkName("Anything") .setChecksumData(ContainerProtos.ChecksumData.newBuilder() - .setBytesPerChecksum(512) + .setBytesPerChecksum(1) .setType(ContainerProtos.ChecksumType.CRC32) .build()) - .setLen(-1) + .setLen(1) .setOffset(0) - .build(), - bid.getDatanodeBlockIDProtobuf(), - null, null); + .build(); + if (readChunkApi == ReadChunkApi.ZERO_COPY) { + ContainerProtos.ContainerCommandResponseProto reply = + ContainerProtocolCalls.readChunkForZeroCopy(client, chunkInfo, + bid.getDatanodeBlockIDProtobuf(), null, null); + client.releaseReceivedResponse(reply); + return; + } + ContainerProtocolCalls.readChunk(client, chunkInfo, + bid.getDatanodeBlockIDProtobuf(), null, null); } private void invokeXceiverClientReadSmallFile(XceiverClientSpi client) @@ -305,4 +497,32 @@ private XceiverClientReply buildValidResponse() { return new XceiverClientReply(replyFuture); } + private XceiverClientReply buildValidReadChunkResponse() { + ContainerProtos.ContainerCommandResponseProto resp = + ContainerProtos.ContainerCommandResponseProto.newBuilder() + .setCmdType(ContainerProtos.Type.ReadChunk) + .setResult(ContainerProtos.Result.SUCCESS) + .setReadChunk(ContainerProtos.ReadChunkResponseProto.newBuilder() + .setBlockID(ContainerProtos.DatanodeBlockID.newBuilder() + .setContainerID(1) + .setLocalID(1) + .build()) + .setChunkData(ContainerProtos.ChunkInfo.newBuilder() + .setChunkName("Anything") + .setChecksumData(ContainerProtos.ChecksumData.newBuilder() + .setBytesPerChecksum(1) + .setType(ContainerProtos.ChecksumType.CRC32) + .build()) + .setLen(1) + .setOffset(0) + .build()) + .setData(ByteString.copyFrom(new byte[] {1})) + .build()) + .build(); + final CompletableFuture + replyFuture = new CompletableFuture<>(); + replyFuture.complete(resp); + return new XceiverClientReply(replyFuture); + } + } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index 7ce4f9319db2..f23e6300cc8d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -165,6 +165,11 @@ public class TestContainerCommandsEC { private static OzoneBucket classBucket; private static ReplicationConfig repConfig; + private enum ReadChunkPath { + LEGACY, + ZERO_COPY + } + @BeforeAll public static void init() throws Exception { config = new OzoneConfiguration(); @@ -245,6 +250,10 @@ private void closeAllPipelines(ReplicationConfig replicationConfig) { }); } + private static Stream readChunkPaths() { + return Stream.of(ReadChunkPath.values()); + } + @Test public void testOrphanBlock() throws Exception { // Close all pipelines so we are guaranteed to get a new one @@ -443,8 +452,10 @@ public void testListBlock() throws Exception { } } - @Test - public void testCreateRecoveryContainer() throws Exception { + @ParameterizedTest(name = "readChunkPath={0}") + @MethodSource("readChunkPaths") + public void testCreateRecoveryContainer(ReadChunkPath readChunkPath) + throws Exception { try (XceiverClientManager xceiverClientManager = new XceiverClientManager(config)) { ECReplicationConfig replicationConfig = new ECReplicationConfig(3, 2); @@ -514,24 +525,50 @@ public void testCreateRecoveryContainer() throws Exception { container.containerID().getProtobuf().getId(), encodedToken); assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, readContainerResponseProto.getContainerData().getState()); - ContainerProtos.ReadChunkResponseProto readChunkResponseProto = - ContainerProtocolCalls.readChunk(dnClient, - writeChunkRequest.getWriteChunk().getChunkData(), - blockID.getDatanodeBlockIDProtobufBuilder().setReplicaIndex(replicaIndex).build(), null, - blockToken); - ByteBuffer[] readOnlyByteBuffersArray = BufferUtils - .getReadOnlyByteBuffersArray( - readChunkResponseProto.getDataBuffers().getBuffersList()); - assertEquals(readOnlyByteBuffersArray[0].limit(), data.length); - byte[] readBuff = new byte[readOnlyByteBuffersArray[0].limit()]; - readOnlyByteBuffersArray[0].get(readBuff, 0, readBuff.length); - assertArrayEquals(data, readBuff); + ContainerProtos.DatanodeBlockID readBlockID = + blockID.getDatanodeBlockIDProtobufBuilder() + .setReplicaIndex(replicaIndex) + .build(); + assertReadChunkData(readChunkPath, dnClient, + writeChunkRequest.getWriteChunk().getChunkData(), + readBlockID, blockToken, data); } finally { xceiverClientManager.releaseClient(dnClient, false); } } } + private void assertReadChunkData(ReadChunkPath readChunkPath, + XceiverClientSpi dnClient, ContainerProtos.ChunkInfo chunkInfo, + ContainerProtos.DatanodeBlockID blockID, + Token blockToken, byte[] expectedData) + throws IOException { + ContainerProtos.ContainerCommandResponseProto readChunkReply = null; + ContainerProtos.ReadChunkResponseProto readChunkResponseProto; + if (readChunkPath == ReadChunkPath.ZERO_COPY) { + readChunkReply = ContainerProtocolCalls.readChunkForZeroCopy(dnClient, + chunkInfo, blockID, null, blockToken); + readChunkResponseProto = readChunkReply.getReadChunk(); + } else { + readChunkResponseProto = ContainerProtocolCalls.readChunk(dnClient, + chunkInfo, blockID, null, blockToken); + } + + try { + ByteBuffer[] readOnlyByteBuffersArray = BufferUtils + .getReadOnlyByteBuffersArray( + readChunkResponseProto.getDataBuffers().getBuffersList()); + assertEquals(readOnlyByteBuffersArray[0].limit(), expectedData.length); + byte[] readBuff = new byte[readOnlyByteBuffersArray[0].limit()]; + readOnlyByteBuffersArray[0].get(readBuff, 0, readBuff.length); + assertArrayEquals(expectedData, readBuff); + } finally { + if (readChunkReply != null) { + dnClient.releaseReceivedResponse(readChunkReply); + } + } + } + @Test public void testCreateRecoveryContainerAfterDNRestart() throws Exception { try (XceiverClientManager xceiverClientManager = diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java index 4e69848b307d..399eb6f10bf3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java @@ -835,35 +835,49 @@ private void mockReadChunk(XceiverClientGrpc mockDnProtocol, byte[] data, Exception exception, Result errorCode) throws Exception { - final CompletableFuture response; - if (exception != null) { - response = new CompletableFuture<>(); - response.completeExceptionally(exception); - } else if (errorCode != null) { - ContainerCommandResponseProto readChunkResp = - ContainerCommandResponseProto.newBuilder() - .setResult(errorCode) - .setCmdType(Type.ReadChunk) - .build(); - response = completedFuture(readChunkResp); + final ContainerCommandResponseProto response; + if (errorCode != null) { + response = ContainerCommandResponseProto.newBuilder() + .setResult(errorCode) + .setCmdType(Type.ReadChunk) + .build(); + } else if (data != null) { + response = ContainerCommandResponseProto.newBuilder() + .setReadChunk(ReadChunkResponseProto.newBuilder() + .setBlockID(createBlockId(containerId, localId)) + .setChunkData(createChunkInfo(data)) + .setData(ByteString.copyFrom(data)) + .build() + ) + .setResult(Result.SUCCESS) + .setCmdType(Type.ReadChunk) + .build(); } else { - ContainerCommandResponseProto readChunkResp = - ContainerCommandResponseProto.newBuilder() - .setReadChunk(ReadChunkResponseProto.newBuilder() - .setBlockID(createBlockId(containerId, localId)) - .setChunkData(createChunkInfo(data)) - .setData(ByteString.copyFrom(data)) - .build() - ) - .setResult(Result.SUCCESS) - .setCmdType(Type.ReadChunk) - .build(); - response = completedFuture(readChunkResp); + response = null; } - doAnswer(invocation -> new XceiverClientReply(response)) - .when(mockDnProtocol) - .sendCommandAsync(argThat(matchCmd(Type.ReadChunk)), any()); + doAnswer(invocation -> { + if (exception != null) { + ExecutionException executionException = new ExecutionException( + exception); + if (Status.fromThrowable(exception).getCode() + == Status.UNAUTHENTICATED.getCode()) { + throw new SCMSecurityException("Failed to authenticate with " + + "GRPC XceiverServer with Ozone block token."); + } + throw new IOException(executionException); + } + + ContainerCommandRequestProto request = invocation.getArgument(0); + List validators = invocation.getArgument(1); + if (validators != null) { + for (XceiverClientSpi.Validator validator : validators) { + validator.accept(request, response); + } + } + return response; + }).when(mockDnProtocol) + .sendCommandWithZeroCopy(argThat(matchCmd(Type.ReadChunk)), any()); }