• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mizosoft / methanol / #605

09 Oct 2025 12:32PM UTC coverage: 88.705% (+0.1%) from 88.592%
#605

push

github

mizosoft
Use a specific interceptor implementation instead of a new interface

2374 of 2876 branches covered (82.55%)

Branch coverage included in aggregate %.

144 of 182 new or added lines in 3 files covered. (79.12%)

52 existing lines in 4 files now uncovered.

7843 of 8642 relevant lines covered (90.75%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.54
/methanol/src/main/java/com/github/mizosoft/methanol/internal/extensions/HttpResponsePublisher.java
1
/*
2
 * Copyright (c) 2024 Moataz Hussein
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to deal
6
 * in the Software without restriction, including without limitation the rights
7
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
 * copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in all
12
 * copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20
 * SOFTWARE.
21
 */
22

23
package com.github.mizosoft.methanol.internal.extensions;
24

25
import static java.util.Objects.requireNonNull;
26

27
import com.github.mizosoft.methanol.internal.flow.AbstractQueueSubscription;
28
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
29
import com.google.errorprone.annotations.concurrent.GuardedBy;
30
import java.net.http.HttpClient;
31
import java.net.http.HttpRequest;
32
import java.net.http.HttpResponse;
33
import java.net.http.HttpResponse.BodyHandler;
34
import java.net.http.HttpResponse.BodySubscriber;
35
import java.net.http.HttpResponse.PushPromiseHandler;
36
import java.net.http.HttpResponse.ResponseInfo;
37
import java.util.concurrent.CompletableFuture;
38
import java.util.concurrent.Executor;
39
import java.util.concurrent.Flow.Publisher;
40
import java.util.concurrent.Flow.Subscriber;
41
import java.util.concurrent.atomic.AtomicBoolean;
42
import java.util.concurrent.locks.Lock;
43
import java.util.concurrent.locks.ReentrantLock;
44
import java.util.function.Function;
45
import org.checkerframework.checker.nullness.qual.Nullable;
46

47
/**
48
 * A publisher of {@link HttpResponse<T> responses} resulting from sending a given request and
49
 * optionally accepting incoming push promises, if any.
50
 */
51
public final class HttpResponsePublisher<T> implements Publisher<HttpResponse<T>> {
52
  private final AtomicBoolean subscribed = new AtomicBoolean();
1✔
53

54
  private final HttpClient client;
55
  private final HttpRequest request;
56
  private final BodyHandler<T> bodyHandler;
57
  private final @Nullable Function<HttpRequest, @Nullable BodyHandler<T>> pushPromiseHandler;
58
  private final Executor executor;
59

60
  /**
61
   * Creates a new {@code HttpResponsePublisher}. If {@code pushPromiseMapper} is {@code null}, all
62
   * push promises are rejected.
63
   */
64
  public HttpResponsePublisher(
65
      HttpClient client,
66
      HttpRequest request,
67
      BodyHandler<T> bodyHandler,
68
      @Nullable Function<HttpRequest, @Nullable BodyHandler<T>> pushPromiseMapper,
69
      Executor executor) {
1✔
70
    this.client = requireNonNull(client);
1✔
71
    this.request = requireNonNull(request);
1✔
72
    this.bodyHandler = requireNonNull(bodyHandler);
1✔
73
    this.pushPromiseHandler = pushPromiseMapper;
1✔
74
    this.executor = requireNonNull(executor);
1✔
75
  }
1✔
76

77
  @Override
78
  public void subscribe(Subscriber<? super HttpResponse<T>> subscriber) {
79
    if (subscribed.compareAndSet(false, true)) {
1✔
80
      new SubscriptionImpl<>(subscriber, this).fireOrKeepAlive();
1✔
81
    } else {
82
      FlowSupport.rejectMulticast(subscriber);
1✔
83
    }
84
  }
1✔
85

86
  private static final class SubscriptionImpl<V>
87
      extends AbstractQueueSubscription<HttpResponse<V>> {
88
    private final Lock lock = new ReentrantLock();
1✔
89

90
    private final HttpClient client;
91
    private final HttpRequest initialRequest;
92
    private final BodyHandler<V> handler;
93
    private final @Nullable PushPromiseHandler<V> pushPromiseHandler;
94

95
    /** The number of currently ongoing requests (original request plus push promises, if any). */
96
    @GuardedBy("lock")
97
    private int ongoing;
98

99
    /**
100
     * Whether the main response body has been received. After which we won't be expecting any push
101
     * promises.
102
     */
103
    @GuardedBy("lock")
104
    private boolean isInitialResponseBodyComplete;
105

106
    /**
107
     * Whether we've sent the initial request, which is delayed till we receive positive demand for
108
     * the first time. This doesn't need any synchronization as there's no contention.
109
     */
110
    private boolean isInitialRequestSent;
111

112
    SubscriptionImpl(
113
        Subscriber<? super HttpResponse<V>> downstream, HttpResponsePublisher<V> publisher) {
114
      super(downstream, publisher.executor);
1✔
115
      this.client = publisher.client;
1✔
116
      this.initialRequest = publisher.request;
1✔
117
      this.handler = publisher.bodyHandler;
1✔
118
      this.pushPromiseHandler =
1✔
119
          publisher.pushPromiseHandler != null
1✔
120
              ? new SubscriptionPushPromiseHandler(publisher.pushPromiseHandler)
1✔
121
              : null;
1✔
122
    }
1✔
123

124
    @Override
125
    @SuppressWarnings("FutureReturnValueIgnored")
126
    protected long emit(Subscriber<? super HttpResponse<V>> downstream, long emit) {
127
      if (emit > 0 && !isInitialRequestSent) {
1✔
128
        isInitialRequestSent = true;
1✔
129
        try {
130
          incrementOngoing();
1✔
131
          client
1✔
132
              .sendAsync(initialRequest, this::notifyOnBodyCompletion, pushPromiseHandler)
1✔
133
              .whenComplete(this::onResponse);
1✔
134
        } catch (Throwable t) {
1✔
135
          cancelOnError(downstream, t, true);
1✔
136
          return 0;
1✔
137
        }
1✔
138
      }
139
      return super.emit(downstream, emit);
1✔
140
    }
141

142
    private void onResponse(HttpResponse<V> response, Throwable exception) {
143
      if (exception != null) {
1✔
144
        fireOrKeepAliveOnError(exception);
1✔
145
      } else {
146
        boolean isComplete;
147
        lock.lock();
1✔
148
        try {
149
          isComplete = ongoing == 1 && isInitialResponseBodyComplete;
1✔
150
        } finally {
151
          lock.unlock();
1✔
152
        }
153

154
        if (isComplete) {
1✔
155
          submitAndComplete(response);
1✔
156
        } else {
157
          submit(response);
1✔
158

159
          boolean isCompleteAfterSubmit;
160
          lock.lock();
1✔
161
          try {
162
            int ongoingAfterDecrement = --ongoing;
1✔
163
            isCompleteAfterSubmit = ongoingAfterDecrement == 0 && isInitialResponseBodyComplete;
1!
164
          } finally {
165
            lock.unlock();
1✔
166
          }
167

168
          if (isCompleteAfterSubmit) {
1!
UNCOV
169
            complete();
×
170
          }
171
        }
172
      }
173
    }
1✔
174

175
    private void onInitialResponseBodyCompletion() {
176
      boolean isComplete;
177
      lock.lock();
1✔
178
      try {
179
        isInitialResponseBodyComplete = true;
1✔
180
        isComplete = ongoing == 0;
1✔
181
      } finally {
182
        lock.unlock();
1✔
183
      }
184

185
      if (isComplete) {
1✔
186
        complete();
1✔
187
      } else {
188
        fireOrKeepAlive();
1✔
189
      }
190
    }
1✔
191

192
    private BodySubscriber<V> notifyOnBodyCompletion(ResponseInfo info) {
193
      return new NotifyingBodySubscriber<>(
1✔
194
          handler.apply(info), this::onInitialResponseBodyCompletion);
1✔
195
    }
196

197
    private void incrementOngoing() {
198
      lock.lock();
1✔
199
      try {
200
        ongoing++;
1✔
201
      } finally {
202
        lock.unlock();
1✔
203
      }
204
    }
1✔
205

206
    private class SubscriptionPushPromiseHandler implements PushPromiseHandler<V> {
207
      private final Function<HttpRequest, @Nullable BodyHandler<V>> acceptor;
208

209
      SubscriptionPushPromiseHandler(Function<HttpRequest, @Nullable BodyHandler<V>> acceptor) {
1✔
210
        this.acceptor = acceptor;
1✔
211
      }
1✔
212

213
      @Override
214
      public void applyPushPromise(
215
          HttpRequest initiatingRequest,
216
          HttpRequest pushPromiseRequest,
217
          Function<BodyHandler<V>, CompletableFuture<HttpResponse<V>>> acceptor) {
218
        boolean localIsInitialResponseBodyComplete;
219
        lock.lock();
1✔
220
        try {
221
          localIsInitialResponseBodyComplete = isInitialResponseBodyComplete;
1✔
222
        } finally {
223
          lock.unlock();
1✔
224
        }
225

226
        if (localIsInitialResponseBodyComplete) {
1✔
227
          fireOrKeepAliveOnError(
1✔
228
              new IllegalStateException(
229
                  "Receiving push promise after initial response body has been received: "
230
                      + pushPromiseRequest));
231
        } else if (!(isCancelled() || hasPendingErrors())) {
1✔
232
          applyPushPromise(pushPromiseRequest, acceptor);
1✔
233
        }
234
      }
1✔
235

236
      @SuppressWarnings("FutureReturnValueIgnored")
237
      private void applyPushPromise(
238
          HttpRequest pushPromiseRequest,
239
          Function<BodyHandler<V>, CompletableFuture<HttpResponse<V>>> acceptor) {
240
        BodyHandler<V> pushedResponseBodyHandler;
241
        try {
242
          pushedResponseBodyHandler = this.acceptor.apply(pushPromiseRequest);
1✔
243
        } catch (Throwable t) {
1✔
244
          fireOrKeepAliveOnError(t);
1✔
245
          return;
1✔
246
        }
1✔
247

248
        if (pushedResponseBodyHandler != null) {
1✔
249
          incrementOngoing();
1✔
250
          acceptor.apply(pushedResponseBodyHandler).whenComplete(SubscriptionImpl.this::onResponse);
1✔
251
        }
252
      }
1✔
253
    }
254
  }
255

256
  /** A {@link BodySubscriber} that invokes a callback after the body has been fully received. */
257
  private static final class NotifyingBodySubscriber<R> extends ForwardingBodySubscriber<R> {
258
    private final Runnable callback;
259

260
    NotifyingBodySubscriber(BodySubscriber<R> downstream, Runnable onComplete) {
261
      super(downstream);
1✔
262
      this.callback = onComplete;
1✔
263
    }
1✔
264

265
    @Override
266
    public void onComplete() {
267
      super.onComplete();
1✔
268
      callback.run();
1✔
269
    }
1✔
270
  }
271
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc