• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19673

31 Jan 2025 12:10AM CUT coverage: 88.605% (+0.02%) from 88.586%
#19673

push

github

ejona86
alts: Add ClientCall support to AltsContextUtil

This adds a createFrom(Attributes) to mirror the check(Attributes) added
in ba8ab79. It also adds conveniences for ClientCall for both
createFrom() and check(). This allows getting peer information from
ClientCall and CallCredentials.RequestInfo, as was already available
from ServerCall.

The tests were reworked to test the Attribute-based methods and then
only basic tests for client/server.

Fixes #11042

33769 of 38112 relevant lines covered (88.6%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.08
/../servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.servlet;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.servlet.ServletServerStream.toHexString;
21
import static java.util.logging.Level.FINE;
22
import static java.util.logging.Level.FINEST;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.errorprone.annotations.CheckReturnValue;
26
import io.grpc.InternalLogId;
27
import io.grpc.servlet.ServletServerStream.ServletTransportState;
28
import java.io.IOException;
29
import java.time.Duration;
30
import java.util.Queue;
31
import java.util.concurrent.ConcurrentLinkedQueue;
32
import java.util.concurrent.atomic.AtomicReference;
33
import java.util.concurrent.locks.LockSupport;
34
import java.util.function.BiFunction;
35
import java.util.function.BooleanSupplier;
36
import java.util.logging.Level;
37
import java.util.logging.Logger;
38
import javax.annotation.Nullable;
39
import javax.servlet.AsyncContext;
40
import javax.servlet.ServletOutputStream;
41

42
/** Handles write actions from the container thread and the application thread. */
43
final class AsyncServletOutputStreamWriter {
44

45
  /**
46
   * Memory boundary for write actions.
47
   *
48
   * <pre>
49
   * WriteState curState = writeState.get();  // mark a boundary
50
   * doSomething();  // do something within the boundary
51
   * boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
52
   * if (successful) {
53
   *   // state has not changed since
54
   *   return;
55
   * } else {
56
   *   // state is changed by another thread while doSomething(), need recompute
57
   * }
58
   * </pre>
59
   *
60
   * <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
61
   * application thread (calling {@code runOrBuffer()}) that read and update the
62
   * writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
63
   * only runOrBuffer() may turn it from true to false.
64
   */
65
  private final AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);
1✔
66

67
  private final Log log;
68
  private final BiFunction<byte[], Integer, ActionItem> writeAction;
69
  private final ActionItem flushAction;
70
  private final ActionItem completeAction;
71
  private final BooleanSupplier isReady;
72

73
  /**
74
   * New write actions will be buffered into this queue if the servlet output stream is not ready or
75
   * the queue is not drained.
76
   */
77
  // SPSC queue would do
78
  private final Queue<ActionItem> writeChain = new ConcurrentLinkedQueue<>();
1✔
79
  // for a theoretical race condition that onWritePossible() is called immediately after isReady()
80
  // returns false and before writeState.compareAndSet()
81
  @Nullable
82
  private volatile Thread parkingThread;
83

84
  AsyncServletOutputStreamWriter(
85
      AsyncContext asyncContext,
86
      ServletTransportState transportState,
87
      InternalLogId logId) throws IOException {
1✔
88
    Logger logger = Logger.getLogger(AsyncServletOutputStreamWriter.class.getName());
1✔
89
    this.log = new Log() {
1✔
90
      @Override
91
      public boolean isLoggable(Level level) {
92
        return logger.isLoggable(level);
1✔
93
      }
94

95
      @Override
96
      public void fine(String str, Object... params) {
97
        if (logger.isLoggable(FINE)) {
1✔
98
          logger.log(FINE, "[" + logId + "]" + str, params);
×
99
        }
100
      }
1✔
101

102
      @Override
103
      public void finest(String str, Object... params) {
104
        if (logger.isLoggable(FINEST)) {
1✔
105
          logger.log(FINEST, "[" + logId + "] " + str, params);
×
106
        }
107
      }
1✔
108
    };
109

110
    ServletOutputStream outputStream = asyncContext.getResponse().getOutputStream();
1✔
111
    this.writeAction = (byte[] bytes, Integer numBytes) -> () -> {
1✔
112
      outputStream.write(bytes, 0, numBytes);
1✔
113
      transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes));
1✔
114
      if (log.isLoggable(Level.FINEST)) {
1✔
115
        log.finest("outbound data: length={0}, bytes={1}", numBytes, toHexString(bytes, numBytes));
×
116
      }
117
    };
1✔
118
    this.flushAction = () -> {
1✔
119
      log.finest("flushBuffer");
1✔
120
      asyncContext.getResponse().flushBuffer();
1✔
121
    };
1✔
122
    this.completeAction = () -> {
1✔
123
      log.fine("call is completing");
1✔
124
      transportState.runOnTransportThread(
1✔
125
          () -> {
126
            transportState.complete();
1✔
127
            asyncContext.complete();
1✔
128
            log.fine("call completed");
1✔
129
          });
1✔
130
    };
1✔
131
    this.isReady = () -> outputStream.isReady();
1✔
132
  }
1✔
133

134
  /**
135
   * Constructor without java.util.logging and javax.servlet.* dependency, so that Lincheck can run.
136
   *
137
   * @param writeAction Provides an {@link ActionItem} to write given bytes with specified length.
138
   * @param isReady Indicates whether the writer can write bytes at the moment (asynchronously).
139
   */
140
  @VisibleForTesting
141
  AsyncServletOutputStreamWriter(
142
      BiFunction<byte[], Integer, ActionItem> writeAction,
143
      ActionItem flushAction,
144
      ActionItem completeAction,
145
      BooleanSupplier isReady,
146
      Log log) {
×
147
    this.writeAction = writeAction;
×
148
    this.flushAction = flushAction;
×
149
    this.completeAction = completeAction;
×
150
    this.isReady = isReady;
×
151
    this.log = log;
×
152
  }
×
153

154
  /** Called from application thread. */
155
  void writeBytes(byte[] bytes, int numBytes) throws IOException {
156
    runOrBuffer(writeAction.apply(bytes, numBytes));
1✔
157
  }
1✔
158

159
  /** Called from application thread. */
160
  void flush() throws IOException {
161
    runOrBuffer(flushAction);
1✔
162
  }
1✔
163

164
  /** Called from application thread. */
165
  void complete() {
166
    try {
167
      runOrBuffer(completeAction);
1✔
168
    } catch (IOException ignore) {
×
169
      // actually completeAction does not throw IOException
170
    }
1✔
171
  }
1✔
172

173
  /** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
174
  void onWritePossible() throws IOException {
175
    log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready");
1✔
176
    assureReadyAndDrainedTurnsFalse();
1✔
177
    while (isReady.getAsBoolean()) {
1✔
178
      WriteState curState = writeState.get();
1✔
179

180
      ActionItem actionItem = writeChain.poll();
1✔
181
      if (actionItem != null) {
1✔
182
        actionItem.run();
1✔
183
        continue;
1✔
184
      }
185

186
      if (writeState.compareAndSet(curState, curState.withReadyAndDrained(true))) {
1✔
187
        // state has not changed since.
188
        log.finest(
1✔
189
            "onWritePossible: EXIT. All data available now is sent out and the servlet output"
190
                + " stream is still ready");
191
        return;
1✔
192
      }
193
      // else, state changed by another thread (runOrBuffer()), need to drain the writeChain
194
      // again
195
    }
×
196
    log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready");
1✔
197
  }
1✔
198

199
  private void assureReadyAndDrainedTurnsFalse() {
200
    // readyAndDrained should have been set to false already.
201
    // Just in case due to a race condition readyAndDrained is still true at this moment and is
202
    // being set to false by runOrBuffer() concurrently.
203
    while (writeState.get().readyAndDrained) {
1✔
204
      parkingThread = Thread.currentThread();
1✔
205
      // Try to sleep for an extremely long time to avoid writeState being changed at exactly
206
      // the time when sleep time expires (in extreme scenario, such as #9917).
207
      LockSupport.parkNanos(Duration.ofHours(1).toNanos()); // should return immediately
×
208
    }
209
    parkingThread = null;
1✔
210
  }
1✔
211

212
  /**
213
   * Either execute the write action directly, or buffer the action and let the container thread
214
   * drain it.
215
   *
216
   * <p>Called from application thread.
217
   */
218
  private void runOrBuffer(ActionItem actionItem) throws IOException {
219
    WriteState curState = writeState.get();
1✔
220
    if (curState.readyAndDrained) { // write to the outputStream directly
1✔
221
      actionItem.run();
1✔
222
      if (actionItem == completeAction) {
1✔
223
        return;
1✔
224
      }
225
      if (!isReady.getAsBoolean()) {
1✔
226
        boolean successful =
1✔
227
            writeState.compareAndSet(curState, curState.withReadyAndDrained(false));
1✔
228
        LockSupport.unpark(parkingThread);
1✔
229
        checkState(successful, "Bug: curState is unexpectedly changed by another thread");
1✔
230
        log.finest("the servlet output stream becomes not ready");
1✔
231
      }
1✔
232
    } else { // buffer to the writeChain
233
      writeChain.offer(actionItem);
1✔
234
      if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) {
1✔
235
        checkState(
×
236
            writeState.get().readyAndDrained,
×
237
            "Bug: onWritePossible() should have changed readyAndDrained to true, but not");
238
        ActionItem lastItem = writeChain.poll();
×
239
        if (lastItem != null) {
×
240
          checkState(lastItem == actionItem, "Bug: lastItem != actionItem");
×
241
          runOrBuffer(lastItem);
×
242
        }
243
      } // state has not changed since
244
    }
245
  }
1✔
246

247
  /** Write actions, e.g. writeBytes, flush, complete. */
248
  @FunctionalInterface
249
  @VisibleForTesting
250
  interface ActionItem {
251
    void run() throws IOException;
252
  }
253

254
  @VisibleForTesting // Lincheck test can not run with java.util.logging dependency.
255
  interface Log {
256
    default boolean isLoggable(Level level) {
257
      return false; 
×
258
    }
259

260
    default void fine(String str, Object...params) {}
×
261

262
    default void finest(String str, Object...params) {}
×
263
  }
264

265
  private static final class WriteState {
266

267
    static final WriteState DEFAULT = new WriteState(false);
1✔
268

269
    /**
270
     * The servlet output stream is ready and the writeChain is empty.
271
     *
272
     * <p>readyAndDrained turns from false to true when:
273
     * {@code onWritePossible()} exits while currently there is no more data to write, but the last
274
     * check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
275
     *
276
     * <p>readyAndDrained turns from true to false when:
277
     * {@code runOrBuffer()} exits while either the action item is written directly to the
278
     * servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
279
     * right after that returns false, or the action item is buffered into the writeChain.
280
     */
281
    final boolean readyAndDrained;
282

283
    WriteState(boolean readyAndDrained) {
1✔
284
      this.readyAndDrained = readyAndDrained;
1✔
285
    }
1✔
286

287
    /**
288
     * Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code
289
     * runOrBuffer()} can set it to false.
290
     */
291
    @CheckReturnValue
292
    WriteState withReadyAndDrained(boolean readyAndDrained) {
293
      return new WriteState(readyAndDrained);
1✔
294
    }
295
  }
296
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc