• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mizosoft / methanol / #577

19 Aug 2025 04:34PM UTC coverage: 87.897%. First build
#577

Pull #132

github

mizosoft
Setup covered & test-contributing projects properly
Pull Request #132: Properly setup coveralls

2325 of 2818 branches covered (82.51%)

Branch coverage included in aggregate %.

18 of 135 new or added lines in 3 files covered. (13.33%)

7639 of 8518 relevant lines covered (89.68%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.04
/methanol/src/main/java/com/github/mizosoft/methanol/WritableBodyPublisher.java
1
/*
2
 * Copyright (c) 2024 Moataz Hussein
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to deal
6
 * in the Software without restriction, including without limitation the rights
7
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
 * copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in all
12
 * copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
 * SOFTWARE.
21
 */
22

23
package com.github.mizosoft.methanol;
24

25
import static com.github.mizosoft.methanol.internal.Validate.requireArgument;
26
import static com.github.mizosoft.methanol.internal.Validate.requireState;
27
import static java.util.Objects.requireNonNull;
28

29
import com.github.mizosoft.methanol.internal.Utils;
30
import com.github.mizosoft.methanol.internal.flow.AbstractQueueSubscription;
31
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
32
import com.google.errorprone.annotations.concurrent.GuardedBy;
33
import java.io.Flushable;
34
import java.io.IOException;
35
import java.io.InterruptedIOException;
36
import java.io.OutputStream;
37
import java.lang.invoke.MethodHandles;
38
import java.lang.invoke.VarHandle;
39
import java.net.http.HttpRequest.BodyPublisher;
40
import java.net.http.HttpRequest.BodyPublishers;
41
import java.nio.ByteBuffer;
42
import java.nio.channels.AsynchronousCloseException;
43
import java.nio.channels.Channels;
44
import java.nio.channels.ClosedChannelException;
45
import java.nio.channels.WritableByteChannel;
46
import java.util.concurrent.ConcurrentLinkedQueue;
47
import java.util.concurrent.Flow.Publisher;
48
import java.util.concurrent.Flow.Subscriber;
49
import java.util.concurrent.Semaphore;
50
import java.util.concurrent.atomic.AtomicBoolean;
51
import java.util.concurrent.locks.Lock;
52
import java.util.concurrent.locks.ReentrantLock;
53
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
54
import org.checkerframework.checker.nullness.qual.Nullable;
55

56
/**
57
 * A {@code BodyPublisher} that allows streaming the body's content through an {@code OutputStream}
58
 * or a {@code WritableByteChannel}. It is recommended to use {@link
59
 * MoreBodyPublishers#ofOutputStream} or {@link MoreBodyPublishers#ofWritableByteChannel} instead of
60
 * directly using this class.
61
 *
62
 * <p>After writing is finished, the publisher must be closed to complete the request (either by
63
 * calling {@link #close()} or closing the {@code OutputStream} or the {@code WritableByteChannel},
64
 * or using a try-with-resources construct). Additionally, {@link #closeExceptionally(Throwable)}
65
 * can be used to fail the request in case an error is encountered while writing.
66
 *
67
 * <p>Note that ({@link #contentLength()} always returns {@code -1}). If the content length is known
68
 * prior to writing, {@link BodyPublishers#fromPublisher(Publisher, long)} can be used to attach the
69
 * known length to this publisher.
70
 */
71
public final class WritableBodyPublisher implements BodyPublisher, Flushable, AutoCloseable {
72
  private static final VarHandle STATE;
73

74
  static {
75
    try {
76
      STATE =
77
          MethodHandles.lookup().findVarHandle(WritableBodyPublisher.class, "state", State.class);
1✔
78
    } catch (NoSuchFieldException | IllegalAccessException e) {
×
79
      throw new ExceptionInInitializerError(e);
×
80
    }
1✔
81
  }
82

83
  private static final ByteBuffer CLOSED_SENTINEL = ByteBuffer.allocate(0);
1✔
84

85
  /** A lock that serializes writing to {@code pipe}. */
86
  private final Lock writeLock = new ReentrantLock();
1✔
87

88
  private final ConcurrentLinkedQueue<ByteBuffer> pipe = new ConcurrentLinkedQueue<>();
1✔
89
  private final AtomicBoolean subscribed = new AtomicBoolean();
1✔
90
  private final int bufferSize;
91
  private final Semaphore queuedBytes;
92

93
  @SuppressWarnings("FieldMayBeFinal") // VarHandle indirection.
1✔
94
  private State state = Initial.INSTANCE;
95

96
  private interface State {}
97

98
  private enum Initial implements State {
1✔
99
    INSTANCE
1✔
100
  }
101

102
  private static final class Subscribed implements State {
103
    final SubscriptionImpl subscription;
104

105
    Subscribed(SubscriptionImpl subscription) {
1✔
106
      this.subscription = requireNonNull(subscription);
1✔
107
    }
1✔
108
  }
109

110
  private static final class Closed implements State {
111
    static final Closed NORMALLY = new Closed(null);
1✔
112

113
    final @Nullable Throwable exception;
114

115
    Closed(@Nullable Throwable exception) {
1✔
116
      this.exception = exception;
1✔
117
    }
1✔
118

119
    static Closed normally() {
120
      return NORMALLY;
1✔
121
    }
122

123
    static Closed exceptionally(Throwable exception) {
124
      return new Closed(requireNonNull(exception));
1✔
125
    }
126
  }
127

128
  @GuardedBy("writeLock")
129
  private @Nullable ByteBuffer sinkBuffer;
130

131
  private @MonotonicNonNull WritableByteChannel lazyChannel;
132
  private @MonotonicNonNull OutputStream lazyOutputStream;
133

134
  private volatile boolean submittedSentinel;
135

136
  private WritableBodyPublisher(int bufferSize, int maxBufferedBytes) {
1✔
137
    requireArgument(bufferSize > 0, "non-positive bufferSize");
1!
138
    requireArgument(maxBufferedBytes > 0, "non-positive maxBufferedBytes");
1!
139
    requireArgument(maxBufferedBytes >= bufferSize, "maxBufferedBytes < bufferSize");
1!
140
    this.bufferSize = bufferSize;
1✔
141
    this.queuedBytes = new Semaphore(maxBufferedBytes);
1✔
142
  }
1✔
143

144
  /** Returns a {@code WritableByteChannel} for writing this body's content. */
145
  public WritableByteChannel byteChannel() {
146
    var channel = lazyChannel;
1✔
147
    if (channel == null) {
1✔
148
      channel = new SinkChannel();
1✔
149
      lazyChannel = channel;
1✔
150
    }
151
    return channel;
1✔
152
  }
153

154
  /** Returns a {@code OutputStream} for writing this body's content. */
155
  public OutputStream outputStream() {
156
    var outputStream = lazyOutputStream;
1✔
157
    if (outputStream == null) {
1✔
158
      outputStream = new SinkOutputStream(byteChannel());
1✔
159
      lazyOutputStream = outputStream;
1✔
160
    }
161
    return outputStream;
1✔
162
  }
163

164
  /**
165
   * Unless already closed, causes the subscribed (or yet to subscribe) client to fail with the
166
   * given exception.
167
   */
168
  public void closeExceptionally(Throwable exception) {
169
    State prevState;
170
    while (true) {
171
      var currentState = state;
1✔
172
      if (currentState instanceof Closed) {
1!
173
        FlowSupport.onDroppedException(exception);
×
174
        return;
×
175
      }
176

177
      prevState = currentState;
1✔
178

179
      // If subscribed, we can pass the exception to the subscription & skip storing it.
180
      if (currentState instanceof Subscribed
1✔
181
          && STATE.compareAndSet(this, currentState, Closed.normally())) {
1!
182
        break;
1✔
183
      }
184

185
      // If not subscribed, we must store the exception so the future subscriber consumes it.
186
      if (currentState == Initial.INSTANCE
1!
187
          && STATE.compareAndSet(this, currentState, Closed.exceptionally(exception))) {
1!
188
        break;
1✔
189
      }
190
    }
×
191

192
    if (prevState instanceof Subscribed) {
1✔
193
      ((Subscribed) prevState).subscription.fireOrKeepAliveOnError(exception);
1✔
194
    }
195
  }
1✔
196

197
  /**
198
   * Unless already closed, causes the subscribed (or yet to subscribe) client to be completed after
199
   * the content written so far has been consumed.
200
   */
201
  @Override
202
  public void close() {
203
    State prevState;
204
    while (true) {
205
      var currentState = state;
1✔
206
      if (currentState instanceof Closed) {
1✔
207
        return;
1✔
208
      }
209

210
      prevState = currentState;
1✔
211
      if (STATE.compareAndSet(this, currentState, Closed.normally())) {
1!
212
        break;
1✔
213
      }
214
    }
×
215

216
    submitSentinel();
1✔
217
    if (prevState instanceof Subscribed) {
1✔
218
      ((Subscribed) prevState).subscription.fireOrKeepAlive();
1✔
219
    }
220
  }
1✔
221

222
  /**
223
   * Returns {@code true} if this publisher is closed by either {@link #close} or {@link
224
   * #closeExceptionally}.
225
   */
226
  public boolean isClosed() {
227
    return state instanceof Closed;
1✔
228
  }
229

230
  /**
231
   * Makes any buffered content available for consumption by the downstream.
232
   *
233
   * @throws IllegalStateException if closed
234
   */
235
  @Override
236
  public void flush() {
237
    requireState(!isClosed(), "closed");
1✔
238
    fireOrKeepAliveOnNextIf(flushBuffer());
1✔
239
  }
1✔
240

241
  @Override
242
  public long contentLength() {
243
    return -1;
1✔
244
  }
245

246
  @Override
247
  public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
248
    if (subscribed.compareAndSet(false, true)) {
1✔
249
      var subscription = new SubscriptionImpl(subscriber);
1✔
250
      if (STATE.compareAndSet(this, Initial.INSTANCE, new Subscribed(subscription))) {
1✔
251
        subscription.fireOrKeepAlive();
1✔
252
      } else {
253
        var currentState = state;
1✔
254
        if (currentState instanceof Closed) {
1!
255
          var exception = ((Closed) currentState).exception;
1✔
256
          if (exception != null) {
1✔
257
            subscription.fireOrKeepAliveOnError(exception);
1✔
258
          } else {
259
            // As close() submits the sentinel value AFTER setting the state, we must double-check
260
            // the sentinel has been submitted before starting the subscription. Otherwise, the
261
            // subscription might miss the sentinel submitted by close().
262
            submitSentinel();
1✔
263
            subscription.fireOrKeepAlive();
1✔
264
          }
265
        }
266
      }
267
    } else {
1✔
268
      FlowSupport.rejectMulticast(subscriber);
1✔
269
    }
270
  }
1✔
271

272
  private void fireOrKeepAliveOnNextIf(boolean condition) {
273
    State currentState;
274
    if (condition && (currentState = state) instanceof Subscribed) {
1✔
275
      ((Subscribed) currentState).subscription.fireOrKeepAliveOnNext();
1✔
276
    }
277
  }
1✔
278

279
  private boolean flushBuffer() {
280
    writeLock.lock();
1✔
281
    try {
282
      return unguardedFlushBuffer();
1✔
283
    } finally {
284
      writeLock.unlock();
1✔
285
    }
286
  }
287

288
  @GuardedBy("writeLock")
289
  private boolean unguardedFlushBuffer() {
290
    var buffer = sinkBuffer;
1✔
291
    if (buffer != null && buffer.position() > 0) {
1!
292
      sinkBuffer = null;
1✔
293
      pipe.add(buffer.flip().asReadOnlyBuffer());
1✔
294
      return true;
1✔
295
    }
296
    return false;
1✔
297
  }
298

299
  private void submitSentinel() {
300
    if (!submittedSentinel) {
1✔
301
      writeLock.lock();
1✔
302
      try {
303
        if (!submittedSentinel) {
1!
304
          submittedSentinel = true;
1✔
305
          flushBuffer();
1✔
306
          pipe.add(CLOSED_SENTINEL);
1✔
307
        }
308
      } finally {
309
        writeLock.unlock();
1✔
310
      }
311
    }
312
  }
1✔
313

314
  /** Returns a new {@code WritableBodyPublisher}. */
315
  public static WritableBodyPublisher create() {
316
    return new WritableBodyPublisher(
1✔
317
        Utils.BUFFER_SIZE, Math.multiplyExact(FlowSupport.prefetch(), Utils.BUFFER_SIZE));
1✔
318
  }
319

320
  /**
321
   * Returns a new {@code WritableBodyPublisher} with the given buffer size (number of bytes to
322
   * buffer before making them available to the HTTP client).
323
   */
324
  public static WritableBodyPublisher create(int bufferSize) {
325
    return new WritableBodyPublisher(
1✔
326
        bufferSize, Math.multiplyExact(FlowSupport.prefetch(), Utils.BUFFER_SIZE));
1✔
327
  }
328

329
  /**
330
   * Returns a new {@code WritableBodyPublisher} with the given buffer size (number of bytes to
331
   * buffer before making them available to the HTTP client) and max number of bytes that can be
332
   * queued before the writer is blocked. It is recommended that {@code maxQueuedBytes} is a
333
   * multiple of {@code bufferSize}.
334
   */
335
  public static WritableBodyPublisher create(int bufferSize, int maxQueuedBytes) {
NEW
336
    return new WritableBodyPublisher(bufferSize, maxQueuedBytes);
×
337
  }
338

339
  private final class SinkChannel implements WritableByteChannel {
340
    SinkChannel() {}
1✔
341

342
    @Override
343
    public int write(ByteBuffer src) throws InterruptedIOException, ClosedChannelException {
344
      requireNonNull(src);
1✔
345
      if (isClosed()) {
1✔
346
        throw new ClosedChannelException();
1✔
347
      }
348
      if (!src.hasRemaining()) {
1!
349
        return 0;
×
350
      }
351

352
      int written = 0;
1✔
353
      boolean signalsAvailable = false;
1✔
354
      writeLock.lock();
1✔
355
      try {
356
        var buffer = sinkBuffer;
1✔
357
        do {
358
          if (buffer == null) {
1✔
359
            buffer = ByteBuffer.allocate(bufferSize);
1✔
360
          }
361
          written += Utils.copyRemaining(src, buffer);
1✔
362
          if (!buffer.hasRemaining()) {
1✔
363
            var readableBuffer = buffer.flip().asReadOnlyBuffer();
1✔
364
            try {
365
              queuedBytes.acquire(readableBuffer.remaining());
1✔
NEW
366
            } catch (InterruptedException e) {
×
NEW
367
              throw Utils.toInterruptedIOException(e);
×
368
            }
1✔
369
            pipe.add(readableBuffer);
1✔
370
            signalsAvailable = true;
1✔
371
            buffer = null;
1✔
372
          }
373
        } while (src.hasRemaining() && isOpen());
1!
374

375
        if (isClosed()) { // Check if asynchronously closed.
1!
376
          sinkBuffer = null;
×
377
          if (written <= 0) { // Only report if no bytes were written.
×
378
            throw new AsynchronousCloseException();
×
379
          }
380
          return written;
×
381
        } else {
382
          sinkBuffer = buffer; // Save the last allocated buffer till it becomes full.
1✔
383
        }
384
      } finally {
385
        writeLock.unlock();
1✔
386
      }
387
      fireOrKeepAliveOnNextIf(signalsAvailable);
1✔
388
      return written;
1✔
389
    }
390

391
    @Override
392
    public boolean isOpen() {
393
      return !isClosed();
1✔
394
    }
395

396
    @Override
397
    public void close() {
398
      WritableBodyPublisher.this.close();
1✔
399
    }
1✔
400
  }
401

402
  /**
403
   * WritableByteChannel adapter for an OutputStream. Most work is done by Channels::newOutputStream
404
   * but overridden to forward flush() and close() to this publisher.
405
   */
406
  private final class SinkOutputStream extends OutputStream {
407
    private final OutputStream out;
408

409
    SinkOutputStream(WritableByteChannel channel) {
1✔
410
      this.out = Channels.newOutputStream(channel);
1✔
411
    }
1✔
412

413
    @Override
414
    public void write(int b) throws IOException {
415
      out.write(b);
1✔
416
    }
1✔
417

418
    @Override
419
    public void write(byte[] b, int off, int len) throws IOException {
420
      out.write(b, off, len);
1✔
421
    }
1✔
422

423
    @Override
424
    public void flush() throws IOException {
425
      try {
426
        WritableBodyPublisher.this.flush();
1✔
427
      } catch (IllegalStateException e) {
1✔
428
        // Throw a more appropriate exception for an OutputStream.
429
        throw new IOException("closed", e);
1✔
430
      }
1✔
431
    }
1✔
432

433
    @Override
434
    public void close() throws IOException {
435
      out.close();
1✔
436
    }
1✔
437
  }
438

439
  private final class SubscriptionImpl extends AbstractQueueSubscription<ByteBuffer> {
440
    SubscriptionImpl(Subscriber<? super ByteBuffer> downstream) {
1✔
441
      super(downstream, FlowSupport.SYNC_EXECUTOR, pipe, CLOSED_SENTINEL);
1✔
442
    }
1✔
443

444
    @Override
445
    public ByteBuffer poll() {
446
      var buffer = super.poll();
1✔
447
      if (buffer != null) {
1✔
448
        queuedBytes.release(buffer.remaining());
1✔
449
      }
450
      return buffer;
1✔
451
    }
452

453
    @Override
454
    protected void abort(boolean flowInterrupted) {
455
      pipe.clear();
1✔
456

457
      // Make sure state also reflects unexpected cancellations. This also allows a possibly active
458
      // writer to detect asynchronous closure.
459
      if (flowInterrupted) {
1✔
460
        while (true) {
461
          var currentState = state;
1✔
462
          if (!(currentState instanceof Subscribed)
1✔
463
              || STATE.compareAndSet(WritableBodyPublisher.this, currentState, Closed.normally())) {
1!
464
            return;
1✔
465
          }
466
        }
×
467
      }
468
    }
1✔
469
  }
470
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc