• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mizosoft / methanol / #587

08 Sep 2025 03:21PM UTC coverage: 88.972% (-0.06%) from 89.031%
#587

Pull #135

github

mizosoft
Explicitly set US locale on DateTimeFormatters
Pull Request #135: Explicitly set US locale on DateTimeFormatters

2321 of 2796 branches covered (83.01%)

Branch coverage included in aggregate %.

8 of 8 new or added lines in 1 file covered. (100.0%)

5 existing lines in 3 files now uncovered.

7635 of 8394 relevant lines covered (90.96%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.37
/methanol/src/main/java/com/github/mizosoft/methanol/internal/cache/CacheWritingPublisher.java
1
/*
2
 * Copyright (c) 2024 Moataz Hussein
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to deal
6
 * in the Software without restriction, including without limitation the rights
7
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
 * copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in all
12
 * copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
 * SOFTWARE.
21
 */
22

23
package com.github.mizosoft.methanol.internal.cache;
24

25
import static java.util.Objects.requireNonNull;
26

27
import com.github.mizosoft.methanol.internal.cache.Store.Editor;
28
import com.github.mizosoft.methanol.internal.cache.Store.EntryWriter;
29
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
30
import com.github.mizosoft.methanol.internal.flow.Upstream;
31
import java.lang.System.Logger;
32
import java.lang.System.Logger.Level;
33
import java.lang.invoke.MethodHandles;
34
import java.lang.invoke.VarHandle;
35
import java.nio.ByteBuffer;
36
import java.util.ArrayList;
37
import java.util.Collections;
38
import java.util.List;
39
import java.util.concurrent.ConcurrentLinkedQueue;
40
import java.util.concurrent.Executor;
41
import java.util.concurrent.Flow.Publisher;
42
import java.util.concurrent.Flow.Subscriber;
43
import java.util.concurrent.Flow.Subscription;
44
import java.util.concurrent.atomic.AtomicBoolean;
45
import java.util.stream.Collectors;
46
import org.checkerframework.checker.nullness.qual.NonNull;
47
import org.checkerframework.checker.nullness.qual.Nullable;
48

49
/**
50
 * A {@code Publisher} that writes the body stream into cache while simultaneously forwarding it to
51
 * downstream. Forwarding data downstream and writing it run at different paces. Consequently,
52
 * writing may lag behind downstream consumption, and may, by default, proceed in background after
53
 * downstream has been completed. If the {@code
54
 * com.github.mizosoft.methanol.internal.cache.CacheWritingPublisher.waitForCommit} is true,
55
 * downstream isn't completed unless all data is written and the edit is committed. If an error
56
 * occurs while writing, the edit is silently discarded.
57
 */
58
public final class CacheWritingPublisher implements Publisher<List<ByteBuffer>> {
59
  private static final Logger logger = System.getLogger(CacheWritingPublisher.class.getName());
1✔
60

61
  private static final int SOFT_MAX_BULK_WRITE_SIZE = 8;
62

63
  private final Publisher<List<ByteBuffer>> upstream;
64
  private final Editor editor;
65
  private final ByteBuffer metadata;
66
  private final Executor executor;
67
  private final Listener listener;
68

69
  /** Whether to wait till the cache entry is committed before completing downstream. */
70
  private final boolean waitForCommit;
71

72
  private final AtomicBoolean subscribed = new AtomicBoolean();
1✔
73

74
  public CacheWritingPublisher(
75
      Publisher<List<ByteBuffer>> upstream, Editor editor, ByteBuffer metadata, Executor executor) {
76
    this(upstream, editor, metadata, executor, DisabledListener.INSTANCE, false);
1✔
77
  }
1✔
78

79
  public CacheWritingPublisher(
80
      Publisher<List<ByteBuffer>> upstream,
81
      Editor editor,
82
      ByteBuffer metadata,
83
      Executor executor,
84
      Listener listener) {
85
    this(upstream, editor, metadata, executor, listener, false);
1✔
86
  }
1✔
87

88
  public CacheWritingPublisher(
89
      Publisher<List<ByteBuffer>> upstream,
90
      Editor editor,
91
      ByteBuffer metadata,
92
      Executor executor,
93
      Listener listener,
94
      boolean waitForCommit) {
1✔
95
    this.upstream = requireNonNull(upstream);
1✔
96
    this.editor = requireNonNull(editor);
1✔
97
    this.metadata = requireNonNull(metadata);
1✔
98
    this.executor = requireNonNull(executor);
1✔
99
    this.listener = requireNonNull(listener);
1✔
100
    this.waitForCommit = waitForCommit;
1✔
101
  }
1✔
102

103
  @Override
104
  public void subscribe(Subscriber<? super List<ByteBuffer>> subscriber) {
105
    requireNonNull(subscriber);
1✔
106
    if (subscribed.compareAndSet(false, true)) {
1✔
107
      upstream.subscribe(
1✔
108
          new CacheWritingSubscriber(
109
              subscriber, editor, metadata, executor, listener, waitForCommit));
110
    } else {
111
      FlowSupport.rejectMulticast(subscriber);
1✔
112
    }
113
  }
1✔
114

115
  public interface Listener {
116
    void onWriteSuccess();
117

118
    void onWriteFailure(Throwable exception);
119

120
    default Listener guarded() {
121
      return new Listener() {
1✔
122
        @Override
123
        public void onWriteSuccess() {
124
          try {
125
            Listener.this.onWriteSuccess();
1✔
126
          } catch (Throwable e) {
×
127
            logger.log(Level.WARNING, "Exception thrown by Listener::onWriteSuccess", e);
×
128
          }
1✔
129
        }
1✔
130

131
        @Override
132
        public void onWriteFailure(Throwable exception) {
133
          try {
134
            Listener.this.onWriteFailure(exception);
1✔
135
          } catch (Throwable e) {
×
136
            logger.log(Level.WARNING, "Exception thrown by Listener::onWriteFailure", e);
×
137
          }
1✔
138
        }
1✔
139
      };
140
    }
141

142
    static Listener disabled() {
143
      return DisabledListener.INSTANCE;
1✔
144
    }
145
  }
146

147
  private enum DisabledListener implements Listener {
1✔
148
    INSTANCE;
1✔
149

150
    @Override
151
    public void onWriteSuccess() {}
1✔
152

153
    @Override
154
    public void onWriteFailure(Throwable exception) {}
1✔
155
  }
156

157
  private static final class CacheWritingSubscriber implements Subscriber<List<ByteBuffer>> {
158
    private final CacheWritingSubscription downstreamSubscription;
159

160
    CacheWritingSubscriber(
161
        Subscriber<? super List<ByteBuffer>> downstream,
162
        Editor editor,
163
        ByteBuffer metadata,
164
        Executor executor,
165
        Listener listener,
166
        boolean waitForCommit) {
1✔
167
      downstreamSubscription =
1✔
168
          new CacheWritingSubscription(
169
              downstream, editor, metadata, executor, listener, waitForCommit);
170
    }
1✔
171

172
    @Override
173
    public void onSubscribe(Subscription subscription) {
174
      downstreamSubscription.onSubscribe(requireNonNull(subscription));
1✔
175
    }
1✔
176

177
    @Override
178
    public void onNext(List<ByteBuffer> item) {
179
      downstreamSubscription.onNext(requireNonNull(item));
1✔
180
    }
1✔
181

182
    @Override
183
    public void onError(Throwable throwable) {
184
      downstreamSubscription.onError(requireNonNull(throwable));
1✔
185
    }
1✔
186

187
    @Override
188
    public void onComplete() {
189
      downstreamSubscription.onComplete();
1✔
190
    }
1✔
191
  }
192

193
  static final class CacheWritingSubscription implements Subscription {
194
    private static final VarHandle DOWNSTREAM;
195
    private static final VarHandle STATE;
196

197
    static {
198
      try {
199
        var lookup = MethodHandles.lookup();
1✔
200
        DOWNSTREAM =
1✔
201
            lookup.findVarHandle(CacheWritingSubscription.class, "downstream", Subscriber.class);
1✔
202
        STATE = lookup.findVarHandle(CacheWritingSubscription.class, "state", WritingState.class);
1✔
203
      } catch (NoSuchFieldException | IllegalAccessException e) {
×
204
        throw new ExceptionInInitializerError(e);
×
205
      }
1✔
206
    }
1✔
207

208
    @SuppressWarnings("FieldMayBeFinal") // VarHandle indirection.
209
    private volatile @Nullable Subscriber<? super List<ByteBuffer>> downstream;
210

211
    private final Editor editor;
212
    private final ByteBuffer metadata;
213
    private final EntryWriter writer;
214
    private final Executor executor;
215
    private final Listener listener;
216
    private final boolean waitForCommit;
217
    private final Upstream upstream = new Upstream();
1✔
218
    private final ConcurrentLinkedQueue<List<ByteBuffer>> writeQueue =
1✔
219
        new ConcurrentLinkedQueue<>();
220

221
    @SuppressWarnings("FieldMayBeFinal") // VarHandle indirection.
1✔
222
    private volatile WritingState state = WritingState.IDLE;
223

224
    private enum WritingState {
1✔
225
      IDLE,
1✔
226
      WRITING,
1✔
227
      COMMITTING,
1✔
228
      DONE
1✔
229
    }
230

231
    /**
232
     * Set to true when onComplete() is called, after then the edit is to be committed as soon as
233
     * writeQueue becomes empty.
234
     */
235
    private volatile boolean receivedBodyCompletion;
236

237
    CacheWritingSubscription(
238
        @NonNull Subscriber<? super List<ByteBuffer>> downstream,
239
        Editor editor,
240
        ByteBuffer metadata,
241
        Executor executor,
242
        Listener listener,
243
        boolean waitForCommit) {
1✔
244
      this.downstream = downstream;
1✔
245
      this.editor = editor;
1✔
246
      this.metadata = metadata;
1✔
247
      this.executor = executor;
1✔
248
      this.writer = editor.writer();
1✔
249
      this.listener = listener.guarded(); // Ensure the listener doesn't throw.
1✔
250
      this.waitForCommit = waitForCommit;
1✔
251
    }
1✔
252

253
    @Override
254
    public void request(long n) {
255
      upstream.request(n);
1✔
256
    }
1✔
257

258
    @Override
259
    public void cancel() {
260
      getAndClearDownstream();
1✔
261
      upstream.cancel();
1✔
262

263
      // If we've received the entire body there's no need to discard the edit as it can continue in
264
      // background till committing.
265
      if (!receivedBodyCompletion) {
1✔
266
        discardEdit();
1✔
267
      }
268
    }
1✔
269

270
    void onSubscribe(Subscription subscription) {
271
      if (upstream.setOrCancel(subscription)) {
1!
272
        var subscriber = downstream;
1✔
273
        if (subscriber != null) {
1!
274
          subscriber.onSubscribe(this);
1✔
275
        } else {
276
          logger.log(
×
277
              Level.WARNING,
278
              "Bad reactive-streams implementation: downstream is disposed (completed, errored) before calling onSubscribe");
279
        }
280
      }
281
    }
1✔
282

283
    void onNext(List<ByteBuffer> buffers) {
284
      if (state != WritingState.DONE) {
1✔
285
        // Create independent buffers for writing to cache.
286
        writeQueue.add(
1✔
287
            buffers.stream().map(ByteBuffer::duplicate).collect(Collectors.toUnmodifiableList()));
1✔
288
        tryScheduleWrite(false);
1✔
289
      }
290

291
      var subscriber = downstream;
1✔
292
      if (subscriber != null) {
1✔
293
        subscriber.onNext(buffers);
1✔
294
      }
295
    }
1✔
296

297
    void onError(Throwable exception) {
298
      upstream.clear();
1✔
299
      writeQueue.clear();
1✔
300
      discardEdit();
1✔
301
      listener.onWriteFailure(exception);
1✔
302
      var subscriber = getAndClearDownstream();
1✔
303
      if (subscriber != null) {
1!
304
        subscriber.onError(exception);
1✔
305
      } else {
306
        FlowSupport.onDroppedException(exception);
×
307
      }
308
    }
1✔
309

310
    void onComplete() {
311
      upstream.clear();
1✔
312
      receivedBodyCompletion = true;
1✔
313
      tryScheduleWrite(false);
1✔
314
      if (!waitForCommit || state == WritingState.DONE) {
1✔
315
        completeDownstream();
1✔
316
      }
317
    }
1✔
318

319
    private void completeDownstream() {
320
      var subscriber = getAndClearDownstream();
1✔
321
      if (subscriber != null) {
1✔
322
        subscriber.onComplete();
1✔
323
      }
324
    }
1✔
325

326
    @SuppressWarnings("unchecked")
327
    private Subscriber<? super List<ByteBuffer>> getAndClearDownstream() {
328
      return (Subscriber<? super List<ByteBuffer>>) DOWNSTREAM.getAndSet(this, null);
1✔
329
    }
330

331
    /**
332
     * @param maintainWritingState whether the write is to be scheduled directly after a previous
333
     *     write is completed, allowing to leave the WRITING state as is
334
     */
335
    @SuppressWarnings({"FutureReturnValueIgnored", "StatementWithEmptyBody"})
336
    private boolean tryScheduleWrite(boolean maintainWritingState) {
337
      List<ByteBuffer> buffers = null;
1✔
338
      boolean queueWasEmpty;
339
      while (true) {
340
        if ((buffers != null || (buffers = writeQueue.peek()) != null)
1!
341
            && ((maintainWritingState && state == WritingState.WRITING)
342
                || STATE.compareAndSet(this, WritingState.IDLE, WritingState.WRITING))) {
1✔
343
          writeQueue.poll(); // Consume.
1✔
344

345
          // Take this chance to write as much as we can up to a soft limit. The limit might be
346
          // exceeded, although expectedly slightly.
347
          List<ByteBuffer> moreBuffers;
348
          boolean polledMoreBuffers = false;
1✔
349
          while (buffers.size() < SOFT_MAX_BULK_WRITE_SIZE
1✔
350
              && (moreBuffers = writeQueue.poll()) != null) {
1✔
351
            if (!polledMoreBuffers) {
1✔
352
              polledMoreBuffers = true;
1✔
353
              buffers = new ArrayList<>(buffers);
1✔
354
            }
355
            buffers.addAll(moreBuffers);
1✔
356
          }
357
          if (polledMoreBuffers) {
1✔
358
            buffers = Collections.unmodifiableList(buffers);
1✔
359
          }
360

361
          try {
362
            writer.write(buffers, executor).whenComplete((__, ex) -> onWriteCompletion(ex));
1✔
363
            return true;
1✔
364
          } catch (Throwable t) { // write might throw.
×
365
            discardEdit();
×
366
            listener.onWriteFailure(t);
×
367
            completeDownstreamOnDiscardedEdit();
×
368
            return false;
×
369
          }
370
        } else if ((queueWasEmpty = (buffers == null))
1✔
371
            && receivedBodyCompletion
372
            && (buffers = writeQueue.peek()) == null // Might've missed items before completion.
1!
373
            && STATE.compareAndSet(
1✔
374
                this,
375
                maintainWritingState ? WritingState.WRITING : WritingState.IDLE,
1✔
376
                WritingState.COMMITTING)) {
377
          try {
378
            editor
1✔
379
                .commit(metadata, executor)
1✔
380
                .whenComplete(
1✔
381
                    (__, ex) -> {
382
                      state = WritingState.DONE;
1✔
383
                      closeEditor();
1✔
384
                      if (ex != null) {
1✔
385
                        listener.onWriteFailure(ex);
1✔
386
                      } else {
387
                        listener.onWriteSuccess();
1✔
388
                      }
389
                      completeDownstreamOnCommittedEdit();
1✔
390
                    });
1✔
391
            return true;
1✔
392
          } catch (Throwable t) { // commit might throw.
1✔
393
            discardEdit();
1✔
394
            listener.onWriteFailure(t);
1✔
395
            completeDownstreamOnDiscardedEdit();
1✔
396
            return false;
1✔
397
          }
398
        } else if (queueWasEmpty && buffers != null) {
1!
399
          // Picked up new buffers after perceiving completion, retry.
400
        } else {
401
          return false;
1✔
402
        }
403
      }
404
    }
405

406
    private void onWriteCompletion(@Nullable Throwable exception) {
407
      if (exception != null) {
1✔
408
        discardEdit();
1✔
409
        listener.onWriteFailure(exception);
1✔
410
        completeDownstreamOnDiscardedEdit();
1✔
411
      } else if (!tryScheduleWrite(true)
1✔
412
          && STATE.compareAndSet(this, WritingState.WRITING, WritingState.IDLE)) {
1✔
413
        // There might be signals missed just before CASing to IDLE.
414
        tryScheduleWrite(false);
1✔
415
      }
416
    }
1✔
417

418
    private void discardEdit() {
419
      while (true) {
420
        var currentState = state;
1✔
421
        if (currentState == WritingState.COMMITTING || currentState == WritingState.DONE) {
1✔
422
          return;
1✔
423
        } else if (STATE.compareAndSet(this, currentState, WritingState.DONE)) {
1!
424
          writeQueue.clear();
1✔
425
          closeEditor();
1✔
426
          return;
1✔
427
        }
UNCOV
428
      }
×
429
    }
430

431
    private void closeEditor() {
432
      try {
433
        editor.close();
1✔
434
      } catch (Throwable t) {
1✔
435
        logger.log(Level.WARNING, "Exception thrown when closing the editor", t);
1✔
436
      }
1✔
437
    }
1✔
438

439
    private void completeDownstreamOnCommittedEdit() {
440
      if (waitForCommit) {
1✔
441
        completeDownstream();
1✔
442
      }
443
    }
1✔
444

445
    private void completeDownstreamOnDiscardedEdit() {
446
      if (waitForCommit && receivedBodyCompletion) {
1✔
447
        completeDownstream();
1✔
448
      }
449
    }
1✔
450
  }
451
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc