• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-js / 5334935081

pending completion
5334935081

Pull #4494

github

web-flow
Merge 1e624bfad into 5ee34c0ce
Pull Request #4494: fix: [botframework-streaming] Some named pipe tests are not passing after fixing their inconclusiveness

9775 of 12807 branches covered (76.33%)

Branch coverage included in aggregate %.

5 of 5 new or added lines in 1 file covered. (100.0%)

20037 of 22480 relevant lines covered (89.13%)

7059.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.36
/libraries/botframework-streaming/src/namedPipe/namedPipeTransport.ts
1
/**
2
 * @module botframework-streaming
3
 */
4
/**
5
 * Copyright (c) Microsoft Corporation. All rights reserved.
6
 * Licensed under the MIT License.
7
 */
8
import { INodeBuffer, INodeSocket, ITransportSender, ITransportReceiver } from '../interfaces';
9

10
/**
11
 * Named pipes based transport sender and receiver abstraction
12
 */
13
export class NamedPipeTransport implements ITransportSender, ITransportReceiver {
2✔
14
    static readonly PipePath: string = '\\\\.\\pipe\\';
2✔
15
    static readonly ServerIncomingPath: string = '.incoming';
2✔
16
    static readonly ServerOutgoingPath: string = '.outgoing';
2✔
17

18
    private readonly _queue: INodeBuffer[];
19
    private _active: INodeBuffer;
20
    private _activeOffset = 0;
31✔
21
    private _activeReceiveCount = 0;
31✔
22
    private _activeReceiveResolve: (resolve: INodeBuffer) => void;
23
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
24
    private _activeReceiveReject: (reason?: any) => void;
25

26
    /**
27
     * Creates a new instance of the [NamedPipeTransport](xref:botframework-streaming.NamedPipeTransport) class.
28
     *
29
     * @param socket The socket object to build this connection on.
30
     */
31
    constructor(private socket: INodeSocket) {
31✔
32
        this._queue = [];
31✔
33
        if (socket) {
31!
34
            this.socket.on('data', (data): void => {
31✔
35
                this.socketReceive(data);
16✔
36
            });
37
            this.socket.on('close', (): void => {
31✔
38
                this.socketClose();
18✔
39
            });
40
            this.socket.on('error', (err): void => {
31✔
41
                this.socketError(err);
10✔
42
            });
43
        }
44
    }
45

46
    /**
47
     * Writes to the pipe and sends.
48
     *
49
     * @param buffer The buffer full of data to send out across the socket.
50
     * @returns A number indicating the length of the sent data if the data was successfully sent, otherwise 0.
51
     */
52
    send(buffer: INodeBuffer): number {
53
        if (this.socket && !this.socket.connecting && this.socket.writable) {
19✔
54
            this.socket.write(buffer);
13✔
55

56
            return buffer.length;
13✔
57
        }
58

59
        return 0;
6✔
60
    }
61

62
    /**
63
     * Returns `true` if currently connected.
64
     *
65
     * @returns `true` if the the transport is connected and ready to send data, `false` otherwise.
66
     */
67
    get isConnected(): boolean {
68
        return !(!this.socket || this.socket.destroyed || this.socket.connecting);
10✔
69
    }
70

71
    /**
72
     * Closes the transport.
73
     */
74
    close(): void {
75
        if (this.socket) {
27✔
76
            this.socket.end('end');
22✔
77
            this.socket = null;
22✔
78
        }
79
    }
80

81
    /**
82
     * Receive from the transport into the buffer.
83
     *
84
     * @param count The maximum amount of bytes to write to the buffer.
85
     * @returns The buffer containing the data from the transport.
86
     */
87
    receive(count: number): Promise<INodeBuffer> {
88
        if (!this.socket) {
24✔
89
            throw new Error('Cannot receive data over an unavailable/null socket.');
1✔
90
        } else if (this.socket.destroyed) {
23✔
91
            throw new Error('Cannot receive data over a dead/destroyed socket.');
1✔
92
        } else if (this._activeReceiveResolve) {
22!
93
            throw new Error('Cannot call receive more than once before it has returned.');
×
94
        }
95

96
        this._activeReceiveCount = count;
22✔
97

98
        const promise = new Promise<INodeBuffer>((resolve, reject): void => {
22✔
99
            this._activeReceiveResolve = resolve;
22✔
100
            this._activeReceiveReject = reject;
22✔
101
        });
102

103
        this.trySignalData();
22✔
104

105
        return promise;
22✔
106
    }
107

108
    private socketReceive(data: INodeBuffer): void {
109
        if (this._queue && data && data.length > 0) {
18!
110
            this._queue.push(data);
18✔
111
            this.trySignalData();
18✔
112
        }
113
    }
114

115
    private socketClose(): void {
116
        if (this._activeReceiveReject) {
30✔
117
            this._activeReceiveReject(new Error('Socket was closed.'));
5✔
118
        }
119

120
        this._active = null;
30✔
121
        this._activeOffset = 0;
30✔
122
        this._activeReceiveResolve = null;
30✔
123
        this._activeReceiveReject = null;
30✔
124
        this._activeReceiveCount = 0;
30✔
125
        this.socket = null;
30✔
126
    }
127

128
    private socketError(err: Error): void {
129
        if (this._activeReceiveReject) {
11✔
130
            this._activeReceiveReject(err);
5✔
131
        }
132
        this.socketClose();
11✔
133
    }
134

135
    private trySignalData(): void {
136
        if (this._activeReceiveResolve) {
40✔
137
            if (!this._active && this._queue.length > 0) {
35✔
138
                this._active = this._queue.shift();
13✔
139
                this._activeOffset = 0;
13✔
140
            }
141

142
            if (this._active) {
35✔
143
                if (this._activeOffset === 0 && this._active.length === this._activeReceiveCount) {
17✔
144
                    // can send the entire _active buffer
145
                    const buffer = this._active;
7✔
146
                    this._active = null;
7✔
147

148
                    this._activeReceiveResolve(buffer);
7✔
149
                } else {
150
                    // create a new buffer and copy some of the contents into it
151
                    const available = Math.min(this._activeReceiveCount, this._active.length - this._activeOffset);
10✔
152
                    const buffer = Buffer.alloc(available);
10✔
153
                    this._active.copy(buffer, 0, this._activeOffset, this._activeOffset + available);
10✔
154
                    this._activeOffset += available;
10✔
155

156
                    // if we used all of active, set it to undefined
157
                    if (this._activeOffset >= this._active.length) {
10✔
158
                        this._active = null;
6✔
159
                        this._activeOffset = 0;
6✔
160
                    }
161

162
                    this._activeReceiveResolve(buffer);
10✔
163
                }
164

165
                this._activeReceiveCount = 0;
17✔
166
                this._activeReceiveReject = null;
17✔
167
                this._activeReceiveResolve = null;
17✔
168
            }
169
        }
170
    }
171
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc