• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18783

pending completion
#18783

push

github-actions

web-flow
util: Outlier detection tracer delegation (#10459) (#10483)

OutlierDetectionLoadBalancer did not delegate calls to an existing
ClientStreamTracer from the tracer it installed. This change has the OD
tracer delegate all calls to the underlying one.

30639 of 34707 relevant lines covered (88.28%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.32
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpServerStream.java
1
/*
2
 * Copyright 2022 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import com.google.common.base.Preconditions;
20
import io.grpc.Attributes;
21
import io.grpc.Metadata;
22
import io.grpc.Status;
23
import io.grpc.internal.AbstractServerStream;
24
import io.grpc.internal.StatsTraceContext;
25
import io.grpc.internal.TransportTracer;
26
import io.grpc.internal.WritableBuffer;
27
import io.grpc.okhttp.internal.framed.ErrorCode;
28
import io.grpc.okhttp.internal.framed.Header;
29
import io.perfmark.PerfMark;
30
import io.perfmark.Tag;
31
import io.perfmark.TaskCloseable;
32
import java.util.List;
33
import javax.annotation.concurrent.GuardedBy;
34
import okio.Buffer;
35

36
/**
37
 * Server stream for the okhttp transport.
38
 */
39
class OkHttpServerStream extends AbstractServerStream {
40
  private final String authority;
41
  private final TransportState state;
42
  private final Sink sink = new Sink();
1✔
43
  private final TransportTracer transportTracer;
44
  private final Attributes attributes;
45

46
  public OkHttpServerStream(
47
      TransportState state,
48
      Attributes transportAttrs,
49
      String authority,
50
      StatsTraceContext statsTraceCtx,
51
      TransportTracer transportTracer) {
52
    super(new OkHttpWritableBufferAllocator(), statsTraceCtx);
1✔
53
    this.state = Preconditions.checkNotNull(state, "state");
1✔
54
    this.attributes = Preconditions.checkNotNull(transportAttrs, "transportAttrs");
1✔
55
    this.authority = authority;
1✔
56
    this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
1✔
57
  }
1✔
58

59
  @Override
60
  protected TransportState transportState() {
61
    return state;
1✔
62
  }
63

64
  @Override
65
  protected Sink abstractServerStreamSink() {
66
    return sink;
1✔
67
  }
68

69
  @Override
70
  public int streamId() {
71
    return state.streamId;
1✔
72
  }
73

74
  @Override
75
  public String getAuthority() {
76
    return authority;
1✔
77
  }
78

79
  @Override
80
  public Attributes getAttributes() {
81
    return attributes;
1✔
82
  }
83

84
  class Sink implements AbstractServerStream.Sink {
1✔
85
    @Override
86
    public void writeHeaders(Metadata metadata) {
87
      try (TaskCloseable ignore =
1✔
88
               PerfMark.traceTask("OkHttpServerStream$Sink.writeHeaders")) {
1✔
89
        List<Header> responseHeaders = Headers.createResponseHeaders(metadata);
1✔
90
        synchronized (state.lock) {
1✔
91
          state.sendHeaders(responseHeaders);
1✔
92
        }
1✔
93
      }
94
    }
1✔
95

96
    @Override
97
    public void writeFrame(WritableBuffer frame, boolean flush, int numMessages) {
98
      try (TaskCloseable ignore =
1✔
99
               PerfMark.traceTask("OkHttpServerStream$Sink.writeFrame")) {
1✔
100
        Buffer buffer = ((OkHttpWritableBuffer) frame).buffer();
1✔
101
        int size = (int) buffer.size();
1✔
102
        if (size > 0) {
1✔
103
          onSendingBytes(size);
1✔
104
        }
105
        synchronized (state.lock) {
1✔
106
          state.sendBuffer(buffer, flush);
1✔
107
          transportTracer.reportMessageSent(numMessages);
1✔
108
        }
1✔
109
      }
110
    }
1✔
111

112
    @Override
113
    public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
114
      try (TaskCloseable ignore =
1✔
115
               PerfMark.traceTask("OkHttpServerStream$Sink.writeTrailers")) {
1✔
116
        List<Header> responseTrailers = Headers.createResponseTrailers(trailers, headersSent);
1✔
117
        synchronized (state.lock) {
1✔
118
          state.sendTrailers(responseTrailers);
1✔
119
        }
1✔
120
      }
121
    }
1✔
122

123
    @Override
124
    public void cancel(Status reason) {
125
      try (TaskCloseable ignore =
1✔
126
               PerfMark.traceTask("OkHttpServerStream$Sink.cancel")) {
1✔
127
        synchronized (state.lock) {
1✔
128
          state.cancel(ErrorCode.CANCEL, reason);
1✔
129
        }
1✔
130
      }
131
    }
1✔
132
  }
133

134
  static class TransportState extends AbstractServerStream.TransportState
135
      implements OutboundFlowController.Stream, OkHttpServerTransport.StreamState {
136
    @GuardedBy("lock")
137
    private final OkHttpServerTransport transport;
138
    private final int streamId;
139
    private final int initialWindowSize;
140
    private final Object lock;
141
    @GuardedBy("lock")
1✔
142
    private boolean cancelSent = false;
143
    @GuardedBy("lock")
144
    private int window;
145
    @GuardedBy("lock")
146
    private int processedWindow;
147
    @GuardedBy("lock")
148
    private final ExceptionHandlingFrameWriter frameWriter;
149
    @GuardedBy("lock")
150
    private final OutboundFlowController outboundFlow;
151
    @GuardedBy("lock")
152
    private boolean receivedEndOfStream;
153
    private final Tag tag;
154
    private final OutboundFlowController.StreamState outboundFlowState;
155

156
    public TransportState(
157
        OkHttpServerTransport transport,
158
        int streamId,
159
        int maxMessageSize,
160
        StatsTraceContext statsTraceCtx,
161
        Object lock,
162
        ExceptionHandlingFrameWriter frameWriter,
163
        OutboundFlowController outboundFlow,
164
        int initialWindowSize,
165
        TransportTracer transportTracer,
166
        String methodName) {
167
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
168
      this.transport = Preconditions.checkNotNull(transport, "transport");
1✔
169
      this.streamId = streamId;
1✔
170
      this.lock = Preconditions.checkNotNull(lock, "lock");
1✔
171
      this.frameWriter = frameWriter;
1✔
172
      this.outboundFlow = outboundFlow;
1✔
173
      this.window = initialWindowSize;
1✔
174
      this.processedWindow = initialWindowSize;
1✔
175
      this.initialWindowSize = initialWindowSize;
1✔
176
      tag = PerfMark.createTag(methodName);
1✔
177
      outboundFlowState = outboundFlow.createState(this, streamId);
1✔
178
    }
1✔
179

180
    @Override
181
    @GuardedBy("lock")
182
    public void deframeFailed(Throwable cause) {
183
      cancel(ErrorCode.INTERNAL_ERROR, Status.fromThrowable(cause));
×
184
    }
×
185

186
    @Override
187
    @GuardedBy("lock")
188
    public void bytesRead(int processedBytes) {
189
      processedWindow -= processedBytes;
1✔
190
      if (processedWindow <= initialWindowSize * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
191
        int delta = initialWindowSize - processedWindow;
1✔
192
        window += delta;
1✔
193
        processedWindow += delta;
1✔
194
        frameWriter.windowUpdate(streamId, delta);
1✔
195
        frameWriter.flush();
1✔
196
      }
197
    }
1✔
198

199
    @Override
200
    @GuardedBy("lock")
201
    public void runOnTransportThread(final Runnable r) {
202
      synchronized (lock) {
1✔
203
        r.run();
1✔
204
      }
1✔
205
    }
1✔
206

207
    /**
208
     * Must be called with holding the transport lock.
209
     */
210
    @Override
211
    public void inboundDataReceived(okio.Buffer frame, int windowConsumed, boolean endOfStream) {
212
      synchronized (lock) {
1✔
213
        PerfMark.event("OkHttpServerTransport$FrameHandler.data", tag);
1✔
214
        if (endOfStream) {
1✔
215
          this.receivedEndOfStream = true;
1✔
216
        }
217
        window -= windowConsumed;
1✔
218
        super.inboundDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
1✔
219
      }
1✔
220
    }
1✔
221

222
    /** Must be called with holding the transport lock. */
223
    @Override
224
    public void inboundRstReceived(Status status) {
225
      PerfMark.event("OkHttpServerTransport$FrameHandler.rstStream", tag);
1✔
226
      transportReportStatus(status);
1✔
227
    }
1✔
228

229
    /** Must be called with holding the transport lock. */
230
    @Override
231
    public boolean hasReceivedEndOfStream() {
232
      synchronized (lock) {
1✔
233
        return receivedEndOfStream;
1✔
234
      }
235
    }
236

237
    /** Must be called with holding the transport lock. */
238
    @Override
239
    public int inboundWindowAvailable() {
240
      synchronized (lock) {
1✔
241
        return window;
1✔
242
      }
243
    }
244

245
    @GuardedBy("lock")
246
    private void sendBuffer(Buffer buffer, boolean flush) {
247
      if (cancelSent) {
1✔
248
        return;
×
249
      }
250
      // If buffer > frameWriter.maxDataLength() the flow-controller will ensure that it is
251
      // properly chunked.
252
      outboundFlow.data(false, outboundFlowState, buffer, flush);
1✔
253
    }
1✔
254

255
    @GuardedBy("lock")
256
    private void sendHeaders(List<Header> responseHeaders) {
257
      frameWriter.synReply(false, streamId, responseHeaders);
1✔
258
      frameWriter.flush();
1✔
259
    }
1✔
260

261
    @GuardedBy("lock")
262
    private void sendTrailers(List<Header> responseTrailers) {
263
      outboundFlow.notifyWhenNoPendingData(
1✔
264
          outboundFlowState, () -> sendTrailersAfterFlowControlled(responseTrailers));
1✔
265
    }
1✔
266

267
    private void sendTrailersAfterFlowControlled(List<Header> responseTrailers) {
268
      synchronized (lock) {
1✔
269
        frameWriter.synReply(true, streamId, responseTrailers);
1✔
270
        if (!receivedEndOfStream) {
1✔
271
          frameWriter.rstStream(streamId, ErrorCode.NO_ERROR);
1✔
272
        }
273
        transport.streamClosed(streamId, /*flush=*/ true);
1✔
274
        complete();
1✔
275
      }
1✔
276
    }
1✔
277

278
    @GuardedBy("lock")
279
    private void cancel(ErrorCode http2Error, Status reason) {
280
      if (cancelSent) {
1✔
281
        return;
1✔
282
      }
283
      cancelSent = true;
1✔
284
      frameWriter.rstStream(streamId, http2Error);
1✔
285
      transportReportStatus(reason);
1✔
286
      transport.streamClosed(streamId, /*flush=*/ true);
1✔
287
    }
1✔
288

289
    @Override
290
    public OutboundFlowController.StreamState getOutboundFlowState() {
291
      return outboundFlowState;
1✔
292
    }
293
  }
294
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc