• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mizosoft / methanol / #598

10 Sep 2025 12:10PM UTC coverage: 89.059% (+0.02%) from 89.044%
#598

Pull #133

github

mizosoft
Try increasing timeout
Pull Request #133: Bound WritableBodyPublisher memory usage

2349 of 2822 branches covered (83.24%)

Branch coverage included in aggregate %.

138 of 152 new or added lines in 11 files covered. (90.79%)

1 existing line in 1 file now uncovered.

7704 of 8466 relevant lines covered (91.0%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.2
/methanol/src/main/java/com/github/mizosoft/methanol/WritableBodyPublisher.java
1
/*
2
 * Copyright (c) 2025 Moataz Hussein
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to deal
6
 * in the Software without restriction, including without limitation the rights
7
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
 * copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in all
12
 * copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
 * SOFTWARE.
21
 */
22

23
package com.github.mizosoft.methanol;
24

25
import static com.github.mizosoft.methanol.internal.Validate.requireArgument;
26
import static com.github.mizosoft.methanol.internal.Validate.requireState;
27
import static java.util.Objects.requireNonNull;
28

29
import com.github.mizosoft.methanol.internal.Utils;
30
import com.github.mizosoft.methanol.internal.flow.AbstractPollableSubscription;
31
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
32
import com.google.errorprone.annotations.concurrent.GuardedBy;
33
import java.io.Flushable;
34
import java.io.IOException;
35
import java.io.InterruptedIOException;
36
import java.io.OutputStream;
37
import java.io.UncheckedIOException;
38
import java.lang.invoke.MethodHandles;
39
import java.lang.invoke.VarHandle;
40
import java.net.http.HttpRequest.BodyPublisher;
41
import java.net.http.HttpRequest.BodyPublishers;
42
import java.nio.ByteBuffer;
43
import java.nio.channels.AsynchronousCloseException;
44
import java.nio.channels.Channels;
45
import java.nio.channels.ClosedChannelException;
46
import java.nio.channels.WritableByteChannel;
47
import java.util.ArrayDeque;
48
import java.util.Queue;
49
import java.util.concurrent.Flow.Publisher;
50
import java.util.concurrent.Flow.Subscriber;
51
import java.util.concurrent.atomic.AtomicBoolean;
52
import java.util.concurrent.locks.Condition;
53
import java.util.concurrent.locks.Lock;
54
import java.util.concurrent.locks.ReentrantLock;
55
import java.util.function.BooleanSupplier;
56
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
57
import org.checkerframework.checker.nullness.qual.Nullable;
58

59
/**
60
 * A {@code BodyPublisher} that allows streaming the body's content through an {@code OutputStream}
61
 * or a {@code WritableByteChannel}. It is recommended to use {@link
62
 * MoreBodyPublishers#ofOutputStream} or {@link MoreBodyPublishers#ofWritableByteChannel} instead of
63
 * directly using this class.
64
 *
65
 * <p>After writing is finished, the publisher must be closed to complete the request (either by
66
 * calling {@link #close()} or closing the {@code OutputStream} or the {@code WritableByteChannel},
67
 * or using a try-with-resources construct). Additionally, {@link #closeExceptionally(Throwable)}
68
 * can be used to fail the request in case an error is encountered while writing.
69
 *
70
 * <p>Note that ({@link #contentLength()} always returns {@code -1}). If the content length is known
71
 * prior to writing, {@link BodyPublishers#fromPublisher(Publisher, long)} can be used to attach the
72
 * known length to this publisher.
73
 */
74
public final class WritableBodyPublisher implements BodyPublisher, Flushable, AutoCloseable {
75
  private static final VarHandle STATE;
76

77
  static {
78
    try {
79
      STATE =
80
          MethodHandles.lookup().findVarHandle(WritableBodyPublisher.class, "state", State.class);
1✔
81
    } catch (NoSuchFieldException | IllegalAccessException e) {
×
82
      throw new ExceptionInInitializerError(e);
×
83
    }
1✔
84
  }
1✔
85

86
  private static final int SIGNALS_AVAILABLE_BIT = 1 << 31;
87

88
  private final AtomicBoolean subscribed = new AtomicBoolean();
1✔
89
  private final Stream stream;
90

91
  @SuppressWarnings("FieldMayBeFinal") // VarHandle indirection.
1✔
92
  private State state = Initial.INSTANCE;
93

94
  private interface State {}
95

96
  private enum Initial implements State {
1✔
97
    INSTANCE
1✔
98
  }
99

100
  private static final class Subscribed implements State {
101
    final SubscriptionImpl subscription;
102

103
    Subscribed(SubscriptionImpl subscription) {
1✔
104
      this.subscription = requireNonNull(subscription);
1✔
105
    }
1✔
106
  }
107

108
  private static final class Closed implements State {
109
    static final Closed NORMALLY = new Closed(null);
1✔
110

111
    final @Nullable Throwable exception;
112

113
    Closed(@Nullable Throwable exception) {
1✔
114
      this.exception = exception;
1✔
115
    }
1✔
116

117
    static Closed normally() {
118
      return NORMALLY;
1✔
119
    }
120

121
    static Closed exceptionally(Throwable exception) {
122
      return new Closed(requireNonNull(exception));
1✔
123
    }
124
  }
125

126
  private @MonotonicNonNull WritableByteChannel lazyChannel;
127
  private @MonotonicNonNull OutputStream lazyOutputStream;
128

129
  private WritableBodyPublisher(int bufferSize, int memoryQuota, QueuedMemoryTracker tracker) {
1✔
130
    requireArgument(bufferSize > 0, "Non-positive bufferSize");
1!
131
    requireArgument(memoryQuota > 0, "Non-positive maxBufferedBytes");
1!
132
    requireArgument(memoryQuota >= bufferSize, "memoryQuota < bufferSize");
1!
133
    this.stream = new Stream(bufferSize, memoryQuota, this::isClosed, requireNonNull(tracker));
1✔
134
  }
1✔
135

136
  /** Returns a {@code WritableByteChannel} for writing this body's content. */
137
  public WritableByteChannel byteChannel() {
138
    var channel = lazyChannel;
1✔
139
    if (channel == null) {
1✔
140
      channel = new SinkChannel();
1✔
141
      lazyChannel = channel;
1✔
142
    }
143
    return channel;
1✔
144
  }
145

146
  /** Returns a {@code OutputStream} for writing this body's content. */
147
  public OutputStream outputStream() {
148
    var outputStream = lazyOutputStream;
1✔
149
    if (outputStream == null) {
1✔
150
      outputStream = new SinkOutputStream(byteChannel());
1✔
151
      lazyOutputStream = outputStream;
1✔
152
    }
153
    return outputStream;
1✔
154
  }
155

156
  /**
157
   * Unless already closed, causes the subscribed (or yet to subscribe) client to fail with the
158
   * given exception.
159
   */
160
  public void closeExceptionally(Throwable exception) {
161
    State prevState;
162
    while (true) {
163
      var currentState = state;
1✔
164
      if (currentState instanceof Closed) {
1!
165
        FlowSupport.onDroppedException(exception);
×
166
        return;
×
167
      }
168

169
      prevState = currentState;
1✔
170

171
      // If subscribed, we can pass the exception to the subscription & skip storing it.
172
      if (currentState instanceof Subscribed
1✔
173
          && STATE.compareAndSet(this, currentState, Closed.normally())) {
1!
174
        break;
1✔
175
      }
176

177
      // If not subscribed, we must store the exception so the future subscriber consumes it.
178
      if (currentState == Initial.INSTANCE
1!
179
          && STATE.compareAndSet(this, currentState, Closed.exceptionally(exception))) {
1!
180
        break;
1✔
181
      }
182
    }
×
183

184
    stream.close(true);
1✔
185
    if (prevState instanceof Subscribed) {
1✔
186
      ((Subscribed) prevState).subscription.fireOrKeepAliveOnError(exception);
1✔
187
    }
188
  }
1✔
189

190
  /**
191
   * Unless already closed, causes the subscribed (or yet to subscribe) client to be completed after
192
   * the content written so far has been consumed.
193
   */
194
  @Override
195
  public void close() {
196
    State prevState;
197
    while (true) {
198
      var currentState = state;
1✔
199
      if (currentState instanceof Closed) {
1✔
200
        return;
1✔
201
      }
202

203
      prevState = currentState;
1✔
204
      if (STATE.compareAndSet(this, currentState, Closed.normally())) {
1!
205
        break;
1✔
206
      }
207
    }
×
208

209
    stream.close(false);
1✔
210
    if (prevState instanceof Subscribed) {
1✔
211
      ((Subscribed) prevState).subscription.fireOrKeepAlive();
1✔
212
    }
213
  }
1✔
214

215
  /**
216
   * Returns {@code true} if this publisher is closed by either {@link #close} or {@link
217
   * #closeExceptionally}.
218
   */
219
  public boolean isClosed() {
220
    return state instanceof Closed;
1✔
221
  }
222

223
  /**
224
   * Makes any buffered content available for consumption by downstream.
225
   *
226
   * @throws IllegalStateException if closed
227
   * @throws UncheckedIOException if an interruption occurs while waiting
228
   */
229
  @Override
230
  public void flush() {
231
    requireState(!isClosed(), "Closed");
1✔
232
    try {
233
      fireOrKeepAliveOnNextIf(stream.flush());
1✔
NEW
234
    } catch (InterruptedIOException e) {
×
NEW
235
      throw new UncheckedIOException(e);
×
236
    }
1✔
237
  }
1✔
238

239
  @Override
240
  public long contentLength() {
241
    return -1;
1✔
242
  }
243

244
  @Override
245
  public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
246
    if (subscribed.compareAndSet(false, true)) {
1✔
247
      var subscription = new SubscriptionImpl(subscriber);
1✔
248
      if (STATE.compareAndSet(this, Initial.INSTANCE, new Subscribed(subscription))) {
1✔
249
        subscription.fireOrKeepAlive();
1✔
250
      } else {
251
        var currentState = state;
1✔
252
        if (currentState instanceof Closed) {
1!
253
          var exception = ((Closed) currentState).exception;
1✔
254
          if (exception != null) {
1✔
255
            subscription.fireOrKeepAliveOnError(exception);
1✔
256
          } else {
257
            subscription.fireOrKeepAlive();
1✔
258
          }
259
        }
260
      }
261
    } else {
1✔
262
      FlowSupport.rejectMulticast(subscriber);
1✔
263
    }
264
  }
1✔
265

266
  @SuppressWarnings("AssignmentExpression")
267
  private void fireOrKeepAliveOnNextIf(boolean condition) {
268
    State currentState;
269
    if (condition && (currentState = state) instanceof Subscribed) {
1✔
270
      ((Subscribed) currentState).subscription.fireOrKeepAliveOnNext();
1✔
271
    }
272
  }
1✔
273

274
  /** Returns a new {@code WritableBodyPublisher}. */
275
  public static WritableBodyPublisher create() {
276
    return create(Utils.BUFFER_SIZE, NoopQueuedMemoryTracker.INSTANCE);
1✔
277
  }
278

279
  /**
280
   * Returns a new {@code WritableBodyPublisher} with the given buffer size (number of bytes to
281
   * buffer before making them available to the HTTP client).
282
   */
283
  public static WritableBodyPublisher create(int bufferSize) {
284
    return create(bufferSize, NoopQueuedMemoryTracker.INSTANCE);
1✔
285
  }
286

287
  // For testing.
288
  interface QueuedMemoryTracker {
289
    void queued(int size);
290

291
    void dequeued(int size);
292
  }
293

294
  private enum NoopQueuedMemoryTracker implements QueuedMemoryTracker {
1✔
295
    INSTANCE;
1✔
296

297
    @Override
298
    public void queued(int size) {}
1✔
299

300
    @Override
301
    public void dequeued(int size) {}
1✔
302
  }
303

304
  static WritableBodyPublisher create(int bufferSize, QueuedMemoryTracker queuedMemoryTracker) {
305
    return new WritableBodyPublisher(
1✔
306
        bufferSize, queuedMemoryQuotaFor(bufferSize), queuedMemoryTracker);
1✔
307
  }
308

309
  static int queuedMemoryQuotaFor(int bufferSize) {
310
    return Math.multiplyExact(FlowSupport.prefetch(), bufferSize);
1✔
311
  }
312

313
  private final class SinkChannel implements WritableByteChannel {
314
    SinkChannel() {}
1✔
315

316
    @Override
317
    public int write(ByteBuffer src) throws IOException {
318
      int written = stream.write(src);
1✔
319
      fireOrKeepAliveOnNextIf((written & SIGNALS_AVAILABLE_BIT) != 0);
1✔
320
      return written & ~SIGNALS_AVAILABLE_BIT;
1✔
321
    }
322

323
    @Override
324
    public boolean isOpen() {
325
      return !isClosed();
1!
326
    }
327

328
    @Override
329
    public void close() {
330
      WritableBodyPublisher.this.close();
1✔
331
    }
1✔
332
  }
333

334
  /**
335
   * WritableByteChannel adapter for an OutputStream. Most work is done by Channels::newOutputStream
336
   * but overridden to forward flush() and close() to this publisher.
337
   */
338
  private final class SinkOutputStream extends OutputStream {
339
    private final OutputStream out;
340

341
    SinkOutputStream(WritableByteChannel channel) {
1✔
342
      this.out = Channels.newOutputStream(channel);
1✔
343
    }
1✔
344

345
    @Override
346
    public void write(int b) throws IOException {
347
      out.write(b);
1✔
348
    }
1✔
349

350
    @Override
351
    public void write(byte[] b, int off, int len) throws IOException {
352
      out.write(b, off, len);
1✔
353
    }
1✔
354

355
    @Override
356
    public void flush() throws IOException {
357
      try {
358
        WritableBodyPublisher.this.flush();
1✔
359
      } catch (IllegalStateException e) {
1✔
360
        // Throw a more appropriate exception for an OutputStream.
361
        throw new IOException("Closed", e);
1✔
362
      }
1✔
363
    }
1✔
364

365
    @Override
366
    public void close() {
367
      WritableBodyPublisher.this.close();
1✔
368
    }
1✔
369
  }
370

371
  private final class SubscriptionImpl extends AbstractPollableSubscription<ByteBuffer> {
372
    SubscriptionImpl(Subscriber<? super ByteBuffer> downstream) {
1✔
373
      super(downstream, FlowSupport.SYNC_EXECUTOR);
1✔
374
    }
1✔
375

376
    @Override
377
    protected @Nullable ByteBuffer poll() {
378
      return stream.poll();
1✔
379
    }
380

381
    @Override
382
    protected boolean isComplete() {
383
      return stream.isComplete();
1✔
384
    }
385

386
    @Override
387
    protected void abort(boolean flowInterrupted) {
388
      // Make sure we reflect unexpected cancellations (e.g. subscriber failures) on the stream.
389
      while (true) {
390
        var currentState = state;
1✔
391
        if (currentState instanceof Closed) {
1✔
392
          return;
1✔
393
        }
394

395
        if (STATE.compareAndSet(WritableBodyPublisher.this, currentState, Closed.normally())) {
1!
396
          stream.close(flowInterrupted);
1✔
397
          return;
1✔
398
        }
NEW
399
      }
×
400
    }
401
  }
402

403
  private static final class Stream {
404
    private final Lock lock = new ReentrantLock();
1✔
405
    private final Condition hasQueuedMemoryQuota = lock.newCondition();
1✔
406

407
    /** A lock that serializes writing to {@code pipe}. */
408
    @GuardedBy("lock")
1✔
409
    private final Queue<ByteBuffer> queue = new ArrayDeque<>();
410

411
    private final QueuedMemoryTracker queuedMemoryTracker;
412

413
    /** Checks whether the publisher is closed. Used for checking asynchronous closure. */
414
    private final BooleanSupplier isClosed;
415

416
    private final int queuedMemoryQuota;
417
    private final int bufferSize;
418

419
    @GuardedBy("lock")
420
    private int queuedMemory;
421

422
    @GuardedBy("lock")
423
    private @Nullable ByteBuffer sinkBuffer;
424

425
    Stream(
426
        int bufferSize,
427
        int queuedMemoryQuota,
428
        BooleanSupplier isClosed,
429
        QueuedMemoryTracker queuedMemoryTracker) {
1✔
430
      this.bufferSize = bufferSize;
1✔
431
      this.queuedMemoryQuota = queuedMemoryQuota;
1✔
432
      this.isClosed = isClosed;
1✔
433
      this.queuedMemoryTracker = queuedMemoryTracker;
1✔
434
    }
1✔
435

436
    int write(ByteBuffer src) throws IOException {
437
      if (isClosed.getAsBoolean()) {
1✔
438
        throw new ClosedChannelException();
1✔
439
      }
440
      if (!src.hasRemaining()) {
1!
NEW
441
        return 0;
×
442
      }
443

444
      int written = 0;
1✔
445
      boolean signalsAvailable = false;
1✔
446
      lock.lock();
1✔
447
      try {
448
        do {
449
          if (sinkBuffer == null) {
1✔
450
            sinkBuffer = ByteBuffer.allocate(bufferSize);
1✔
451
          }
452

453
          written += Utils.copyRemaining(src, sinkBuffer);
1✔
454
          if (!sinkBuffer.hasRemaining()) {
1✔
455
            var readableBuffer = sinkBuffer.flip().asReadOnlyBuffer();
1✔
456
            try {
457
              // Block for queue space.
458
              while (queuedMemory > queuedMemoryQuota - readableBuffer.remaining()) {
1✔
459
                hasQueuedMemoryQuota.await();
1✔
460
              }
NEW
461
            } catch (InterruptedException e) {
×
NEW
462
              throw Utils.toInterruptedIOException(e);
×
463
            }
1✔
464

465
            // We might've been closed while waiting for space.
466
            if (isClosed.getAsBoolean()) {
1!
NEW
467
              signalsAvailable = false;
×
NEW
468
              break;
×
469
            }
470

471
            sinkBuffer = null;
1✔
472
            queue.add(readableBuffer);
1✔
473
            queuedMemory += readableBuffer.remaining();
1✔
474
            queuedMemoryTracker.queued(readableBuffer.remaining());
1✔
475
            signalsAvailable = true;
1✔
476
          }
477
        } while (src.hasRemaining() && !isClosed.getAsBoolean());
1!
478

479
        if (isClosed.getAsBoolean()) { // Check if asynchronously closed.
1✔
480
          signalsAvailable = false;
1✔
481
          if (written <= 0) { // Only report if no bytes were written.
1!
NEW
482
            throw new AsynchronousCloseException();
×
483
          }
484
        }
485
        return written | (signalsAvailable ? SIGNALS_AVAILABLE_BIT : 0);
1✔
486
      } finally {
487
        lock.unlock();
1✔
488
      }
489
    }
490

491
    boolean flush() throws InterruptedIOException {
492
      lock.lock();
1✔
493
      try {
494
        return flush(false);
1✔
495
      } finally {
496
        lock.unlock();
1✔
497
      }
498
    }
499

500
    @GuardedBy("lock")
501
    private boolean flush(boolean closing) throws InterruptedIOException {
502
      var buffer = sinkBuffer;
1✔
503
      if (buffer == null || buffer.position() == 0) {
1!
504
        return false;
1✔
505
      }
506

507
      // Only wait for space if we're not closing. Otherwise, this will be the last buffer to push
508
      // so there is no need to thwart.
509
      var remainingSlice = buffer.slice();
1✔
510
      var readableBuffer = buffer.flip().asReadOnlyBuffer();
1✔
511
      if (!closing) {
1✔
512
        try {
513
          while (queuedMemory > queuedMemoryQuota - readableBuffer.remaining()) {
1✔
514
            hasQueuedMemoryQuota.await();
1✔
515
          }
NEW
516
        } catch (InterruptedException e) {
×
NEW
517
          throw Utils.toInterruptedIOException(e);
×
518
        }
1✔
519
      }
520

521
      if (closing) {
1✔
522
        sinkBuffer = null;
1✔
523
      } else if (buffer.position() < buffer.limit()) {
1!
524
        sinkBuffer = remainingSlice; // Keep the remaining free space.
1✔
525
      }
526

527
      queue.add(readableBuffer);
1✔
528
      queuedMemory += readableBuffer.remaining();
1✔
529
      queuedMemoryTracker.queued(readableBuffer.remaining());
1✔
530
      return true;
1✔
531
    }
532

533
    void close(boolean flowInterrupted) {
534
      lock.lock();
1✔
535
      try {
536
        if (flowInterrupted) {
1✔
537
          sinkBuffer = null;
1✔
538
          queue.clear();
1✔
539
        } else {
540
          try {
541
            flush(true);
1✔
NEW
542
          } catch (InterruptedIOException e) {
×
NEW
543
            throw new AssertionError(e); // Can't happen since we'll not be waiting.
×
544
          }
1✔
545
        }
546

547
        // Wake up waiters if any.
548
        queuedMemory = 0;
1✔
549
        hasQueuedMemoryQuota.signalAll();
1✔
550
      } finally {
551
        lock.unlock();
1✔
552
      }
553
    }
1✔
554

555
    @Nullable ByteBuffer poll() {
556
      lock.lock();
1✔
557
      try {
558
        var buffer = queue.poll();
1✔
559
        if (buffer != null) {
1✔
560
          queuedMemory -= buffer.remaining();
1✔
561
          queuedMemoryTracker.dequeued(buffer.remaining());
1✔
562
          hasQueuedMemoryQuota.signalAll();
1✔
563
        }
564
        return buffer;
1✔
565
      } finally {
566
        lock.unlock();
1✔
567
      }
568
    }
569

570
    boolean isComplete() {
571
      // Here we utilize the fact that once the stream is closed (through the publisher), no more
572
      // writes are expected. So we're complete if the queue is empty after detecting closure.
573
      if (!isClosed.getAsBoolean()) {
1✔
574
        return false;
1✔
575
      }
576

577
      lock.lock();
1✔
578
      try {
579
        return queue.isEmpty();
1✔
580
      } finally {
581
        lock.unlock();
1✔
582
      }
583
    }
584
  }
585
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc