• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-js / 6395137128

03 Oct 2023 03:29PM CUT coverage: 84.903% (+0.03%) from 84.869%
6395137128

push

github

web-flow
port: [#4527][#6655] Implementation of Teams batch APIs (#4535)

* add teams batch operations

* update teamsInfo logic

* add unit tests for batch operations

* fix retry logic

* apply code feedback

* Fix lint issue

---------

Co-authored-by: JhontSouth <jhonatan.sandoval@southworks.com>

9955 of 12972 branches covered (0.0%)

Branch coverage included in aggregate %.

84 of 84 new or added lines in 7 files covered. (100.0%)

20256 of 22611 relevant lines covered (89.58%)

7158.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.05
/libraries/botbuilder-azure-blobs/src/blobsTranscriptStore.ts
1
// Copyright (c) Microsoft Corporation.
2
// Licensed under the MIT License.
3

4
import * as z from 'zod';
2✔
5
import getStream from 'get-stream';
2✔
6
import pmap from 'p-map';
2✔
7
import { Activity, PagedResult, TranscriptInfo, TranscriptStore } from 'botbuilder-core';
8
import { maybeCast } from 'botbuilder-stdlib';
2✔
9
import { sanitizeBlobKey } from './sanitizeBlobKey';
2✔
10

11
import {
2✔
12
    ContainerClient,
13
    ContainerListBlobHierarchySegmentResponse,
14
    StoragePipelineOptions,
15
} from '@azure/storage-blob';
16

17
// Formats a timestamp in a way that is consistent with the C# SDK
18
function formatTicks(timestamp: Date): string {
19
    const epochTicks = 621355968000000000; // the number of .net ticks at the unix epoch
×
20
    const ticksPerMillisecond = 10000; // there are 10000 .net ticks per millisecond
×
21
    const ticks = epochTicks + timestamp.getTime() * ticksPerMillisecond;
×
22
    return ticks.toString(16);
×
23
}
24

25
// Formats a channelId as a blob prefix
26
function getChannelPrefix(channelId: string): string {
27
    return sanitizeBlobKey(`${channelId}/`);
×
28
}
29

30
// Formats a channelId and conversationId as a blob prefix
31
function getConversationPrefix(channelId: string, conversationId: string): string {
32
    return sanitizeBlobKey(`${channelId}/${conversationId}`);
×
33
}
34

35
// Formats an activity as a blob key
36
function getBlobKey(activity: Activity, options?: BlobsTranscriptStoreOptions): string {
37
    const { timestamp } = z
×
38
        .object({ timestamp: z.instanceof(Date) })
39
        .nonstrict()
40
        .parse(activity);
41

42
    return sanitizeBlobKey(
×
43
        [activity.channelId, activity.conversation.id, `${formatTicks(timestamp)}-${activity.id}.json`].join('/'),
44
        options
45
    );
46
}
47

48
// Max number of results returned in a single Azure API call
49
const MAX_PAGE_SIZE = 20;
2✔
50

51
/**
52
 * Optional settings for BlobsTranscriptStore
53
 */
54
export interface BlobsTranscriptStoreOptions {
55
    /**
56
     * [StoragePipelineOptions](xref:@azure/storage-blob.StoragePipelineOptions) to pass to azure blob
57
     * storage client
58
     */
59
    storagePipelineOptions?: StoragePipelineOptions;
60

61
    /**
62
     * Optional setting to return a new string representing the decoded version of the given encoded blob transcript key.
63
     * This remains the default behavior to false, but can be overridden by setting decodeTranscriptKey to true.
64
     */
65
    decodeTranscriptKey?: boolean;
66
}
67

68
/**
69
 * BlobsTranscriptStore is a [TranscriptStore](xref:botbuilder-core.TranscriptStore) that persists
70
 * transcripts in Azure Blob Storage
71
 *
72
 * @summary
73
 * Each activity is stored as JSON blob with a key of
74
 * `container/{channelId]/{conversationId}/{Timestamp.ticks}-{activity.id}.json`.
75
 */
76
export class BlobsTranscriptStore implements TranscriptStore {
2✔
77
    private readonly _containerClient: ContainerClient;
78
    private readonly _concurrency = Infinity;
6✔
79
    private _initializePromise?: Promise<unknown>;
80
    private _isDecodeTranscriptKey?: boolean = false;
6✔
81

82
    /**
83
     * Constructs a BlobsTranscriptStore instance.
84
     *
85
     * @param {string} connectionString Azure Blob Storage connection string
86
     * @param {string} containerName Azure Blob Storage container name
87
     * @param {BlobsTranscriptStoreOptions} options Other options for BlobsTranscriptStore
88
     */
89
    constructor(connectionString: string, containerName: string, options?: BlobsTranscriptStoreOptions) {
90
        z.object({ connectionString: z.string(), containerName: z.string() }).parse({
6✔
91
            connectionString,
92
            containerName,
93
        });
94

95
        this._containerClient = new ContainerClient(connectionString, containerName, options?.storagePipelineOptions);
2!
96

97
        this._isDecodeTranscriptKey = options?.decodeTranscriptKey;
2!
98

99
        // At most one promise at a time to be friendly to local emulator users
100
        if (connectionString.trim() === 'UseDevelopmentStorage=true;') {
2!
101
            this._concurrency = 1;
2✔
102
        }
103
    }
104

105
    // Protects against JSON.stringify cycles
106
    private toJSON(): unknown {
107
        return { name: 'BlobsTranscriptStore' };
×
108
    }
109

110
    private _initialize(): Promise<unknown> {
111
        if (!this._initializePromise) {
×
112
            this._initializePromise = this._containerClient.createIfNotExists();
×
113
        }
114
        return this._initializePromise;
×
115
    }
116

117
    /**
118
     * Get activities for a conversation (aka the transcript).
119
     *
120
     * @param {string} channelId channelId
121
     * @param {string} conversationId conversationId
122
     * @param {string} continuationToken continuation token to page through results
123
     * @param {Date} startDate earliest time to include in results
124
     * @returns {Promise<PagedResult<Activity>>} Promise that resolves to a
125
     * [PagedResult](xref:botbuilder-core.PagedResult) of [Activity](xref:botbuilder-core.Activity) items
126
     */
127
    async getTranscriptActivities(
128
        channelId: string,
129
        conversationId: string,
130
        continuationToken?: string,
131
        startDate?: Date
132
    ): Promise<PagedResult<Activity>> {
133
        z.object({ channelId: z.string(), conversationId: z.string() }).parse({ channelId, conversationId });
×
134

135
        await this._initialize();
×
136

137
        const iter = this._containerClient
×
138
            .listBlobsByHierarchy('/', {
139
                prefix: getConversationPrefix(channelId, conversationId),
140
            })
141
            .byPage({ continuationToken, maxPageSize: MAX_PAGE_SIZE });
142

143
        let page = await iter.next();
×
144
        const result: Activity[] = [];
×
145
        let response: ContainerListBlobHierarchySegmentResponse | undefined;
146
        while (!page.done) {
×
147
            // Note: azure library does not properly type iterator result, hence the need to cast
148
            response = maybeCast<ContainerListBlobHierarchySegmentResponse>(page?.value ?? {});
×
149
            const blobItems = response?.segment?.blobItems ?? [];
×
150

151
            // Locate first index of results to slice from. If we have a start date, we want to return
152
            // activities after that start date. Otherwise we can simply return all activities in this page.
153
            const fromIdx =
154
                startDate != null
×
155
                    ? blobItems.findIndex(
×
156
                          (blobItem) => blobItem?.properties?.createdOn && blobItem?.properties?.createdOn >= startDate
×
157
                      )
158
                    : 0;
159

160
            if (fromIdx !== -1) {
×
161
                const activities = await pmap(
×
162
                    blobItems.slice(fromIdx),
163
                    async (blobItem) => {
×
164
                        const blob = await this._containerClient.getBlobClient(blobItem.name).download();
×
165

166
                        const { readableStreamBody: stream } = blob;
×
167
                        if (!stream) {
×
168
                            return null;
×
169
                        }
170

171
                        const contents = await getStream(stream);
×
172

173
                        const activity = JSON.parse(contents);
×
174
                        return { ...activity, timestamp: new Date(activity.timestamp) } as Activity;
×
175
                    },
176
                    { concurrency: this._concurrency }
177
                );
178

179
                activities.forEach((activity) => {
×
180
                    if (activity) result.push(activity);
×
181
                });
182
            }
183

184
            page = await iter.next();
×
185
        }
186

187
        return {
×
188
            continuationToken: response?.continuationToken ?? '',
×
189
            items: result.reduce<Activity[]>((acc, activity) => (activity ? acc.concat(activity) : acc), []),
×
190
        };
191
    }
192

193
    /**
194
     * List conversations in the channelId.
195
     *
196
     * @param {string} channelId channelId
197
     * @param {string} continuationToken continuation token to page through results
198
     * @returns {Promise<PagedResult<TranscriptInfo>>} Promise that resolves to a
199
     * [PagedResult](xref:botbuilder-core.PagedResult) of [Activity](xref:botbuilder-core.Activity) items
200
     */
201
    async listTranscripts(channelId: string, continuationToken?: string): Promise<PagedResult<TranscriptInfo>> {
202
        z.object({ channelId: z.string() }).parse({ channelId });
×
203

204
        await this._initialize();
×
205

206
        const iter = this._containerClient
×
207
            .listBlobsByHierarchy('/', {
208
                prefix: getChannelPrefix(channelId),
209
            })
210
            .byPage({ continuationToken, maxPageSize: MAX_PAGE_SIZE });
211

212
        let page = await iter.next();
×
213
        const result: any[] = [];
×
214
        let response: ContainerListBlobHierarchySegmentResponse | undefined;
215

216
        while (!page.done) {
×
217
            // Note: azure library does not properly type iterator result, hence the need to cast
218
            const response = maybeCast<ContainerListBlobHierarchySegmentResponse>(page?.value ?? {});
×
219
            const blobItems = response?.segment?.blobItems ?? [];
×
220

221
            const items = blobItems.map((blobItem) => {
×
222
                const [, id] = decodeURIComponent(blobItem.name).split('/');
×
223

224
                const created = blobItem.metadata?.timestamp ? new Date(blobItem.metadata.timestamp) : new Date();
×
225

226
                return { channelId, created, id };
×
227
            });
228

229
            items.forEach((transcript) => {
×
230
                if (transcript) result.push(transcript);
×
231
            });
232

233
            page = await iter.next();
×
234
        }
235

236
        return {
×
237
            continuationToken: response?.continuationToken ?? '',
×
238
            items: result ?? [],
×
239
        };
240
    }
241

242
    /**
243
     * Delete a specific conversation and all of its activities.
244
     *
245
     * @param {string} channelId channelId
246
     * @param {string} conversationId conversationId
247
     * @returns {Promise<void>} A promise representing the async operation.
248
     */
249
    async deleteTranscript(channelId: string, conversationId: string): Promise<void> {
250
        z.object({ channelId: z.string(), conversationId: z.string() }).parse({ channelId, conversationId });
×
251

252
        await this._initialize();
×
253

254
        const iter = this._containerClient
×
255
            .listBlobsByHierarchy('/', {
256
                prefix: getConversationPrefix(channelId, conversationId),
257
            })
258
            .byPage({
259
                maxPageSize: MAX_PAGE_SIZE,
260
            });
261

262
        let page = await iter.next();
×
263
        while (!page.done) {
×
264
            // Note: azure library does not properly type iterator result, hence the need to cast
265
            const response = maybeCast<ContainerListBlobHierarchySegmentResponse>(page?.value ?? {});
×
266
            const blobItems = response?.segment?.blobItems ?? [];
×
267

268
            await pmap(blobItems, (blobItem) => this._containerClient.deleteBlob(blobItem.name), {
×
269
                concurrency: this._concurrency,
270
            });
271

272
            page = await iter.next();
×
273
        }
274
    }
275

276
    /**
277
     * Log an activity to the transcript.
278
     *
279
     * @param {Activity} activity activity to log
280
     * @param {BlobsTranscriptStoreOptions} options Optional settings for BlobsTranscriptStore
281
     * @returns {Promise<void>} A promise representing the async operation.
282
     */
283
    async logActivity(activity: Activity, options?: BlobsTranscriptStoreOptions): Promise<void> {
284
        z.object({ activity: z.record(z.unknown()) }).parse({ activity });
×
285

286
        await this._initialize();
×
287

288
        const blob = this._containerClient.getBlockBlobClient(getBlobKey(activity, options));
×
289
        const serialized = JSON.stringify(activity);
×
290

291
        const metadata: Record<string, string> = {
×
292
            FromId: activity.from.id,
293
            RecipientId: activity.recipient.id,
294
        };
295

296
        if (activity.id) {
×
297
            metadata.Id = activity.id;
×
298
        }
299

300
        if (activity.timestamp) {
×
301
            metadata.Timestamp = activity.timestamp.toJSON();
×
302
        }
303

304
        await blob.upload(serialized, serialized.length, { metadata });
×
305
    }
306
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc