• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-js / 4791637529

pending completion
4791637529

Pull #4461

github

GitHub
Merge 9835ef480 into f9cf5fd25
Pull Request #4461: fix: [#4452][#4456][#4460][botframework-streaming] Should reject pending requests on disconnection

9672 of 12704 branches covered (76.13%)

Branch coverage included in aggregate %.

8 of 8 new or added lines in 4 files covered. (100.0%)

19954 of 22409 relevant lines covered (89.04%)

3353.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.68
/libraries/botframework-streaming/src/namedPipe/namedPipeServer.ts
1
/**
2
 * @module botframework-streaming
3
 */
4
/**
5
 * Copyright (c) Microsoft Corporation. All rights reserved.
6
 * Licensed under the MIT License.
7
 */
8
import { INodeServer, IStreamingTransportServer, IReceiveResponse } from '../interfaces';
9
import { NamedPipeTransport } from './namedPipeTransport';
1✔
10
import { PayloadReceiver, PayloadSender } from '../payloadTransport';
1✔
11
import { ProtocolAdapter } from '../protocolAdapter';
1✔
12
import { RequestHandler } from '../requestHandler';
13
import { RequestManager } from '../payloads';
1✔
14
import { StreamingRequest } from '../streamingRequest';
15
import { createNodeServer } from '../utilities/createNodeServer';
1✔
16

17
/**
18
 * Streaming transport server implementation that uses named pipes for inter-process communication.
19
 */
20
export class NamedPipeServer implements IStreamingTransportServer {
1✔
21
    private _outgoingServer: INodeServer;
22
    private _incomingServer: INodeServer;
23

24
    private readonly _sender = new PayloadSender();
2✔
25
    private readonly _receiver = new PayloadReceiver();
2✔
26
    private readonly _protocolAdapter: ProtocolAdapter;
27

28
    /**
29
     * Creates a new instance of the [NamedPipeServer](xref:botframework-streaming.NamedPipeServer) class.
30
     *
31
     * @param baseName The named pipe to connect to.
32
     * @param requestHandler Optional [RequestHandler](xref:botframework-streaming.RequestHandler) to process incoming messages received by this client.
33
     * @param autoReconnect Deprecated: Automatic reconnection is the default behavior.
34
     */
35
    constructor(private readonly baseName: string, requestHandler?: RequestHandler, autoReconnect?: boolean) {
2✔
36
        if (!baseName) {
2!
37
            throw new TypeError('NamedPipeServer: Missing baseName parameter');
×
38
        }
39

40
        if (autoReconnect != null) {
2!
41
            console.warn('NamedPipeServer: The autoReconnect parameter is deprecated');
×
42
        }
43

44
        this._sender = new PayloadSender();
2✔
45
        this._receiver = new PayloadReceiver();
2✔
46
        this._protocolAdapter = new ProtocolAdapter(requestHandler, new RequestManager(), this._sender, this._receiver);
2✔
47
    }
48

49
    /**
50
     * Get connected status
51
     *
52
     * @returns true if currently connected.
53
     */
54
    get isConnected(): boolean {
55
        return this._receiver.isConnected && this._sender.isConnected;
×
56
    }
57

58
    /**
59
     * Used to establish the connection used by this server and begin listening for incoming messages.
60
     *
61
     * @param onListen Optional callback that fires once when server is listening on both incoming and outgoing pipe
62
     * @returns A promised string that will not resolve as long as the server is running.
63
     */
64
    async start(onListen?: () => void): Promise<string> {
65
        const { PipePath, ServerIncomingPath, ServerOutgoingPath } = NamedPipeTransport;
1✔
66

67
        // The first promise resolves as soon as the server is listening. The second resolves when the server
68
        // closes, or an error occurs. Wrapping with an array ensures the initial await only waits for the listening
69
        // promise.
70
        //
71
        // We want to ensure we are listening to the servers in series so that, if two processes start at the same
72
        // time, only one is able to listen on both the incoming and outgoing sockets.
73
        const [incoming] = await new Promise<[Promise<void>]>((resolveListening, rejectListening) => {
1✔
74
            const server = createNodeServer((socket) => {
1✔
75
                if (this._receiver.isConnected) {
×
76
                    return;
×
77
                }
78

79
                this._receiver.connect(new NamedPipeTransport(socket));
×
80
            }).once('error', rejectListening);
81

82
            this._incomingServer = server;
1✔
83

84
            const isListening = new Promise<void>((resolveClosed, rejectClosed) => {
1✔
85
                // Only register rejection once the server is actually listening
86
                server.once('listening', () => server.once('error', rejectClosed));
1✔
87
                server.once('closed', resolveClosed);
1✔
88
            });
89

90
            server.once('listening', () => resolveListening([isListening]));
1✔
91

92
            server.listen(PipePath + this.baseName + ServerIncomingPath);
1✔
93
        });
94

95
        // Now that we absolutely have the incoming socket, bind the outgoing socket as well
96
        const [outgoing] = await new Promise<[Promise<void>]>((resolveListening, rejectListening) => {
×
97
            const server = createNodeServer((socket) => {
×
98
                if (this._sender.isConnected) {
×
99
                    return;
×
100
                }
101

102
                // Note: manually disconnect sender if client closes socket. This ensures that
103
                // reconnections are allowed
104
                this._sender.connect(new NamedPipeTransport(socket));
×
105
                socket.once('close', () => this._sender.disconnect());
×
106
            }).once('error', rejectListening);
107

108
            this._outgoingServer = server;
×
109

110
            const isListening = new Promise<void>((resolveClosed, rejectClosed) => {
×
111
                // Only register rejection once the server is actually listening
112
                server.once('listening', () => server.once('error', rejectClosed));
×
113
                server.once('closed', resolveClosed);
×
114
            });
115

116
            server.once('listening', () => resolveListening([isListening]));
×
117

118
            server.listen(PipePath + this.baseName + ServerOutgoingPath);
×
119
        });
120

121
        onListen?.();
×
122

123
        await Promise.all([incoming, outgoing]);
×
124

125
        return 'connected';
×
126
    }
127

128
    /**
129
     * Allows for manually disconnecting the server.
130
     */
131
    disconnect(): void {
132
        this._receiver.disconnect();
1✔
133
        this._incomingServer?.close();
1!
134
        this._incomingServer = null;
1✔
135

136
        this._sender.disconnect();
1✔
137
        this._outgoingServer?.close();
1!
138
        this._outgoingServer = null;
1✔
139
    }
140

141
    /**
142
     * Task used to send data over this client connection.
143
     *
144
     * @param request The [StreamingRequest](xref:botframework-streaming.StreamingRequest) to send.
145
     * @returns A promise for an instance of [IReceiveResponse](xref:botframework-streaming.IReceiveResponse) on completion of the send operation.
146
     */
147
    async send(request: StreamingRequest): Promise<IReceiveResponse> {
148
        return this._protocolAdapter.sendRequest(request);
×
149
    }
150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc