• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cofacts / rumors-api / 22892408157

10 Mar 2026 07:32AM UTC coverage: 80.99% (-0.4%) from 81.414%
22892408157

Pull #381

github

nonumpa
test: update snapshots for score assertion changes
Pull Request #381: refactor: migrate from Elasticsearch 6.8 to 9.2 and Node 18 to 24

836 of 1105 branches covered (75.66%)

Branch coverage included in aggregate %.

147 of 159 new or added lines in 40 files covered. (92.45%)

8 existing lines in 3 files now uncovered.

1601 of 1904 relevant lines covered (84.09%)

17.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.27
/src/scripts/migrations/createBackendUsers.js
1
/*
2
  A script that loads all documents in db and create user for all occurrences of non-web users.
3

4
  The followings are current references to user records in db:
5
    - analytics: (~1825k entries as of 2020/10)
6
      - (docUserId, docAppId) via docId (reply/article)
7

8
    - articlecategoryfeedbacks: (less than 1k  entries as of 2020/10)
9
      id = ${articleId}__${categoryId}__${userId}__${appId},
10
      - (userId, appId)
11

12
    - articlereplyfeedbacks: (~179k entries as of 2020/10)
13
      id = ${articleId}__${replyId}__${userId}__${appId},
14
      - (userId, appId)
15

16
    - articles:  (~ 40k entries as of 2020/10)
17
      - (userId, appId)
18
      - [references]:
19
          - (userId, appId)
20
      - [articleReplies]:
21
          - (userId, appId)
22
      - [articleCategories]:
23
          - (userId, appId)
24

25
    - replies: (~42k entries as of 2020/10)
26
      - (userId, appId)
27

28
    - replyrequests: (~61k entries as of 2020/10)
29
      id = ${articleId}__${userId}__${appId}
30
      - (userId, appId)
31
      - [feedbacks]:
32
        - (userId, appId)
33
*/
34

35
import 'dotenv/config';
36
import client from 'util/client';
37
import getAllDocs from 'util/getAllDocs';
38
import Bulk from 'util/bulk';
39
import rollbar from 'rollbarInstance';
40
import {
41
  generatePseudonym,
42
  generateOpenPeepsAvatar,
43
  AvatarTypes,
44
  convertAppUserIdToUserId,
45
  isBackendApp,
46
} from 'util/user';
47
import { get, set } from 'lodash';
48

49
const AGG_NAME = 'userIdPair';
1✔
50

51
const AGG_SCRIPT = `
1✔
52
  boolean valueExists(def map, def key) { map.containsKey(key) && map[key] != ''}
53
  String getUserIdPair(def map, def docRef, def emptyAllowed) {
54
    boolean appIdExists = valueExists(map, 'appId') ;
55
    boolean userIdExists = valueExists(map, 'userId');
56
    if (!appIdExists || !userIdExists) {
57
      if (emptyAllowed) {
58
        return '';
59
      } else {
60
        return '{"error":"appId and/or userId do not exist on '+ docRef + '"}';
61
      }
62
    } else {
63
      return '{"appId":"' + map.appId + '", "userId":"' + map.userId + '"}';
64
    }
65
  }
66

67
  int totalLength = 1;
68
  for (field in params.additionalFields) {
69
    if (params._source.containsKey(field)) {
70
      totalLength += params._source[field].length;
71
    }
72
  }
73

74
  String [] userIds = new String[totalLength];
75
  String docRef = doc._index.value + ' ' + doc._id.value;
76
  int currentIndex = 1;
77
  userIds[0] = getUserIdPair(params._source, docRef, false);
78
  for (field in params.additionalFields) {
79
    if (params._source.containsKey(field)) {
80
      boolean emptyAllowed = params.emptyUserIdAllowedFields.containsKey(doc._index + '__' + field);
81
      for (int i = 0; i < params._source[field].length; i++) {
82
        userIds[currentIndex] = getUserIdPair(params._source[field][i], 'some reference of ' + docRef, emptyAllowed);
83
        currentIndex += 1;
84
      }
85
    }
86
  }
87
  return userIds;
88
`;
89

90
const BATCH_SIZE = 10000;
1✔
91
const AGG_BATCH_SIZE = 10000;
1✔
92
const ANALYTICS_BATCH_SIZE = 10000;
1✔
93
const SCROLL_TIMEOUT = '30s';
1✔
94
const SCRIPT_ID = 'appIdUserIdAggScript';
1✔
95

96
const matchAllQuery = {
1✔
97
  match_all: {},
98
};
99

100
const backendUserQuery = {
1✔
101
  bool: {
102
    must_not: [
103
      { term: { appId: 'WEBSITE' } },
104
      { term: { appId: 'DEVELOPMENT_FRONTEND' } },
105
    ],
106
  },
107
};
108

109
const userReferenceInSchema = {
1✔
110
  articlecategoryfeedbacks: {
111
    fields: [],
112
    genId: (doc) =>
113
      `${doc.articleId}__${doc.categoryId}__${doc.userId}__${doc.appId}`,
×
114
  },
115
  articlereplyfeedbacks: {
116
    fields: [],
117
    genId: (doc) =>
118
      `${doc.articleId}__${doc.replyId}__${doc.userId}__${doc.appId}`,
×
119
  },
120
  articles: {
121
    fields: ['references', 'articleReplies', 'articleCategories'],
122
  },
123
  replies: { fields: [] },
124
  replyrequests: {
125
    fields: ['feedbacks'],
126
    genId: (doc) => `${doc.articleId}__${doc.userId}__${doc.appId}`,
×
127
  },
128
};
129

130
const emptyUserIdAllowedFields = { articles: ['articleCategories'] };
1✔
131

132
const logError = (error) => {
1✔
133
  console.error(`createBackendUserError: ${error}`);
×
134
  rollbar.error(`createBackendUserError: ${error}`);
×
135
};
136

137
export default class CreateBackendUsers {
138
  constructor({ batchSize, aggBatchSize, analyticsBatchSize } = {}) {
×
139
    this.userIdMap = {}; // {[appID]: {[appUserId]: dbUserId}}
×
140
    this.reversedUserIdMap = {}; // {[dbUserId]: [appId, appUserId]};
×
141
    this.batchSize = batchSize ?? BATCH_SIZE;
×
142
    this.aggBatchSize = aggBatchSize ?? AGG_BATCH_SIZE;
×
143
    this.analyticsBatchSize = analyticsBatchSize ?? ANALYTICS_BATCH_SIZE;
×
144
    this.bulk = new Bulk(client, this.batchSize);
×
145

146
    this.emptyUserIdAllowedFields = {};
×
147
    for (const index in emptyUserIdAllowedFields) {
×
148
      for (const field of emptyUserIdAllowedFields[index]) {
×
149
        this.emptyUserIdAllowedFields[`${index}__${field}`] = true;
×
150
      }
151
    }
152
  }
153

154
  async storeScriptInDB() {
155
    try {
×
156
      await client.put_script({
×
157
        id: SCRIPT_ID,
158
        body: {
159
          script: {
160
            lang: 'painless',
161
            source: AGG_SCRIPT,
162
          },
163
        },
164
      });
165
    } catch (e) {
166
      logError(e);
×
167
    }
168
  }
169

170
  // buckets is an array of {
171
  //  key: {
172
  //    [AGG_NAME]: "{appId:[appId], userId:[userId]}" || "{error:[errorMessage]}"
173
  //  },
174
  //  doc_count: [doc_count]
175
  // }
176
  async processUsers(buckets) {
177
    let bulkOperations = [];
×
178
    const now = new Date().toISOString();
×
179
    for (const {
×
180
      key: { [AGG_NAME]: userIdPair },
181
    } of buckets) {
182
      if (userIdPair === '') {
×
183
        continue;
×
184
      }
185
      const { userId, appId, error } = JSON.parse(userIdPair);
×
186
      if (error) {
×
187
        logError(error);
×
188
      } else if (isBackendApp(appId)) {
×
189
        const dbUserId = convertAppUserIdToUserId({ appId, appUserId: userId });
×
190
        const appUserId = get(this.reversedUserIdMap, [dbUserId, 1]);
×
191
        // if the hashed id already exists, check for collision
192
        if (appUserId !== undefined) {
×
193
          if (appUserId !== userId) {
×
194
            logError(
×
195
              `collision found! ${userId} and ${appUserId} both hash to ${dbUserId}`
196
            );
197
          }
198
        } else {
199
          set(this.userIdMap, [appId, userId], dbUserId);
×
200
          set(this.reversedUserIdMap, dbUserId, [appId, userId]);
×
201
          bulkOperations.push(
×
202
            {
203
              index: {
204
                _index: 'users',
205
                _id: dbUserId,
206
              },
207
            },
208
            {
209
              name: generatePseudonym(),
210
              avatarType: AvatarTypes.OpenPeeps,
211
              avatarData: JSON.stringify(generateOpenPeepsAvatar()),
212
              appId,
213
              appUserId: userId,
214
              createdAt: now,
215
              updatedAt: now,
216
            }
217
          );
218
        }
219
      }
220
    }
221
    await this.bulk.push(bulkOperations, bulkOperations.length / 2);
×
222
  }
223

224
  // response is of the form {
225
  //   aggregations: { [AGG_NAME]: {
226
  //     after_key: {[AGG_NAME]: [lastKey]},
227
  //     buckets: [{...}]
228
  // }}}
229
  async fetchUniqueUsers(indexName, pageIndex = undefined) {
×
230
    try {
×
231
      const {
232
        aggregations: {
233
          [AGG_NAME]: { buckets },
234
        },
235
      } = await client.search({
×
236
        index: indexName,
237
        size: 0,
238
        body: {
239
          aggs: {
240
            [AGG_NAME]: {
241
              composite: {
242
                size: this.aggBatchSize,
243
                after: pageIndex,
244
                sources: [
245
                  {
246
                    [AGG_NAME]: {
247
                      terms: {
248
                        script: {
249
                          id: SCRIPT_ID,
250
                          params: {
251
                            emptyUserIdAllowedFields:
252
                              this.emptyUserIdAllowedFields,
253
                            additionalFields:
254
                              userReferenceInSchema[indexName].fields,
255
                          },
256
                        },
257
                      },
258
                    },
259
                  },
260
                ],
261
              },
262
            },
263
          },
264
        },
265
      });
266
      if (buckets.length > 0) {
×
267
        const lastKey = { ...buckets[buckets.length - 1].key };
×
268
        await this.processUsers(buckets);
×
269
        return lastKey;
×
270
      }
271
    } catch (e) {
272
      logError(
×
273
        `error while fetching users for indexName:${indexName} with pageIndex ${pageIndex}`
274
      );
275
      logError(e);
×
276
    }
277
  }
278

279
  async fetchUsersFromAllDocs() {
280
    for (const indexName in userReferenceInSchema) {
×
281
      let pageIndex;
282
      do {
×
283
        pageIndex = await this.fetchUniqueUsers(indexName, pageIndex);
×
284
      } while (pageIndex !== undefined);
285
    }
286
    return this;
×
287
  }
288

289
  /**
290
   * A generator that fetches all docs in the specified index.
291
   *
292
   * @param {String} indexName The name of the index to fetch
293
   * @yields {Object} the document
294
   */
295
  async *getAllDocs(indexName, hasAdditionalUserFields = false) {
×
296
    for await (const doc of getAllDocs(
×
297
      indexName,
298
      hasAdditionalUserFields ? matchAllQuery : backendUserQuery,
×
299
      {
300
        scroll: SCROLL_TIMEOUT,
301
        size: this.batchSize,
302
      }
303
    )) {
304
      yield doc;
×
305
    }
306
  }
307

308
  getUserIds(appId, userId) {
309
    return isBackendApp(appId)
×
310
      ? {
311
          userId: get(this.userIdMap, [appId, userId], userId),
312
          appUserId: userId,
313
        }
314
      : { userId };
315
  }
316

317
  async updateAllDocs() {
318
    for (const indexName in userReferenceInSchema) {
×
319
      const { genId, fields } = userReferenceInSchema[indexName];
×
320
      for await (const doc of this.getAllDocs(
×
321
        indexName,
322
        fields && fields.length > 0
×
323
      )) {
324
        const { appId, userId } = doc._source;
×
325
        let newFields = {};
×
326

327
        newFields = this.getUserIds(appId, userId);
×
328
        for (const field of fields) {
×
329
          if (doc._source[field]) {
×
330
            newFields[field] = doc._source[field].map((entry) => ({
×
331
              ...entry,
332
              ...this.getUserIds(entry.appId, entry.userId),
333
            }));
334
          }
335
        }
336
        if (genId !== undefined) {
×
337
          await this.bulk.push(
×
338
            [
339
              {
340
                delete: {
341
                  _index: doc._index,
342
                  _id: doc._id,
343
                },
344
              },
345
              {
346
                index: {
347
                  _index: doc._index,
348
                  _id: genId({ ...doc._source, ...newFields }),
349
                },
350
              },
351
              {
352
                ...doc._source,
353
                ...newFields,
354
              },
355
            ],
356
            2
357
          );
358
        } else {
359
          await this.bulk.push([
×
360
            {
361
              update: {
362
                _index: doc._index,
363
                _id: doc._id,
364
              },
365
            },
366
            {
367
              doc: newFields,
368
            },
369
          ]);
370
        }
371
      }
372
    }
373
    return this;
×
374
  }
375

376
  // Fetches unique doc ids in analytics, and updates analytics entries for each doc
377
  async fetchUniqueDocs(docType, docIndex, pageIndex = undefined) {
×
378
    try {
×
379
      const {
380
        aggregations: {
381
          docIds: { buckets },
382
        },
383
      } = await client.search({
×
384
        index: 'analytics',
385
        size: 0,
386
        body: {
387
          query: { term: { type: docType } },
388
          aggs: {
389
            docIds: {
390
              composite: {
391
                size: this.analyticsBatchSize,
392
                after: pageIndex,
393
                sources: [{ docId: { terms: { field: 'docId' } } }],
394
              },
395
            },
396
          },
397
        },
398
      });
399
      if (buckets.length > 0) {
×
400
        const {
401
          hits: { hits: docs },
UNCOV
402
        } = await client.search({
×
403
          index: docIndex,
404
          size: this.analyticsBatchSize,
405
          body: {
406
            query: {
407
              ids: { values: buckets.map((bucket) => bucket.key.docId) },
×
408
            },
409
            _source: {
410
              includes: ['userId', 'appId'],
411
            },
412
          },
413
        });
414
        const lastKey = { ...buckets[buckets.length - 1].key };
×
415
        await this.updateAnalyticsForDocs(docType, docs);
×
416
        return lastKey;
×
417
      }
418
    } catch (e) {
419
      logError(
×
420
        `error while updating analytics type ${docType} with pageIndex ${
421
          pageIndex ? JSON.stringify(pageIndex) : null
×
422
        }`
423
      );
424
      logError(e);
×
425
    }
426
  }
427

428
  async updateAnalyticsForDocs(docType, docs) {
429
    let totalUpdated = 0;
×
430
    for (const doc of docs) {
×
431
      {
432
        try {
×
433
          const requestBody = {
×
434
            script: {
435
              source: `ctx._source.docAppId = params.docAppId; ctx._source.docUserId = params.docUserId;`,
436
              lang: 'painless',
437
              params: {
438
                docAppId: doc._source.appId,
439
                docUserId: doc._source.userId,
440
              },
441
            },
442
            query: {
443
              bool: {
444
                must: [
445
                  { term: { type: docType } },
446
                  { term: { docId: doc._id } },
447
                ],
448
              },
449
            },
450
          };
NEW
451
          const { updated } = await client.updateByQuery({
×
452
            index: 'analytics',
453
            body: requestBody,
454
            refresh: 'true',
455
          });
456
          totalUpdated += updated;
×
457
        } catch (e) {
458
          logError(
×
459
            `error trying to update analytics user for doc ${docType} ${doc._id}`
460
          );
461
          logError(e);
×
462
        }
463
      }
464
    }
465
    `${totalUpdated} updated for ${docs.length} ${docType} analytics docs`;
×
466
    return this;
×
467
  }
468

469
  async updateAnalytics() {
470
    for (const [docType, docIndex] of [
×
471
      ['article', 'articles'],
472
      ['reply', 'replies'],
473
    ]) {
474
      let pageIndex;
475
      do {
×
476
        pageIndex = await this.fetchUniqueDocs(docType, docIndex, pageIndex);
×
477
      } while (pageIndex !== undefined);
478
    }
479
    return this;
×
480
  }
481

482
  async execute() {
483
    await this.storeScriptInDB();
×
484
    await this.fetchUsersFromAllDocs();
×
485
    await this.bulk.flush();
×
486
    await this.updateAllDocs();
×
487
    await this.bulk.flush();
×
488

489
    await client.indices.refresh({ index: 'replies' });
×
490
    await client.indices.refresh({ index: 'articles' });
×
491
    await this.updateAnalytics();
×
492
  }
493
}
494

495
async function main() {
496
  try {
×
497
    await new CreateBackendUsers().execute();
×
498
  } catch (e) {
499
    logError(e);
×
500
  }
501
}
502

503
if (require.main === module) {
1!
504
  main();
×
505
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc