• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / edge-runtime / 17229631411

26 Aug 2025 06:13AM UTC coverage: 51.84% (-2.1%) from 53.937%
17229631411

push

github

web-flow
fix: remove another bottleneck that causes boot time spike (#596)

* fix: remove another bottleneck that causes boot time spike

* chore: add integration test

28 of 33 new or added lines in 1 file covered. (84.85%)

4922 existing lines in 74 files now uncovered.

18444 of 35579 relevant lines covered (51.84%)

5545.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.89
/ext/node/ops/http.rs
1
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
2

3
use std::borrow::Cow;
4
use std::cell::RefCell;
5
use std::pin::Pin;
6
use std::rc::Rc;
7
use std::task::Context;
8
use std::task::Poll;
9

10
use bytes::Bytes;
11
use deno_core::futures::stream::Peekable;
12
use deno_core::futures::Future;
13
use deno_core::futures::FutureExt;
14
use deno_core::futures::Stream;
15
use deno_core::futures::StreamExt;
16
use deno_core::futures::TryFutureExt;
17
use deno_core::op2;
18
use deno_core::serde::Serialize;
19
use deno_core::unsync::spawn;
20
use deno_core::url::Url;
21
use deno_core::AsyncRefCell;
22
use deno_core::AsyncResult;
23
use deno_core::BufView;
24
use deno_core::ByteString;
25
use deno_core::CancelFuture;
26
use deno_core::CancelHandle;
27
use deno_core::CancelTryFuture;
28
use deno_core::OpState;
29
use deno_core::RcRef;
30
use deno_core::Resource;
31
use deno_core::ResourceId;
32
use deno_fetch::get_or_create_client_from_state;
33
use deno_fetch::FetchCancelHandle;
34
use deno_fetch::FetchError;
35
use deno_fetch::FetchRequestResource;
36
use deno_fetch::FetchReturn;
37
use deno_fetch::HttpClientResource;
38
use deno_fetch::ResBody;
39
use http::header::HeaderMap;
40
use http::header::HeaderName;
41
use http::header::HeaderValue;
42
use http::header::AUTHORIZATION;
43
use http::header::CONTENT_LENGTH;
44
use http::Method;
45
use http_body_util::BodyExt;
46
use hyper::body::Frame;
47
use hyper_util::rt::TokioIo;
48
use std::cmp::min;
49
use tokio::io::AsyncReadExt;
50
use tokio::io::AsyncWriteExt;
51

52
#[op2(stack_trace)]
1,158✔
53
#[serde]
54
pub fn op_node_http_request<P>(
×
55
  state: &mut OpState,
×
56
  #[serde] method: ByteString,
×
UNCOV
57
  #[string] url: String,
×
58
  #[serde] headers: Vec<(ByteString, ByteString)>,
×
UNCOV
59
  #[smi] client_rid: Option<u32>,
×
UNCOV
60
  #[smi] body: Option<ResourceId>,
×
61
) -> Result<FetchReturn, FetchError>
×
UNCOV
62
where
×
63
  P: crate::NodePermissions + 'static,
×
64
{
×
65
  let client = if let Some(rid) = client_rid {
×
UNCOV
66
    let r = state
×
UNCOV
67
      .resource_table
×
UNCOV
68
      .get::<HttpClientResource>(rid)
×
UNCOV
69
      .map_err(FetchError::Resource)?;
×
70
    r.client.clone()
×
71
  } else {
72
    get_or_create_client_from_state(state)?
×
73
  };
74

75
  let method = Method::from_bytes(&method)?;
×
76
  let mut url = Url::parse(&url)?;
×
77
  let maybe_authority = deno_fetch::extract_authority(&mut url);
×
78

×
79
  {
×
80
    let permissions = state.borrow_mut::<P>();
×
81
    permissions.check_net_url(&url, "ClientRequest")?;
×
82
  }
83

84
  let mut header_map = HeaderMap::new();
×
UNCOV
85
  for (key, value) in headers {
×
86
    let name = HeaderName::from_bytes(&key)?;
×
87
    let v = HeaderValue::from_bytes(&value)?;
×
88

89
    header_map.append(name, v);
×
90
  }
91

92
  let (body, con_len) = if let Some(body) = body {
×
93
    (
94
      BodyExt::boxed(NodeHttpResourceToBodyAdapter::new(
95
        state
×
96
          .resource_table
×
UNCOV
97
          .take_any(body)
×
UNCOV
98
          .map_err(FetchError::Resource)?,
×
99
      )),
UNCOV
100
      None,
×
101
    )
102
  } else {
103
    // POST and PUT requests should always have a 0 length content-length,
104
    // if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
UNCOV
105
    let len = if matches!(method, Method::POST | Method::PUT) {
×
UNCOV
106
      Some(0)
×
107
    } else {
UNCOV
108
      None
×
109
    };
UNCOV
110
    (
×
UNCOV
111
      http_body_util::Empty::new()
×
UNCOV
112
        .map_err(|never| match never {})
×
UNCOV
113
        .boxed(),
×
UNCOV
114
      len,
×
UNCOV
115
    )
×
116
  };
117

UNCOV
118
  let mut request = http::Request::new(body);
×
UNCOV
119
  *request.method_mut() = method.clone();
×
UNCOV
120
  *request.uri_mut() = url
×
UNCOV
121
    .as_str()
×
UNCOV
122
    .parse()
×
UNCOV
123
    .map_err(|_| FetchError::InvalidUrl(url.clone()))?;
×
UNCOV
124
  *request.headers_mut() = header_map;
×
125

UNCOV
126
  if let Some((username, password)) = maybe_authority {
×
UNCOV
127
    request.headers_mut().insert(
×
UNCOV
128
      AUTHORIZATION,
×
UNCOV
129
      deno_fetch::basic_auth(&username, password.as_deref()),
×
UNCOV
130
    );
×
UNCOV
131
  }
×
UNCOV
132
  if let Some(len) = con_len {
×
UNCOV
133
    request.headers_mut().insert(CONTENT_LENGTH, len.into());
×
UNCOV
134
  }
×
135

UNCOV
136
  let cancel_handle = CancelHandle::new_rc();
×
UNCOV
137
  let cancel_handle_ = cancel_handle.clone();
×
UNCOV
138

×
UNCOV
139
  let fut = async move {
×
UNCOV
140
    client
×
UNCOV
141
      .send(request)
×
UNCOV
142
      .map_err(Into::into)
×
UNCOV
143
      .or_cancel(cancel_handle_)
×
UNCOV
144
      .await
×
UNCOV
145
  };
×
146

UNCOV
147
  let request_rid = state.resource_table.add(FetchRequestResource {
×
UNCOV
148
    future: Box::pin(fut),
×
UNCOV
149
    url,
×
UNCOV
150
  });
×
UNCOV
151

×
UNCOV
152
  let cancel_handle_rid =
×
UNCOV
153
    state.resource_table.add(FetchCancelHandle(cancel_handle));
×
UNCOV
154

×
UNCOV
155
  Ok(FetchReturn {
×
UNCOV
156
    request_rid,
×
UNCOV
157
    cancel_handle_rid: Some(cancel_handle_rid),
×
UNCOV
158
  })
×
UNCOV
159
}
×
160

161
#[derive(Default, Serialize)]
162
#[serde(rename_all = "camelCase")]
163
pub struct NodeHttpFetchResponse {
164
  pub status: u16,
165
  pub status_text: String,
166
  pub headers: Vec<(ByteString, ByteString)>,
167
  pub url: String,
168
  pub response_rid: ResourceId,
169
  pub content_length: Option<u64>,
170
  pub remote_addr_ip: Option<String>,
171
  pub remote_addr_port: Option<u16>,
172
  pub error: Option<String>,
173
}
174

175
#[op2(async)]
1,159✔
176
#[serde]
UNCOV
177
pub async fn op_node_http_fetch_send(
×
UNCOV
178
  state: Rc<RefCell<OpState>>,
×
UNCOV
179
  #[smi] rid: ResourceId,
×
UNCOV
180
) -> Result<NodeHttpFetchResponse, FetchError> {
×
UNCOV
181
  let request = state
×
UNCOV
182
    .borrow_mut()
×
UNCOV
183
    .resource_table
×
UNCOV
184
    .take::<FetchRequestResource>(rid)
×
UNCOV
185
    .map_err(FetchError::Resource)?;
×
186

UNCOV
187
  let request = Rc::try_unwrap(request)
×
UNCOV
188
    .ok()
×
UNCOV
189
    .expect("multiple op_node_http_fetch_send ongoing");
×
190

UNCOV
191
  let res = match request.future.await {
×
UNCOV
192
    Ok(Ok(res)) => res,
×
UNCOV
193
    Ok(Err(err)) => {
×
194
      // We're going to try and rescue the error cause from a stream and return it from this fetch.
195
      // If any error in the chain is a hyper body error, return that as a special result we can use to
196
      // reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`).
197
      // TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead
198

UNCOV
199
      if let FetchError::ClientSend(err_src) = &err {
×
UNCOV
200
        if let Some(client_err) = std::error::Error::source(&err_src.source) {
×
UNCOV
201
          if let Some(err_src) = client_err.downcast_ref::<hyper::Error>() {
×
UNCOV
202
            if let Some(err_src) = std::error::Error::source(err_src) {
×
UNCOV
203
              return Ok(NodeHttpFetchResponse {
×
UNCOV
204
                error: Some(err_src.to_string()),
×
UNCOV
205
                ..Default::default()
×
UNCOV
206
              });
×
UNCOV
207
            }
×
UNCOV
208
          }
×
UNCOV
209
        }
×
UNCOV
210
      }
×
211

UNCOV
212
      return Err(err);
×
213
    }
UNCOV
214
    Err(_) => return Err(FetchError::RequestCanceled),
×
215
  };
216

UNCOV
217
  let status = res.status();
×
UNCOV
218
  let url = request.url.into();
×
UNCOV
219
  let mut res_headers = Vec::new();
×
UNCOV
220
  for (key, val) in res.headers().iter() {
×
UNCOV
221
    res_headers.push((key.as_str().into(), val.as_bytes().into()));
×
UNCOV
222
  }
×
223

UNCOV
224
  let content_length = hyper::body::Body::size_hint(res.body()).exact();
×
UNCOV
225
  let remote_addr = res
×
UNCOV
226
    .extensions()
×
UNCOV
227
    .get::<hyper_util::client::legacy::connect::HttpInfo>()
×
UNCOV
228
    .map(|info| info.remote_addr());
×
UNCOV
229
  let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr {
×
UNCOV
230
    (Some(addr.ip().to_string()), Some(addr.port()))
×
231
  } else {
UNCOV
232
    (None, None)
×
233
  };
234

UNCOV
235
  let response_rid = state
×
UNCOV
236
    .borrow_mut()
×
UNCOV
237
    .resource_table
×
UNCOV
238
    .add(NodeHttpFetchResponseResource::new(res, content_length));
×
UNCOV
239

×
UNCOV
240
  Ok(NodeHttpFetchResponse {
×
UNCOV
241
    status: status.as_u16(),
×
UNCOV
242
    status_text: status.canonical_reason().unwrap_or("").to_string(),
×
UNCOV
243
    headers: res_headers,
×
UNCOV
244
    url,
×
UNCOV
245
    response_rid,
×
UNCOV
246
    content_length,
×
UNCOV
247
    remote_addr_ip,
×
UNCOV
248
    remote_addr_port,
×
UNCOV
249
    error: None,
×
UNCOV
250
  })
×
UNCOV
251
}
×
252

253
#[op2(async)]
1,159✔
254
#[smi]
UNCOV
255
pub async fn op_node_http_fetch_response_upgrade(
×
UNCOV
256
  state: Rc<RefCell<OpState>>,
×
UNCOV
257
  #[smi] rid: ResourceId,
×
UNCOV
258
) -> Result<ResourceId, FetchError> {
×
UNCOV
259
  let raw_response = state
×
UNCOV
260
    .borrow_mut()
×
UNCOV
261
    .resource_table
×
UNCOV
262
    .take::<NodeHttpFetchResponseResource>(rid)
×
UNCOV
263
    .map_err(FetchError::Resource)?;
×
UNCOV
264
  let raw_response = Rc::try_unwrap(raw_response)
×
UNCOV
265
    .expect("Someone is holding onto NodeHttpFetchResponseResource");
×
UNCOV
266

×
UNCOV
267
  let (read, write) = tokio::io::duplex(1024);
×
UNCOV
268
  let (read_rx, write_tx) = tokio::io::split(read);
×
UNCOV
269
  let (mut write_rx, mut read_tx) = tokio::io::split(write);
×
UNCOV
270
  let upgraded = raw_response.upgrade().await?;
×
UNCOV
271
  {
×
UNCOV
272
    // Stage 3: Pump the data
×
UNCOV
273
    let (mut upgraded_rx, mut upgraded_tx) =
×
UNCOV
274
      tokio::io::split(TokioIo::new(upgraded));
×
UNCOV
275

×
UNCOV
276
    spawn(async move {
×
UNCOV
277
      let mut buf = [0; 1024];
×
278
      loop {
UNCOV
279
        let read = upgraded_rx.read(&mut buf).await?;
×
UNCOV
280
        if read == 0 {
×
UNCOV
281
          read_tx.shutdown().await?;
×
UNCOV
282
          break;
×
UNCOV
283
        }
×
UNCOV
284
        read_tx.write_all(&buf[..read]).await?;
×
285
      }
UNCOV
286
      Ok::<_, FetchError>(())
×
UNCOV
287
    });
×
UNCOV
288
    spawn(async move {
×
UNCOV
289
      let mut buf = [0; 1024];
×
290
      loop {
UNCOV
291
        let read = write_rx.read(&mut buf).await?;
×
UNCOV
292
        if read == 0 {
×
UNCOV
293
          break;
×
UNCOV
294
        }
×
UNCOV
295
        upgraded_tx.write_all(&buf[..read]).await?;
×
296
      }
UNCOV
297
      Ok::<_, FetchError>(())
×
UNCOV
298
    });
×
UNCOV
299
  }
×
UNCOV
300

×
UNCOV
301
  Ok(
×
UNCOV
302
    state
×
UNCOV
303
      .borrow_mut()
×
UNCOV
304
      .resource_table
×
UNCOV
305
      .add(UpgradeStream::new(read_rx, write_tx)),
×
UNCOV
306
  )
×
UNCOV
307
}
×
308

309
struct UpgradeStream {
310
  read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
311
  write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>,
312
  cancel_handle: CancelHandle,
313
}
314

315
impl UpgradeStream {
UNCOV
316
  pub fn new(
×
UNCOV
317
    read: tokio::io::ReadHalf<tokio::io::DuplexStream>,
×
UNCOV
318
    write: tokio::io::WriteHalf<tokio::io::DuplexStream>,
×
UNCOV
319
  ) -> Self {
×
UNCOV
320
    Self {
×
UNCOV
321
      read: AsyncRefCell::new(read),
×
UNCOV
322
      write: AsyncRefCell::new(write),
×
UNCOV
323
      cancel_handle: CancelHandle::new(),
×
UNCOV
324
    }
×
UNCOV
325
  }
×
326

UNCOV
327
  async fn read(
×
UNCOV
328
    self: Rc<Self>,
×
UNCOV
329
    buf: &mut [u8],
×
UNCOV
330
  ) -> Result<usize, std::io::Error> {
×
UNCOV
331
    let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
×
UNCOV
332
    async {
×
UNCOV
333
      let read = RcRef::map(self, |this| &this.read);
×
UNCOV
334
      let mut read = read.borrow_mut().await;
×
UNCOV
335
      Pin::new(&mut *read).read(buf).await
×
UNCOV
336
    }
×
UNCOV
337
    .try_or_cancel(cancel_handle)
×
UNCOV
338
    .await
×
UNCOV
339
  }
×
340

UNCOV
341
  async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, std::io::Error> {
×
UNCOV
342
    let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
×
UNCOV
343
    async {
×
UNCOV
344
      let write = RcRef::map(self, |this| &this.write);
×
UNCOV
345
      let mut write = write.borrow_mut().await;
×
UNCOV
346
      Pin::new(&mut *write).write(buf).await
×
UNCOV
347
    }
×
UNCOV
348
    .try_or_cancel(cancel_handle)
×
UNCOV
349
    .await
×
UNCOV
350
  }
×
351
}
352

353
impl Resource for UpgradeStream {
UNCOV
354
  fn name(&self) -> Cow<str> {
×
UNCOV
355
    "fetchUpgradedStream".into()
×
UNCOV
356
  }
×
357

358
  deno_core::impl_readable_byob!();
359
  deno_core::impl_writable!();
360

UNCOV
361
  fn close(self: Rc<Self>) {
×
UNCOV
362
    self.cancel_handle.cancel();
×
UNCOV
363
  }
×
364
}
365

366
type BytesStream =
367
  Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
368

369
pub enum NodeHttpFetchResponseReader {
370
  Start(http::Response<ResBody>),
371
  BodyReader(Peekable<BytesStream>),
372
}
373

374
impl Default for NodeHttpFetchResponseReader {
UNCOV
375
  fn default() -> Self {
×
UNCOV
376
    let stream: BytesStream = Box::pin(deno_core::futures::stream::empty());
×
UNCOV
377
    Self::BodyReader(stream.peekable())
×
UNCOV
378
  }
×
379
}
380

381
#[derive(Debug)]
382
pub struct NodeHttpFetchResponseResource {
383
  pub response_reader: AsyncRefCell<NodeHttpFetchResponseReader>,
384
  pub cancel: CancelHandle,
385
  pub size: Option<u64>,
386
}
387

388
impl NodeHttpFetchResponseResource {
UNCOV
389
  pub fn new(response: http::Response<ResBody>, size: Option<u64>) -> Self {
×
UNCOV
390
    Self {
×
UNCOV
391
      response_reader: AsyncRefCell::new(NodeHttpFetchResponseReader::Start(
×
UNCOV
392
        response,
×
UNCOV
393
      )),
×
UNCOV
394
      cancel: CancelHandle::default(),
×
UNCOV
395
      size,
×
UNCOV
396
    }
×
UNCOV
397
  }
×
398

UNCOV
399
  pub async fn upgrade(self) -> Result<hyper::upgrade::Upgraded, hyper::Error> {
×
UNCOV
400
    let reader = self.response_reader.into_inner();
×
UNCOV
401
    match reader {
×
UNCOV
402
      NodeHttpFetchResponseReader::Start(resp) => {
×
UNCOV
403
        Ok(hyper::upgrade::on(resp).await?)
×
404
      }
UNCOV
405
      _ => unreachable!(),
×
406
    }
UNCOV
407
  }
×
408
}
409

410
impl Resource for NodeHttpFetchResponseResource {
UNCOV
411
  fn name(&self) -> Cow<str> {
×
UNCOV
412
    "fetchResponse".into()
×
UNCOV
413
  }
×
414

UNCOV
415
  fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
×
UNCOV
416
    Box::pin(async move {
×
UNCOV
417
      let mut reader =
×
UNCOV
418
        RcRef::map(&self, |r| &r.response_reader).borrow_mut().await;
×
419

UNCOV
420
      let body = loop {
×
UNCOV
421
        match &mut *reader {
×
UNCOV
422
          NodeHttpFetchResponseReader::BodyReader(reader) => break reader,
×
UNCOV
423
          NodeHttpFetchResponseReader::Start(_) => {}
×
UNCOV
424
        }
×
UNCOV
425

×
UNCOV
426
        match std::mem::take(&mut *reader) {
×
UNCOV
427
          NodeHttpFetchResponseReader::Start(resp) => {
×
UNCOV
428
            let stream: BytesStream =
×
UNCOV
429
              Box::pin(resp.into_body().into_data_stream().map(|r| {
×
UNCOV
430
                r.map_err(|err| {
×
UNCOV
431
                  std::io::Error::new(std::io::ErrorKind::Other, err)
×
UNCOV
432
                })
×
UNCOV
433
              }));
×
UNCOV
434
            *reader =
×
UNCOV
435
              NodeHttpFetchResponseReader::BodyReader(stream.peekable());
×
UNCOV
436
          }
×
UNCOV
437
          NodeHttpFetchResponseReader::BodyReader(_) => unreachable!(),
×
438
        }
439
      };
UNCOV
440
      let fut = async move {
×
UNCOV
441
        let mut reader = Pin::new(body);
×
442
        loop {
UNCOV
443
          match reader.as_mut().peek_mut().await {
×
UNCOV
444
            Some(Ok(chunk)) if !chunk.is_empty() => {
×
UNCOV
445
              let len = min(limit, chunk.len());
×
UNCOV
446
              let chunk = chunk.split_to(len);
×
UNCOV
447
              break Ok(chunk.into());
×
448
            }
449
            // This unwrap is safe because `peek_mut()` returned `Some`, and thus
450
            // currently has a peeked value that can be synchronously returned
451
            // from `next()`.
452
            //
453
            // The future returned from `next()` is always ready, so we can
454
            // safely call `await` on it without creating a race condition.
UNCOV
455
            Some(_) => match reader.as_mut().next().await.unwrap() {
×
UNCOV
456
              Ok(chunk) => assert!(chunk.is_empty()),
×
UNCOV
457
              Err(err) => {
×
UNCOV
458
                break Err(deno_core::error::type_error(err.to_string()))
×
459
              }
460
            },
UNCOV
461
            None => break Ok(BufView::empty()),
×
462
          }
463
        }
UNCOV
464
      };
×
465

UNCOV
466
      let cancel_handle = RcRef::map(self, |r| &r.cancel);
×
UNCOV
467
      fut.try_or_cancel(cancel_handle).await.map_err(Into::into)
×
UNCOV
468
    })
×
UNCOV
469
  }
×
470

UNCOV
471
  fn size_hint(&self) -> (u64, Option<u64>) {
×
UNCOV
472
    (self.size.unwrap_or(0), self.size)
×
UNCOV
473
  }
×
474

UNCOV
475
  fn close(self: Rc<Self>) {
×
UNCOV
476
    self.cancel.cancel()
×
UNCOV
477
  }
×
478
}
479

480
#[allow(clippy::type_complexity)]
481
pub struct NodeHttpResourceToBodyAdapter(
482
  Rc<dyn Resource>,
483
  Option<
484
    Pin<Box<dyn Future<Output = Result<BufView, deno_core::anyhow::Error>>>>,
485
  >,
486
);
487

488
impl NodeHttpResourceToBodyAdapter {
UNCOV
489
  pub fn new(resource: Rc<dyn Resource>) -> Self {
×
UNCOV
490
    let future = resource.clone().read(64 * 1024);
×
UNCOV
491
    Self(resource, Some(future))
×
UNCOV
492
  }
×
493
}
494

495
// SAFETY: we only use this on a single-threaded executor
496
unsafe impl Send for NodeHttpResourceToBodyAdapter {}
497
// SAFETY: we only use this on a single-threaded executor
498
unsafe impl Sync for NodeHttpResourceToBodyAdapter {}
499

500
impl Stream for NodeHttpResourceToBodyAdapter {
501
  type Item = Result<Bytes, deno_core::anyhow::Error>;
502

UNCOV
503
  fn poll_next(
×
UNCOV
504
    self: Pin<&mut Self>,
×
UNCOV
505
    cx: &mut Context<'_>,
×
UNCOV
506
  ) -> Poll<Option<Self::Item>> {
×
UNCOV
507
    let this = self.get_mut();
×
UNCOV
508
    if let Some(mut fut) = this.1.take() {
×
UNCOV
509
      match fut.poll_unpin(cx) {
×
510
        Poll::Pending => {
UNCOV
511
          this.1 = Some(fut);
×
UNCOV
512
          Poll::Pending
×
513
        }
UNCOV
514
        Poll::Ready(res) => match res {
×
UNCOV
515
          Ok(buf) if buf.is_empty() => Poll::Ready(None),
×
UNCOV
516
          Ok(buf) => {
×
UNCOV
517
            this.1 = Some(this.0.clone().read(64 * 1024));
×
UNCOV
518
            Poll::Ready(Some(Ok(buf.to_vec().into())))
×
519
          }
UNCOV
520
          Err(err) => Poll::Ready(Some(Err(err))),
×
521
        },
522
      }
523
    } else {
UNCOV
524
      Poll::Ready(None)
×
525
    }
UNCOV
526
  }
×
527
}
528

529
impl hyper::body::Body for NodeHttpResourceToBodyAdapter {
530
  type Data = Bytes;
531
  type Error = deno_core::anyhow::Error;
532

UNCOV
533
  fn poll_frame(
×
UNCOV
534
    self: Pin<&mut Self>,
×
UNCOV
535
    cx: &mut Context<'_>,
×
UNCOV
536
  ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
×
UNCOV
537
    match self.poll_next(cx) {
×
UNCOV
538
      Poll::Ready(Some(res)) => Poll::Ready(Some(res.map(Frame::data))),
×
UNCOV
539
      Poll::Ready(None) => Poll::Ready(None),
×
UNCOV
540
      Poll::Pending => Poll::Pending,
×
541
    }
UNCOV
542
  }
×
543
}
544

545
impl Drop for NodeHttpResourceToBodyAdapter {
UNCOV
546
  fn drop(&mut self) {
×
UNCOV
547
    self.0.clone().close()
×
UNCOV
548
  }
×
549
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc