• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18940

12 Dec 2023 10:50PM CUT coverage: 88.309% (+0.005%) from 88.304%
#18940

push

github

web-flow
core: de-expermentalize pick first config parsing (#10531) (#10742)

Co-authored-by: Terry Wilson <tmwilson@google.com>

30336 of 34352 relevant lines covered (88.31%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.02
/../servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.servlet;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.servlet.ServletServerStream.toHexString;
21
import static java.util.logging.Level.FINE;
22
import static java.util.logging.Level.FINEST;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import io.grpc.InternalLogId;
26
import io.grpc.servlet.ServletServerStream.ServletTransportState;
27
import java.io.IOException;
28
import java.time.Duration;
29
import java.util.Queue;
30
import java.util.concurrent.ConcurrentLinkedQueue;
31
import java.util.concurrent.atomic.AtomicReference;
32
import java.util.concurrent.locks.LockSupport;
33
import java.util.function.BiFunction;
34
import java.util.function.BooleanSupplier;
35
import java.util.logging.Logger;
36
import javax.annotation.CheckReturnValue;
37
import javax.annotation.Nullable;
38
import javax.servlet.AsyncContext;
39
import javax.servlet.ServletOutputStream;
40

41
/** Handles write actions from the container thread and the application thread. */
42
final class AsyncServletOutputStreamWriter {
43

44
  /**
45
   * Memory boundary for write actions.
46
   *
47
   * <pre>
48
   * WriteState curState = writeState.get();  // mark a boundary
49
   * doSomething();  // do something within the boundary
50
   * boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
51
   * if (successful) {
52
   *   // state has not changed since
53
   *   return;
54
   * } else {
55
   *   // state is changed by another thread while doSomething(), need recompute
56
   * }
57
   * </pre>
58
   *
59
   * <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
60
   * application thread (calling {@code runOrBuffer()}) that read and update the
61
   * writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
62
   * only runOrBuffer() may turn it from true to false.
63
   */
64
  private final AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);
1✔
65

66
  private final Log log;
67
  private final BiFunction<byte[], Integer, ActionItem> writeAction;
68
  private final ActionItem flushAction;
69
  private final ActionItem completeAction;
70
  private final BooleanSupplier isReady;
71

72
  /**
73
   * New write actions will be buffered into this queue if the servlet output stream is not ready or
74
   * the queue is not drained.
75
   */
76
  // SPSC queue would do
77
  private final Queue<ActionItem> writeChain = new ConcurrentLinkedQueue<>();
1✔
78
  // for a theoretical race condition that onWritePossible() is called immediately after isReady()
79
  // returns false and before writeState.compareAndSet()
80
  @Nullable
81
  private volatile Thread parkingThread;
82

83
  AsyncServletOutputStreamWriter(
84
      AsyncContext asyncContext,
85
      ServletTransportState transportState,
86
      InternalLogId logId) throws IOException {
1✔
87
    Logger logger = Logger.getLogger(AsyncServletOutputStreamWriter.class.getName());
1✔
88
    this.log = new Log() {
1✔
89
      @Override
90
      public void fine(String str, Object... params) {
91
        if (logger.isLoggable(FINE)) {
1✔
92
          logger.log(FINE, "[" + logId + "]" + str, params);
×
93
        }
94
      }
1✔
95

96
      @Override
97
      public void finest(String str, Object... params) {
98
        if (logger.isLoggable(FINEST)) {
1✔
99
          logger.log(FINEST, "[" + logId + "] " + str, params);
×
100
        }
101
      }
1✔
102
    };
103

104
    ServletOutputStream outputStream = asyncContext.getResponse().getOutputStream();
1✔
105
    this.writeAction = (byte[] bytes, Integer numBytes) -> () -> {
1✔
106
      outputStream.write(bytes, 0, numBytes);
1✔
107
      transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes));
1✔
108
      log.finest("outbound data: length={0}, bytes={1}", numBytes, toHexString(bytes, numBytes));
1✔
109
    };
1✔
110
    this.flushAction = () -> {
1✔
111
      log.finest("flushBuffer");
1✔
112
      asyncContext.getResponse().flushBuffer();
1✔
113
    };
1✔
114
    this.completeAction = () -> {
1✔
115
      log.fine("call is completing");
1✔
116
      transportState.runOnTransportThread(
1✔
117
          () -> {
118
            transportState.complete();
1✔
119
            asyncContext.complete();
1✔
120
            log.fine("call completed");
1✔
121
          });
1✔
122
    };
1✔
123
    this.isReady = () -> outputStream.isReady();
1✔
124
  }
1✔
125

126
  /**
127
   * Constructor without java.util.logging and javax.servlet.* dependency, so that Lincheck can run.
128
   *
129
   * @param writeAction Provides an {@link ActionItem} to write given bytes with specified length.
130
   * @param isReady Indicates whether the writer can write bytes at the moment (asynchronously).
131
   */
132
  @VisibleForTesting
133
  AsyncServletOutputStreamWriter(
134
      BiFunction<byte[], Integer, ActionItem> writeAction,
135
      ActionItem flushAction,
136
      ActionItem completeAction,
137
      BooleanSupplier isReady,
138
      Log log) {
1✔
139
    this.writeAction = writeAction;
1✔
140
    this.flushAction = flushAction;
1✔
141
    this.completeAction = completeAction;
1✔
142
    this.isReady = isReady;
1✔
143
    this.log = log;
1✔
144
  }
1✔
145

146
  /** Called from application thread. */
147
  void writeBytes(byte[] bytes, int numBytes) throws IOException {
148
    runOrBuffer(writeAction.apply(bytes, numBytes));
1✔
149
  }
1✔
150

151
  /** Called from application thread. */
152
  void flush() throws IOException {
153
    runOrBuffer(flushAction);
1✔
154
  }
1✔
155

156
  /** Called from application thread. */
157
  void complete() {
158
    try {
159
      runOrBuffer(completeAction);
1✔
160
    } catch (IOException ignore) {
×
161
      // actually completeAction does not throw IOException
162
    }
1✔
163
  }
1✔
164

165
  /** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
166
  void onWritePossible() throws IOException {
167
    log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready");
1✔
168
    assureReadyAndDrainedTurnsFalse();
1✔
169
    while (isReady.getAsBoolean()) {
1✔
170
      WriteState curState = writeState.get();
1✔
171

172
      ActionItem actionItem = writeChain.poll();
1✔
173
      if (actionItem != null) {
1✔
174
        actionItem.run();
1✔
175
        continue;
1✔
176
      }
177

178
      if (writeState.compareAndSet(curState, curState.withReadyAndDrained(true))) {
1✔
179
        // state has not changed since.
180
        log.finest(
1✔
181
            "onWritePossible: EXIT. All data available now is sent out and the servlet output"
182
                + " stream is still ready");
183
        return;
1✔
184
      }
185
      // else, state changed by another thread (runOrBuffer()), need to drain the writeChain
186
      // again
187
    }
×
188
    log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready");
1✔
189
  }
1✔
190

191
  private void assureReadyAndDrainedTurnsFalse() {
192
    // readyAndDrained should have been set to false already.
193
    // Just in case due to a race condition readyAndDrained is still true at this moment and is
194
    // being set to false by runOrBuffer() concurrently.
195
    while (writeState.get().readyAndDrained) {
1✔
196
      parkingThread = Thread.currentThread();
1✔
197
      // Try to sleep for an extremely long time to avoid writeState being changed at exactly
198
      // the time when sleep time expires (in extreme scenario, such as #9917).
199
      LockSupport.parkNanos(Duration.ofHours(1).toNanos()); // should return immediately
×
200
    }
201
    parkingThread = null;
1✔
202
  }
1✔
203

204
  /**
205
   * Either execute the write action directly, or buffer the action and let the container thread
206
   * drain it.
207
   *
208
   * <p>Called from application thread.
209
   */
210
  private void runOrBuffer(ActionItem actionItem) throws IOException {
211
    WriteState curState = writeState.get();
1✔
212
    if (curState.readyAndDrained) { // write to the outputStream directly
1✔
213
      actionItem.run();
1✔
214
      if (actionItem == completeAction) {
1✔
215
        return;
1✔
216
      }
217
      if (!isReady.getAsBoolean()) {
1✔
218
        boolean successful =
1✔
219
            writeState.compareAndSet(curState, curState.withReadyAndDrained(false));
1✔
220
        LockSupport.unpark(parkingThread);
1✔
221
        checkState(successful, "Bug: curState is unexpectedly changed by another thread");
1✔
222
        log.finest("the servlet output stream becomes not ready");
1✔
223
      }
1✔
224
    } else { // buffer to the writeChain
225
      writeChain.offer(actionItem);
1✔
226
      if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) {
1✔
227
        checkState(
×
228
            writeState.get().readyAndDrained,
×
229
            "Bug: onWritePossible() should have changed readyAndDrained to true, but not");
230
        ActionItem lastItem = writeChain.poll();
×
231
        if (lastItem != null) {
×
232
          checkState(lastItem == actionItem, "Bug: lastItem != actionItem");
×
233
          runOrBuffer(lastItem);
×
234
        }
235
      } // state has not changed since
236
    }
237
  }
1✔
238

239
  /** Write actions, e.g. writeBytes, flush, complete. */
240
  @FunctionalInterface
241
  @VisibleForTesting
242
  interface ActionItem {
243
    void run() throws IOException;
244
  }
245

246
  @VisibleForTesting // Lincheck test can not run with java.util.logging dependency.
247
  interface Log {
248
    default void fine(String str, Object...params) {}
×
249

250
    default void finest(String str, Object...params) {}
×
251
  }
252

253
  private static final class WriteState {
254

255
    static final WriteState DEFAULT = new WriteState(false);
1✔
256

257
    /**
258
     * The servlet output stream is ready and the writeChain is empty.
259
     *
260
     * <p>readyAndDrained turns from false to true when:
261
     * {@code onWritePossible()} exits while currently there is no more data to write, but the last
262
     * check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
263
     *
264
     * <p>readyAndDrained turns from true to false when:
265
     * {@code runOrBuffer()} exits while either the action item is written directly to the
266
     * servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
267
     * right after that returns false, or the action item is buffered into the writeChain.
268
     */
269
    final boolean readyAndDrained;
270

271
    WriteState(boolean readyAndDrained) {
1✔
272
      this.readyAndDrained = readyAndDrained;
1✔
273
    }
1✔
274

275
    /**
276
     * Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code
277
     * runOrBuffer()} can set it to false.
278
     */
279
    @CheckReturnValue
280
    WriteState withReadyAndDrained(boolean readyAndDrained) {
281
      return new WriteState(readyAndDrained);
1✔
282
    }
283
  }
284
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc