• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mizosoft / methanol / #590

09 Sep 2025 03:38PM UTC coverage: 88.988% (-0.07%) from 89.053%
#590

Pull #133

github

mizosoft
Try increasing timeout
Pull Request #133: Bound WritableBodyPublisher memory usage

2345 of 2822 branches covered (83.1%)

Branch coverage included in aggregate %.

144 of 160 new or added lines in 12 files covered. (90.0%)

7 existing lines in 3 files now uncovered.

7700 of 8466 relevant lines covered (90.95%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.96
/methanol/src/main/java/com/github/mizosoft/methanol/internal/flow/FlowSupport.java
1
/*
2
 * Copyright (c) 2024 Moataz Hussein
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to deal
6
 * in the Software without restriction, including without limitation the rights
7
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
 * copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in all
12
 * copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
 * SOFTWARE.
21
 */
22

23
package com.github.mizosoft.methanol.internal.flow;
24

25
import static java.util.Objects.requireNonNull;
26

27
import java.lang.System.Logger;
28
import java.lang.System.Logger.Level;
29
import java.lang.invoke.VarHandle;
30
import java.util.concurrent.Executor;
31
import java.util.concurrent.Flow;
32
import java.util.concurrent.Flow.Publisher;
33
import java.util.concurrent.Flow.Subscriber;
34
import java.util.stream.Collectors;
35

36
/** Helpers for implementing reactive streams subscriptions and the like. */
37
public class FlowSupport {
38
  private static final Logger logger = System.getLogger(FlowSupport.class.getName());
1✔
39

40
  static final String PREFETCH_PROP = "com.github.mizosoft.methanol.flow.prefetch";
41
  static final String PREFETCH_FACTOR_PROP = "com.github.mizosoft.methanol.flow.prefetchFactor";
42

43
  /** Have a humble number as we're dealing with 16Kb buffers. */
44
  static final int DEFAULT_PREFETCH = 8;
45

46
  static final int DEFAULT_PREFETCH_FACTOR = 50; // Request more when half are consumed.
47

48
  private static final int PREFETCH = loadPrefetch();
1✔
49
  private static final int PREFETCH_THRESHOLD = (int) (PREFETCH * (loadPrefetchFactor() / 100f));
1✔
50

51
  private static final int DROPPED_EXCEPTION_STACK_TRACE_LIMIT = 10;
52

53
  /** A subscription that does nothing. */
54
  public static final Flow.Subscription NOOP_SUBSCRIPTION =
1✔
55
      new Flow.Subscription() {
1✔
56
        @Override
57
        public void request(long n) {}
1✔
58

59
        @Override
60
        public void cancel() {}
1✔
61
      };
62

63
  private static final Publisher<?> EMPTY_PUBLISHER =
1✔
64
      (Publisher<Object>)
65
          subscriber -> {
66
            requireNonNull(subscriber);
1✔
67
            try {
68
              subscriber.onSubscribe(NOOP_SUBSCRIPTION);
1✔
69
            } catch (Throwable t) {
×
70
              subscriber.onError(t);
×
71
              return;
×
72
            }
1✔
73
            subscriber.onComplete();
1✔
74
          };
1✔
75

76
  /** An executor that executes the runnable in the calling thread. */
77
  public static final Executor SYNC_EXECUTOR = SyncExecutor.INSTANCE;
1✔
78

79
  private enum SyncExecutor implements Executor {
1✔
80
    INSTANCE;
1✔
81

82
    @Override
83
    public void execute(Runnable command) {
84
      command.run();
1✔
85
    }
1✔
86

87
    @Override
88
    public String toString() {
89
      return SyncExecutor.class.getSimpleName();
1✔
90
    }
91
  }
92

93
  private FlowSupport() {} // non-instantiable
94

95
  static int loadPrefetch() {
96
    int prefetch = Integer.getInteger(PREFETCH_PROP, DEFAULT_PREFETCH);
1✔
97
    return prefetch > 0 ? prefetch : DEFAULT_PREFETCH;
1✔
98
  }
99

100
  static int loadPrefetchFactor() {
101
    int prefetchFactor = Integer.getInteger(PREFETCH_FACTOR_PROP, DEFAULT_PREFETCH_FACTOR);
1✔
102
    return (prefetchFactor >= 0 && prefetchFactor <= 100)
1✔
103
        ? prefetchFactor
1✔
104
        : DEFAULT_PREFETCH_FACTOR;
1✔
105
  }
106

107
  /**
108
   * Returns an {@code IllegalArgumentException} to signal if the subscriber requests a non-positive
109
   * number of items.
110
   */
111
  public static IllegalArgumentException illegalRequest() {
UNCOV
112
    return new IllegalArgumentException("non-positive subscription request");
×
113
  }
114

115
  /** Returns the prefetch property or a default of {@value DEFAULT_PREFETCH}. */
116
  public static int prefetch() {
117
    return PREFETCH;
1✔
118
  }
119

120
  /**
121
   * Returns the prefetch threshold according to the prefetch factor property or a default of
122
   * {@value DEFAULT_PREFETCH} {@code / 2}.
123
   */
124
  public static int prefetchThreshold() {
125
    return PREFETCH_THRESHOLD;
1✔
126
  }
127

128
  /** Adds the given count to demand, making sure it doesn't exceed {@code Long.MAX_VALUE}. */
129
  public static long getAndAddDemand(Object owner, VarHandle demand, long n) {
130
    while (true) {
131
      long currentDemand = (long) demand.getVolatile(owner);
1✔
132
      long updatedDemand = currentDemand + n;
1✔
133
      if (updatedDemand < 0) { // overflow
1✔
134
        updatedDemand = Long.MAX_VALUE;
1✔
135
      }
136
      if (demand.compareAndSet(owner, currentDemand, updatedDemand)) {
1!
137
        return currentDemand;
1✔
138
      }
139
    }
×
140
  }
141

142
  /** Subtracts given count from demand. Caller must ensure the result won't be negative. */
143
  public static long subtractAndGetDemand(Object owner, VarHandle demand, long n) {
144
    return (long) demand.getAndAdd(owner, -n) - n;
1✔
145
  }
146

147
  @SuppressWarnings("unchecked")
148
  public static <T> Publisher<T> emptyPublisher() {
149
    return (Publisher<T>) EMPTY_PUBLISHER;
1✔
150
  }
151

152
  public static void rejectMulticast(Subscriber<?> subscriber) {
153
    reject(subscriber, new IllegalStateException("Multiple subscribers not supported"));
1✔
154
  }
1✔
155

156
  public static void reject(Subscriber<?> subscriber, Throwable cause) {
157
    try {
158
      subscriber.onSubscribe(FlowSupport.NOOP_SUBSCRIPTION);
1✔
159
    } catch (Throwable t) {
×
160
      cause.addSuppressed(t);
×
161
    } finally {
162
      subscriber.onError(cause);
1✔
163
    }
164
  }
1✔
165

166
  // TODO allow the user to install a hook instead of always logging
167

168
  public static void onDroppedException(Throwable exception) {
169
    if (logger.isLoggable(Level.WARNING)) {
1!
170
      logger.log(
1✔
171
          Level.WARNING,
172
          () ->
173
              "Dropped exception: "
1✔
174
                  + System.lineSeparator()
1✔
175
                  + "\tat "
176
                  + StackWalker.getInstance()
1✔
177
                      .walk(
1✔
178
                          frames ->
179
                              frames
180
                                  .limit(DROPPED_EXCEPTION_STACK_TRACE_LIMIT)
1✔
181
                                  .map(StackWalker.StackFrame::toString)
1✔
182
                                  .collect(Collectors.joining(System.lineSeparator() + "\tat "))),
1✔
183
          exception);
184
    }
185
  }
1✔
186
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc