Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -63,8 +64,16 @@
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.util.Time;
import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
import org.apache.ratis.thirdparty.io.grpc.CallOptions;
import org.apache.ratis.thirdparty.io.grpc.Channel;
import org.apache.ratis.thirdparty.io.grpc.ClientCall;
import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
import org.apache.ratis.thirdparty.io.grpc.ClientInterceptors;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
Expand All @@ -91,6 +100,83 @@ public class XceiverClientGrpc extends XceiverClientSpi {
private static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
private static final int SHUTDOWN_WAIT_INTERVAL_MILLIS = 100;
private static final int SHUTDOWN_WAIT_MAX_SECONDS = 5;

/**
* Zero-copy marshaller for inbound {@link ContainerCommandResponseProto}.
* For {@code ReadChunk} responses, the parsed proto's bytes fields alias
* the inbound Netty buffer (avoiding a chunk-sized memcpy); the buffer is
* held until {@link ChunkInputStream} releases it via
* {@link #releaseReceivedResponse}. Other response types are deep-copied
* inside the wrapping marshaller and released immediately, so callers
* never observe a buffer-aliased non-{@code ReadChunk} response.
*/
private static final ZeroCopyMessageMarshaller<ContainerCommandResponseProto>
ZERO_COPY_RESPONSE_MARSHALLER = new ZeroCopyMessageMarshaller<>(
ContainerCommandResponseProto.getDefaultInstance());

/**
* Marshaller used as the response marshaller of the {@code send} method.
* Delegates outbound {@code stream()} to the default, and inbound
* {@code parse()} to the zero-copy marshaller. After a successful
* zero-copy parse, non-{@code ReadChunk} responses are deep-copied and
* the original buffer-aliased response is released, so only
* {@code ReadChunk} responses retain the Netty buffer until the caller
* is done with the chunk bytes.
*/
private static final MethodDescriptor.Marshaller<ContainerCommandResponseProto>
RESPONSE_MARSHALLER =
new MethodDescriptor.Marshaller<ContainerCommandResponseProto>() {
@Override
public InputStream stream(ContainerCommandResponseProto value) {
return ZERO_COPY_RESPONSE_MARSHALLER.stream(value);
}

@Override
public ContainerCommandResponseProto parse(InputStream stream) {
ContainerCommandResponseProto resp =
ZERO_COPY_RESPONSE_MARSHALLER.parse(stream);
if (resp == null
|| resp.getCmdType() == ContainerProtos.Type.ReadChunk) {
return resp;
}
// Materialize all bytes fields into byte[]-backed ByteStrings so
// the response is independent of the Netty inbound buffer, then
// release the underlying buffer back to the pool.
try {
return ContainerCommandResponseProto.parseFrom(resp.toByteArray());
} catch (InvalidProtocolBufferException e) {
throw Status.INTERNAL.withDescription("Failed to materialize response")
.withCause(e).asRuntimeException();
} finally {
ZERO_COPY_RESPONSE_MARSHALLER.release(resp);
}
}
};

/**
* gRPC interceptor that installs {@link #RESPONSE_MARSHALLER} as the
* response marshaller for the {@code send} bidi-streaming method of the
* dedicated zero-copy stub.
*/
private static final ClientInterceptor ZERO_COPY_INTERCEPTOR =
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
if (XceiverClientProtocolServiceGrpc.getSendMethod()
.getFullMethodName().equals(method.getFullMethodName())) {
@SuppressWarnings("unchecked")
final MethodDescriptor.Marshaller<RespT> respMarshaller =
(MethodDescriptor.Marshaller<RespT>) RESPONSE_MARSHALLER;
method = method.toBuilder()
.setResponseMarshaller(respMarshaller)
.build();
}
return next.newCall(method, callOptions);
}
};

private final Pipeline pipeline;
private final ConfigurationSource config;
private final XceiverClientMetrics metrics;
Expand Down Expand Up @@ -196,7 +282,10 @@ private ChannelInfo generateNewChannel(DatanodeDetails dn) throws IOException {

ManagedChannel channel = createChannel(dn, port).build();
XceiverClientProtocolServiceStub stub = XceiverClientProtocolServiceGrpc.newStub(channel);
return new ChannelInfo(channel, stub);
XceiverClientProtocolServiceStub zeroCopyStub =
XceiverClientProtocolServiceGrpc.newStub(
ClientInterceptors.intercept(channel, ZERO_COPY_INTERCEPTOR));
return new ChannelInfo(channel, stub, zeroCopyStub);
}

protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port)
Expand Down Expand Up @@ -305,18 +394,7 @@ public Pipeline getPipeline() {
@Override
public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException {
try {
return sendCommandWithTraceIDAndRetry(request, null).
getResponse().get();
} catch (ExecutionException e) {
throw getIOExceptionForSendCommand(request, e);
} catch (InterruptedException e) {
LOG.error("Command execution was interrupted.");
Thread.currentThread().interrupt();
throw (IOException) new InterruptedIOException(
"Command " + processForDebug(request) + " was interrupted.")
.initCause(e);
}
return sendCommandAndWait(request, null, false);
}

@Override
Expand Down Expand Up @@ -396,8 +474,22 @@ private ContainerCommandRequestProto reconstructRequestIfNeeded(
public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request, List<Validator> validators)
throws IOException {
return sendCommandAndWait(request, validators, false);
}

@Override
public ContainerCommandResponseProto sendCommandWithZeroCopy(
ContainerCommandRequestProto request, List<Validator> validators)
throws IOException {
return sendCommandAndWait(request, validators, true);
}

private ContainerCommandResponseProto sendCommandAndWait(
ContainerCommandRequestProto request, List<Validator> validators,
boolean zeroCopy) throws IOException {
try {
XceiverClientReply reply = sendCommandWithTraceIDAndRetry(request, validators);
XceiverClientReply reply =
sendCommandWithTraceIDAndRetry(request, validators, zeroCopy);
return reply.getResponse().get();
} catch (ExecutionException e) {
throw getIOExceptionForSendCommand(request, e);
Expand All @@ -411,7 +503,8 @@ public ContainerCommandResponseProto sendCommand(
}

private XceiverClientReply sendCommandWithTraceIDAndRetry(
ContainerCommandRequestProto request, List<Validator> validators)
ContainerCommandRequestProto request, List<Validator> validators,
boolean zeroCopy)
throws IOException {

String spanName = "XceiverClientGrpc." + request.getCmdType().name();
Expand All @@ -424,7 +517,7 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry(
if (!request.hasVersion()) {
builder.setVersion(ClientVersion.CURRENT.toProtoValue());
}
return sendCommandWithRetry(builder.build(), validators);
return sendCommandWithRetry(builder.build(), validators, zeroCopy);
});
}

Expand Down Expand Up @@ -489,7 +582,8 @@ private static DatanodeBlockID getRequestBlockID(ContainerCommandRequestProto re
}

private XceiverClientReply sendCommandWithRetry(
ContainerCommandRequestProto request, List<Validator> validators)
ContainerCommandRequestProto request, List<Validator> validators,
boolean zeroCopy)
throws IOException {
ContainerCommandResponseProto responseProto = null;
IOException ioException = null;
Expand All @@ -509,7 +603,9 @@ private XceiverClientReply sendCommandWithRetry(
// sendCommandAsyncCall will create a new channel and async stub
// in case these don't exist for the specific datanode.
reply.addDatanode(dn);
responseProto = sendCommandAsync(request, dn).getResponse().get();
responseProto = (zeroCopy
? sendCommandAsync(request, dn, true)
: sendCommandAsync(request, dn)).getResponse().get();
if (validators != null && !validators.isEmpty()) {
for (Validator validator : validators) {
validator.accept(request, responseProto);
Expand All @@ -521,8 +617,11 @@ private XceiverClientReply sendCommandWithRetry(
}
break;
} catch (IOException e) {
if (responseProto != null) {
releaseReceivedResponse(responseProto);
responseProto = null;
}
ioException = e;
responseProto = null;
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to execute command {} on datanode {}",
processForDebug(request), dn, e);
Expand Down Expand Up @@ -686,11 +785,19 @@ protected boolean shouldBlockAndWaitAsyncReply(
public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request, DatanodeDetails dn)
throws IOException, InterruptedException {
return sendCommandAsync(request, dn, false);
}

@VisibleForTesting
protected XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request, DatanodeDetails dn,
boolean zeroCopy)
throws IOException, InterruptedException {
checkOpen(dn);
DatanodeID dnId = dn.getID();
if (LOG.isDebugEnabled()) {
LOG.debug("Send command {} to datanode {}",
request.getCmdType(), dn);
LOG.debug("Send command {} to datanode {}{}",
request.getCmdType(), dn, zeroCopy ? " with zero-copy response" : "");
}
final CompletableFuture<ContainerCommandResponseProto> replyFuture =
new CompletableFuture<>();
Expand All @@ -700,7 +807,9 @@ public XceiverClientReply sendCommandAsync(

// create a new grpc message stream pair for each call.
final StreamObserver<ContainerCommandRequestProto> requestObserver =
dnChannelInfoMap.get(dnId).getStub().withDeadlineAfter(timeout, TimeUnit.SECONDS)
(zeroCopy ? dnChannelInfoMap.get(dnId).getZeroCopyStub()
: dnChannelInfoMap.get(dnId).getStub())
.withDeadlineAfter(timeout, TimeUnit.SECONDS)
.send(new StreamObserver<ContainerCommandResponseProto>() {
@Override
public void onNext(ContainerCommandResponseProto value) {
Expand Down Expand Up @@ -766,16 +875,28 @@ public void setTimeout(long timeout) {
this.timeout = timeout;
}

@Override
public void releaseReceivedResponse(ContainerCommandResponseProto response) {
if (response != null) {
// Idempotent: the marshaller no-ops if `response` was never tracked
// (i.e., parsed via the standard fallback path).
ZERO_COPY_RESPONSE_MARSHALLER.release(response);
}
}

/**
* Group the channel and stub so that they are published together.
*/
private static class ChannelInfo {
private final ManagedChannel channel;
private final XceiverClientProtocolServiceStub stub;
private final XceiverClientProtocolServiceStub zeroCopyStub;

ChannelInfo(ManagedChannel channel, XceiverClientProtocolServiceStub stub) {
ChannelInfo(ManagedChannel channel, XceiverClientProtocolServiceStub stub,
XceiverClientProtocolServiceStub zeroCopyStub) {
this.channel = channel;
this.stub = stub;
this.zeroCopyStub = zeroCopyStub;
}

public ManagedChannel getChannel() {
Expand All @@ -786,6 +907,10 @@ public XceiverClientProtocolServiceStub getStub() {
return stub;
}

public XceiverClientProtocolServiceStub getZeroCopyStub() {
return zeroCopyStub;
}

public boolean isChannelInactive() {
return channel == null
|| channel.isTerminated()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ public class ChunkInputStream extends InputStream
// retry. Once the chunk is read, this variable is reset.
private long chunkPosition = -1;

// Outer ContainerCommandResponseProto whose chunk data the current `buffers`
// view. Held until releaseBuffers() so the underlying zero-copy-tracked
// Netty buffer is not freed while the buffers are still in use.
// Null on the standard (non-zero-copy) parse path - the proto's bytes
// are heap-backed and need no explicit release.
private ContainerCommandResponseProto lastResponseToRelease;

private final Supplier<Token<?>> tokenSupplier;

private static final int EOF = -1;
Expand Down Expand Up @@ -431,14 +438,25 @@ private void readChunkDataIntoBuffers(ChunkInfo readChunkInfo)

/**
* Send RPC call to get the chunk from the container.
* <p>
* On the zero-copy path (HDDS-10283), the returned {@link ByteBuffer}s
* view the underlying inbound Netty buffer of the response. Those bytes
* stay reachable until {@link #releaseBuffers()} releases them via
* {@link XceiverClientSpi#releaseReceivedResponse}, which this method
* registers as a side effect through {@link #lastResponseToRelease}.
*/
@VisibleForTesting
protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
throws IOException {

ReadChunkResponseProto readChunkResponse =
ContainerProtocolCalls.readChunk(xceiverClient, readChunkInfo, datanodeBlockID, validators,
tokenSupplier.get());
ContainerCommandResponseProto reply =
ContainerProtocolCalls.readChunkForZeroCopy(xceiverClient, readChunkInfo,
datanodeBlockID, validators, tokenSupplier.get());
// The zero-copy-tracked buffer for the chunk data lives inside `reply`.
// Stash the outer reference so releaseBuffers() can return it once the
// chunk bytes have been fully consumed.
setLastResponseToRelease(reply);
final ReadChunkResponseProto readChunkResponse = reply.getReadChunk();

if (readChunkResponse.hasData()) {
return readChunkResponse.getData().asReadOnlyByteBufferList()
Expand All @@ -453,6 +471,16 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
}
}

private void setLastResponseToRelease(ContainerCommandResponseProto reply) {
// If a previous response is still being held (e.g., readChunk called
// twice without an intervening releaseBuffers), release the prior one
// first so its Netty buffer goes back to the pool.
if (lastResponseToRelease != null) {
xceiverClient.releaseReceivedResponse(lastResponseToRelease);
}
lastResponseToRelease = reply;
}

private void validateChunk(
ContainerCommandRequestProto request,
ContainerCommandResponseProto response
Expand Down Expand Up @@ -702,6 +730,13 @@ private void releaseBuffers() {
buffers = null;
bufferIndex = 0;
firstUnreleasedBufferIndex = 0;
// Release the zero-copy-tracked Netty buffer of the response that backed
// these ByteBuffers. Idempotent and a no-op if the response was parsed
// via the standard heap-backed path.
if (lastResponseToRelease != null) {
xceiverClient.releaseReceivedResponse(lastResponseToRelease);
lastResponseToRelease = null;
}
// We should not reset bufferOffsetWrtChunkData and buffersSize here
// because when getPos() is called in chunkStreamEOF() we use these
// values and determine whether chunk is read completely or not.
Expand Down
Loading