• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-js / 11579379955

29 Oct 2024 05:34PM UTC coverage: 84.703% (-0.5%) from 85.23%
11579379955

push

github

web-flow
refactor: [#4684] Replace browserify with tsup (#4774)

* Replace browserify with tsup in adaptive-expressions

* Remove remaining browserify packages

* Fix streaming tests

* Fix yarn.lock

* fix depcheck

8186 of 10820 branches covered (75.66%)

Branch coverage included in aggregate %.

2 of 2 new or added lines in 1 file covered. (100.0%)

23 existing lines in 13 files now uncovered.

20514 of 23063 relevant lines covered (88.95%)

7296.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.87
/libraries/botframework-streaming/src/payloadTransport/payloadReceiver.ts
1
/**
8✔
2
 * @module botframework-streaming
3
 */
4
/**
5
 * Copyright (c) Microsoft Corporation. All rights reserved.
6
 * Licensed under the MIT License.
7
 */
8
import { TransportDisconnectedEventHandler } from '.';
9
import { PayloadTypes } from '../payloads/payloadTypes';
2✔
10
import { HeaderSerializer } from '../payloads/headerSerializer';
2✔
11
import { SubscribableStream } from '../subscribableStream';
12
import { PayloadConstants } from '../payloads/payloadConstants';
2✔
13
import { TransportDisconnectedEvent } from './transportDisconnectedEvent';
2✔
14
import { IHeader, INodeBuffer, ITransportReceiver } from '../interfaces';
15

16
/**
17
 * Payload receiver for streaming.
18
 */
19
export class PayloadReceiver {
2✔
20
    disconnected?: TransportDisconnectedEventHandler;
21

22
    private _receiver: ITransportReceiver;
23
    private _receiveHeaderBuffer: INodeBuffer;
24
    private _receivePayloadBuffer: INodeBuffer;
25

26
    private _getStream: (header: IHeader) => SubscribableStream;
27
    private _receiveAction: (header: IHeader, stream: SubscribableStream, length: number) => void;
28

29
    /**
30
     * Get current connected state
31
     *
32
     * @returns true if connected to a transport sender.
33
     */
34
    get isConnected(): boolean {
35
        return this._receiver != null;
88,286✔
36
    }
37

38
    /**
39
     * Connects to a transport receiver
40
     *
41
     * @param receiver The [ITransportReceiver](xref:botframework-streaming.ITransportReceiver) object to pull incoming data from.
42
     * @returns a promise that resolves when the receiver is complete
43
     */
44
    connect(receiver: ITransportReceiver): Promise<void> {
45
        this._receiver = receiver;
73✔
46
        return this.receivePackets();
73✔
47
    }
48

49
    /**
50
     * Allows subscribing to this receiver in order to be notified when new data comes in.
51
     *
52
     * @param getStream Callback when a new stream has been received.
53
     * @param receiveAction Callback when a new message has been received.
54
     */
55
    subscribe(
56
        getStream: (header: IHeader) => SubscribableStream,
57
        receiveAction: (header: IHeader, stream: SubscribableStream, count: number) => void
58
    ): void {
59
        this._getStream = getStream;
85✔
60
        this._receiveAction = receiveAction;
85✔
61
    }
62

63
    /**
64
     * Force this receiver to disconnect.
65
     *
66
     * @param event Event arguments to include when broadcasting disconnection event.
67
     */
68
    disconnect(event = TransportDisconnectedEvent.Empty): void {
18!
69
        if (!this.isConnected) {
88,095!
70
            return;
77✔
71
        }
72

73
        try {
88,009✔
74
            this._receiver.close();
88,009✔
75
            this.disconnected?.(this, event);
88,007!
76
        } catch (err) {
77
            this.disconnected?.(this, new TransportDisconnectedEvent(err.message));
40!
78
        } finally {
79
            this._receiver = null;
88,004✔
80
        }
81
    }
82

UNCOV
83
    private async receivePackets(): Promise<void> {
×
84
        while (this.isConnected) {
73✔
85
            try {
113✔
86
                let readSoFar = 0;
113✔
87
                while (readSoFar < PayloadConstants.MaxHeaderLength) {
113✔
88
                    this._receiveHeaderBuffer = await this._receiver.receive(
117✔
89
                        PayloadConstants.MaxHeaderLength - readSoFar
90
                    );
91

92
                    if (this._receiveHeaderBuffer) {
52!
93
                        readSoFar += this._receiveHeaderBuffer.length;
48✔
94
                    }
95
                }
96

97
                const header = HeaderSerializer.deserialize(this._receiveHeaderBuffer);
48✔
98
                const isStream = header.payloadType === PayloadTypes.stream;
48✔
99

100
                if (header.payloadLength > 0) {
48✔
101
                    let bytesActuallyRead = 0;
46✔
102

103
                    const contentStream = this._getStream(header);
46✔
104

105
                    while (
42✔
106
                        bytesActuallyRead < header.payloadLength &&
122!
107
                        bytesActuallyRead < PayloadConstants.MaxPayloadLength
108
                    ) {
109
                        const count = Math.min(
42✔
110
                            header.payloadLength - bytesActuallyRead,
111
                            PayloadConstants.MaxPayloadLength
112
                        );
113
                        this._receivePayloadBuffer = await this._receiver.receive(count);
42✔
114
                        bytesActuallyRead += this._receivePayloadBuffer.byteLength;
38✔
115
                        contentStream.write(this._receivePayloadBuffer);
38✔
116

117
                        // If this is a stream we want to keep handing it up as it comes in
118
                        if (isStream) {
38!
119
                            this._receiveAction(header, contentStream, bytesActuallyRead);
18✔
120
                        }
121
                    }
122

123
                    if (!isStream) {
38!
124
                        this._receiveAction(header, contentStream, bytesActuallyRead);
20✔
125
                    }
126
                }
127
            } catch (err) {
128
                this.disconnect(new TransportDisconnectedEvent(err.message));
73✔
129
            }
130
        }
131
    }
132
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc