• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19368

22 Jul 2024 11:35PM CUT coverage: 84.455% (+0.01%) from 84.443%
#19368

push

github

ejona86
examples: For Bazel, remove compat repo for maven_install

It hasn't been needed since 0064991. In that commit the main WORKSPACE
was cleaned up, but not the examples.

33234 of 39351 relevant lines covered (84.46%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.35
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpServerStream.java
1
/*
2
 * Copyright 2022 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import com.google.common.base.Preconditions;
20
import io.grpc.Attributes;
21
import io.grpc.Metadata;
22
import io.grpc.Status;
23
import io.grpc.internal.AbstractServerStream;
24
import io.grpc.internal.StatsTraceContext;
25
import io.grpc.internal.TransportTracer;
26
import io.grpc.internal.WritableBuffer;
27
import io.grpc.okhttp.internal.framed.ErrorCode;
28
import io.grpc.okhttp.internal.framed.Header;
29
import io.perfmark.PerfMark;
30
import io.perfmark.Tag;
31
import io.perfmark.TaskCloseable;
32
import java.util.List;
33
import javax.annotation.concurrent.GuardedBy;
34
import okio.Buffer;
35

36
/**
37
 * Server stream for the okhttp transport.
38
 */
39
class OkHttpServerStream extends AbstractServerStream {
40
  private final String authority;
41
  private final TransportState state;
42
  private final Sink sink = new Sink();
1✔
43
  private final TransportTracer transportTracer;
44
  private final Attributes attributes;
45

46
  public OkHttpServerStream(
47
      TransportState state,
48
      Attributes transportAttrs,
49
      String authority,
50
      StatsTraceContext statsTraceCtx,
51
      TransportTracer transportTracer) {
52
    super(new OkHttpWritableBufferAllocator(), statsTraceCtx);
1✔
53
    this.state = Preconditions.checkNotNull(state, "state");
1✔
54
    this.attributes = Preconditions.checkNotNull(transportAttrs, "transportAttrs");
1✔
55
    this.authority = authority;
1✔
56
    this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
1✔
57
  }
1✔
58

59
  @Override
60
  protected TransportState transportState() {
61
    return state;
1✔
62
  }
63

64
  @Override
65
  protected Sink abstractServerStreamSink() {
66
    return sink;
1✔
67
  }
68

69
  @Override
70
  public int streamId() {
71
    return state.streamId;
1✔
72
  }
73

74
  @Override
75
  public String getAuthority() {
76
    return authority;
1✔
77
  }
78

79
  @Override
80
  public Attributes getAttributes() {
81
    return attributes;
1✔
82
  }
83

84
  class Sink implements AbstractServerStream.Sink {
1✔
85
    @Override
86
    public void writeHeaders(Metadata metadata, boolean flush) {
87
      try (TaskCloseable ignore =
1✔
88
               PerfMark.traceTask("OkHttpServerStream$Sink.writeHeaders")) {
1✔
89
        List<Header> responseHeaders = Headers.createResponseHeaders(metadata);
1✔
90
        synchronized (state.lock) {
1✔
91
          state.sendHeaders(responseHeaders);
1✔
92
        }
1✔
93
      }
94
    }
1✔
95

96
    @Override
97
    public void writeFrame(WritableBuffer frame, boolean flush, int numMessages) {
98
      try (TaskCloseable ignore =
1✔
99
               PerfMark.traceTask("OkHttpServerStream$Sink.writeFrame")) {
1✔
100
        Buffer buffer = ((OkHttpWritableBuffer) frame).buffer();
1✔
101
        int size = (int) buffer.size();
1✔
102
        if (size > 0) {
1✔
103
          onSendingBytes(size);
1✔
104
        }
105
        synchronized (state.lock) {
1✔
106
          state.sendBuffer(buffer, flush);
1✔
107
          transportTracer.reportMessageSent(numMessages);
1✔
108
        }
1✔
109
      }
110
    }
1✔
111

112
    @Override
113
    public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
114
      try (TaskCloseable ignore =
1✔
115
               PerfMark.traceTask("OkHttpServerStream$Sink.writeTrailers")) {
1✔
116
        List<Header> responseTrailers = Headers.createResponseTrailers(trailers, headersSent);
1✔
117
        synchronized (state.lock) {
1✔
118
          state.sendTrailers(responseTrailers);
1✔
119
        }
1✔
120
      }
121
    }
1✔
122

123
    @Override
124
    public void cancel(Status reason) {
125
      try (TaskCloseable ignore =
1✔
126
               PerfMark.traceTask("OkHttpServerStream$Sink.cancel")) {
1✔
127
        synchronized (state.lock) {
1✔
128
          state.cancel(ErrorCode.CANCEL, reason);
1✔
129
        }
1✔
130
      }
131
    }
1✔
132
  }
133

134
  static class TransportState extends AbstractServerStream.TransportState
135
      implements OutboundFlowController.Stream, OkHttpServerTransport.StreamState {
136
    @GuardedBy("lock")
137
    private final OkHttpServerTransport transport;
138
    private final int streamId;
139
    private final int initialWindowSize;
140
    private final Object lock;
141
    @GuardedBy("lock")
1✔
142
    private boolean cancelSent = false;
143
    @GuardedBy("lock")
144
    private int window;
145
    @GuardedBy("lock")
146
    private int processedWindow;
147
    @GuardedBy("lock")
148
    private final ExceptionHandlingFrameWriter frameWriter;
149
    @GuardedBy("lock")
150
    private final OutboundFlowController outboundFlow;
151
    @GuardedBy("lock")
152
    private boolean receivedEndOfStream;
153
    private final Tag tag;
154
    private final OutboundFlowController.StreamState outboundFlowState;
155

156
    public TransportState(
157
        OkHttpServerTransport transport,
158
        int streamId,
159
        int maxMessageSize,
160
        StatsTraceContext statsTraceCtx,
161
        Object lock,
162
        ExceptionHandlingFrameWriter frameWriter,
163
        OutboundFlowController outboundFlow,
164
        int initialWindowSize,
165
        TransportTracer transportTracer,
166
        String methodName) {
167
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
168
      this.transport = Preconditions.checkNotNull(transport, "transport");
1✔
169
      this.streamId = streamId;
1✔
170
      this.lock = Preconditions.checkNotNull(lock, "lock");
1✔
171
      this.frameWriter = frameWriter;
1✔
172
      this.outboundFlow = outboundFlow;
1✔
173
      this.window = initialWindowSize;
1✔
174
      this.processedWindow = initialWindowSize;
1✔
175
      this.initialWindowSize = initialWindowSize;
1✔
176
      tag = PerfMark.createTag(methodName);
1✔
177
      outboundFlowState = outboundFlow.createState(this, streamId);
1✔
178
    }
1✔
179

180
    @Override
181
    @GuardedBy("lock")
182
    public void deframeFailed(Throwable cause) {
183
      cancel(ErrorCode.INTERNAL_ERROR, Status.fromThrowable(cause));
×
184
    }
×
185

186
    @Override
187
    @GuardedBy("lock")
188
    public void bytesRead(int processedBytes) {
189
      processedWindow -= processedBytes;
1✔
190
      if (processedWindow <= initialWindowSize * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
191
        int delta = initialWindowSize - processedWindow;
1✔
192
        window += delta;
1✔
193
        processedWindow += delta;
1✔
194
        frameWriter.windowUpdate(streamId, delta);
1✔
195
        frameWriter.flush();
1✔
196
      }
197
    }
1✔
198

199
    @Override
200
    @GuardedBy("lock")
201
    public void runOnTransportThread(final Runnable r) {
202
      synchronized (lock) {
1✔
203
        r.run();
1✔
204
      }
1✔
205
    }
1✔
206

207
    /**
208
     * Must be called with holding the transport lock.
209
     */
210
    @Override
211
    public void inboundDataReceived(okio.Buffer frame, int dataLength, int paddingLength,
212
                                    boolean endOfStream) {
213
      synchronized (lock) {
1✔
214
        PerfMark.event("OkHttpServerTransport$FrameHandler.data", tag);
1✔
215
        if (endOfStream) {
1✔
216
          this.receivedEndOfStream = true;
1✔
217
        }
218
        window -= dataLength + paddingLength;
1✔
219
        processedWindow -= paddingLength;
1✔
220
        super.inboundDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
1✔
221
      }
1✔
222
    }
1✔
223

224
    /** Must be called with holding the transport lock. */
225
    @Override
226
    public void inboundRstReceived(Status status) {
227
      PerfMark.event("OkHttpServerTransport$FrameHandler.rstStream", tag);
1✔
228
      transportReportStatus(status);
1✔
229
    }
1✔
230

231
    /** Must be called with holding the transport lock. */
232
    @Override
233
    public boolean hasReceivedEndOfStream() {
234
      synchronized (lock) {
1✔
235
        return receivedEndOfStream;
1✔
236
      }
237
    }
238

239
    /** Must be called with holding the transport lock. */
240
    @Override
241
    public int inboundWindowAvailable() {
242
      synchronized (lock) {
1✔
243
        return window;
1✔
244
      }
245
    }
246

247
    @GuardedBy("lock")
248
    private void sendBuffer(Buffer buffer, boolean flush) {
249
      if (cancelSent) {
1✔
250
        return;
×
251
      }
252
      // If buffer > frameWriter.maxDataLength() the flow-controller will ensure that it is
253
      // properly chunked.
254
      outboundFlow.data(false, outboundFlowState, buffer, flush);
1✔
255
    }
1✔
256

257
    @GuardedBy("lock")
258
    private void sendHeaders(List<Header> responseHeaders) {
259
      frameWriter.synReply(false, streamId, responseHeaders);
1✔
260
      frameWriter.flush();
1✔
261
    }
1✔
262

263
    @GuardedBy("lock")
264
    private void sendTrailers(List<Header> responseTrailers) {
265
      outboundFlow.notifyWhenNoPendingData(
1✔
266
          outboundFlowState, () -> sendTrailersAfterFlowControlled(responseTrailers));
1✔
267
    }
1✔
268

269
    private void sendTrailersAfterFlowControlled(List<Header> responseTrailers) {
270
      synchronized (lock) {
1✔
271
        frameWriter.synReply(true, streamId, responseTrailers);
1✔
272
        if (!receivedEndOfStream) {
1✔
273
          frameWriter.rstStream(streamId, ErrorCode.NO_ERROR);
1✔
274
        }
275
        transport.streamClosed(streamId, /*flush=*/ true);
1✔
276
        complete();
1✔
277
      }
1✔
278
    }
1✔
279

280
    @GuardedBy("lock")
281
    private void cancel(ErrorCode http2Error, Status reason) {
282
      if (cancelSent) {
1✔
283
        return;
1✔
284
      }
285
      cancelSent = true;
1✔
286
      frameWriter.rstStream(streamId, http2Error);
1✔
287
      transportReportStatus(reason);
1✔
288
      transport.streamClosed(streamId, /*flush=*/ true);
1✔
289
    }
1✔
290

291
    @Override
292
    public OutboundFlowController.StreamState getOutboundFlowState() {
293
      return outboundFlowState;
1✔
294
    }
295
  }
296
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc