• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-js / 12436188280

20 Dec 2024 06:01PM CUT coverage: 84.625%. Remained the same
12436188280

Pull #4838

github

web-flow
Merge 18324efcf into 3b8fcab21
Pull Request #4838: fix: ESLint issues in botbuilder-repo-utils

8185 of 10821 branches covered (75.64%)

Branch coverage included in aggregate %.

20513 of 23091 relevant lines covered (88.84%)

7402.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.6
/libraries/botbuilder-azure-blobs/src/blobsTranscriptStore.ts
1
// Copyright (c) Microsoft Corporation.
2
// Licensed under the MIT License.
3

4
import * as z from 'zod';
2✔
5
import StreamConsumers from 'stream/consumers';
2✔
6
import pmap from 'p-map';
2✔
7
import { Activity, PagedResult, TranscriptInfo, TranscriptStore } from 'botbuilder-core';
8
import { maybeCast } from 'botbuilder-stdlib';
2✔
9
import { sanitizeBlobKey } from './sanitizeBlobKey';
2✔
10

11
import {
2✔
12
    AnonymousCredential,
13
    ContainerClient,
14
    ContainerListBlobHierarchySegmentResponse,
15
    StoragePipelineOptions,
16
    StorageSharedKeyCredential,
17
} from '@azure/storage-blob';
18
import { isTokenCredential, TokenCredential } from '@azure/core-http';
2✔
19

20
// Formats a timestamp in a way that is consistent with the C# SDK
21
function formatTicks(timestamp: Date): string {
22
    const epochTicks = 621355968000000000; // the number of .net ticks at the unix epoch
×
23
    const ticksPerMillisecond = 10000; // there are 10000 .net ticks per millisecond
×
24
    const ticks = epochTicks + timestamp.getTime() * ticksPerMillisecond;
×
25
    return ticks.toString(16);
×
26
}
27

28
// Formats a channelId as a blob prefix
29
function getChannelPrefix(channelId: string): string {
30
    return sanitizeBlobKey(`${channelId}/`);
×
31
}
32

33
// Formats a channelId and conversationId as a blob prefix
34
function getConversationPrefix(channelId: string, conversationId: string): string {
35
    return sanitizeBlobKey(`${channelId}/${conversationId}`);
×
36
}
37

38
// Formats an activity as a blob key
39
function getBlobKey(activity: Activity, options?: BlobsTranscriptStoreOptions): string {
40
    const { timestamp } = z
×
41
        .object({ timestamp: z.instanceof(Date) })
42
        .nonstrict()
43
        .parse(activity);
44

45
    return sanitizeBlobKey(
×
46
        [activity.channelId, activity.conversation.id, `${formatTicks(timestamp)}-${activity.id}.json`].join('/'),
47
        options,
48
    );
49
}
50

51
function isCredentialType(value: any): value is TokenCredential {
52
    return (
2✔
53
        isTokenCredential(value) || value instanceof StorageSharedKeyCredential || value instanceof AnonymousCredential
6✔
54
    );
55
}
56

57
// Max number of results returned in a single Azure API call
58
const MAX_PAGE_SIZE = 20;
2✔
59

60
/**
61
 * Optional settings for BlobsTranscriptStore
62
 */
63
export interface BlobsTranscriptStoreOptions {
64
    /**
65
     * [StoragePipelineOptions](xref:@azure/storage-blob.StoragePipelineOptions) to pass to azure blob
66
     * storage client
67
     */
68
    storagePipelineOptions?: StoragePipelineOptions;
69

70
    /**
71
     * Optional setting to return a new string representing the decoded version of the given encoded blob transcript key.
72
     * This remains the default behavior to false, but can be overridden by setting decodeTranscriptKey to true.
73
     */
74
    decodeTranscriptKey?: boolean;
75
}
76

77
/**
78
 * BlobsTranscriptStore is a [TranscriptStore](xref:botbuilder-core.TranscriptStore) that persists
79
 * transcripts in Azure Blob Storage
80
 *
81
 * @summary
82
 * Each activity is stored as JSON blob with a key of
83
 * `container/{channelId]/{conversationId}/{Timestamp.ticks}-{activity.id}.json`.
84
 */
85
export class BlobsTranscriptStore implements TranscriptStore {
2✔
86
    private readonly _containerClient: ContainerClient;
87
    private readonly _concurrency = Infinity;
10✔
88
    private _initializePromise?: Promise<unknown>;
89
    private _isDecodeTranscriptKey?: boolean = false;
10✔
90

91
    /**
92
     * Constructs a BlobsTranscriptStore instance.
93
     *
94
     * @param {string} connectionString Azure Blob Storage connection string
95
     * @param {string} containerName Azure Blob Storage container name
96
     * @param {BlobsTranscriptStoreOptions} options Other options for BlobsTranscriptStore
97
     * @param {string} blobServiceUri A Uri referencing the blob container that includes the name of the account and the name of the container
98
     * @param {StorageSharedKeyCredential | AnonymousCredential | TokenCredential} tokenCredential The token credential to authenticate to the Azure storage
99
     */
100
    constructor(
101
        connectionString: string,
102
        containerName: string,
103
        options?: BlobsTranscriptStoreOptions,
104
        blobServiceUri = '',
6✔
105
        tokenCredential?: StorageSharedKeyCredential | AnonymousCredential | TokenCredential,
106
    ) {
107
        if (blobServiceUri != '' && tokenCredential != null) {
10✔
108
            z.object({ blobServiceUri: z.string() }).parse({
2✔
109
                blobServiceUri,
110
            });
111

112
            if (typeof tokenCredential != 'object' || !isCredentialType(tokenCredential)) {
2✔
113
                throw new ReferenceError('Invalid credential type.');
2✔
114
            }
115

116
            this._containerClient = new ContainerClient(
×
117
                blobServiceUri,
118
                tokenCredential,
119
                options?.storagePipelineOptions,
×
120
            );
121

122
            // At most one promise at a time to be friendly to local emulator users
123
            if (blobServiceUri.trim() === 'UseDevelopmentStorage=true;') {
×
124
                this._concurrency = 1;
×
125
            }
126
        } else {
127
            z.object({ connectionString: z.string(), containerName: z.string() }).parse({
8✔
128
                connectionString,
129
                containerName,
130
            });
131

132
            this._containerClient = new ContainerClient(
2✔
133
                connectionString,
134
                containerName,
135
                options?.storagePipelineOptions,
6!
136
            );
137

138
            // At most one promise at a time to be friendly to local emulator users
139
            if (connectionString.trim() === 'UseDevelopmentStorage=true;') {
2✔
140
                this._concurrency = 1;
2✔
141
            }
142
        }
143

144
        this._isDecodeTranscriptKey = options?.decodeTranscriptKey;
2!
145
    }
146

147
    // Protects against JSON.stringify cycles
148
    private toJSON(): unknown {
149
        return { name: 'BlobsTranscriptStore' };
×
150
    }
151

152
    private _initialize(): Promise<unknown> {
153
        if (!this._initializePromise) {
×
154
            this._initializePromise = this._containerClient.createIfNotExists();
×
155
        }
156
        return this._initializePromise;
×
157
    }
158

159
    /**
160
     * Get activities for a conversation (aka the transcript).
161
     *
162
     * @param {string} channelId channelId
163
     * @param {string} conversationId conversationId
164
     * @param {string} continuationToken continuation token to page through results
165
     * @param {Date} startDate earliest time to include in results
166
     * @returns {Promise<PagedResult<Activity>>} Promise that resolves to a
167
     * [PagedResult](xref:botbuilder-core.PagedResult) of [Activity](xref:botbuilder-core.Activity) items
168
     */
169
    async getTranscriptActivities(
170
        channelId: string,
171
        conversationId: string,
172
        continuationToken?: string,
173
        startDate?: Date,
174
    ): Promise<PagedResult<Activity>> {
175
        z.object({ channelId: z.string(), conversationId: z.string() }).parse({ channelId, conversationId });
×
176

177
        await this._initialize();
×
178

179
        const iter = this._containerClient
×
180
            .listBlobsByHierarchy('/', {
181
                prefix: getConversationPrefix(channelId, conversationId),
182
            })
183
            .byPage({ continuationToken, maxPageSize: MAX_PAGE_SIZE });
184

185
        let page = await iter.next();
×
186
        const result: Activity[] = [];
×
187
        let response: ContainerListBlobHierarchySegmentResponse | undefined;
188
        while (!page.done) {
×
189
            // Note: azure library does not properly type iterator result, hence the need to cast
190
            response = maybeCast<ContainerListBlobHierarchySegmentResponse>(page?.value ?? {});
×
191
            const blobItems = response?.segment?.blobItems ?? [];
×
192

193
            // Locate first index of results to slice from. If we have a start date, we want to return
194
            // activities after that start date. Otherwise we can simply return all activities in this page.
195
            const fromIdx =
196
                startDate != null
×
197
                    ? blobItems.findIndex(
×
198
                          (blobItem) => blobItem?.properties?.createdOn && blobItem?.properties?.createdOn >= startDate,
×
199
                      )
200
                    : 0;
201

202
            if (fromIdx !== -1) {
×
203
                const activities = await pmap(
×
204
                    blobItems.slice(fromIdx),
205
                    async (blobItem) => {
×
206
                        const blob = await this._containerClient.getBlobClient(blobItem.name).download();
×
207

208
                        const { readableStreamBody } = blob;
×
209
                        if (!readableStreamBody) {
×
210
                            return null;
×
211
                        }
212

213
                        const activity = (await StreamConsumers.json(readableStreamBody)) as any;
×
214
                        return { ...activity, timestamp: new Date(activity.timestamp) } as Activity;
×
215
                    },
216
                    { concurrency: this._concurrency },
217
                );
218

219
                activities.forEach((activity) => {
×
220
                    if (activity) result.push(activity);
×
221
                });
222
            }
223

224
            page = await iter.next();
×
225
        }
226

227
        return {
×
228
            continuationToken: response?.continuationToken ?? '',
×
229
            items: result.reduce<Activity[]>((acc, activity) => (activity ? acc.concat(activity) : acc), []),
×
230
        };
231
    }
232

233
    /**
234
     * List conversations in the channelId.
235
     *
236
     * @param {string} channelId channelId
237
     * @param {string} continuationToken continuation token to page through results
238
     * @returns {Promise<PagedResult<TranscriptInfo>>} Promise that resolves to a
239
     * [PagedResult](xref:botbuilder-core.PagedResult) of [Activity](xref:botbuilder-core.Activity) items
240
     */
241
    async listTranscripts(channelId: string, continuationToken?: string): Promise<PagedResult<TranscriptInfo>> {
242
        z.object({ channelId: z.string() }).parse({ channelId });
×
243

244
        await this._initialize();
×
245

246
        const iter = this._containerClient
×
247
            .listBlobsByHierarchy('/', {
248
                prefix: getChannelPrefix(channelId),
249
            })
250
            .byPage({ continuationToken, maxPageSize: MAX_PAGE_SIZE });
251

252
        let page = await iter.next();
×
253
        const result: any[] = [];
×
254
        let response: ContainerListBlobHierarchySegmentResponse | undefined;
255

256
        while (!page.done) {
×
257
            // Note: azure library does not properly type iterator result, hence the need to cast
258
            const response = maybeCast<ContainerListBlobHierarchySegmentResponse>(page?.value ?? {});
×
259
            const blobItems = response?.segment?.blobItems ?? [];
×
260

261
            const items = blobItems.map((blobItem) => {
×
262
                const [, id] = decodeURIComponent(blobItem.name).split('/');
×
263

264
                const created = blobItem.metadata?.timestamp ? new Date(blobItem.metadata.timestamp) : new Date();
×
265

266
                return { channelId, created, id };
×
267
            });
268

269
            items.forEach((transcript) => {
×
270
                if (transcript) result.push(transcript);
×
271
            });
272

273
            page = await iter.next();
×
274
        }
275

276
        return {
×
277
            continuationToken: response?.continuationToken ?? '',
×
278
            items: result ?? [],
×
279
        };
280
    }
281

282
    /**
283
     * Delete a specific conversation and all of its activities.
284
     *
285
     * @param {string} channelId channelId
286
     * @param {string} conversationId conversationId
287
     * @returns {Promise<void>} A promise representing the async operation.
288
     */
289
    async deleteTranscript(channelId: string, conversationId: string): Promise<void> {
290
        z.object({ channelId: z.string(), conversationId: z.string() }).parse({ channelId, conversationId });
×
291

292
        await this._initialize();
×
293

294
        const iter = this._containerClient
×
295
            .listBlobsByHierarchy('/', {
296
                prefix: getConversationPrefix(channelId, conversationId),
297
            })
298
            .byPage({
299
                maxPageSize: MAX_PAGE_SIZE,
300
            });
301

302
        let page = await iter.next();
×
303
        while (!page.done) {
×
304
            // Note: azure library does not properly type iterator result, hence the need to cast
305
            const response = maybeCast<ContainerListBlobHierarchySegmentResponse>(page?.value ?? {});
×
306
            const blobItems = response?.segment?.blobItems ?? [];
×
307

308
            await pmap(blobItems, (blobItem) => this._containerClient.deleteBlob(blobItem.name), {
×
309
                concurrency: this._concurrency,
310
            });
311

312
            page = await iter.next();
×
313
        }
314
    }
315

316
    /**
317
     * Log an activity to the transcript.
318
     *
319
     * @param {Activity} activity activity to log
320
     * @param {BlobsTranscriptStoreOptions} options Optional settings for BlobsTranscriptStore
321
     * @returns {Promise<void>} A promise representing the async operation.
322
     */
323
    async logActivity(activity: Activity, options?: BlobsTranscriptStoreOptions): Promise<void> {
324
        z.object({ activity: z.record(z.unknown()) }).parse({ activity });
×
325

326
        await this._initialize();
×
327

328
        const blob = this._containerClient.getBlockBlobClient(getBlobKey(activity, options));
×
329
        const serialized = JSON.stringify(activity);
×
330

331
        const metadata: Record<string, string> = {
×
332
            FromId: activity.from.id,
333
            RecipientId: activity.recipient.id,
334
        };
335

336
        if (activity.id) {
×
337
            metadata.Id = activity.id;
×
338
        }
339

340
        if (activity.timestamp) {
×
341
            metadata.Timestamp = activity.timestamp.toJSON();
×
342
        }
343

344
        await blob.upload(serialized, serialized.length, { metadata });
×
345
    }
346
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc