• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19796

02 May 2025 08:31AM CUT coverage: 84.45% (-0.04%) from 84.488%
#19796

push

github

web-flow
.github/workflows: Bump action major versions (v1.66.x backport) (#12044)

* .github/workflows: Bump action major versions from Node16 to Node20 (#11476)

GitHub began the Node16 deprecation process a year ago [1][2]. This
commit updates all workflows to use the latest Node20 actions.

[1]: https://github.blog/changelog/2023-09-22-github-actions-transitioning-from-node-16-to-node-20/
[2]: https://github.blog/changelog/2024-03-07-github-actions-all-actions-will-run-on-node20-instead-of-node16-by-default/

* Update appengine-web.xml

Use Java 17.

---------

Co-authored-by: Eng Zer Jun <engzerjun@gmail.com>

33236 of 39356 relevant lines covered (84.45%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/../core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import io.grpc.Decompressor;
22
import io.perfmark.Link;
23
import io.perfmark.PerfMark;
24
import io.perfmark.TaskCloseable;
25
import java.io.Closeable;
26
import java.io.InputStream;
27
import java.util.ArrayDeque;
28
import java.util.Queue;
29
import javax.annotation.concurrent.GuardedBy;
30

31
/**
32
 * A deframer that moves decoding between the transport and app threads based on which is more
33
 * efficient at that moment.
34
 */
35
final class MigratingThreadDeframer implements ThreadOptimizedDeframer {
36
  private interface Op {
37
    void run(boolean isDeframerOnTransportThread);
38
  }
39

40
  private final MessageDeframer.Listener transportListener;
41
  private final ApplicationThreadDeframerListener appListener;
42
  private final MigratingDeframerListener migratingListener;
43
  private final ApplicationThreadDeframerListener.TransportExecutor transportExecutor;
44
  private final MessageDeframer deframer;
45
  private final DeframeMessageProducer messageProducer = new DeframeMessageProducer();
×
46

47
  private final Object lock = new Object();
×
48
  /**
49
   * {@code true} means decoding on transport thread.
50
   *
51
   * <p>Invariant: if there are outstanding requests, then deframerOnTransportThread=true. Otherwise
52
   * deframerOnTransportThread=false.
53
   */
54
  @GuardedBy("lock")
55
  private boolean deframerOnTransportThread;
56
  @GuardedBy("lock")
×
57
  private final Queue<Op> opQueue = new ArrayDeque<>();
58
  @GuardedBy("lock")
59
  private boolean messageProducerEnqueued;
60

61
  public MigratingThreadDeframer(
62
      MessageDeframer.Listener listener,
63
      ApplicationThreadDeframerListener.TransportExecutor transportExecutor,
64
      MessageDeframer deframer) {
×
65
    this.transportListener =
×
66
        new SquelchLateMessagesAvailableDeframerListener(checkNotNull(listener, "listener"));
×
67
    this.transportExecutor = checkNotNull(transportExecutor, "transportExecutor");
×
68
    this.appListener = new ApplicationThreadDeframerListener(transportListener, transportExecutor);
×
69
    // Starts on app thread
70
    this.migratingListener = new MigratingDeframerListener(appListener);
×
71
    deframer.setListener(migratingListener);
×
72
    this.deframer = deframer;
×
73
  }
×
74

75
  @Override
76
  public void setMaxInboundMessageSize(int messageSize) {
77
    deframer.setMaxInboundMessageSize(messageSize);
×
78
  }
×
79

80
  @Override
81
  public void setDecompressor(Decompressor decompressor) {
82
    deframer.setDecompressor(decompressor);
×
83
  }
×
84

85
  @Override
86
  public void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
87
    deframer.setFullStreamDecompressor(fullStreamDecompressor);
×
88
  }
×
89

90
  private boolean runWhereAppropriate(Op op) {
91
    return runWhereAppropriate(op, true);
×
92
  }
93

94
  private boolean runWhereAppropriate(Op op, boolean currentThreadIsTransportThread) {
95
    boolean deframerOnTransportThreadCopy;
96
    boolean alreadyEnqueued;
97
    synchronized (lock) {
×
98
      deframerOnTransportThreadCopy = deframerOnTransportThread;
×
99
      alreadyEnqueued = messageProducerEnqueued;
×
100
      if (!deframerOnTransportThreadCopy) {
×
101
        opQueue.offer(op);
×
102
        messageProducerEnqueued = true;
×
103
      }
104
    }
×
105
    if (deframerOnTransportThreadCopy) {
×
106
      op.run(/*isDeframerOnTransportThread=*/true);
×
107
      return true;
×
108
    } else {
109
      if (!alreadyEnqueued) {
×
110
        if (currentThreadIsTransportThread) {
×
111
          try (TaskCloseable ignore =
×
112
                   PerfMark.traceTask("MigratingThreadDeframer.messageAvailable")) {
×
113
            transportListener.messagesAvailable(messageProducer);
×
114
          }
×
115
        } else {
116
          final Link link = PerfMark.linkOut();
×
117
          // SLOW path. This is the "normal" thread-hopping approach for request() when _not_ using
118
          // MigratingThreadDeframer
119
          transportExecutor.runOnTransportThread(new Runnable() {
×
120
            @Override public void run() {
121
              try (TaskCloseable ignore =
×
122
                       PerfMark.traceTask("MigratingThreadDeframer.messageAvailable")) {
×
123
                PerfMark.linkIn(link);
×
124
                transportListener.messagesAvailable(messageProducer);
×
125
              }
126
            }
×
127
          });
128
        }
129
      }
130
      return false;
×
131
    }
132
  }
133

134
  // May be called from an arbitrary thread
135
  @Override
136
  public void request(final int numMessages) {
137
    class RequestOp implements Op {
×
138
      @Override public void run(boolean isDeframerOnTransportThread) {
139
        if (isDeframerOnTransportThread) {
×
140
          final Link link = PerfMark.linkOut();
×
141
          // We may not be currently on the transport thread, so jump over to it and then do the
142
          // necessary processing
143
          transportExecutor.runOnTransportThread(new Runnable() {
×
144
            @Override public void run() {
145
              try (TaskCloseable ignore = PerfMark.traceTask("MigratingThreadDeframer.request")) {
×
146
                PerfMark.linkIn(link);
×
147
                // Since processing continues from transport thread while this runnable was
148
                // enqueued, the state may have changed since we ran runOnTransportThread. So we
149
                // must make sure deframerOnTransportThread==true
150
                requestFromTransportThread(numMessages);
×
151
              }
152
            }
×
153
          });
154
          return;
×
155
        }
156
        try (TaskCloseable ignore = PerfMark.traceTask("MigratingThreadDeframer.request")) {
×
157
          deframer.request(numMessages);
×
158
        } catch (Throwable t) {
×
159
          appListener.deframeFailed(t);
×
160
          deframer.close(); // unrecoverable state
×
161
        }
×
162
      }
×
163
    }
164

165
    runWhereAppropriate(new RequestOp(), false);
×
166
  }
×
167

168
  private void requestFromTransportThread(final int numMessages) {
169
    class RequestAgainOp implements Op {
×
170
      @Override public void run(boolean isDeframerOnTransportThread) {
171
        if (!isDeframerOnTransportThread) {
×
172
          // State changed. Go back and try again
173
          request(numMessages);
×
174
          return;
×
175
        }
176
        try {
177
          deframer.request(numMessages);
×
178
        } catch (Throwable t) {
×
179
          appListener.deframeFailed(t);
×
180
          deframer.close(); // unrecoverable state
×
181
        }
×
182
        if (!deframer.hasPendingDeliveries()) {
×
183
          synchronized (lock) {
×
184
            PerfMark.event("MigratingThreadDeframer.deframerOnApplicationThread");
×
185
            migratingListener.setDelegate(appListener);
×
186
            deframerOnTransportThread = false;
×
187
          }
×
188
        }
189
      }
×
190
    }
191

192
    runWhereAppropriate(new RequestAgainOp());
×
193
  }
×
194

195
  @Override
196
  public void deframe(final ReadableBuffer data) {
197
    class DeframeOp implements Op, Closeable {
×
198
      @Override public void run(boolean isDeframerOnTransportThread) {
199
        try (TaskCloseable ignore = PerfMark.traceTask("MigratingThreadDeframer.deframe")) {
×
200
          if (isDeframerOnTransportThread) {
×
201
            deframer.deframe(data);
×
202
            return;
×
203
          }
204

205
          try {
206
            deframer.deframe(data);
×
207
          } catch (Throwable t) {
×
208
            appListener.deframeFailed(t);
×
209
            deframer.close(); // unrecoverable state
×
210
          }
×
211
        }
212
      }
×
213

214
      @Override public void close() {
215
        data.close();
×
216
      }
×
217
    }
218

219
    runWhereAppropriate(new DeframeOp());
×
220
  }
×
221

222
  @Override
223
  public void closeWhenComplete() {
224
    class CloseWhenCompleteOp implements Op {
×
225
      @Override public void run(boolean isDeframerOnTransportThread) {
226
        deframer.closeWhenComplete();
×
227
      }
×
228
    }
229

230
    runWhereAppropriate(new CloseWhenCompleteOp());
×
231
  }
×
232

233
  @Override
234
  public void close() {
235
    class CloseOp implements Op {
×
236
      @Override public void run(boolean isDeframerOnTransportThread) {
237
        deframer.close();
×
238
      }
×
239
    }
240

241
    if (!runWhereAppropriate(new CloseOp())) {
×
242
      deframer.stopDelivery();
×
243
    }
244
  }
×
245

246
  class DeframeMessageProducer implements StreamListener.MessageProducer, Closeable {
×
247
    @Override
248
    public InputStream next() {
249
      while (true) {
250
        InputStream is = appListener.messageReadQueuePoll();
×
251
        if (is != null) {
×
252
          return is;
×
253
        }
254
        Op op;
255
        synchronized (lock) {
×
256
          op = opQueue.poll();
×
257
          if (op == null) {
×
258
            if (deframer.hasPendingDeliveries()) {
×
259
              PerfMark.event("MigratingThreadDeframer.deframerOnTransportThread");
×
260
              migratingListener.setDelegate(transportListener);
×
261
              deframerOnTransportThread = true;
×
262
            }
263
            messageProducerEnqueued = false;
×
264
            return null;
×
265
          }
266
        }
×
267
        op.run(/*isDeframerOnTransportThread=*/false);
×
268
      }
×
269
    }
270

271
    @Override
272
    public void close() {
273
      while (true) {
274
        Op op;
275
        synchronized (lock) {
×
276
          do {
277
            op = opQueue.poll();
×
278
          } while (!(op == null || op instanceof Closeable));
×
279
          if (op == null) {
×
280
            messageProducerEnqueued = false;
×
281
            return;
×
282
          }
283
        }
×
284
        GrpcUtil.closeQuietly((Closeable) op);
×
285
      }
×
286
    }
287
  }
288

289
  static class MigratingDeframerListener extends ForwardingDeframerListener {
290
    private MessageDeframer.Listener delegate;
291

292
    public MigratingDeframerListener(MessageDeframer.Listener delegate) {
×
293
      setDelegate(delegate);
×
294
    }
×
295

296
    @Override
297
    protected MessageDeframer.Listener delegate() {
298
      return delegate;
×
299
    }
300

301
    public void setDelegate(MessageDeframer.Listener delegate) {
302
      this.delegate = checkNotNull(delegate, "delegate");
×
303
    }
×
304
  }
305
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc