• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-js / 5247709948

pending completion
5247709948

Pull #4485

github

web-flow
Merge 5eccee48e into 5ee34c0ce
Pull Request #4485: feat: Add support for config auth type (fetch & submit)

9780 of 12809 branches covered (76.35%)

Branch coverage included in aggregate %.

6 of 6 new or added lines in 1 file covered. (100.0%)

20042 of 22482 relevant lines covered (89.15%)

7035.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.36
/libraries/botbuilder/src/cloudAdapter.ts
1
// Copyright (c) Microsoft Corporation.
2
// Licensed under the MIT License.
3

4
import * as z from 'zod';
2✔
5
import type { BotFrameworkHttpAdapter } from './botFrameworkHttpAdapter';
6
import { Activity, CloudAdapterBase, InvokeResponse, StatusCodes, TurnContext } from 'botbuilder-core';
2✔
7
import { GET, POST, VERSION_PATH } from './streaming';
2✔
8
import { HttpClient, HttpHeaders, HttpOperationResponse, WebResource } from '@azure/ms-rest-js';
2✔
9
import { INodeBufferT, INodeSocketT, LogicT } from './zod';
2✔
10
import { Request, Response, ResponseT } from './interfaces';
2✔
11
import { USER_AGENT } from './botFrameworkAdapter';
2✔
12
import { retry } from 'botbuilder-stdlib';
2✔
13
import { validateAndFixActivity } from './activityValidator';
2✔
14

15
import {
2✔
16
    AuthenticateRequestResult,
17
    AuthenticationError,
18
    BotFrameworkAuthentication,
19
    BotFrameworkAuthenticationFactory,
20
    ClaimsIdentity,
21
    ConnectorClient,
22
    ConnectorFactory,
23
    MicrosoftAppCredentials,
24
} from 'botframework-connector';
25

26
import {
2✔
27
    INodeBuffer,
28
    INodeSocket,
29
    IReceiveRequest,
30
    IReceiveResponse,
31
    IStreamingTransportServer,
32
    NamedPipeServer,
33
    NodeWebSocketFactory,
34
    RequestHandler,
35
    StreamingRequest,
36
    StreamingResponse,
37
    WebSocketServer,
38
} from 'botframework-streaming';
39

40
// Note: this is _okay_ because we pass the result through `validateAndFixActivity`. Should not be used otherwise.
41
const ActivityT = z.custom<Activity>((val) => z.record(z.unknown()).check(val), { message: 'Activity' });
4✔
42

43
/**
44
 * An adapter that implements the Bot Framework Protocol and can be hosted in different cloud environmens both public and private.
45
 */
46
export class CloudAdapter extends CloudAdapterBase implements BotFrameworkHttpAdapter {
2✔
47
    /**
48
     * Initializes a new instance of the [CloudAdapter](xref:botbuilder:CloudAdapter) class.
49
     *
50
     * @param botFrameworkAuthentication Optional [BotFrameworkAuthentication](xref:botframework-connector.BotFrameworkAuthentication) instance
51
     */
52
    constructor(botFrameworkAuthentication: BotFrameworkAuthentication = BotFrameworkAuthenticationFactory.create()) {
8✔
53
        super(botFrameworkAuthentication);
14✔
54
    }
55

56
    /**
57
     * Process a web request by applying a logic function.
58
     *
59
     * @param req An incoming HTTP [Request](xref:botbuilder.Request)
60
     * @param req The corresponding HTTP [Response](xref:botbuilder.Response)
61
     * @param logic The logic function to apply
62
     * @returns a promise representing the asynchronous operation.
63
     */
64
    async process(req: Request, res: Response, logic: (context: TurnContext) => Promise<void>): Promise<void>;
65

66
    /**
67
     * Handle a web socket connection by applying a logic function to
68
     * each streaming request.
69
     *
70
     * @param req An incoming HTTP [Request](xref:botbuilder.Request)
71
     * @param socket The corresponding [INodeSocket](xref:botframework-streaming.INodeSocket)
72
     * @param head The corresponding [INodeBuffer](xref:botframework-streaming.INodeBuffer)
73
     * @param logic The logic function to apply
74
     * @returns a promise representing the asynchronous operation.
75
     */
76
    async process(
77
        req: Request,
78
        socket: INodeSocket,
79
        head: INodeBuffer,
80
        logic: (context: TurnContext) => Promise<void>
81
    ): Promise<void>;
82

83
    /**
84
     * @internal
85
     */
86
    async process(
87
        req: Request,
88
        resOrSocket: Response | INodeSocket,
89
        logicOrHead: ((context: TurnContext) => Promise<void>) | INodeBuffer,
90
        maybeLogic?: (context: TurnContext) => Promise<void>
91
    ): Promise<void> {
92
        // Early return with web socket handler if function invocation matches that signature
93
        if (maybeLogic) {
6✔
94
            const socket = INodeSocketT.parse(resOrSocket);
2✔
95
            const head = INodeBufferT.parse(logicOrHead);
2✔
96
            const logic = LogicT.parse(maybeLogic);
2✔
97

98
            return this.connect(req, socket, head, logic);
2✔
99
        }
100

101
        const res = ResponseT.parse(resOrSocket);
4✔
102

103
        const logic = LogicT.parse(logicOrHead);
4✔
104

105
        const end = (status: StatusCodes, body?: unknown) => {
4✔
106
            res.status(status);
4✔
107
            if (body) {
4✔
108
                res.send(body);
2✔
109
            }
110
            res.end();
4✔
111
        };
112

113
        // Only POST requests from here on out
114
        if (req.method !== 'POST') {
4!
115
            return end(StatusCodes.METHOD_NOT_ALLOWED);
×
116
        }
117

118
        // Ensure we have a parsed request body already. We rely on express/restify middleware to parse
119
        // request body and azure functions, which does it for us before invoking our code. Warn the user
120
        // to update their code and return an error.
121
        if (!z.record(z.unknown()).check(req.body)) {
4!
122
            return end(
×
123
                StatusCodes.BAD_REQUEST,
124
                '`req.body` not an object, make sure you are using middleware to parse incoming requests.'
125
            );
126
        }
127

128
        const activity = validateAndFixActivity(ActivityT.parse(req.body));
4✔
129

130
        if (!activity.type) {
4!
131
            console.warn('BadRequest: Missing activity or activity type.');
×
132
            return end(StatusCodes.BAD_REQUEST);
×
133
        }
134

135
        const authHeader = z.string().parse(req.headers.Authorization ?? req.headers.authorization ?? '');
4!
136

137
        try {
4✔
138
            const invokeResponse = await this.processActivity(authHeader, activity, logic);
4✔
139
            return end(invokeResponse?.status ?? StatusCodes.OK, invokeResponse?.body);
2!
140
        } catch (err) {
141
            console.error(err);
2✔
142
            return end(
2✔
143
                err instanceof AuthenticationError ? StatusCodes.UNAUTHORIZED : StatusCodes.INTERNAL_SERVER_ERROR,
2!
144
                err.message ?? err
6!
145
            );
146
        }
147
    }
148

149
    /**
150
     * Used to connect the adapter to a named pipe.
151
     *
152
     * @param pipeName Pipe name to connect to (note: yields two named pipe servers by appending ".incoming" and ".outgoing" to this name)
153
     * @param logic The logic function to call for resulting bot turns.
154
     * @param appId The Bot application ID
155
     * @param audience The audience to use for outbound communication. The will vary by cloud environment.
156
     * @param callerId Optional, the caller ID
157
     * @param retryCount Optional, the number of times to retry a failed connection (defaults to 7)
158
     */
159
    async connectNamedPipe(
160
        pipeName: string,
161
        logic: (context: TurnContext) => Promise<void>,
162
        appId: string,
163
        audience: string,
164
        callerId?: string,
165
        retryCount = 7
12✔
166
    ): Promise<void> {
167
        z.object({
12✔
168
            pipeName: z.string(),
169
            logic: LogicT,
170
            appId: z.string(),
171
            audience: z.string(),
172
            callerId: z.string().optional(),
173
        }).parse({ pipeName, logic, appId, audience, callerId });
174

175
        // The named pipe is local and so there is no network authentication to perform: so we can create the result here.
176
        const authenticateRequestResult: AuthenticateRequestResult = {
2✔
177
            audience,
178
            callerId,
179
            claimsIdentity: appId ? this.createClaimsIdentity(appId) : new ClaimsIdentity([]),
2!
180
        };
181

182
        // Creat request handler
183
        const requestHandler = new StreamingRequestHandler(
2✔
184
            authenticateRequestResult,
185
            (authenticateRequestResult, activity) => this.processActivity(authenticateRequestResult, activity, logic)
×
186
        );
187

188
        // Create server
189
        const server = new NamedPipeServer(pipeName, requestHandler);
2✔
190

191
        // Attach server to request handler for outbound requests
192
        requestHandler.server = server;
2✔
193

194
        // Spin it up
195
        await retry(() => server.start(), retryCount);
2✔
196
    }
197

198
    private async connect(
199
        req: Request,
200
        socket: INodeSocket,
201
        head: INodeBuffer,
202
        logic: (context: TurnContext) => Promise<void>
203
    ): Promise<void> {
204
        // Grab the auth header from the inbound http request
205
        const authHeader = z.string().parse(req.headers.Authorization ?? req.headers.authorization ?? '');
×
206

207
        // Grab the channelId which should be in the http headers
208
        const channelIdHeader = z.string().optional().parse(req.headers.channelid);
×
209

210
        // Authenticate inbound request
211
        const authenticateRequestResult = await this.botFrameworkAuthentication.authenticateStreamingRequest(
×
212
            authHeader,
213
            channelIdHeader
214
        );
215

216
        // Creat request handler
217
        const requestHandler = new StreamingRequestHandler(
×
218
            authenticateRequestResult,
219
            (authenticateRequestResult, activity) => this.processActivity(authenticateRequestResult, activity, logic)
×
220
        );
221

222
        // Create server
223
        const server = new WebSocketServer(
×
224
            await new NodeWebSocketFactory().createWebSocket(req, socket, head),
225
            requestHandler
226
        );
227

228
        // Attach server to request handler
229
        requestHandler.server = server;
×
230

231
        // Spin it up
232
        await server.start();
×
233
    }
234
}
235

236
/**
237
 * @internal
238
 */
239
class StreamingRequestHandler extends RequestHandler {
240
    server?: IStreamingTransportServer;
241

242
    // Note: `processActivity` lambda is to work around the fact that CloudAdapterBase#processActivity
243
    // is protected, and we can't get around that by defining classes inside of other classes
244
    constructor(
245
        private readonly authenticateRequestResult: AuthenticateRequestResult,
2✔
246
        private readonly processActivity: (
2✔
247
            authenticateRequestResult: AuthenticateRequestResult,
248
            activity: Activity
249
        ) => Promise<InvokeResponse | undefined>
250
    ) {
251
        super();
2✔
252

253
        // Attach streaming connector factory to authenticateRequestResult so it's used for outbound calls
254
        this.authenticateRequestResult.connectorFactory = new StreamingConnectorFactory(this);
2✔
255
    }
256

257
    async processRequest(request: IReceiveRequest): Promise<StreamingResponse> {
258
        const response = new StreamingResponse();
×
259

260
        const end = (statusCode: StatusCodes, body?: unknown): StreamingResponse => {
×
261
            response.statusCode = statusCode;
×
262
            if (body) {
×
263
                response.setBody(body);
×
264
            }
265
            return response;
×
266
        };
267

268
        if (!request) {
×
269
            return end(StatusCodes.BAD_REQUEST, 'No request provided.');
×
270
        }
271

272
        if (!request.verb || !request.path) {
×
273
            return end(
×
274
                StatusCodes.BAD_REQUEST,
275
                `Request missing verb and/or path. Verb: ${request.verb}, Path: ${request.path}`
276
            );
277
        }
278

279
        if (request.verb.toUpperCase() !== POST && request.verb.toUpperCase() !== GET) {
×
280
            return end(
×
281
                StatusCodes.METHOD_NOT_ALLOWED,
282
                `Invalid verb received. Only GET and POST are accepted. Verb: ${request.verb}`
283
            );
284
        }
285

286
        if (request.path.toLowerCase() === VERSION_PATH) {
×
287
            if (request.verb.toUpperCase() === GET) {
×
288
                return end(StatusCodes.OK, { UserAgent: USER_AGENT });
×
289
            } else {
290
                return end(
×
291
                    StatusCodes.METHOD_NOT_ALLOWED,
292
                    `Invalid verb received for path: ${request.path}. Only GET is accepted. Verb: ${request.verb}`
293
                );
294
            }
295
        }
296

297
        const [activityStream, ...attachmentStreams] = request.streams;
×
298

299
        let activity: Activity;
300
        try {
×
301
            activity = validateAndFixActivity(ActivityT.parse(await activityStream.readAsJson()));
×
302

303
            activity.attachments = await Promise.all(
×
304
                attachmentStreams.map(async (attachmentStream) => {
×
305
                    const contentType = attachmentStream.contentType;
×
306

307
                    const content =
308
                        contentType === 'application/json'
×
309
                            ? await attachmentStream.readAsJson()
×
310
                            : await attachmentStream.readAsString();
311

312
                    return { contentType, content };
×
313
                })
314
            );
315
        } catch (err) {
316
            return end(StatusCodes.BAD_REQUEST, `Request body missing or malformed: ${err}`);
×
317
        }
318

319
        try {
×
320
            const invokeResponse = await this.processActivity(this.authenticateRequestResult, activity);
×
321
            return end(invokeResponse?.status ?? StatusCodes.OK, invokeResponse?.body);
×
322
        } catch (err) {
323
            return end(StatusCodes.INTERNAL_SERVER_ERROR, err.message ?? err);
×
324
        }
325
    }
326
}
327

328
/**
329
 * @internal
330
 */
331
class StreamingConnectorFactory implements ConnectorFactory {
332
    private serviceUrl?: string;
333

334
    constructor(private readonly requestHandler: StreamingRequestHandler) {}
2✔
335

336
    async create(serviceUrl: string, _audience: string): Promise<ConnectorClient> {
337
        this.serviceUrl ??= serviceUrl;
×
338

339
        if (serviceUrl !== this.serviceUrl) {
×
340
            throw new Error(
×
341
                'This is a streaming scenario, all connectors from this factory must all be for the same url.'
342
            );
343
        }
344

345
        const httpClient = new StreamingHttpClient(this.requestHandler);
×
346

347
        return new ConnectorClient(MicrosoftAppCredentials.Empty, { httpClient });
×
348
    }
349
}
350

351
/**
352
 * @internal
353
 */
354
class StreamingHttpClient implements HttpClient {
355
    constructor(private readonly requestHandler: StreamingRequestHandler) {}
×
356

357
    async sendRequest(httpRequest: WebResource): Promise<HttpOperationResponse> {
358
        const streamingRequest = this.createStreamingRequest(httpRequest);
×
359
        const receiveResponse = await this.requestHandler.server?.send(streamingRequest);
×
360
        return this.createHttpResponse(receiveResponse, httpRequest);
×
361
    }
362

363
    private createStreamingRequest(httpRequest: WebResource): StreamingRequest {
364
        const verb = httpRequest.method.toString();
×
365
        const path = httpRequest.url.slice(httpRequest.url.indexOf('/v3'));
×
366

367
        const request = StreamingRequest.create(verb, path);
×
368
        request.setBody(httpRequest.body);
×
369

370
        return request;
×
371
    }
372

373
    private async createHttpResponse(
374
        receiveResponse: IReceiveResponse,
375
        httpRequest: WebResource
376
    ): Promise<HttpOperationResponse> {
377
        const [bodyAsText] =
378
            (await Promise.all(receiveResponse.streams?.map((stream) => stream.readAsString()) ?? [])) ?? [];
×
379

380
        return {
×
381
            bodyAsText,
382
            headers: new HttpHeaders(),
383
            request: httpRequest,
384
            status: receiveResponse.statusCode,
385
        };
386
    }
387
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc