• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / e79cdc3c-762e-40cd-acb3-21e17deb1cd9

18 Aug 2024 03:11PM UTC coverage: 92.124% (-0.09%) from 92.213%
e79cdc3c-762e-40cd-acb3-21e17deb1cd9

Pull #13485

circleci

DylanVeldra
fix(core): merge req context with tenant payload in the request instance
Pull Request #13485: fix(core): merge request context with tenant context payload in the request singleton

2559 of 3078 branches covered (83.14%)

1 of 2 new or added lines in 2 files covered. (50.0%)

74 existing lines in 13 files now uncovered.

6737 of 7313 relevant lines covered (92.12%)

17.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.77
/packages/core/router/sse-stream.ts
1
import { MessageEvent } from '@nestjs/common/interfaces';
2
import { isObject } from '@nestjs/common/utils/shared.utils';
1✔
3
import { IncomingMessage, OutgoingHttpHeaders } from 'http';
4
import { Transform } from 'stream';
1✔
5

6
function toDataString(data: string | object): string {
7
  if (isObject(data)) {
13✔
8
    return toDataString(JSON.stringify(data));
1✔
9
  }
10

11
  return data
12✔
12
    .split(/\r\n|\r|\n/)
13
    .map(line => `data: ${line}\n`)
14✔
14
    .join('');
15
}
16

17
export type AdditionalHeaders = Record<
18
  string,
19
  string[] | string | number | undefined
20
>;
21

22
interface ReadHeaders {
23
  getHeaders?(): AdditionalHeaders;
24
}
25

26
interface WriteHeaders {
27
  writableEnded?: boolean;
28
  writeHead?(
29
    statusCode: number,
30
    reasonPhrase?: string,
31
    headers?: OutgoingHttpHeaders,
32
  ): void;
33
  writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): void;
34
  flushHeaders?(): void;
35
}
36

37
export type WritableHeaderStream = NodeJS.WritableStream & WriteHeaders;
38
export type HeaderStream = WritableHeaderStream & ReadHeaders;
39

40
/**
41
 * Adapted from https://raw.githubusercontent.com/EventSource/node-ssestream
42
 * Transforms "messages" to W3C event stream content.
43
 * See https://html.spec.whatwg.org/multipage/server-sent-events.html
44
 * A message is an object with one or more of the following properties:
45
 * - data (String or object, which gets turned into JSON)
46
 * - type
47
 * - id
48
 * - retry
49
 *
50
 * If constructed with a HTTP Request, it will optimise the socket for streaming.
51
 * If this stream is piped to an HTTP Response, it will set appropriate headers.
52
 */
53
export class SseStream extends Transform {
1✔
54
  private lastEventId: number = null;
13✔
55

56
  constructor(req?: IncomingMessage) {
57
    super({ objectMode: true });
13✔
58
    if (req && req.socket) {
13✔
59
      req.socket.setKeepAlive(true);
1✔
60
      req.socket.setNoDelay(true);
1✔
61
      req.socket.setTimeout(0);
1✔
62
    }
63
  }
64

65
  pipe<T extends WritableHeaderStream>(
66
    destination: T,
67
    options?: {
68
      additionalHeaders?: AdditionalHeaders;
69
      end?: boolean;
70
    },
71
  ): T {
72
    if (destination.writeHead) {
13✔
73
      destination.writeHead(200, {
4✔
74
        ...options?.additionalHeaders,
75
        // See https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130
76
        'Content-Type': 'text/event-stream',
77
        Connection: 'keep-alive',
78
        // Disable cache, even for old browsers and proxies
79
        'Cache-Control':
80
          'private, no-cache, no-store, must-revalidate, max-age=0, no-transform',
81
        Pragma: 'no-cache',
82
        Expire: '0',
83
        // NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
84
        'X-Accel-Buffering': 'no',
85
      });
86
      destination.flushHeaders();
4✔
87
    }
88

89
    destination.write('\n');
11✔
90
    return super.pipe(destination, options);
11✔
91
  }
92

93
  _transform(
94
    message: MessageEvent,
95
    encoding: string,
96
    callback: (error?: Error | null, data?: any) => void,
97
  ) {
98
    let data = message.type ? `event: ${message.type}\n` : '';
12✔
99
    data += message.id ? `id: ${message.id}\n` : '';
12!
100
    data += message.retry ? `retry: ${message.retry}\n` : '';
12✔
101
    data += message.data ? toDataString(message.data) : '';
12!
102
    data += '\n';
12✔
103
    this.push(data);
12✔
104
    callback();
12✔
105
  }
106

107
  /**
108
   * Calls `.write` but handles the drain if needed
109
   */
110
  writeMessage(
111
    message: MessageEvent,
112
    cb: (error: Error | null | undefined) => void,
113
  ) {
114
    if (!message.id) {
12✔
115
      this.lastEventId++;
11✔
116
      message.id = this.lastEventId.toString();
11✔
117
    }
118

119
    if (!this.write(message, 'utf-8', cb)) {
12!
UNCOV
120
      this.once('drain', cb);
×
121
    } else {
122
      process.nextTick(cb);
12✔
123
    }
124
  }
125
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc