• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19853

06 Jun 2025 03:43PM CUT coverage: 88.611% (-0.03%) from 88.636%
#19853

push

github

ejona86
xds: Use tracing GC in XdsDepManager

Reference counting doesn't release cycles, so swap to a tracing garbage
collector. This greatly simplifies the code as well, as diffing is no
longer necessary. (If vanilla reference counting was used, diffing
wouldn't have been necessary either as you just increment all the new
objects and decrement the old ones. But that doesn't work when use a set
instead of an integer.)

34700 of 39160 relevant lines covered (88.61%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/../core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import com.google.errorprone.annotations.concurrent.GuardedBy;
22
import io.grpc.Decompressor;
23
import io.perfmark.Link;
24
import io.perfmark.PerfMark;
25
import io.perfmark.TaskCloseable;
26
import java.io.Closeable;
27
import java.io.InputStream;
28
import java.util.ArrayDeque;
29
import java.util.Queue;
30

31
/**
32
 * A deframer that moves decoding between the transport and app threads based on which is more
33
 * efficient at that moment.
34
 */
35
final class MigratingThreadDeframer implements ThreadOptimizedDeframer {
36
  private interface Op {
37
    void run(boolean isDeframerOnTransportThread);
38
  }
39

40
  private final MessageDeframer.Listener transportListener;
41
  private final ApplicationThreadDeframerListener appListener;
42
  private final MigratingDeframerListener migratingListener;
43
  private final ApplicationThreadDeframerListener.TransportExecutor transportExecutor;
44
  private final MessageDeframer deframer;
45
  private final DeframeMessageProducer messageProducer = new DeframeMessageProducer();
×
46

47
  private final Object lock = new Object();
×
48
  /**
49
   * {@code true} means decoding on transport thread.
50
   *
51
   * <p>Invariant: if there are outstanding requests, then deframerOnTransportThread=true. Otherwise
52
   * deframerOnTransportThread=false.
53
   */
54
  @GuardedBy("lock")
55
  private boolean deframerOnTransportThread;
56
  @GuardedBy("lock")
×
57
  private final Queue<Op> opQueue = new ArrayDeque<>();
58
  @GuardedBy("lock")
59
  private boolean messageProducerEnqueued;
60

61
  public MigratingThreadDeframer(
62
      MessageDeframer.Listener listener,
63
      ApplicationThreadDeframerListener.TransportExecutor transportExecutor,
64
      MessageDeframer deframer) {
×
65
    this.transportListener =
×
66
        new SquelchLateMessagesAvailableDeframerListener(checkNotNull(listener, "listener"));
×
67
    this.transportExecutor = checkNotNull(transportExecutor, "transportExecutor");
×
68
    this.appListener = new ApplicationThreadDeframerListener(transportListener, transportExecutor);
×
69
    // Starts on app thread
70
    this.migratingListener = new MigratingDeframerListener(appListener);
×
71
    deframer.setListener(migratingListener);
×
72
    this.deframer = deframer;
×
73
  }
×
74

75
  @Override
76
  public void setMaxInboundMessageSize(int messageSize) {
77
    deframer.setMaxInboundMessageSize(messageSize);
×
78
  }
×
79

80
  @Override
81
  public void setDecompressor(Decompressor decompressor) {
82
    deframer.setDecompressor(decompressor);
×
83
  }
×
84

85
  @Override
86
  public void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
87
    deframer.setFullStreamDecompressor(fullStreamDecompressor);
×
88
  }
×
89

90
  private boolean runWhereAppropriate(Op op) {
91
    return runWhereAppropriate(op, true);
×
92
  }
93

94
  private boolean runWhereAppropriate(Op op, boolean currentThreadIsTransportThread) {
95
    boolean deframerOnTransportThreadCopy;
96
    boolean alreadyEnqueued;
97
    synchronized (lock) {
×
98
      deframerOnTransportThreadCopy = deframerOnTransportThread;
×
99
      alreadyEnqueued = messageProducerEnqueued;
×
100
      if (!deframerOnTransportThreadCopy) {
×
101
        opQueue.offer(op);
×
102
        messageProducerEnqueued = true;
×
103
      }
104
    }
×
105
    if (deframerOnTransportThreadCopy) {
×
106
      op.run(/*isDeframerOnTransportThread=*/true);
×
107
      return true;
×
108
    } else {
109
      if (!alreadyEnqueued) {
×
110
        if (currentThreadIsTransportThread) {
×
111
          try (TaskCloseable ignore =
×
112
                   PerfMark.traceTask("MigratingThreadDeframer.messageAvailable")) {
×
113
            transportListener.messagesAvailable(messageProducer);
×
114
          }
×
115
        } else {
116
          final Link link = PerfMark.linkOut();
×
117
          // SLOW path. This is the "normal" thread-hopping approach for request() when _not_ using
118
          // MigratingThreadDeframer
119
          transportExecutor.runOnTransportThread(new Runnable() {
×
120
            @Override public void run() {
121
              try (TaskCloseable ignore =
×
122
                       PerfMark.traceTask("MigratingThreadDeframer.messageAvailable")) {
×
123
                PerfMark.linkIn(link);
×
124
                transportListener.messagesAvailable(messageProducer);
×
125
              }
126
            }
×
127
          });
128
        }
129
      }
130
      return false;
×
131
    }
132
  }
133

134
  // May be called from an arbitrary thread
135
  @Override
136
  public void request(final int numMessages) {
137
    class RequestOp implements Op {
×
138
      @Override public void run(boolean isDeframerOnTransportThread) {
139
        if (isDeframerOnTransportThread) {
×
140
          final Link link = PerfMark.linkOut();
×
141
          // We may not be currently on the transport thread, so jump over to it and then do the
142
          // necessary processing
143
          transportExecutor.runOnTransportThread(new Runnable() {
×
144
            @Override public void run() {
145
              try (TaskCloseable ignore = PerfMark.traceTask("MigratingThreadDeframer.request")) {
×
146
                PerfMark.linkIn(link);
×
147
                // Since processing continues from transport thread while this runnable was
148
                // enqueued, the state may have changed since we ran runOnTransportThread. So we
149
                // must make sure deframerOnTransportThread==true
150
                requestFromTransportThread(numMessages);
×
151
              }
152
            }
×
153
          });
154
          return;
×
155
        }
156
        try (TaskCloseable ignore = PerfMark.traceTask("MigratingThreadDeframer.request")) {
×
157
          deframer.request(numMessages);
×
158
        } catch (Throwable t) {
×
159
          appListener.deframeFailed(t);
×
160
          deframer.close(); // unrecoverable state
×
161
        }
×
162
      }
×
163
    }
164

165
    runWhereAppropriate(new RequestOp(), false);
×
166
  }
×
167

168
  private void requestFromTransportThread(final int numMessages) {
169
    class RequestAgainOp implements Op {
×
170
      @Override public void run(boolean isDeframerOnTransportThread) {
171
        if (!isDeframerOnTransportThread) {
×
172
          // State changed. Go back and try again
173
          request(numMessages);
×
174
          return;
×
175
        }
176
        try {
177
          deframer.request(numMessages);
×
178
        } catch (Throwable t) {
×
179
          appListener.deframeFailed(t);
×
180
          deframer.close(); // unrecoverable state
×
181
        }
×
182
        if (!deframer.hasPendingDeliveries()) {
×
183
          synchronized (lock) {
×
184
            PerfMark.event("MigratingThreadDeframer.deframerOnApplicationThread");
×
185
            migratingListener.setDelegate(appListener);
×
186
            deframerOnTransportThread = false;
×
187
          }
×
188
        }
189
      }
×
190
    }
191

192
    runWhereAppropriate(new RequestAgainOp());
×
193
  }
×
194

195
  @Override
196
  public void deframe(final ReadableBuffer data) {
197
    class DeframeOp implements Op, Closeable {
×
198
      @Override public void run(boolean isDeframerOnTransportThread) {
199
        try (TaskCloseable ignore = PerfMark.traceTask("MigratingThreadDeframer.deframe")) {
×
200
          if (isDeframerOnTransportThread) {
×
201
            deframer.deframe(data);
×
202
            return;
×
203
          }
204

205
          try {
206
            deframer.deframe(data);
×
207
          } catch (Throwable t) {
×
208
            appListener.deframeFailed(t);
×
209
            deframer.close(); // unrecoverable state
×
210
          }
×
211
        }
212
      }
×
213

214
      @Override public void close() {
215
        data.close();
×
216
      }
×
217
    }
218

219
    runWhereAppropriate(new DeframeOp());
×
220
  }
×
221

222
  @Override
223
  public void closeWhenComplete() {
224
    class CloseWhenCompleteOp implements Op {
×
225
      @Override public void run(boolean isDeframerOnTransportThread) {
226
        deframer.closeWhenComplete();
×
227
      }
×
228
    }
229

230
    runWhereAppropriate(new CloseWhenCompleteOp());
×
231
  }
×
232

233
  @Override
234
  public void close() {
235
    class CloseOp implements Op {
×
236
      @Override public void run(boolean isDeframerOnTransportThread) {
237
        deframer.close();
×
238
      }
×
239
    }
240

241
    if (!runWhereAppropriate(new CloseOp())) {
×
242
      deframer.stopDelivery();
×
243
    }
244
  }
×
245

246
  class DeframeMessageProducer implements StreamListener.MessageProducer, Closeable {
×
247
    @Override
248
    public InputStream next() {
249
      while (true) {
250
        InputStream is = appListener.messageReadQueuePoll();
×
251
        if (is != null) {
×
252
          return is;
×
253
        }
254
        Op op;
255
        synchronized (lock) {
×
256
          op = opQueue.poll();
×
257
          if (op == null) {
×
258
            if (deframer.hasPendingDeliveries()) {
×
259
              PerfMark.event("MigratingThreadDeframer.deframerOnTransportThread");
×
260
              migratingListener.setDelegate(transportListener);
×
261
              deframerOnTransportThread = true;
×
262
            }
263
            messageProducerEnqueued = false;
×
264
            return null;
×
265
          }
266
        }
×
267
        op.run(/*isDeframerOnTransportThread=*/false);
×
268
      }
×
269
    }
270

271
    @Override
272
    public void close() {
273
      while (true) {
274
        Op op;
275
        synchronized (lock) {
×
276
          do {
277
            op = opQueue.poll();
×
278
          } while (!(op == null || op instanceof Closeable));
×
279
          if (op == null) {
×
280
            messageProducerEnqueued = false;
×
281
            return;
×
282
          }
283
        }
×
284
        GrpcUtil.closeQuietly((Closeable) op);
×
285
      }
×
286
    }
287
  }
288

289
  static class MigratingDeframerListener extends ForwardingDeframerListener {
290
    private MessageDeframer.Listener delegate;
291

292
    public MigratingDeframerListener(MessageDeframer.Listener delegate) {
×
293
      setDelegate(delegate);
×
294
    }
×
295

296
    @Override
297
    protected MessageDeframer.Listener delegate() {
298
      return delegate;
×
299
    }
300

301
    public void setDelegate(MessageDeframer.Listener delegate) {
302
      this.delegate = checkNotNull(delegate, "delegate");
×
303
    }
×
304
  }
305
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc