• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19354

11 Jul 2024 06:45PM CUT coverage: 88.434% (-0.01%) from 88.448%
#19354

push

github

larry-safran
Bump version to 1.64.3-SNAPSHOT

32067 of 36261 relevant lines covered (88.43%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.08
/../servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.servlet;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.servlet.ServletServerStream.toHexString;
21
import static java.util.logging.Level.FINE;
22
import static java.util.logging.Level.FINEST;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import io.grpc.InternalLogId;
26
import io.grpc.servlet.ServletServerStream.ServletTransportState;
27
import java.io.IOException;
28
import java.time.Duration;
29
import java.util.Queue;
30
import java.util.concurrent.ConcurrentLinkedQueue;
31
import java.util.concurrent.atomic.AtomicReference;
32
import java.util.concurrent.locks.LockSupport;
33
import java.util.function.BiFunction;
34
import java.util.function.BooleanSupplier;
35
import java.util.logging.Level;
36
import java.util.logging.Logger;
37
import javax.annotation.CheckReturnValue;
38
import javax.annotation.Nullable;
39
import javax.servlet.AsyncContext;
40
import javax.servlet.ServletOutputStream;
41

42
/** Handles write actions from the container thread and the application thread. */
43
final class AsyncServletOutputStreamWriter {
44

45
  /**
46
   * Memory boundary for write actions.
47
   *
48
   * <pre>
49
   * WriteState curState = writeState.get();  // mark a boundary
50
   * doSomething();  // do something within the boundary
51
   * boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
52
   * if (successful) {
53
   *   // state has not changed since
54
   *   return;
55
   * } else {
56
   *   // state is changed by another thread while doSomething(), need recompute
57
   * }
58
   * </pre>
59
   *
60
   * <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
61
   * application thread (calling {@code runOrBuffer()}) that read and update the
62
   * writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
63
   * only runOrBuffer() may turn it from true to false.
64
   */
65
  private final AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);
1✔
66

67
  private final Log log;
68
  private final BiFunction<byte[], Integer, ActionItem> writeAction;
69
  private final ActionItem flushAction;
70
  private final ActionItem completeAction;
71
  private final BooleanSupplier isReady;
72

73
  /**
74
   * New write actions will be buffered into this queue if the servlet output stream is not ready or
75
   * the queue is not drained.
76
   */
77
  // SPSC queue would do
78
  private final Queue<ActionItem> writeChain = new ConcurrentLinkedQueue<>();
1✔
79
  // for a theoretical race condition that onWritePossible() is called immediately after isReady()
80
  // returns false and before writeState.compareAndSet()
81
  @Nullable
82
  private volatile Thread parkingThread;
83

84
  AsyncServletOutputStreamWriter(
85
      AsyncContext asyncContext,
86
      ServletTransportState transportState,
87
      InternalLogId logId) throws IOException {
1✔
88
    Logger logger = Logger.getLogger(AsyncServletOutputStreamWriter.class.getName());
1✔
89
    this.log = new Log() {
1✔
90
      @Override
91
      public boolean isLoggable(Level level) {
92
        return logger.isLoggable(level);
1✔
93
      }
94

95
      @Override
96
      public void fine(String str, Object... params) {
97
        if (logger.isLoggable(FINE)) {
1✔
98
          logger.log(FINE, "[" + logId + "]" + str, params);
×
99
        }
100
      }
1✔
101

102
      @Override
103
      public void finest(String str, Object... params) {
104
        if (logger.isLoggable(FINEST)) {
1✔
105
          logger.log(FINEST, "[" + logId + "] " + str, params);
×
106
        }
107
      }
1✔
108
    };
109

110
    ServletOutputStream outputStream = asyncContext.getResponse().getOutputStream();
1✔
111
    this.writeAction = (byte[] bytes, Integer numBytes) -> () -> {
1✔
112
      outputStream.write(bytes, 0, numBytes);
1✔
113
      transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes));
1✔
114
      if (log.isLoggable(Level.FINEST)) {
1✔
115
        log.finest("outbound data: length={0}, bytes={1}", numBytes, toHexString(bytes, numBytes));
×
116
      }
117
    };
1✔
118
    this.flushAction = () -> {
1✔
119
      log.finest("flushBuffer");
1✔
120
      asyncContext.getResponse().flushBuffer();
1✔
121
    };
1✔
122
    this.completeAction = () -> {
1✔
123
      log.fine("call is completing");
1✔
124
      transportState.runOnTransportThread(
1✔
125
          () -> {
126
            transportState.complete();
1✔
127
            asyncContext.complete();
1✔
128
            log.fine("call completed");
1✔
129
          });
1✔
130
    };
1✔
131
    this.isReady = () -> outputStream.isReady();
1✔
132
  }
1✔
133

134
  /**
135
   * Constructor without java.util.logging and javax.servlet.* dependency, so that Lincheck can run.
136
   *
137
   * @param writeAction Provides an {@link ActionItem} to write given bytes with specified length.
138
   * @param isReady Indicates whether the writer can write bytes at the moment (asynchronously).
139
   */
140
  @VisibleForTesting
141
  AsyncServletOutputStreamWriter(
142
      BiFunction<byte[], Integer, ActionItem> writeAction,
143
      ActionItem flushAction,
144
      ActionItem completeAction,
145
      BooleanSupplier isReady,
146
      Log log) {
×
147
    this.writeAction = writeAction;
×
148
    this.flushAction = flushAction;
×
149
    this.completeAction = completeAction;
×
150
    this.isReady = isReady;
×
151
    this.log = log;
×
152
  }
×
153

154
  /** Called from application thread. */
155
  void writeBytes(byte[] bytes, int numBytes) throws IOException {
156
    runOrBuffer(writeAction.apply(bytes, numBytes));
1✔
157
  }
1✔
158

159
  /** Called from application thread. */
160
  void flush() throws IOException {
161
    runOrBuffer(flushAction);
1✔
162
  }
1✔
163

164
  /** Called from application thread. */
165
  void complete() {
166
    try {
167
      runOrBuffer(completeAction);
1✔
168
    } catch (IOException ignore) {
×
169
      // actually completeAction does not throw IOException
170
    }
1✔
171
  }
1✔
172

173
  /** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
174
  void onWritePossible() throws IOException {
175
    log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready");
1✔
176
    assureReadyAndDrainedTurnsFalse();
1✔
177
    while (isReady.getAsBoolean()) {
1✔
178
      WriteState curState = writeState.get();
1✔
179

180
      ActionItem actionItem = writeChain.poll();
1✔
181
      if (actionItem != null) {
1✔
182
        actionItem.run();
1✔
183
        continue;
1✔
184
      }
185

186
      if (writeState.compareAndSet(curState, curState.withReadyAndDrained(true))) {
1✔
187
        // state has not changed since.
188
        log.finest(
1✔
189
            "onWritePossible: EXIT. All data available now is sent out and the servlet output"
190
                + " stream is still ready");
191
        return;
1✔
192
      }
193
      // else, state changed by another thread (runOrBuffer()), need to drain the writeChain
194
      // again
195
    }
×
196
    log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready");
1✔
197
  }
1✔
198

199
  private void assureReadyAndDrainedTurnsFalse() {
200
    // readyAndDrained should have been set to false already.
201
    // Just in case due to a race condition readyAndDrained is still true at this moment and is
202
    // being set to false by runOrBuffer() concurrently.
203
    while (writeState.get().readyAndDrained) {
1✔
204
      parkingThread = Thread.currentThread();
1✔
205
      // Try to sleep for an extremely long time to avoid writeState being changed at exactly
206
      // the time when sleep time expires (in extreme scenario, such as #9917).
207
      LockSupport.parkNanos(Duration.ofHours(1).toNanos()); // should return immediately
×
208
    }
209
    parkingThread = null;
1✔
210
  }
1✔
211

212
  /**
213
   * Either execute the write action directly, or buffer the action and let the container thread
214
   * drain it.
215
   *
216
   * <p>Called from application thread.
217
   */
218
  private void runOrBuffer(ActionItem actionItem) throws IOException {
219
    WriteState curState = writeState.get();
1✔
220
    if (curState.readyAndDrained) { // write to the outputStream directly
1✔
221
      actionItem.run();
1✔
222
      if (actionItem == completeAction) {
1✔
223
        return;
1✔
224
      }
225
      if (!isReady.getAsBoolean()) {
1✔
226
        boolean successful =
1✔
227
            writeState.compareAndSet(curState, curState.withReadyAndDrained(false));
1✔
228
        LockSupport.unpark(parkingThread);
1✔
229
        checkState(successful, "Bug: curState is unexpectedly changed by another thread");
1✔
230
        log.finest("the servlet output stream becomes not ready");
1✔
231
      }
1✔
232
    } else { // buffer to the writeChain
233
      writeChain.offer(actionItem);
1✔
234
      if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) {
1✔
235
        checkState(
×
236
            writeState.get().readyAndDrained,
×
237
            "Bug: onWritePossible() should have changed readyAndDrained to true, but not");
238
        ActionItem lastItem = writeChain.poll();
×
239
        if (lastItem != null) {
×
240
          checkState(lastItem == actionItem, "Bug: lastItem != actionItem");
×
241
          runOrBuffer(lastItem);
×
242
        }
243
      } // state has not changed since
244
    }
245
  }
1✔
246

247
  /** Write actions, e.g. writeBytes, flush, complete. */
248
  @FunctionalInterface
249
  @VisibleForTesting
250
  interface ActionItem {
251
    void run() throws IOException;
252
  }
253

254
  @VisibleForTesting // Lincheck test can not run with java.util.logging dependency.
255
  interface Log {
256
    default boolean isLoggable(Level level) {
257
      return false; 
×
258
    }
259

260
    default void fine(String str, Object...params) {}
×
261

262
    default void finest(String str, Object...params) {}
×
263
  }
264

265
  private static final class WriteState {
266

267
    static final WriteState DEFAULT = new WriteState(false);
1✔
268

269
    /**
270
     * The servlet output stream is ready and the writeChain is empty.
271
     *
272
     * <p>readyAndDrained turns from false to true when:
273
     * {@code onWritePossible()} exits while currently there is no more data to write, but the last
274
     * check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
275
     *
276
     * <p>readyAndDrained turns from true to false when:
277
     * {@code runOrBuffer()} exits while either the action item is written directly to the
278
     * servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
279
     * right after that returns false, or the action item is buffered into the writeChain.
280
     */
281
    final boolean readyAndDrained;
282

283
    WriteState(boolean readyAndDrained) {
1✔
284
      this.readyAndDrained = readyAndDrained;
1✔
285
    }
1✔
286

287
    /**
288
     * Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code
289
     * runOrBuffer()} can set it to false.
290
     */
291
    @CheckReturnValue
292
    WriteState withReadyAndDrained(boolean readyAndDrained) {
293
      return new WriteState(readyAndDrained);
1✔
294
    }
295
  }
296
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc