• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mizosoft / methanol / #561

15 May 2025 12:18AM UTC coverage: 90.017% (+0.04%) from 89.982%
#561

push

github

web-flow
Use a library-wide ScheduledExecutorService instead of CompletableFuture::delayedExecutor (#124)

24 of 24 new or added lines in 9 files covered. (100.0%)

1 existing line in 1 file now uncovered.

7583 of 8424 relevant lines covered (90.02%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.88
/methanol/src/main/java/com/github/mizosoft/methanol/internal/concurrent/SerialExecutor.java
1
/*
2
 * Copyright (c) 2024 Moataz Hussein
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to deal
6
 * in the Software without restriction, including without limitation the rights
7
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
 * copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in all
12
 * copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
 * SOFTWARE.
21
 */
22

23
package com.github.mizosoft.methanol.internal.concurrent;
24

25
import static java.util.Objects.requireNonNull;
26

27
import com.github.mizosoft.methanol.internal.Utils;
28
import java.lang.invoke.MethodHandles;
29
import java.lang.invoke.VarHandle;
30
import java.util.concurrent.ConcurrentLinkedQueue;
31
import java.util.concurrent.Executor;
32
import java.util.concurrent.ForkJoinPool;
33
import java.util.concurrent.RejectedExecutionException;
34

35
/** An executor that ensures submitted tasks are executed serially. */
36
public final class SerialExecutor implements Executor {
37
  /** Drain task started or about to start execution. Retained till drain exits. */
38
  private static final int RUN = 1;
39

40
  /** Drain task should keep running to recheck for incoming tasks it may have missed. */
41
  private static final int KEEP_ALIVE = 2;
42

43
  /** Don't accept more tasks. */
44
  private static final int SHUTDOWN = 4;
45

46
  private static final VarHandle SYNC;
47

48
  static {
49
    try {
50
      SYNC = MethodHandles.lookup().findVarHandle(SerialExecutor.class, "sync", int.class);
1✔
51
    } catch (NoSuchFieldException | IllegalAccessException e) {
×
52
      throw new ExceptionInInitializerError(e);
×
53
    }
1✔
54
  }
1✔
55

56
  private final ConcurrentLinkedQueue<Runnable> taskQueue = new ConcurrentLinkedQueue<>();
1✔
57

58
  private final Executor delegate;
59

60
  @SuppressWarnings("unused") // VarHandle indirection.
61
  private volatile int sync;
62

63
  public SerialExecutor(Executor delegate) {
1✔
64
    this.delegate = requireNonNull(delegate);
1✔
65
  }
1✔
66

67
  @Override
68
  public void execute(Runnable task) {
69
    if (isShutdownBitSet()) {
1✔
70
      throw new RejectedExecutionException("shutdown");
1✔
71
    }
72

73
    var decoratedTask = new RunnableDecorator(task);
1✔
74
    taskQueue.add(decoratedTask);
1✔
75

76
    while (true) {
77
      int s = sync;
1✔
78

79
      // If drain has been asked to recheck for tasks, but hasn't yet rechecked after adding the
80
      // new task, then it will surely see the added task.
81
      if (isKeepAliveBitSet(s)) {
1✔
82
        return;
1✔
83
      }
84

85
      if (!isRunningBitSet(s)) {
1✔
86
        if (SYNC.compareAndSet(this, s, s | RUN)) {
1✔
87
          tryStart(decoratedTask);
1✔
88
          return;
1✔
89
        }
90
      } else if (SYNC.compareAndSet(this, s, (s | KEEP_ALIVE))) {
1✔
91
        return;
1✔
92
      }
UNCOV
93
    }
×
94
  }
95

96
  private void tryStart(RunnableDecorator task) {
97
    // TODO consider retrying/offloading to an async executor (e.g. common pool).
98
    try {
99
      delegate.execute(this::drain);
1✔
100
    } catch (RuntimeException | Error e) {
1✔
101
      SYNC.getAndBitwiseAnd(this, ~(RUN | KEEP_ALIVE));
1✔
102
      if (!(e instanceof RejectedExecutionException) || taskQueue.remove(task)) {
1✔
103
        throw e;
1✔
104
      }
105
    }
1✔
106
  }
1✔
107

108
  public void shutdown() {
109
    SYNC.getAndBitwiseOr(this, SHUTDOWN);
1✔
110
  }
1✔
111

112
  private void drain() {
113
    boolean interrupted = false;
1✔
114
    while (true) {
115
      Runnable task;
116
      while ((task = taskQueue.poll()) != null) {
1✔
117
        try {
118
          interrupted |= Thread.interrupted();
1✔
119
          task.run();
1✔
120
        } catch (Throwable t) {
1✔
121
          // Before propagating, try to reschedule ourselves if we still have work. This is done
122
          // asynchronously in common FJ pool to rethrow immediately (delegate is not guaranteed to
123
          // execute tasks asynchronously).
124
          SYNC.getAndBitwiseAnd(this, ~(RUN | KEEP_ALIVE));
1✔
125
          if (!taskQueue.isEmpty()) {
1✔
126
            try {
127
              ForkJoinPool.commonPool().execute(() -> execute(() -> {}));
1✔
128
            } catch (RuntimeException | Error e) {
×
129
              // Not much we can do here.
130
              t.addSuppressed(e);
×
131
            }
1✔
132
          }
133
          throw t;
1✔
134
        }
1✔
135
      }
136

137
      // Exit or consume KEEP_ALIVE bit.
138
      int s = sync;
1✔
139
      int unsetBit = (s & KEEP_ALIVE) != 0 ? KEEP_ALIVE : RUN;
1✔
140
      if (SYNC.weakCompareAndSet(this, s, s & ~unsetBit) && unsetBit == RUN) {
1✔
141
        if (interrupted) {
1✔
142
          Thread.currentThread().interrupt();
×
143
        }
144
        return;
1✔
145
      }
146
    }
1✔
147
  }
148

149
  boolean isRunningBitSet() {
150
    return isRunningBitSet(sync);
×
151
  }
152

153
  boolean isShutdownBitSet() {
154
    return isShutdownBitSet(sync);
1✔
155
  }
156

157
  boolean isKeepAliveBitSet() {
158
    return isKeepAliveBitSet(sync);
×
159
  }
160

161
  @Override
162
  public String toString() {
163
    return Utils.toStringIdentityPrefix(this)
×
164
        + "[delegate="
165
        + delegate
166
        + ", running="
167
        + isRunningBitSet()
×
168
        + ", keepAlive="
169
        + isKeepAliveBitSet()
×
170
        + ", shutdown="
171
        + isShutdownBitSet()
×
172
        + "]";
173
  }
174

175
  private static boolean isRunningBitSet(int s) {
176
    return (s & RUN) != 0;
1✔
177
  }
178

179
  private static boolean isShutdownBitSet(int s) {
180
    return (s & SHUTDOWN) != 0;
1✔
181
  }
182

183
  private static boolean isKeepAliveBitSet(int s) {
184
    return (s & KEEP_ALIVE) != 0;
1✔
185
  }
186

187
  /**
188
   * Associates an identity with each task passed to {@link #execute(Runnable)} so it is
189
   * deterministically removed from the task queue when the delegate executor rejects the drain
190
   * task.
191
   */
192
  private static final class RunnableDecorator implements Runnable {
193
    private final Runnable delegate;
194

195
    RunnableDecorator(Runnable delegate) {
1✔
196
      this.delegate = requireNonNull(delegate);
1✔
197
    }
1✔
198

199
    @Override
200
    public void run() {
201
      delegate.run();
1✔
202
    }
1✔
203

204
    @Override
205
    public String toString() {
206
      return delegate.toString();
×
207
    }
208
  }
209
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc