• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-js / 3698619628

pending completion
3698619628

Pull #4389

github

GitHub
Merge 33a6b02b4 into e5f7e3abe
Pull Request #4389: chore(deps): bump express from 4.17.1 to 4.17.3

9686 of 12680 branches covered (76.39%)

Branch coverage included in aggregate %.

19968 of 22357 relevant lines covered (89.31%)

3197.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.89
/libraries/botbuilder-azure-blobs/src/blobsTranscriptStore.ts
1
// Copyright (c) Microsoft Corporation.
2
// Licensed under the MIT License.
3

4
import * as z from 'zod';
1✔
5
import getStream from 'get-stream';
1✔
6
import pmap from 'p-map';
1✔
7
import { Activity, PagedResult, TranscriptInfo, TranscriptStore } from 'botbuilder-core';
8
import { maybeCast } from 'botbuilder-stdlib';
1✔
9
import { sanitizeBlobKey } from './sanitizeBlobKey';
1✔
10

11
import {
1✔
12
    ContainerClient,
13
    ContainerListBlobHierarchySegmentResponse,
14
    StoragePipelineOptions,
15
} from '@azure/storage-blob';
16

17
// Formats a timestamp in a way that is consistent with the C# SDK
18
function formatTicks(timestamp: Date): string {
19
    const epochTicks = 621355968000000000; // the number of .net ticks at the unix epoch
×
20
    const ticksPerMillisecond = 10000; // there are 10000 .net ticks per millisecond
×
21
    const ticks = epochTicks + timestamp.getTime() * ticksPerMillisecond;
×
22
    return ticks.toString(16);
×
23
}
24

25
// Formats a channelId as a blob prefix
26
function getChannelPrefix(channelId: string): string {
27
    return sanitizeBlobKey(`${channelId}/`);
×
28
}
29

30
// Formats a channelId and conversationId as a blob prefix
31
function getConversationPrefix(channelId: string, conversationId: string): string {
32
    return sanitizeBlobKey(`${channelId}/${conversationId}`);
×
33
}
34

35
// Formats an activity as a blob key
36
function getBlobKey(activity: Activity, options?: BlobsTranscriptStoreOptions): string {
37
    const { timestamp } = z
×
38
        .object({ timestamp: z.instanceof(Date) })
39
        .nonstrict()
40
        .parse(activity);
41

42
    return sanitizeBlobKey(
×
43
        [activity.channelId, activity.conversation.id, `${formatTicks(timestamp)}-${activity.id}.json`].join('/'),
44
        options
45
    );
46
}
47

48
// Max number of results returned in a single Azure API call
49
const MAX_PAGE_SIZE = 20;
1✔
50

51
/**
52
 * Optional settings for BlobsTranscriptStore
53
 */
54
export interface BlobsTranscriptStoreOptions {
55
    /**
56
     * [StoragePipelineOptions](xref:@azure/storage-blob.StoragePipelineOptions) to pass to azure blob
57
     * storage client
58
     */
59
    storagePipelineOptions?: StoragePipelineOptions;
60

61
    /**
62
     * Optional setting to return a new string representing the decoded version of the given encoded blob transcript key.
63
     * This remains the default behavior to false, but can be overridden by setting decodeTranscriptKey to true.
64
     */
65
    decodeTranscriptKey?: boolean;
66
}
67

68
/**
69
 * BlobsTranscriptStore is a [TranscriptStore](xref:botbuilder-core.TranscriptStore) that persists
70
 * transcripts in Azure Blob Storage
71
 *
72
 * @summary
73
 * Each activity is stored as JSON blob with a key of
74
 * `container/{channelId]/{conversationId}/{Timestamp.ticks}-{activity.id}.json`.
75
 */
76
export class BlobsTranscriptStore implements TranscriptStore {
1✔
77
    private readonly _containerClient: ContainerClient;
78
    private readonly _concurrency = Infinity;
3✔
79
    private _initializePromise?: Promise<unknown>;
80
    private _isDecodeTranscriptKey?: boolean = false;
3✔
81

82
    /**
83
     * Constructs a BlobsTranscriptStore instance.
84
     *
85
     * @param {string} connectionString Azure Blob Storage connection string
86
     * @param {string} containerName Azure Blob Storage container name
87
     * @param {BlobsTranscriptStoreOptions} options Other options for BlobsTranscriptStore
88
     */
89
    constructor(connectionString: string, containerName: string, options?: BlobsTranscriptStoreOptions) {
90
        z.object({ connectionString: z.string(), containerName: z.string() }).parse({
3✔
91
            connectionString,
92
            containerName,
93
        });
94

95
        this._containerClient = new ContainerClient(connectionString, containerName, options?.storagePipelineOptions);
1!
96

97
        this._isDecodeTranscriptKey = options?.decodeTranscriptKey;
1!
98

99
        // At most one promise at a time to be friendly to local emulator users
100
        if (connectionString.trim() === 'UseDevelopmentStorage=true;') {
1!
101
            this._concurrency = 1;
1✔
102
        }
103
    }
104

105
    // Protects against JSON.stringify cycles
106
    private toJSON(): unknown {
107
        return { name: 'BlobsTranscriptStore' };
×
108
    }
109

110
    private _initialize(): Promise<unknown> {
111
        if (!this._initializePromise) {
×
112
            this._initializePromise = this._containerClient.createIfNotExists();
×
113
        }
114
        return this._initializePromise;
×
115
    }
116

117
    /**
118
     * Get activities for a conversation (aka the transcript).
119
     *
120
     * @param {string} channelId channelId
121
     * @param {string} conversationId conversationId
122
     * @param {string} continuationToken continuation token to page through results
123
     * @param {Date} startDate earliest time to include in results
124
     * @returns {Promise<PagedResult<Activity>>} Promise that resolves to a
125
     * [PagedResult](xref:botbuilder-core.PagedResult) of [Activity](xref:botbuilder-core.Activity) items
126
     */
127
    async getTranscriptActivities(
128
        channelId: string,
129
        conversationId: string,
130
        continuationToken?: string,
131
        startDate?: Date
132
    ): Promise<PagedResult<Activity>> {
133
        z.object({ channelId: z.string(), conversationId: z.string() }).parse({ channelId, conversationId });
×
134

135
        await this._initialize();
×
136

137
        const iter = this._containerClient
×
138
            .listBlobsByHierarchy('/', {
139
                prefix: getConversationPrefix(channelId, conversationId),
140
            })
141
            .byPage({ continuationToken, maxPageSize: MAX_PAGE_SIZE });
142

143
        let page = await iter.next();
×
144
        while (!page.done) {
×
145
            // Note: azure library does not properly type iterator result, hence the need to cast
146
            const response = maybeCast<ContainerListBlobHierarchySegmentResponse>(page?.value ?? {});
×
147
            const blobItems = response?.segment?.blobItems ?? [];
×
148

149
            // Locate first index of results to slice from. If we have a start date, we want to return
150
            // activities after that start date. Otherwise we can simply return all activities in this page.
151
            const fromIdx =
152
                startDate != null
×
153
                    ? blobItems.findIndex(
×
154
                          (blobItem) => blobItem?.properties?.createdOn && blobItem?.properties?.createdOn >= startDate
×
155
                      )
156
                    : 0;
157

158
            if (fromIdx !== -1) {
×
159
                const activities = await pmap(
×
160
                    blobItems.slice(fromIdx),
161
                    async (blobItem) => {
×
162
                        const blob = await this._containerClient.getBlobClient(blobItem.name).download();
×
163

164
                        const { readableStreamBody: stream } = blob;
×
165
                        if (!stream) {
×
166
                            return null;
×
167
                        }
168

169
                        const contents = await getStream(stream);
×
170

171
                        const activity = JSON.parse(contents);
×
172
                        return { ...activity, timestamp: new Date(activity.timestamp) } as Activity;
×
173
                    },
174
                    { concurrency: this._concurrency }
175
                );
176

177
                return {
×
178
                    continuationToken: response?.continuationToken ?? '',
×
179
                    items: activities.reduce<Activity[]>(
180
                        (acc, activity) => (activity ? acc.concat(activity) : acc),
×
181
                        []
182
                    ),
183
                };
184
            }
185

186
            page = await iter.next();
×
187
        }
188

189
        return { continuationToken: '', items: [] };
×
190
    }
191

192
    /**
193
     * List conversations in the channelId.
194
     *
195
     * @param {string} channelId channelId
196
     * @param {string} continuationToken continuation token to page through results
197
     * @returns {Promise<PagedResult<TranscriptInfo>>} Promise that resolves to a
198
     * [PagedResult](xref:botbuilder-core.PagedResult) of [Activity](xref:botbuilder-core.Activity) items
199
     */
200
    async listTranscripts(channelId: string, continuationToken?: string): Promise<PagedResult<TranscriptInfo>> {
201
        z.object({ channelId: z.string() }).parse({ channelId });
×
202

203
        await this._initialize();
×
204

205
        const page = await this._containerClient
×
206
            .listBlobsByHierarchy('/', {
207
                prefix: getChannelPrefix(channelId),
208
            })
209
            .byPage({ continuationToken, maxPageSize: MAX_PAGE_SIZE })
210
            .next();
211

212
        // Note: azure library does not properly type iterator result, hence the need to cast
213
        const response = maybeCast<ContainerListBlobHierarchySegmentResponse>(page?.value ?? {});
×
214
        const blobItems = response?.segment?.blobItems ?? [];
×
215

216
        return {
×
217
            continuationToken: response?.continuationToken ?? '',
×
218
            items: blobItems.map((blobItem) => {
219
                const [, id] = decodeURIComponent(blobItem.name).split('/');
×
220

221
                const created = blobItem.metadata?.timestamp ? new Date(blobItem.metadata.timestamp) : new Date();
×
222

223
                return { channelId, created, id };
×
224
            }),
225
        };
226
    }
227

228
    /**
229
     * Delete a specific conversation and all of its activities.
230
     *
231
     * @param {string} channelId channelId
232
     * @param {string} conversationId conversationId
233
     * @returns {Promise<void>} A promise representing the async operation.
234
     */
235
    async deleteTranscript(channelId: string, conversationId: string): Promise<void> {
236
        z.object({ channelId: z.string(), conversationId: z.string() }).parse({ channelId, conversationId });
×
237

238
        await this._initialize();
×
239

240
        const iter = this._containerClient
×
241
            .listBlobsByHierarchy('/', {
242
                prefix: getConversationPrefix(channelId, conversationId),
243
            })
244
            .byPage({
245
                maxPageSize: MAX_PAGE_SIZE,
246
            });
247

248
        let page = await iter.next();
×
249
        while (!page.done) {
×
250
            // Note: azure library does not properly type iterator result, hence the need to cast
251
            const response = maybeCast<ContainerListBlobHierarchySegmentResponse>(page?.value ?? {});
×
252
            const blobItems = response?.segment?.blobItems ?? [];
×
253

254
            await pmap(blobItems, (blobItem) => this._containerClient.deleteBlob(blobItem.name), {
×
255
                concurrency: this._concurrency,
256
            });
257

258
            page = await iter.next();
×
259
        }
260
    }
261

262
    /**
263
     * Log an activity to the transcript.
264
     *
265
     * @param {Activity} activity activity to log
266
     * @param {BlobsTranscriptStoreOptions} options Optional settings for BlobsTranscriptStore
267
     * @returns {Promise<void>} A promise representing the async operation.
268
     */
269
    async logActivity(activity: Activity, options?: BlobsTranscriptStoreOptions): Promise<void> {
270
        z.object({ activity: z.record(z.unknown()) }).parse({ activity });
×
271

272
        await this._initialize();
×
273

274
        const blob = this._containerClient.getBlockBlobClient(getBlobKey(activity, options));
×
275
        const serialized = JSON.stringify(activity);
×
276

277
        const metadata: Record<string, string> = {
×
278
            FromId: activity.from.id,
279
            RecipientId: activity.recipient.id,
280
        };
281

282
        if (activity.id) {
×
283
            metadata.Id = activity.id;
×
284
        }
285

286
        if (activity.timestamp) {
×
287
            metadata.Timestamp = activity.timestamp.toJSON();
×
288
        }
289

290
        await blob.upload(serialized, serialized.length, { metadata });
×
291
    }
292
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc