Skip to content

Commit 3801961

Browse files
authored
fix: fix client blocking latency (#2346)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) - [ ] Rollback plan is reviewed and LGTMed - [ ] All new data plane features have a completed end to end testing plan Fixes #<issue_number_goes_here> ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
1 parent be62968 commit 3801961

File tree

6 files changed

+49
-44
lines changed

6 files changed

+49
-44
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.stub.metrics;
1717

18-
import com.google.common.base.Stopwatch;
19-
import io.grpc.Attributes;
2018
import io.grpc.ClientStreamTracer;
2119
import io.grpc.Metadata;
22-
import java.util.concurrent.TimeUnit;
2320

2421
/**
2522
* Records the time a request is enqueued in a grpc channel queue. This a bridge between gRPC stream
@@ -28,21 +25,15 @@
2825
*/
2926
class BigtableGrpcStreamTracer extends ClientStreamTracer {
3027

31-
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
3228
private final BigtableTracer tracer;
3329

3430
public BigtableGrpcStreamTracer(BigtableTracer tracer) {
3531
this.tracer = tracer;
3632
}
3733

38-
@Override
39-
public void streamCreated(Attributes transportAttrs, Metadata headers) {
40-
stopwatch.start();
41-
}
42-
4334
@Override
4435
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
45-
tracer.grpcChannelQueuedLatencies(stopwatch.elapsed(TimeUnit.NANOSECONDS));
36+
tracer.grpcMessageSent();
4637
}
4738

4839
static class Factory extends ClientStreamTracer.Factory {

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,14 @@ public void setLocations(String zone, String cluster) {
8383
// noop
8484
}
8585

86+
@Deprecated
87+
/** @deprecated {@link #grpcMessageSent()} is called instead. */
8688
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
8789
// noop
8890
}
91+
92+
/** Called when the message is sent on a grpc channel. */
93+
public void grpcMessageSent() {
94+
// noop
95+
}
8996
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ class BuiltinMetricsTracer extends BigtableTracer {
8484
private final Attributes baseAttributes;
8585

8686
private Long serverLatencies = null;
87+
private final AtomicLong grpcMessageSentDelay = new AtomicLong(0);
8788

8889
// OpenCensus (and server) histogram buckets use [start, end), however OpenTelemetry uses (start,
8990
// end]. To work around this, we measure all the latencies in nanoseconds and convert them
@@ -263,8 +264,8 @@ public void batchRequestThrottled(long throttledTimeNanos) {
263264
}
264265

265266
@Override
266-
public void grpcChannelQueuedLatencies(long queuedTimeNanos) {
267-
totalClientBlockingTime.addAndGet(queuedTimeNanos);
267+
public void grpcMessageSent() {
268+
grpcMessageSentDelay.set(attemptTimer.elapsed(TimeUnit.NANOSECONDS));
268269
}
269270

270271
@Override
@@ -351,6 +352,7 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
351352
.put(STATUS_KEY, statusStr)
352353
.build();
353354

355+
totalClientBlockingTime.addAndGet(grpcMessageSentDelay.get());
354356
clientBlockingLatenciesHistogram.record(convertToMs(totalClientBlockingTime.get()), attributes);
355357

356358
attemptLatenciesHistogram.record(

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,4 +225,11 @@ public void grpcChannelQueuedLatencies(long queuedTimeMs) {
225225
tracer.grpcChannelQueuedLatencies(queuedTimeMs);
226226
}
227227
}
228+
229+
@Override
230+
public void grpcMessageSent() {
231+
for (BigtableTracer tracer : bigtableTracers) {
232+
tracer.grpcMessageSent();
233+
}
234+
}
228235
}

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,11 @@
7070
import com.google.protobuf.ByteString;
7171
import com.google.protobuf.BytesValue;
7272
import com.google.protobuf.StringValue;
73-
import io.grpc.CallOptions;
74-
import io.grpc.Channel;
75-
import io.grpc.ClientCall;
76-
import io.grpc.ClientInterceptor;
77-
import io.grpc.ForwardingClientCall;
7873
import io.grpc.ForwardingServerCall;
7974
import io.grpc.ManagedChannelBuilder;
8075
import io.grpc.Metadata;
81-
import io.grpc.MethodDescriptor;
76+
import io.grpc.ProxiedSocketAddress;
77+
import io.grpc.ProxyDetector;
8278
import io.grpc.Server;
8379
import io.grpc.ServerCall;
8480
import io.grpc.ServerCallHandler;
@@ -95,6 +91,8 @@
9591
import io.opentelemetry.sdk.metrics.View;
9692
import io.opentelemetry.sdk.metrics.data.MetricData;
9793
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
94+
import java.io.IOException;
95+
import java.net.SocketAddress;
9896
import java.nio.charset.Charset;
9997
import java.util.ArrayList;
10098
import java.util.Collections;
@@ -104,6 +102,7 @@
104102
import java.util.concurrent.TimeUnit;
105103
import java.util.concurrent.atomic.AtomicBoolean;
106104
import java.util.concurrent.atomic.AtomicInteger;
105+
import javax.annotation.Nullable;
107106
import org.junit.After;
108107
import org.junit.Assert;
109108
import org.junit.Before;
@@ -130,7 +129,7 @@ public class BuiltinMetricsTracerTest {
130129
private static final long SLEEP_VARIABILITY = 15;
131130
private static final String CLIENT_NAME = "java-bigtable/" + Version.VERSION;
132131

133-
private static final long CHANNEL_BLOCKING_LATENCY = 75;
132+
private static final long CHANNEL_BLOCKING_LATENCY = 200;
134133

135134
@Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
136135

@@ -196,35 +195,14 @@ public void sendHeaders(Metadata headers) {
196195
}
197196
};
198197

199-
ClientInterceptor clientInterceptor =
200-
new ClientInterceptor() {
201-
@Override
202-
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
203-
MethodDescriptor<ReqT, RespT> methodDescriptor,
204-
CallOptions callOptions,
205-
Channel channel) {
206-
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
207-
channel.newCall(methodDescriptor, callOptions)) {
208-
@Override
209-
public void sendMessage(ReqT message) {
210-
try {
211-
Thread.sleep(CHANNEL_BLOCKING_LATENCY);
212-
} catch (InterruptedException e) {
213-
throw new RuntimeException(e);
214-
}
215-
super.sendMessage(message);
216-
}
217-
};
218-
}
219-
};
220-
221198
server = FakeServiceBuilder.create(fakeService).intercept(trailersInterceptor).start();
222199

223200
BigtableDataSettings settings =
224201
BigtableDataSettings.newBuilderForEmulator(server.getPort())
225202
.setProjectId(PROJECT_ID)
226203
.setInstanceId(INSTANCE_ID)
227204
.setAppProfileId(APP_PROFILE_ID)
205+
.setRefreshingChannel(false)
228206
.build();
229207
EnhancedBigtableStubSettings.Builder stubSettingsBuilder =
230208
settings.getStubSettings().toBuilder();
@@ -264,7 +242,7 @@ public void sendMessage(ReqT message) {
264242
if (oldConfigurator != null) {
265243
builder = oldConfigurator.apply(builder);
266244
}
267-
return builder.intercept(clientInterceptor);
245+
return builder.proxyDetector(new DelayProxyDetector());
268246
});
269247
stubSettingsBuilder.setTransportChannelProvider(channelProvider.build());
270248

@@ -692,9 +670,8 @@ public void testQueuedOnChannelUnaryLatencies() {
692670
.put(CLIENT_NAME_KEY, CLIENT_NAME)
693671
.build();
694672

695-
long expected = CHANNEL_BLOCKING_LATENCY * 2 / 3;
696673
long actual = getAggregatedValue(clientLatency, attributes);
697-
assertThat(actual).isAtLeast(expected);
674+
assertThat(actual).isAtLeast(CHANNEL_BLOCKING_LATENCY);
698675
}
699676

700677
@Test
@@ -838,4 +815,18 @@ public AtomicInteger getResponseCounter() {
838815
return responseCounter;
839816
}
840817
}
818+
819+
class DelayProxyDetector implements ProxyDetector {
820+
821+
@Nullable
822+
@Override
823+
public ProxiedSocketAddress proxyFor(SocketAddress socketAddress) throws IOException {
824+
try {
825+
Thread.sleep(CHANNEL_BLOCKING_LATENCY);
826+
} catch (InterruptedException e) {
827+
828+
}
829+
return null;
830+
}
831+
}
841832
}

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,4 +258,11 @@ public void testRequestBlockedOnChannel() {
258258
verify(child3, times(1)).grpcChannelQueuedLatencies(5L);
259259
verify(child4, times(1)).grpcChannelQueuedLatencies(5L);
260260
}
261+
262+
@Test
263+
public void testGrpcMessageSent() {
264+
compositeTracer.grpcMessageSent();
265+
verify(child3, times(1)).grpcMessageSent();
266+
verify(child4, times(1)).grpcMessageSent();
267+
}
261268
}

0 commit comments

Comments
 (0)