• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mizosoft / methanol / #566

25 Jun 2025 11:19PM UTC coverage: 90.014% (-0.07%) from 90.088%
#566

push

github

web-flow
Use a corePoolSize of 1 for the scheduler (#127)

1 of 1 new or added line in 1 file covered. (100.0%)

8 existing lines in 4 files now uncovered.

7581 of 8422 relevant lines covered (90.01%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.6
/methanol/src/main/java/com/github/mizosoft/methanol/internal/flow/AbstractSubscription.java
1
/*
2
 * Copyright (c) 2024 Moataz Hussein
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to deal
6
 * in the Software without restriction, including without limitation the rights
7
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
 * copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in all
12
 * copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
 * SOFTWARE.
21
 */
22

23
package com.github.mizosoft.methanol.internal.flow;
24

25
import static com.github.mizosoft.methanol.internal.Validate.castNonNull;
26
import static com.github.mizosoft.methanol.internal.flow.FlowSupport.getAndAddDemand;
27
import static com.github.mizosoft.methanol.internal.flow.FlowSupport.subtractAndGetDemand;
28
import static java.util.Objects.requireNonNull;
29

30
import java.lang.System.Logger;
31
import java.lang.System.Logger.Level;
32
import java.lang.invoke.MethodHandles;
33
import java.lang.invoke.VarHandle;
34
import java.util.concurrent.Executor;
35
import java.util.concurrent.Flow.Subscriber;
36
import java.util.concurrent.Flow.Subscription;
37
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
38

39
/**
40
 * An abstract {@link Subscription} that implements most of the machinery for execution and
41
 * backpressure control.
42
 */
43
@SuppressWarnings("unused") // VarHandle indirection.
44
public abstract class AbstractSubscription<T> implements Subscription {
45
  private static final Logger logger = System.getLogger(AbstractSubscription.class.getName());
1✔
46

47
  /** Consumer is running or about to run. */
48
  private static final int RUNNING = 0x1;
49

50
  /** Consumer must continue to process potentially missed signals. */
51
  private static final int KEEP_ALIVE = 0x2;
52

53
  /** Subscriber::onSubscribe has been invoked. */
54
  private static final int SUBSCRIBED = 0x4;
55

56
  /** There's a pending error that must be passed downstream if not cancelled. */
57
  private static final int ERROR = 0x8;
58

59
  /** The subscription is cancelled. */
60
  private static final int CANCELLED = 0x10;
61

62
  private static final VarHandle SYNC;
63
  private static final VarHandle PENDING_EXCEPTION;
64
  private static final VarHandle DEMAND;
65

66
  static {
67
    var lookup = MethodHandles.lookup();
1✔
68
    try {
69
      SYNC = lookup.findVarHandle(AbstractSubscription.class, "sync", int.class);
1✔
70
      DEMAND = lookup.findVarHandle(AbstractSubscription.class, "demand", long.class);
1✔
71
      PENDING_EXCEPTION =
1✔
72
          lookup.findVarHandle(AbstractSubscription.class, "pendingException", Throwable.class);
1✔
73
    } catch (NoSuchFieldException | IllegalAccessException e) {
×
74
      throw new ExceptionInInitializerError(e);
×
75
    }
1✔
76
  }
1✔
77

78
  private final Subscriber<? super T> downstream;
79
  private final Executor executor;
80
  private volatile int sync;
81
  private volatile long demand;
82
  private volatile @MonotonicNonNull Throwable pendingException;
83

84
  protected AbstractSubscription(Subscriber<? super T> downstream, Executor executor) {
1✔
85
    this.downstream = requireNonNull(downstream);
1✔
86
    this.executor = requireNonNull(executor);
1✔
87
  }
1✔
88

89
  @Override
90
  public final void request(long n) {
91
    if (n > 0) {
1✔
92
      getAndAddDemand(this, DEMAND, n);
1✔
93
      fireOrKeepAlive();
1✔
94
    } else {
95
      fireOrKeepAliveOnError(FlowSupport.illegalRequest());
1✔
96
    }
97
  }
1✔
98

99
  @Override
100
  public final void cancel() {
101
    if ((getAndBitwiseOrSync(CANCELLED) & CANCELLED) == 0) {
1✔
102
      guardedAbort(true);
1✔
103
      consumePendingException();
1✔
104
    }
105
  }
1✔
106

107
  public void fireOrKeepAlive() {
108
    if ((sync & KEEP_ALIVE) == 0
1✔
109
        && (getAndBitwiseOrSync(RUNNING | KEEP_ALIVE) & (RUNNING | CANCELLED)) == 0) {
1✔
110
      fire();
1✔
111
    }
112
  }
1✔
113

114
  public void fireOrKeepAliveOnNext() {
115
    if (demand > 0) {
1✔
116
      fireOrKeepAlive();
1✔
117
    }
118
  }
1✔
119

120
  public void fireOrKeepAliveOnError(Throwable exception) {
121
    requireNonNull(exception);
1✔
122

123
    // Make sure exceptions are reported even if they can't be passed on downstream. This is
124
    // maintained by two sides: producer (this method) and consumer (terminal methods like cancel(),
125
    // cancelOnError(...) and cancelOnComplete(...)).
126
    boolean produced;
127
    if ((produced = PENDING_EXCEPTION.compareAndSet(this, null, exception))
1✔
128
        && (getAndBitwiseOrSync(RUNNING | KEEP_ALIVE | ERROR) & (RUNNING | CANCELLED)) == 0) {
1✔
129
      fire();
1✔
130
    } else if (!produced) {
1✔
131
      FlowSupport.onDroppedException(exception);
1✔
132
    }
133
  }
1✔
134

135
  private void fire() {
136
    try {
137
      executor.execute(this::drain);
1✔
138
    } catch (RuntimeException | Error e) {
1✔
139
      // This is a problem because we cannot call downstream here as that would ruin the execution
140
      // context guarantee. SubmissionPublisher's behaviour here is followed (cancel & rethrow).
141
      logger.log(Level.ERROR, "Couldn't execute subscription's signaller task", e);
1✔
142
      cancel();
1✔
143
      throw e;
1✔
144
    }
1✔
145
  }
1✔
146

147
  /**
148
   * Emits at most {@code emit} items to downstream using {@link #submitOnNext( Subscriber, Object)}
149
   * as long as it returns {@code true}. The actual number of emitted items is returned, may be 0 in
150
   * case of cancellation or if no items are emitted, perhaps due to lack thereof, or if {@code
151
   * emit} itself is zero. If the underlying source is finished, the subscriber is completed with
152
   * {@link #cancelOnComplete(Subscriber)}.
153
   */
154
  protected abstract long emit(Subscriber<? super T> downstream, long emit);
155

156
  private long guardedEmit(Subscriber<? super T> downstream, long emit) {
157
    try {
158
      return emit(downstream, emit);
1✔
159
    } catch (Throwable t) {
1✔
160
      cancelOnError(downstream, t, true);
1✔
161
      return -1;
1✔
162
    }
163
  }
164

165
  /**
166
   * Releases resources held by this subscription. {@code flowInterrupted} tells whether
167
   * cancellation was due to flow interruption by downstream (e.g. calling {@code cancel()} or
168
   * throwing from {@code onNext} or {@code onSubscribe}), or due to ending the normal flow of
169
   * signals (onSubscribe -> onNext* -> (onError | onComplete)).
170
   */
171
  protected void abort(boolean flowInterrupted) {}
1✔
172

173
  private void guardedAbort(boolean flowInterrupted) {
174
    try {
175
      abort(flowInterrupted);
1✔
176
    } catch (Throwable t) {
1✔
177
      logger.log(Level.WARNING, "Exception thrown during subscription cancellation", t);
1✔
178
    }
1✔
179
  }
1✔
180

181
  private void consumePendingException() {
182
    var exception =
1✔
183
        (Throwable) PENDING_EXCEPTION.getAndSet(this, ConsumedPendingException.INSTANCE);
1✔
184
    if (exception != null && exception != ConsumedPendingException.INSTANCE) {
1✔
UNCOV
185
      FlowSupport.onDroppedException(exception);
×
186
    }
187
  }
1✔
188

189
  /**
190
   * Returns {@code true} if this subscription is cancelled. {@code false} result is immediately
191
   * outdated. Can be used by implementation to halt producing items in case the subscription was
192
   * asynchronously cancelled.
193
   */
194
  protected final boolean isCancelled() {
195
    return (sync & CANCELLED) != 0;
1✔
196
  }
197

198
  /**
199
   * Returns {@code true} if the subscriber is to be completed exceptionally. {@code false} result
200
   * is immediately outdated. Can be used by implementation to halt producing items in case the
201
   * subscription was asynchronously signalled with an error.
202
   */
203
  protected final boolean hasPendingErrors() {
204
    return (sync & ERROR) != 0;
1✔
205
  }
206

207
  /**
208
   * Calls downstream's {@code onError} with the given exception after cancelling this subscription.
209
   * {@code flowInterrupted} tells whether the error interrupted the normal flow of signals.
210
   */
211
  protected final void cancelOnError(
212
      Subscriber<? super T> downstream, Throwable exception, boolean flowInterrupted) {
213
    if ((getAndBitwiseOrSync(CANCELLED) & CANCELLED) == 0) {
1✔
214
      guardedAbort(flowInterrupted);
1✔
215
      if (flowInterrupted) { // Otherwise drain() has already consumed the pending exception.
1✔
216
        consumePendingException();
1✔
217
      }
218

219
      try {
220
        downstream.onError(exception);
1✔
221
      } catch (Throwable t) {
×
222
        t.addSuppressed(exception);
×
223
        logger.log(Level.WARNING, "Exception thrown by subscriber's onError", t);
×
224
      }
1✔
225
    } else {
226
      FlowSupport.onDroppedException(exception);
×
227
    }
228
  }
1✔
229

230
  /** Calls downstream's {@code onComplete} after cancelling this subscription. */
231
  protected final void cancelOnComplete(Subscriber<? super T> downstream) {
232
    if ((getAndBitwiseOrSync(CANCELLED) & CANCELLED) == 0) {
1✔
233
      guardedAbort(false);
1✔
234
      consumePendingException();
1✔
235
      try {
236
        downstream.onComplete();
1✔
237
      } catch (Throwable t) {
×
238
        logger.log(
×
239
            Level.WARNING, () -> "Exception thrown by subscriber's onComplete: " + downstream, t);
×
240
      }
1✔
241
    }
242
  }
1✔
243

244
  /**
245
   * Submits given item to the downstream, returning {@code false} and cancelling on failure. {@code
246
   * false} is also returned if the subscription is already cancelled or has pending errors. On such
247
   * cases, caller should stop emitting items.
248
   */
249
  protected final boolean submitOnNext(Subscriber<? super T> downstream, T item) {
250
    if ((sync & (ERROR | CANCELLED)) == 0) {
1✔
251
      try {
252
        downstream.onNext(item);
1✔
253
        return true;
1✔
254
      } catch (Throwable t) {
1✔
255
        cancelOnError(downstream, t, true);
1✔
256
      }
257
    }
258
    return false;
1✔
259
  }
260

261
  private void drain() {
262
    int s;
263
    var d = downstream;
1✔
264
    subscribeOnDrain(d);
1✔
265
    for (long x = 0L, r = demand; ((s = sync) & CANCELLED) == 0; ) {
1✔
266
      long emitted;
267
      int unsetBit;
268
      if ((s & ERROR) != 0) {
1✔
269
        var exception =
1✔
270
            (Throwable) PENDING_EXCEPTION.getAndSet(this, ConsumedPendingException.INSTANCE);
1✔
271
        cancelOnError(d, castNonNull(exception), false);
1✔
272
      } else if ((emitted = guardedEmit(d, r - x)) > 0L) {
1✔
273
        x += emitted;
1✔
274
      } else if (emitted < 0) {
1✔
275
        return; // Emit failed and the subscriber is completed exceptionally.
1✔
276
      } else if (x > 0) {
1✔
277
        // Flush satisfied demand.
278
        r = subtractAndGetDemand(this, DEMAND, x);
1✔
279
        x = 0L;
1✔
280
      } else if (r == (r = demand) // Check no new demand has arrived.
1✔
281
          && SYNC.compareAndSet(
1✔
282
              this, s, s & ~(unsetBit = (s & KEEP_ALIVE) == 0 ? RUNNING : KEEP_ALIVE))
1✔
283
          && unsetBit == RUNNING) { // Exit or consume KEEP_ALIVE.
284
        return;
1✔
285
      }
286
    }
1✔
287
  }
1✔
288

289
  private void subscribeOnDrain(Subscriber<? super T> downstream) {
290
    if ((sync & SUBSCRIBED) == 0
1✔
291
        && (getAndBitwiseOrSync(SUBSCRIBED) & (SUBSCRIBED | CANCELLED)) == 0) {
1✔
292
      try {
293
        downstream.onSubscribe(this);
1✔
294
      } catch (Throwable t) {
1✔
295
        cancelOnError(downstream, t, true);
1✔
296
      }
1✔
297
    }
298
  }
1✔
299

300
  private int getAndBitwiseOrSync(int bits) {
301
    return (int) SYNC.getAndBitwiseOr(this, bits);
1✔
302
  }
303

304
  protected long currentDemand() {
305
    return demand;
1✔
306
  }
307

308
  private static final class ConsumedPendingException extends Exception {
309
    @SuppressWarnings("StaticAssignmentOfThrowable") // Not thrown, just used for CAS control.
310
    static final ConsumedPendingException INSTANCE = new ConsumedPendingException();
1✔
311

312
    private ConsumedPendingException() {
313
      super("", null, false, false);
1✔
314
    }
1✔
315
  }
316
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc