• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18874

26 Oct 2023 06:34PM CUT coverage: 88.249% (-0.009%) from 88.258%
#18874

push

github-actions

web-flow
xds: Log ORCA UNIMPLEMENTED error to subchannel logger (#10625) (#10631)

Logging to the static instance would result in application logs filling
up if the Orca service is not available.
We'd like to have the logging on the subchannelLogger, so we make it
visible on demand.

Also succeed Orca logging test if log message present. Using
contains over containsExactly seems more reasonable.

Co-authored-by: Yannick Epstein <yannick.epstein@gmail.com>

30294 of 34328 relevant lines covered (88.25%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.49
/../servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.servlet;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.servlet.ServletServerStream.toHexString;
21
import static java.util.logging.Level.FINE;
22
import static java.util.logging.Level.FINEST;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import io.grpc.InternalLogId;
26
import io.grpc.servlet.ServletServerStream.ServletTransportState;
27
import java.io.IOException;
28
import java.time.Duration;
29
import java.util.Queue;
30
import java.util.concurrent.ConcurrentLinkedQueue;
31
import java.util.concurrent.atomic.AtomicReference;
32
import java.util.concurrent.locks.LockSupport;
33
import java.util.function.BiFunction;
34
import java.util.function.BooleanSupplier;
35
import java.util.logging.Logger;
36
import javax.annotation.CheckReturnValue;
37
import javax.annotation.Nullable;
38
import javax.servlet.AsyncContext;
39
import javax.servlet.ServletOutputStream;
40

41
/** Handles write actions from the container thread and the application thread. */
42
final class AsyncServletOutputStreamWriter {
43

44
  /**
45
   * Memory boundary for write actions.
46
   *
47
   * <pre>
48
   * WriteState curState = writeState.get();  // mark a boundary
49
   * doSomething();  // do something within the boundary
50
   * boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
51
   * if (successful) {
52
   *   // state has not changed since
53
   *   return;
54
   * } else {
55
   *   // state is changed by another thread while doSomething(), need recompute
56
   * }
57
   * </pre>
58
   *
59
   * <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
60
   * application thread (calling {@code runOrBuffer()}) that read and update the
61
   * writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
62
   * only runOrBuffer() may turn it from true to false.
63
   */
64
  private final AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);
1✔
65

66
  private final Log log;
67
  private final BiFunction<byte[], Integer, ActionItem> writeAction;
68
  private final ActionItem flushAction;
69
  private final ActionItem completeAction;
70
  private final BooleanSupplier isReady;
71

72
  /**
73
   * New write actions will be buffered into this queue if the servlet output stream is not ready or
74
   * the queue is not drained.
75
   */
76
  // SPSC queue would do
77
  private final Queue<ActionItem> writeChain = new ConcurrentLinkedQueue<>();
1✔
78
  // for a theoretical race condition that onWritePossible() is called immediately after isReady()
79
  // returns false and before writeState.compareAndSet()
80
  @Nullable
81
  private volatile Thread parkingThread;
82

83
  AsyncServletOutputStreamWriter(
84
      AsyncContext asyncContext,
85
      ServletTransportState transportState,
86
      InternalLogId logId) throws IOException {
1✔
87
    Logger logger = Logger.getLogger(AsyncServletOutputStreamWriter.class.getName());
1✔
88
    this.log = new Log() {
1✔
89
      @Override
90
      public void fine(String str, Object... params) {
91
        if (logger.isLoggable(FINE)) {
1✔
92
          logger.log(FINE, "[" + logId + "]" + str, params);
×
93
        }
94
      }
1✔
95

96
      @Override
97
      public void finest(String str, Object... params) {
98
        if (logger.isLoggable(FINEST)) {
1✔
99
          logger.log(FINEST, "[" + logId + "] " + str, params);
×
100
        }
101
      }
1✔
102
    };
103

104
    ServletOutputStream outputStream = asyncContext.getResponse().getOutputStream();
1✔
105
    this.writeAction = (byte[] bytes, Integer numBytes) -> () -> {
1✔
106
      outputStream.write(bytes, 0, numBytes);
1✔
107
      transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes));
1✔
108
      log.finest("outbound data: length={0}, bytes={1}", numBytes, toHexString(bytes, numBytes));
1✔
109
    };
1✔
110
    this.flushAction = () -> {
1✔
111
      log.finest("flushBuffer");
1✔
112
      asyncContext.getResponse().flushBuffer();
1✔
113
    };
1✔
114
    this.completeAction = () -> {
1✔
115
      log.fine("call is completing");
1✔
116
      transportState.runOnTransportThread(
1✔
117
          () -> {
118
            transportState.complete();
1✔
119
            asyncContext.complete();
1✔
120
            log.fine("call completed");
1✔
121
          });
1✔
122
    };
1✔
123
    this.isReady = () -> outputStream.isReady();
1✔
124
  }
1✔
125

126
  /**
127
   * Constructor without java.util.logging and javax.servlet.* dependency, so that Lincheck can run.
128
   *
129
   * @param writeAction Provides an {@link ActionItem} to write given bytes with specified length.
130
   * @param isReady Indicates whether the writer can write bytes at the moment (asynchronously).
131
   */
132
  @VisibleForTesting
133
  AsyncServletOutputStreamWriter(
134
      BiFunction<byte[], Integer, ActionItem> writeAction,
135
      ActionItem flushAction,
136
      ActionItem completeAction,
137
      BooleanSupplier isReady,
138
      Log log) {
×
139
    this.writeAction = writeAction;
×
140
    this.flushAction = flushAction;
×
141
    this.completeAction = completeAction;
×
142
    this.isReady = isReady;
×
143
    this.log = log;
×
144
  }
×
145

146
  /** Called from application thread. */
147
  void writeBytes(byte[] bytes, int numBytes) throws IOException {
148
    runOrBuffer(writeAction.apply(bytes, numBytes));
1✔
149
  }
1✔
150

151
  /** Called from application thread. */
152
  void flush() throws IOException {
153
    runOrBuffer(flushAction);
1✔
154
  }
1✔
155

156
  /** Called from application thread. */
157
  void complete() {
158
    try {
159
      runOrBuffer(completeAction);
1✔
160
    } catch (IOException ignore) {
×
161
      // actually completeAction does not throw IOException
162
    }
1✔
163
  }
1✔
164

165
  /** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
166
  void onWritePossible() throws IOException {
167
    log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready");
1✔
168
    assureReadyAndDrainedTurnsFalse();
1✔
169
    while (isReady.getAsBoolean()) {
1✔
170
      WriteState curState = writeState.get();
1✔
171

172
      ActionItem actionItem = writeChain.poll();
1✔
173
      if (actionItem != null) {
1✔
174
        actionItem.run();
1✔
175
        continue;
1✔
176
      }
177

178
      if (writeState.compareAndSet(curState, curState.withReadyAndDrained(true))) {
1✔
179
        // state has not changed since.
180
        log.finest(
1✔
181
            "onWritePossible: EXIT. All data available now is sent out and the servlet output"
182
                + " stream is still ready");
183
        return;
1✔
184
      }
185
      // else, state changed by another thread (runOrBuffer()), need to drain the writeChain
186
      // again
187
    }
×
188
    log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready");
1✔
189
  }
1✔
190

191
  private void assureReadyAndDrainedTurnsFalse() {
192
    // readyAndDrained should have been set to false already.
193
    // Just in case due to a race condition readyAndDrained is still true at this moment and is
194
    // being set to false by runOrBuffer() concurrently.
195
    while (writeState.get().readyAndDrained) {
1✔
196
      parkingThread = Thread.currentThread();
1✔
197
      // Try to sleep for an extremely long time to avoid writeState being changed at exactly
198
      // the time when sleep time expires (in extreme scenario, such as #9917).
199
      LockSupport.parkNanos(Duration.ofHours(1).toNanos()); // should return immediately
×
200
    }
201
    parkingThread = null;
1✔
202
  }
1✔
203

204
  /**
205
   * Either execute the write action directly, or buffer the action and let the container thread
206
   * drain it.
207
   *
208
   * <p>Called from application thread.
209
   */
210
  private void runOrBuffer(ActionItem actionItem) throws IOException {
211
    WriteState curState = writeState.get();
1✔
212
    if (curState.readyAndDrained) { // write to the outputStream directly
1✔
213
      actionItem.run();
1✔
214
      if (actionItem == completeAction) {
1✔
215
        return;
1✔
216
      }
217
      if (!isReady.getAsBoolean()) {
1✔
218
        boolean successful =
1✔
219
            writeState.compareAndSet(curState, curState.withReadyAndDrained(false));
1✔
220
        LockSupport.unpark(parkingThread);
1✔
221
        checkState(successful, "Bug: curState is unexpectedly changed by another thread");
1✔
222
        log.finest("the servlet output stream becomes not ready");
1✔
223
      }
1✔
224
    } else { // buffer to the writeChain
225
      writeChain.offer(actionItem);
1✔
226
      if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) {
1✔
227
        checkState(
×
228
            writeState.get().readyAndDrained,
×
229
            "Bug: onWritePossible() should have changed readyAndDrained to true, but not");
230
        ActionItem lastItem = writeChain.poll();
×
231
        if (lastItem != null) {
×
232
          checkState(lastItem == actionItem, "Bug: lastItem != actionItem");
×
233
          runOrBuffer(lastItem);
×
234
        }
235
      } // state has not changed since
236
    }
237
  }
1✔
238

239
  /** Write actions, e.g. writeBytes, flush, complete. */
240
  @FunctionalInterface
241
  @VisibleForTesting
242
  interface ActionItem {
243
    void run() throws IOException;
244
  }
245

246
  @VisibleForTesting // Lincheck test can not run with java.util.logging dependency.
247
  interface Log {
248
    default void fine(String str, Object...params) {}
×
249

250
    default void finest(String str, Object...params) {}
×
251
  }
252

253
  private static final class WriteState {
254

255
    static final WriteState DEFAULT = new WriteState(false);
1✔
256

257
    /**
258
     * The servlet output stream is ready and the writeChain is empty.
259
     *
260
     * <p>readyAndDrained turns from false to true when:
261
     * {@code onWritePossible()} exits while currently there is no more data to write, but the last
262
     * check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
263
     *
264
     * <p>readyAndDrained turns from true to false when:
265
     * {@code runOrBuffer()} exits while either the action item is written directly to the
266
     * servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
267
     * right after that returns false, or the action item is buffered into the writeChain.
268
     */
269
    final boolean readyAndDrained;
270

271
    WriteState(boolean readyAndDrained) {
1✔
272
      this.readyAndDrained = readyAndDrained;
1✔
273
    }
1✔
274

275
    /**
276
     * Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code
277
     * runOrBuffer()} can set it to false.
278
     */
279
    @CheckReturnValue
280
    WriteState withReadyAndDrained(boolean readyAndDrained) {
281
      return new WriteState(readyAndDrained);
1✔
282
    }
283
  }
284
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc