|
21 | 21 | import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.INSTANCE_ID_KEY; |
22 | 22 |
|
23 | 23 | import com.google.api.core.ApiFuture; |
24 | | -import com.google.api.core.ApiFutures; |
25 | 24 | import com.google.api.core.BetaApi; |
26 | 25 | import com.google.api.core.InternalApi; |
27 | 26 | import com.google.api.gax.batching.Batcher; |
|
112 | 111 | import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; |
113 | 112 | import com.google.cloud.bigtable.data.v2.stub.readrows.LargeReadRowsResumptionStrategy; |
114 | 113 | import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; |
115 | | -import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsFirstCallable; |
116 | 114 | import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; |
117 | 115 | import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable; |
118 | 116 | import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; |
|
131 | 129 | import com.google.common.base.Preconditions; |
132 | 130 | import com.google.common.collect.ImmutableList; |
133 | 131 | import com.google.common.collect.ImmutableMap; |
134 | | -import com.google.common.util.concurrent.MoreExecutors; |
135 | 132 | import com.google.protobuf.ByteString; |
136 | 133 | import io.grpc.MethodDescriptor; |
137 | 134 | import io.opencensus.stats.Stats; |
@@ -404,52 +401,31 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable( |
404 | 401 | */ |
405 | 402 | public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) { |
406 | 403 | ClientContext clientContext = bigtableClientContext.getClientContext(); |
407 | | - if (!settings.getEnableSkipTrailers()) { |
408 | | - ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable = |
409 | | - createReadRowsBaseCallable( |
410 | | - ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder() |
411 | | - .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) |
412 | | - .setRetrySettings(settings.readRowSettings().getRetrySettings()) |
413 | | - .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) |
414 | | - .build(), |
415 | | - rowAdapter); |
416 | | - |
417 | | - ReadRowsUserCallable<RowT> readRowCallable = |
418 | | - new ReadRowsUserCallable<>(readRowsCallable, requestContext); |
419 | | - ReadRowsFirstCallable<RowT> firstRow = new ReadRowsFirstCallable<>(readRowCallable); |
420 | | - UnaryCallable<Query, RowT> traced = |
421 | | - new TracedUnaryCallable<>( |
422 | | - firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow")); |
423 | | - return traced.withDefaultCallContext( |
424 | | - clientContext |
425 | | - .getDefaultCallContext() |
426 | | - .withRetrySettings(settings.readRowSettings().getRetrySettings())); |
427 | | - } else { |
428 | | - ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable = |
429 | | - createReadRowsBaseCallable( |
430 | | - ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder() |
431 | | - .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) |
432 | | - .setRetrySettings(settings.readRowSettings().getRetrySettings()) |
433 | | - .setIdleTimeoutDuration(Duration.ZERO) |
434 | | - .setWaitTimeoutDuration(Duration.ZERO) |
435 | | - .build(), |
436 | | - rowAdapter, |
437 | | - new SimpleStreamResumptionStrategy<>()); |
438 | | - ServerStreamingCallable<Query, RowT> readRowCallable = |
439 | | - new TransformingServerStreamingCallable<>( |
440 | | - readRowsCallable, |
441 | | - (query) -> query.limit(1).toProto(requestContext), |
442 | | - Functions.identity()); |
443 | | - |
444 | | - return new BigtableUnaryOperationCallable<>( |
445 | | - readRowCallable, |
446 | | - clientContext |
447 | | - .getDefaultCallContext() |
448 | | - .withRetrySettings(settings.readRowSettings().getRetrySettings()), |
449 | | - clientContext.getTracerFactory(), |
450 | | - getSpanName("ReadRow"), |
451 | | - /* allowNoResponses= */ true); |
452 | | - } |
| 404 | + |
| 405 | + ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable = |
| 406 | + createReadRowsBaseCallable( |
| 407 | + ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder() |
| 408 | + .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) |
| 409 | + .setRetrySettings(settings.readRowSettings().getRetrySettings()) |
| 410 | + .setIdleTimeoutDuration(Duration.ZERO) |
| 411 | + .setWaitTimeoutDuration(Duration.ZERO) |
| 412 | + .build(), |
| 413 | + rowAdapter, |
| 414 | + new SimpleStreamResumptionStrategy<>()); |
| 415 | + ServerStreamingCallable<Query, RowT> readRowCallable = |
| 416 | + new TransformingServerStreamingCallable<>( |
| 417 | + readRowsCallable, |
| 418 | + (query) -> query.limit(1).toProto(requestContext), |
| 419 | + Functions.identity()); |
| 420 | + |
| 421 | + return new BigtableUnaryOperationCallable<>( |
| 422 | + readRowCallable, |
| 423 | + clientContext |
| 424 | + .getDefaultCallContext() |
| 425 | + .withRetrySettings(settings.readRowSettings().getRetrySettings()), |
| 426 | + clientContext.getTracerFactory(), |
| 427 | + getSpanName("ReadRow"), |
| 428 | + /* allowNoResponses= */ true); |
453 | 429 | } |
454 | 430 |
|
455 | 431 | private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRowsBaseCallable( |
@@ -1292,67 +1268,6 @@ private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnar |
1292 | 1268 | UnaryCallSettings<ReqT, RespT> callSettings, |
1293 | 1269 | Function<ReqT, BaseReqT> requestTransformer, |
1294 | 1270 | Function<BaseRespT, RespT> responseTranformer) { |
1295 | | - if (settings.getEnableSkipTrailers()) { |
1296 | | - return createUnaryCallableNew( |
1297 | | - methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer); |
1298 | | - } else { |
1299 | | - return createUnaryCallableOld( |
1300 | | - methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer); |
1301 | | - } |
1302 | | - } |
1303 | | - |
1304 | | - private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnaryCallableOld( |
1305 | | - MethodDescriptor<BaseReqT, BaseRespT> methodDescriptor, |
1306 | | - RequestParamsExtractor<BaseReqT> headerParamsFn, |
1307 | | - UnaryCallSettings<ReqT, RespT> callSettings, |
1308 | | - Function<ReqT, BaseReqT> requestTransformer, |
1309 | | - Function<BaseRespT, RespT> responseTranformer) { |
1310 | | - |
1311 | | - UnaryCallable<BaseReqT, BaseRespT> base = |
1312 | | - GrpcRawCallableFactory.createUnaryCallable( |
1313 | | - GrpcCallSettings.<BaseReqT, BaseRespT>newBuilder() |
1314 | | - .setMethodDescriptor(methodDescriptor) |
1315 | | - .setParamsExtractor(headerParamsFn) |
1316 | | - .build(), |
1317 | | - callSettings.getRetryableCodes()); |
1318 | | - |
1319 | | - UnaryCallable<BaseReqT, BaseRespT> withStatsHeaders = new StatsHeadersUnaryCallable<>(base); |
1320 | | - |
1321 | | - UnaryCallable<BaseReqT, BaseRespT> withBigtableTracer = |
1322 | | - new BigtableTracerUnaryCallable<>(withStatsHeaders); |
1323 | | - |
1324 | | - UnaryCallable<BaseReqT, BaseRespT> retrying = withRetries(withBigtableTracer, callSettings); |
1325 | | - |
1326 | | - UnaryCallable<ReqT, RespT> transformed = |
1327 | | - new UnaryCallable<ReqT, RespT>() { |
1328 | | - @Override |
1329 | | - public ApiFuture<RespT> futureCall(ReqT reqT, ApiCallContext apiCallContext) { |
1330 | | - ApiFuture<BaseRespT> f = |
1331 | | - retrying.futureCall(requestTransformer.apply(reqT), apiCallContext); |
1332 | | - return ApiFutures.transform( |
1333 | | - f, responseTranformer::apply, MoreExecutors.directExecutor()); |
1334 | | - } |
1335 | | - }; |
1336 | | - |
1337 | | - UnaryCallable<ReqT, RespT> traced = |
1338 | | - new TracedUnaryCallable<>( |
1339 | | - transformed, |
1340 | | - bigtableClientContext.getClientContext().getTracerFactory(), |
1341 | | - getSpanName(methodDescriptor.getBareMethodName())); |
1342 | | - |
1343 | | - return traced.withDefaultCallContext( |
1344 | | - bigtableClientContext |
1345 | | - .getClientContext() |
1346 | | - .getDefaultCallContext() |
1347 | | - .withRetrySettings(callSettings.getRetrySettings())); |
1348 | | - } |
1349 | | - |
1350 | | - private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnaryCallableNew( |
1351 | | - MethodDescriptor<BaseReqT, BaseRespT> methodDescriptor, |
1352 | | - RequestParamsExtractor<BaseReqT> headerParamsFn, |
1353 | | - UnaryCallSettings<ReqT, RespT> callSettings, |
1354 | | - Function<ReqT, BaseReqT> requestTransformer, |
1355 | | - Function<BaseRespT, RespT> responseTranformer) { |
1356 | 1271 |
|
1357 | 1272 | ServerStreamingCallable<BaseReqT, BaseRespT> base = |
1358 | 1273 | GrpcRawCallableFactory.createServerStreamingCallable( |
|
0 commit comments