• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-js / 14082303413

26 Mar 2025 11:36AM UTC coverage: 84.47% (-0.05%) from 84.524%
14082303413

push

github

web-flow
Update elliptic, esbuild, and serialize-javascript (#4862)

8260 of 10940 branches covered (75.5%)

Branch coverage included in aggregate %.

20572 of 23193 relevant lines covered (88.7%)

4032.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.82
/libraries/botbuilder/src/cloudAdapter.ts
1
// Copyright (c) Microsoft Corporation.
2
// Licensed under the MIT License.
3

4
import * as z from 'zod';
1✔
5
import type { BotFrameworkHttpAdapter } from './botFrameworkHttpAdapter';
6
import { Activity, CloudAdapterBase, InvokeResponse, StatusCodes, TurnContext } from 'botbuilder-core';
1✔
7
import { GET, POST, VERSION_PATH } from './streaming';
1✔
8
import {
1✔
9
    HttpClient,
10
    HttpHeaders,
11
    HttpOperationResponse,
12
    WebResourceLike as WebResource,
13
} from 'botbuilder-stdlib/lib/azureCoreHttpCompat';
14
import { INodeBufferT, INodeSocketT, LogicT } from './zod';
1✔
15
import { Request, Response, ResponseT } from './interfaces';
1✔
16
import { USER_AGENT } from './botFrameworkAdapter';
1✔
17
import { retry } from 'botbuilder-stdlib';
1✔
18
import { validateAndFixActivity } from './activityValidator';
1✔
19

20
import {
1✔
21
    AuthenticateRequestResult,
22
    AuthenticationError,
23
    BotFrameworkAuthentication,
24
    BotFrameworkAuthenticationFactory,
25
    ClaimsIdentity,
26
    ConnectorClient,
27
    ConnectorFactory,
28
    MicrosoftAppCredentials,
29
} from 'botframework-connector';
30

31
import {
1✔
32
    INodeBuffer,
33
    INodeSocket,
34
    INodeDuplex,
35
    IReceiveRequest,
36
    IReceiveResponse,
37
    IStreamingTransportServer,
38
    NamedPipeServer,
39
    NodeWebSocketFactory,
40
    RequestHandler,
41
    StreamingRequest,
42
    StreamingResponse,
43
    WebSocketServer,
44
} from 'botframework-streaming';
45

46
// Note: this is _okay_ because we pass the result through `validateAndFixActivity`. Should not be used otherwise.
47
const ActivityT = z.custom<Activity>((val) => z.record(z.unknown()).safeParse(val).success, { message: 'Activity' });
2✔
48

49
/**
50
 * An adapter that implements the Bot Framework Protocol and can be hosted in different cloud environmens both public and private.
51
 */
52
export class CloudAdapter extends CloudAdapterBase implements BotFrameworkHttpAdapter {
1✔
53
    /**
54
     * Initializes a new instance of the [CloudAdapter](xref:botbuilder:CloudAdapter) class.
55
     *
56
     * @param botFrameworkAuthentication Optional [BotFrameworkAuthentication](xref:botframework-connector.BotFrameworkAuthentication) instance
57
     */
58
    constructor(botFrameworkAuthentication: BotFrameworkAuthentication = BotFrameworkAuthenticationFactory.create()) {
4✔
59
        super(botFrameworkAuthentication);
7✔
60
    }
61

62
    /**
63
     * Process a web request by applying a logic function.
64
     *
65
     * @param req An incoming HTTP [Request](xref:botbuilder.Request)
66
     * @param req The corresponding HTTP [Response](xref:botbuilder.Response)
67
     * @param logic The logic function to apply
68
     * @returns a promise representing the asynchronous operation.
69
     */
70
    async process(req: Request, res: Response, logic: (context: TurnContext) => Promise<void>): Promise<void>;
71

72
    /**
73
     * Handle a web socket connection by applying a logic function to
74
     * each streaming request.
75
     *
76
     * @param req An incoming HTTP [Request](xref:botbuilder.Request)
77
     * @param socket The corresponding [INodeSocket](xref:botframework-streaming.INodeSocket)
78
     * @param head The corresponding [INodeBuffer](xref:botframework-streaming.INodeBuffer)
79
     * @param logic The logic function to apply
80
     * @returns a promise representing the asynchronous operation.
81
     */
82
    async process(
83
        req: Request,
84
        socket: INodeSocket,
85
        head: INodeBuffer,
86
        logic: (context: TurnContext) => Promise<void>,
87
    ): Promise<void>;
88

89
    /**
90
     * Handle a web socket connection by applying a logic function to
91
     * each streaming request.
92
     *
93
     * @param req An incoming HTTP [Request](xref:botbuilder.Request)
94
     * @param socket The corresponding [INodeDuplex](xref:botframework-streaming.INodeDuplex)
95
     * @param head The corresponding [INodeBuffer](xref:botframework-streaming.INodeBuffer)
96
     * @param logic The logic function to apply
97
     * @returns a promise representing the asynchronous operation.
98
     */
99
    async process(
100
        req: Request,
101
        socket: INodeDuplex,
102
        head: INodeBuffer,
103
        logic: (context: TurnContext) => Promise<void>,
104
    ): Promise<void>;
105

106
    /**
107
     * @internal
108
     */
109
    async process(
110
        req: Request,
111
        resOrSocket: Response | INodeSocket | INodeDuplex,
112
        logicOrHead: ((context: TurnContext) => Promise<void>) | INodeBuffer,
113
        maybeLogic?: (context: TurnContext) => Promise<void>,
114
    ): Promise<void> {
115
        // Early return with web socket handler if function invocation matches that signature
116
        if (maybeLogic) {
3✔
117
            const socket = INodeSocketT.parse(resOrSocket);
1✔
118
            const head = INodeBufferT.parse(logicOrHead);
1✔
119
            const logic = LogicT.parse(maybeLogic);
1✔
120

121
            return this.connect(req, socket, head, logic);
1✔
122
        }
123

124
        const res = ResponseT.parse(resOrSocket);
2✔
125

126
        const logic = LogicT.parse(logicOrHead);
2✔
127

128
        const end = (status: StatusCodes, body?: unknown) => {
2✔
129
            res.status(status);
2✔
130
            if (body) {
2✔
131
                res.send(body);
1✔
132
            }
133
            res.end();
2✔
134
        };
135

136
        // Only POST requests from here on out
137
        if (req.method !== 'POST') {
2!
138
            return end(StatusCodes.METHOD_NOT_ALLOWED);
×
139
        }
140

141
        // Ensure we have a parsed request body already. We rely on express/restify middleware to parse
142
        // request body and azure functions, which does it for us before invoking our code. Warn the user
143
        // to update their code and return an error.
144
        if (!z.record(z.unknown()).safeParse(req.body).success) {
2!
145
            return end(
×
146
                StatusCodes.BAD_REQUEST,
147
                '`req.body` not an object, make sure you are using middleware to parse incoming requests.',
148
            );
149
        }
150

151
        const activity = validateAndFixActivity(ActivityT.parse(req.body));
2✔
152

153
        if (!activity.type) {
2!
154
            console.warn('BadRequest: Missing activity or activity type.');
×
155
            return end(StatusCodes.BAD_REQUEST);
×
156
        }
157

158
        const authHeader = z.string().parse(req.headers.Authorization ?? req.headers.authorization ?? '');
2!
159
        try {
2✔
160
            const invokeResponse = await this.processActivity(authHeader, activity, logic);
2✔
161
            return end(invokeResponse?.status ?? StatusCodes.OK, invokeResponse?.body);
1!
162
        } catch (err) {
163
            console.error(err);
1✔
164
            return end(
1✔
165
                err instanceof AuthenticationError ? StatusCodes.UNAUTHORIZED : StatusCodes.INTERNAL_SERVER_ERROR,
1!
166
                err.message ?? err,
3!
167
            );
168
        }
169
    }
170

171
    /**
172
     * Asynchronously process an activity running the provided logic function.
173
     *
174
     * @param authorization The authorization header in the format: "Bearer [longString]" or the AuthenticateRequestResult for this turn.
175
     * @param activity The activity to process.
176
     * @param logic The logic function to apply.
177
     * @returns a promise representing the asynchronous operation.
178
     */
179
    async processActivityDirect(
180
        authorization: string | AuthenticateRequestResult,
181
        activity: Activity,
182
        logic: (context: TurnContext) => Promise<void>,
183
    ): Promise<void> {
184
        try {
3✔
185
            await this.processActivity(authorization as any, activity, logic);
3✔
186
        } catch (err) {
187
            throw new Error(`CloudAdapter.processActivityDirect(): ERROR\n ${err.stack}`);
1✔
188
        }
189
    }
190

191
    /**
192
     * Used to connect the adapter to a named pipe.
193
     *
194
     * @param pipeName Pipe name to connect to (note: yields two named pipe servers by appending ".incoming" and ".outgoing" to this name)
195
     * @param logic The logic function to call for resulting bot turns.
196
     * @param appId The Bot application ID
197
     * @param audience The audience to use for outbound communication. The will vary by cloud environment.
198
     * @param callerId Optional, the caller ID
199
     * @param retryCount Optional, the number of times to retry a failed connection (defaults to 7)
200
     */
201
    async connectNamedPipe(
202
        pipeName: string,
203
        logic: (context: TurnContext) => Promise<void>,
204
        appId: string,
205
        audience: string,
206
        callerId?: string,
207
        retryCount = 7,
6✔
208
    ): Promise<void> {
209
        z.object({
6✔
210
            pipeName: z.string(),
211
            logic: LogicT,
212
            appId: z.string(),
213
            audience: z.string(),
214
            callerId: z.string().optional(),
215
        }).parse({ pipeName, logic, appId, audience, callerId });
216

217
        // The named pipe is local and so there is no network authentication to perform: so we can create the result here.
218
        const authenticateRequestResult: AuthenticateRequestResult = {
1✔
219
            audience,
220
            callerId,
221
            claimsIdentity: appId ? this.createClaimsIdentity(appId) : new ClaimsIdentity([]),
1!
222
        };
223

224
        // Creat request handler
225
        const requestHandler = new StreamingRequestHandler(
1✔
226
            authenticateRequestResult,
227
            (authenticateRequestResult, activity) => this.processActivity(authenticateRequestResult, activity, logic),
×
228
        );
229

230
        // Create server
231
        const server = new NamedPipeServer(pipeName, requestHandler);
1✔
232

233
        // Attach server to request handler for outbound requests
234
        requestHandler.server = server;
1✔
235

236
        // Spin it up
237
        await retry(() => server.start(), retryCount);
1✔
238
    }
239

240
    private async connect(
241
        req: Request,
242
        socket: INodeSocket,
243
        head: INodeBuffer,
244
        logic: (context: TurnContext) => Promise<void>,
245
    ): Promise<void> {
246
        // Grab the auth header from the inbound http request
247
        const authHeader = z.string().parse(req.headers.Authorization ?? req.headers.authorization ?? '');
×
248

249
        // Grab the channelId which should be in the http headers
250
        const channelIdHeader = z.string().optional().parse(req.headers.channelid);
×
251

252
        // Authenticate inbound request
253
        const authenticateRequestResult = await this.botFrameworkAuthentication.authenticateStreamingRequest(
×
254
            authHeader,
255
            channelIdHeader,
256
        );
257

258
        // Creat request handler
259
        const requestHandler = new StreamingRequestHandler(
×
260
            authenticateRequestResult,
261
            (authenticateRequestResult, activity) => this.processActivity(authenticateRequestResult, activity, logic),
×
262
        );
263

264
        // Create server
265
        const server = new WebSocketServer(
×
266
            await new NodeWebSocketFactory().createWebSocket(req, socket, head),
267
            requestHandler,
268
        );
269

270
        // Attach server to request handler
271
        requestHandler.server = server;
×
272

273
        // Spin it up
274
        await server.start();
×
275
    }
276
}
277

278
/**
279
 * @internal
280
 */
281
class StreamingRequestHandler extends RequestHandler {
282
    server?: IStreamingTransportServer;
283

284
    // Note: `processActivity` lambda is to work around the fact that CloudAdapterBase#processActivity
285
    // is protected, and we can't get around that by defining classes inside of other classes
286
    constructor(
287
        private readonly authenticateRequestResult: AuthenticateRequestResult,
1✔
288
        private readonly processActivity: (
1✔
289
            authenticateRequestResult: AuthenticateRequestResult,
290
            activity: Activity,
291
        ) => Promise<InvokeResponse | undefined>,
292
    ) {
293
        super();
1✔
294

295
        // Attach streaming connector factory to authenticateRequestResult so it's used for outbound calls
296
        this.authenticateRequestResult.connectorFactory = new StreamingConnectorFactory(this);
1✔
297
    }
298

299
    async processRequest(request: IReceiveRequest): Promise<StreamingResponse> {
300
        const response = new StreamingResponse();
×
301

302
        const end = (statusCode: StatusCodes, body?: unknown): StreamingResponse => {
×
303
            response.statusCode = statusCode;
×
304
            if (body) {
×
305
                response.setBody(body);
×
306
            }
307
            return response;
×
308
        };
309

310
        if (!request) {
×
311
            return end(StatusCodes.BAD_REQUEST, 'No request provided.');
×
312
        }
313

314
        if (!request.verb || !request.path) {
×
315
            return end(
×
316
                StatusCodes.BAD_REQUEST,
317
                `Request missing verb and/or path. Verb: ${request.verb}, Path: ${request.path}`,
318
            );
319
        }
320

321
        if (request.verb.toUpperCase() !== POST && request.verb.toUpperCase() !== GET) {
×
322
            return end(
×
323
                StatusCodes.METHOD_NOT_ALLOWED,
324
                `Invalid verb received. Only GET and POST are accepted. Verb: ${request.verb}`,
325
            );
326
        }
327

328
        if (request.path.toLowerCase() === VERSION_PATH) {
×
329
            if (request.verb.toUpperCase() === GET) {
×
330
                return end(StatusCodes.OK, { UserAgent: USER_AGENT });
×
331
            } else {
332
                return end(
×
333
                    StatusCodes.METHOD_NOT_ALLOWED,
334
                    `Invalid verb received for path: ${request.path}. Only GET is accepted. Verb: ${request.verb}`,
335
                );
336
            }
337
        }
338

339
        const [activityStream, ...attachmentStreams] = request.streams;
×
340

341
        let activity: Activity;
342
        try {
×
343
            activity = validateAndFixActivity(ActivityT.parse(await activityStream.readAsJson()));
×
344

345
            activity.attachments = await Promise.all(
×
346
                attachmentStreams.map(async (attachmentStream) => {
×
347
                    const contentType = attachmentStream.contentType;
×
348

349
                    const content =
350
                        contentType === 'application/json'
×
351
                            ? await attachmentStream.readAsJson()
×
352
                            : await attachmentStream.readAsString();
353

354
                    return { contentType, content };
×
355
                }),
356
            );
357
        } catch (err) {
358
            return end(StatusCodes.BAD_REQUEST, `Request body missing or malformed: ${err}`);
×
359
        }
360

361
        try {
×
362
            const invokeResponse = await this.processActivity(this.authenticateRequestResult, activity);
×
363
            return end(invokeResponse?.status ?? StatusCodes.OK, invokeResponse?.body);
×
364
        } catch (err) {
365
            return end(StatusCodes.INTERNAL_SERVER_ERROR, err.message ?? err);
×
366
        }
367
    }
368
}
369

370
/**
371
 * @internal
372
 */
373
class StreamingConnectorFactory implements ConnectorFactory {
374
    private serviceUrl?: string;
375

376
    constructor(private readonly requestHandler: StreamingRequestHandler) {}
1✔
377

378
    async create(serviceUrl: string, _audience: string): Promise<ConnectorClient> {
379
        this.serviceUrl ??= serviceUrl;
×
380

381
        if (serviceUrl !== this.serviceUrl) {
×
382
            throw new Error(
×
383
                'This is a streaming scenario, all connectors from this factory must all be for the same url.',
384
            );
385
        }
386

387
        const httpClient = new StreamingHttpClient(this.requestHandler);
×
388

389
        return new ConnectorClient(MicrosoftAppCredentials.Empty, { httpClient });
×
390
    }
391
}
392

393
/**
394
 * @internal
395
 */
396
class StreamingHttpClient implements HttpClient {
397
    constructor(private readonly requestHandler: StreamingRequestHandler) {}
×
398

399
    async sendRequest(httpRequest: WebResource): Promise<HttpOperationResponse> {
400
        const streamingRequest = this.createStreamingRequest(httpRequest);
×
401
        const receiveResponse = await this.requestHandler.server?.send(streamingRequest);
×
402
        return this.createHttpResponse(receiveResponse, httpRequest);
×
403
    }
404

405
    private createStreamingRequest(httpRequest: WebResource): StreamingRequest {
406
        const verb = httpRequest.method.toString();
×
407
        const path = httpRequest.url.slice(httpRequest.url.indexOf('/v3'));
×
408

409
        const request = StreamingRequest.create(verb, path);
×
410
        request.setBody(httpRequest.body);
×
411

412
        return request;
×
413
    }
414

415
    private async createHttpResponse(
416
        receiveResponse: IReceiveResponse,
417
        httpRequest: WebResource,
418
    ): Promise<HttpOperationResponse> {
419
        const [bodyAsText] =
420
            (await Promise.all(receiveResponse.streams?.map((stream) => stream.readAsString()) ?? [])) ?? [];
×
421

422
        return {
×
423
            bodyAsText,
424
            headers: new HttpHeaders(),
425
            request: httpRequest,
426
            status: receiveResponse.statusCode,
427
        };
428
    }
429
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc