• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-js / 11579379955

29 Oct 2024 05:34PM UTC coverage: 84.703% (-0.5%) from 85.23%
11579379955

push

github

web-flow
refactor: [#4684] Replace browserify with tsup (#4774)

* Replace browserify with tsup in adaptive-expressions

* Remove remaining browserify packages

* Fix streaming tests

* Fix yarn.lock

* fix depcheck

8186 of 10820 branches covered (75.66%)

Branch coverage included in aggregate %.

2 of 2 new or added lines in 1 file covered. (100.0%)

23 existing lines in 13 files now uncovered.

20514 of 23063 relevant lines covered (88.95%)

7296.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.87
/libraries/botframework-streaming/src/assemblers/payloadAssembler.ts
1
/**
6!
2
 * @module botframework-streaming
3
 */
4
/**
5
 * Copyright (c) Microsoft Corporation. All rights reserved.
6
 * Licensed under the MIT License.
7
 */
8

9
import { SubscribableStream } from '../subscribableStream';
2✔
10
import { StreamManager, PayloadTypes } from '../payloads';
2✔
11
import { ContentStream } from '../contentStream';
2✔
12
import {
13
    IAssemblerParams,
14
    IHeader,
15
    IResponsePayload,
16
    IReceiveResponse,
17
    IReceiveRequest,
18
    IRequestPayload,
19
} from '../interfaces';
20

21
/**
22
 * Assembles payloads for streaming library.
23
 *
24
 * @internal
25
 */
26
export class PayloadAssembler {
2✔
27
    id: string;
28
    end: boolean;
29
    contentLength: number;
30
    payloadType: string | PayloadTypes;
31
    private stream: SubscribableStream;
32
    private readonly _onCompleted: (id: string, receiveResponse: IReceiveResponse | IReceiveRequest) => Promise<void>;
33
    private readonly _byteOrderMark = 0xfeff;
114✔
34
    private readonly _utf: BufferEncoding = 'utf8';
114✔
35

36
    /**
37
     * Initializes a new instance of the [PayloadAssembler](xref:botframework-streaming.PayloadAssembler) class.
38
     *
39
     * @param streamManager The [StreamManager](xref:botframework-streaming.StreamManager) managing the stream being assembled.
40
     * @param params Parameters for a streaming assembler.
41
     */
42
    constructor(private readonly streamManager: StreamManager, params: IAssemblerParams) {
114✔
43
        if (params.header) {
114✔
44
            this.id = params.header.id;
68!
45
            this.payloadType = params.header.payloadType;
68✔
46
            this.contentLength = params.header.payloadLength;
68✔
47
            this.end = params.header.end;
68✔
48
        } else {
49
            this.id = params.id;
46✔
50
        }
51

52
        if (!this.id) {
114!
53
            throw Error('An ID must be supplied when creating an assembler.');
2✔
54
        }
55

56
        this._onCompleted = params.onCompleted;
112✔
57
    }
58

59
    /**
60
     * Retrieves the assembler's payload as a stream.
61
     *
62
     * @returns A [SubscribableStream](xref:botframework-streaming.SubscribableStream) of the assembler's payload.
63
     */
64
    getPayloadStream(): SubscribableStream {
65
        if (!this.stream) {
78!
66
            this.stream = this.createPayloadStream();
66✔
67
        }
68

69
        return this.stream;
78✔
70
    }
71

72
    /**
73
     * The action the assembler executes when new bytes are received on the incoming stream.
74
     *
75
     * @param header The stream's Header.
76
     * @param stream The incoming stream being assembled.
77
     * @param _contentLength The length of the stream, if finite.
78
     */
79
    onReceive(header: IHeader, stream: SubscribableStream, _contentLength: number): void {
80
        this.end = header.end;
52✔
81

82
        if (header.payloadType === PayloadTypes.response || header.payloadType === PayloadTypes.request) {
52!
83
            this.process(stream).then().catch();
32!
84
        } else if (header.end) {
20!
85
            stream.end();
20✔
86
        }
87
    }
88

89
    /**
90
     * Closes the assembler.
91
     */
92
    close(): void {
93
        this.streamManager.closeStream(this.id);
8✔
94
    }
95

96
    /**
97
     * Creates a new [SubscribableStream](xref:botframework-streaming.SubscribableStream) instance.
98
     *
99
     * @returns The new stream ready for consumption.
100
     */
101
    private createPayloadStream(): SubscribableStream {
102
        return new SubscribableStream();
72✔
103
    }
104

105
    private payloadFromJson<T>(json: string): T {
106
        return JSON.parse(json.charCodeAt(0) === this._byteOrderMark ? json.slice(1) : json) as T;
26!
107
    }
108

109
    private stripBOM(input: string): string {
110
        return input.charCodeAt(0) === this._byteOrderMark ? input.slice(1) : input;
12!
111
    }
112

UNCOV
113
    private async process(stream: SubscribableStream): Promise<void> {
×
114
        const streamData: Buffer = stream.read(stream.length) as Buffer;
32✔
115
        if (!streamData) {
32!
116
            return;
4✔
117
        }
118

119
        const streamDataAsString = streamData.toString(this._utf);
28✔
120

121
        if (this.payloadType === PayloadTypes.request) {
28✔
122
            await this.processRequest(streamDataAsString);
14!
123
        } else if (this.payloadType === PayloadTypes.response) {
14!
124
            await this.processResponse(streamDataAsString);
12✔
125
        }
126
    }
127

UNCOV
128
    private async processResponse(streamDataAsString: string): Promise<void> {
×
129
        const responsePayload = this.payloadFromJson<IResponsePayload>(this.stripBOM(streamDataAsString));
12✔
130
        const receiveResponse: IReceiveResponse = { streams: [], statusCode: responsePayload.statusCode };
12✔
131

132
        await this.processStreams(responsePayload, receiveResponse);
12✔
133
    }
134

UNCOV
135
    private async processRequest(streamDataAsString: string): Promise<void> {
×
136
        const requestPayload = this.payloadFromJson<IRequestPayload>(streamDataAsString);
14✔
137
        const receiveRequest: IReceiveRequest = { streams: [], path: requestPayload.path, verb: requestPayload.verb };
14✔
138

139
        await this.processStreams(requestPayload, receiveRequest);
14✔
140
    }
141

142
    private async processStreams(
143
        responsePayload: IResponsePayload | IRequestPayload,
144
        receiveResponse: IReceiveResponse | IReceiveRequest
UNCOV
145
    ): Promise<void> {
×
146
        responsePayload.streams?.forEach((responseStream) => {
26✔
147
            // There was a bug in how streams were handled. In .NET, the StreamDescription definiton mapped the
148
            // "type" JSON property to `contentType`, but in Typescript there was no mapping. This is a workaround
149
            // to resolve this inconsistency.
150
            //
151
            // .NET code:
152
            // https://github.com/microsoft/botbuilder-dotnet/blob/a79036ddf6625ec3fd68a6f7295886eb7831bc1c/libraries/Microsoft.Bot.Streaming/Payloads/Models/StreamDescription.cs#L28-L29
153
            const contentType =
154
                ((responseStream as unknown) as Record<string, string>).type ?? responseStream.contentType;
26!
155

156
            const contentAssembler = this.streamManager.getPayloadAssembler(responseStream.id);
26✔
157

158
            contentAssembler.payloadType = contentType;
26✔
159
            contentAssembler.contentLength = responseStream.length;
26✔
160

161
            receiveResponse.streams.push(new ContentStream(responseStream.id, contentAssembler));
26✔
162
        });
163

164
        await this._onCompleted(this.id, receiveResponse);
26✔
165
    }
166
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc