• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cofacts / rumors-api / 11633393708

01 Nov 2024 05:14PM UTC coverage: 83.365% (-5.1%) from 88.512%
11633393708

push

github

web-flow
Merge pull request #342 from cofacts/strict-schema

Disable dynamic ES mapping and fix test fixtures

750 of 945 branches covered (79.37%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

102 existing lines in 2 files now uncovered.

1465 of 1712 relevant lines covered (85.57%)

18.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.27
/src/scripts/migrations/createBackendUsers.js
1
/*
2
  A script that loads all documents in db and create user for all occurrences of non-web users.
3

4
  The followings are current references to user records in db:
5
    - analytics: (~1825k entries as of 2020/10)
6
      - (docUserId, docAppId) via docId (reply/article)
7

8
    - articlecategoryfeedbacks: (less than 1k  entries as of 2020/10)
9
      id = ${articleId}__${categoryId}__${userId}__${appId},
10
      - (userId, appId)
11

12
    - articlereplyfeedbacks: (~179k entries as of 2020/10)
13
      id = ${articleId}__${replyId}__${userId}__${appId},
14
      - (userId, appId)
15

16
    - articles:  (~ 40k entries as of 2020/10)
17
      - (userId, appId)
18
      - [references]:
19
          - (userId, appId)
20
      - [articleReplies]:
21
          - (userId, appId)
22
      - [articleCategories]:
23
          - (userId, appId)
24

25
    - replies: (~42k entries as of 2020/10)
26
      - (userId, appId)
27

28
    - replyrequests: (~61k entries as of 2020/10)
29
      id = ${articleId}__${userId}__${appId}
30
      - (userId, appId)
31
      - [feedbacks]:
32
        - (userId, appId)
33
*/
34

35
import 'dotenv/config';
36
import client from 'util/client';
37
import getAllDocs from 'util/getAllDocs';
38
import Bulk from 'util/bulk';
39
import rollbar from 'rollbarInstance';
40
import {
41
  generatePseudonym,
42
  generateOpenPeepsAvatar,
43
  AvatarTypes,
44
  convertAppUserIdToUserId,
45
  isBackendApp,
46
} from 'util/user';
47
import { get, set } from 'lodash';
48

49
const AGG_NAME = 'userIdPair';
1✔
50

51
const AGG_SCRIPT = `
1✔
52
  boolean valueExists(def map, def key) { map.containsKey(key) && map[key] != ''}
53
  String getUserIdPair(def map, def docRef, def emptyAllowed) {
54
    boolean appIdExists = valueExists(map, 'appId') ;
55
    boolean userIdExists = valueExists(map, 'userId');
56
    if (!appIdExists || !userIdExists) {
57
      if (emptyAllowed) {
58
        return '';
59
      } else {
60
        return '{"error":"appId and/or userId do not exist on '+ docRef + '"}';
61
      }
62
    } else {
63
      return '{"appId":"' + map.appId + '", "userId":"' + map.userId + '"}';
64
    }
65
  }
66

67
  int totalLength = 1;
68
  for (field in params.additionalFields) {
69
    if (params._source.containsKey(field)) {
70
      totalLength += params._source[field].length;
71
    }
72
  }
73

74
  String [] userIds = new String[totalLength];
75
  String docRef = doc._index.value + ' ' + doc._id.value;
76
  int currentIndex = 1;
77
  userIds[0] = getUserIdPair(params._source, docRef, false);
78
  for (field in params.additionalFields) {
79
    if (params._source.containsKey(field)) {
80
      boolean emptyAllowed = params.emptyUserIdAllowedFields.containsKey(doc._index + '__' + field);
81
      for (int i = 0; i < params._source[field].length; i++) {
82
        userIds[currentIndex] = getUserIdPair(params._source[field][i], 'some reference of ' + docRef, emptyAllowed);
83
        currentIndex += 1;
84
      }
85
    }
86
  }
87
  return userIds;
88
`;
89

90
const BATCH_SIZE = 10000;
1✔
91
const AGG_BATCH_SIZE = 10000;
1✔
92
const ANALYTICS_BATCH_SIZE = 10000;
1✔
93
const SCROLL_TIMEOUT = '30s';
1✔
94
const SCRIPT_ID = 'appIdUserIdAggScript';
1✔
95

96
const matchAllQuery = {
1✔
97
  match_all: {},
98
};
99

100
const backendUserQuery = {
1✔
101
  bool: {
102
    must_not: [
103
      { term: { appId: 'WEBSITE' } },
104
      { term: { appId: 'DEVELOPMENT_FRONTEND' } },
105
    ],
106
  },
107
};
108

109
const userReferenceInSchema = {
1✔
110
  articlecategoryfeedbacks: {
111
    fields: [],
112
    genId: (doc) =>
113
      `${doc.articleId}__${doc.categoryId}__${doc.userId}__${doc.appId}`,
×
114
  },
115
  articlereplyfeedbacks: {
116
    fields: [],
117
    genId: (doc) =>
UNCOV
118
      `${doc.articleId}__${doc.replyId}__${doc.userId}__${doc.appId}`,
×
119
  },
120
  articles: {
121
    fields: ['references', 'articleReplies', 'articleCategories'],
122
  },
123
  replies: { fields: [] },
124
  replyrequests: {
125
    fields: ['feedbacks'],
UNCOV
126
    genId: (doc) => `${doc.articleId}__${doc.userId}__${doc.appId}`,
×
127
  },
128
};
129

130
const emptyUserIdAllowedFields = { articles: ['articleCategories'] };
1✔
131

132
const logError = (error) => {
1✔
UNCOV
133
  console.error(`createBackendUserError: ${error}`);
×
UNCOV
134
  rollbar.error(`createBackendUserError: ${error}`);
×
135
};
136

137
export default class CreateBackendUsers {
138
  constructor({ batchSize, aggBatchSize, analyticsBatchSize } = {}) {
×
UNCOV
139
    this.userIdMap = {}; // {[appID]: {[appUserId]: dbUserId}}
×
UNCOV
140
    this.reversedUserIdMap = {}; // {[dbUserId]: [appId, appUserId]};
×
UNCOV
141
    this.batchSize = batchSize ?? BATCH_SIZE;
×
UNCOV
142
    this.aggBatchSize = aggBatchSize ?? AGG_BATCH_SIZE;
×
UNCOV
143
    this.analyticsBatchSize = analyticsBatchSize ?? ANALYTICS_BATCH_SIZE;
×
UNCOV
144
    this.bulk = new Bulk(client, this.batchSize);
×
145

UNCOV
146
    this.emptyUserIdAllowedFields = {};
×
UNCOV
147
    for (const index in emptyUserIdAllowedFields) {
×
UNCOV
148
      for (const field of emptyUserIdAllowedFields[index]) {
×
UNCOV
149
        this.emptyUserIdAllowedFields[`${index}__${field}`] = true;
×
150
      }
151
    }
152
  }
153

154
  async storeScriptInDB() {
UNCOV
155
    try {
×
UNCOV
156
      await client.put_script({
×
157
        id: SCRIPT_ID,
158
        body: {
159
          script: {
160
            lang: 'painless',
161
            source: AGG_SCRIPT,
162
          },
163
        },
164
      });
165
    } catch (e) {
166
      logError(e);
×
167
    }
168
  }
169

170
  // buckets is an array of {
171
  //  key: {
172
  //    [AGG_NAME]: "{appId:[appId], userId:[userId]}" || "{error:[errorMessage]}"
173
  //  },
174
  //  doc_count: [doc_count]
175
  // }
176
  async processUsers(buckets) {
UNCOV
177
    let bulkOperations = [];
×
UNCOV
178
    const now = new Date().toISOString();
×
UNCOV
179
    for (const {
×
180
      key: { [AGG_NAME]: userIdPair },
181
    } of buckets) {
UNCOV
182
      if (userIdPair === '') {
×
183
        continue;
×
184
      }
UNCOV
185
      const { userId, appId, error } = JSON.parse(userIdPair);
×
UNCOV
186
      if (error) {
×
UNCOV
187
        logError(error);
×
UNCOV
188
      } else if (isBackendApp(appId)) {
×
UNCOV
189
        const dbUserId = convertAppUserIdToUserId({ appId, appUserId: userId });
×
UNCOV
190
        const appUserId = get(this.reversedUserIdMap, [dbUserId, 1]);
×
191
        // if the hashed id already exists, check for collision
UNCOV
192
        if (appUserId !== undefined) {
×
UNCOV
193
          if (appUserId !== userId) {
×
194
            logError(
×
195
              `collision found! ${userId} and ${appUserId} both hash to ${dbUserId}`
196
            );
197
          }
198
        } else {
UNCOV
199
          set(this.userIdMap, [appId, userId], dbUserId);
×
UNCOV
200
          set(this.reversedUserIdMap, dbUserId, [appId, userId]);
×
UNCOV
201
          bulkOperations.push(
×
202
            {
203
              index: {
204
                _index: 'users',
205
                _type: 'doc',
206
                _id: dbUserId,
207
              },
208
            },
209
            {
210
              name: generatePseudonym(),
211
              avatarType: AvatarTypes.OpenPeeps,
212
              avatarData: JSON.stringify(generateOpenPeepsAvatar()),
213
              appId,
214
              appUserId: userId,
215
              createdAt: now,
216
              updatedAt: now,
217
            }
218
          );
219
        }
220
      }
221
    }
UNCOV
222
    await this.bulk.push(bulkOperations, bulkOperations.length / 2);
×
223
  }
224

225
  // response is of the form {
226
  //   aggregations: { [AGG_NAME]: {
227
  //     after_key: {[AGG_NAME]: [lastKey]},
228
  //     buckets: [{...}]
229
  // }}}
230
  async fetchUniqueUsers(indexName, pageIndex = undefined) {
×
UNCOV
231
    try {
×
232
      const {
233
        body: {
234
          aggregations: {
235
            [AGG_NAME]: { buckets },
236
          },
237
        },
UNCOV
238
      } = await client.search({
×
239
        index: indexName,
240
        size: 0,
241
        body: {
242
          aggs: {
243
            [AGG_NAME]: {
244
              composite: {
245
                size: this.aggBatchSize,
246
                after: pageIndex,
247
                sources: [
248
                  {
249
                    [AGG_NAME]: {
250
                      terms: {
251
                        script: {
252
                          id: SCRIPT_ID,
253
                          params: {
254
                            emptyUserIdAllowedFields:
255
                              this.emptyUserIdAllowedFields,
256
                            additionalFields:
257
                              userReferenceInSchema[indexName].fields,
258
                          },
259
                        },
260
                      },
261
                    },
262
                  },
263
                ],
264
              },
265
            },
266
          },
267
        },
268
      });
UNCOV
269
      if (buckets.length > 0) {
×
UNCOV
270
        const lastKey = { ...buckets[buckets.length - 1].key };
×
UNCOV
271
        await this.processUsers(buckets);
×
UNCOV
272
        return lastKey;
×
273
      }
274
    } catch (e) {
275
      logError(
×
276
        `error while fetching users for indexName:${indexName} with pageIndex ${pageIndex}`
277
      );
278
      logError(e);
×
279
    }
280
  }
281

282
  async fetchUsersFromAllDocs() {
UNCOV
283
    for (const indexName in userReferenceInSchema) {
×
284
      let pageIndex;
UNCOV
285
      do {
×
UNCOV
286
        pageIndex = await this.fetchUniqueUsers(indexName, pageIndex);
×
287
      } while (pageIndex !== undefined);
288
    }
UNCOV
289
    return this;
×
290
  }
291

292
  /**
293
   * A generator that fetches all docs in the specified index.
294
   *
295
   * @param {String} indexName The name of the index to fetch
296
   * @yields {Object} the document
297
   */
298
  async *getAllDocs(indexName, hasAdditionalUserFields = false) {
×
UNCOV
299
    for await (const doc of getAllDocs(
×
300
      indexName,
301
      hasAdditionalUserFields ? matchAllQuery : backendUserQuery,
×
302
      {
303
        scroll: SCROLL_TIMEOUT,
304
        size: this.batchSize,
305
      }
306
    )) {
UNCOV
307
      yield doc;
×
308
    }
309
  }
310

311
  getUserIds(appId, userId) {
UNCOV
312
    return isBackendApp(appId)
×
313
      ? {
314
          userId: get(this.userIdMap, [appId, userId], userId),
315
          appUserId: userId,
316
        }
317
      : { userId };
318
  }
319

320
  async updateAllDocs() {
UNCOV
321
    for (const indexName in userReferenceInSchema) {
×
UNCOV
322
      const { genId, fields } = userReferenceInSchema[indexName];
×
UNCOV
323
      for await (const doc of this.getAllDocs(
×
324
        indexName,
325
        fields && fields.length > 0
×
326
      )) {
UNCOV
327
        const { appId, userId } = doc._source;
×
UNCOV
328
        let newFields = {};
×
329

UNCOV
330
        newFields = this.getUserIds(appId, userId);
×
UNCOV
331
        for (const field of fields) {
×
UNCOV
332
          if (doc._source[field]) {
×
UNCOV
333
            newFields[field] = doc._source[field].map((entry) => ({
×
334
              ...entry,
335
              ...this.getUserIds(entry.appId, entry.userId),
336
            }));
337
          }
338
        }
UNCOV
339
        if (genId !== undefined) {
×
UNCOV
340
          await this.bulk.push(
×
341
            [
342
              {
343
                delete: {
344
                  _index: doc._index,
345
                  _type: 'doc',
346
                  _id: doc._id,
347
                },
348
              },
349
              {
350
                index: {
351
                  _index: doc._index,
352
                  _type: 'doc',
353
                  _id: genId({ ...doc._source, ...newFields }),
354
                },
355
              },
356
              {
357
                ...doc._source,
358
                ...newFields,
359
              },
360
            ],
361
            2
362
          );
363
        } else {
UNCOV
364
          await this.bulk.push([
×
365
            {
366
              update: {
367
                _index: doc._index,
368
                _type: 'doc',
369
                _id: doc._id,
370
              },
371
            },
372
            {
373
              doc: newFields,
374
            },
375
          ]);
376
        }
377
      }
378
    }
UNCOV
379
    return this;
×
380
  }
381

382
  // Fetches unique doc ids in analytics, and updates analytics entries for each doc
383
  async fetchUniqueDocs(docType, docIndex, pageIndex = undefined) {
×
UNCOV
384
    try {
×
385
      const {
386
        body: {
387
          aggregations: {
388
            docIds: { buckets },
389
          },
390
        },
UNCOV
391
      } = await client.search({
×
392
        index: 'analytics',
393
        size: 0,
394
        body: {
395
          query: { term: { type: docType } },
396
          aggs: {
397
            docIds: {
398
              composite: {
399
                size: this.analyticsBatchSize,
400
                after: pageIndex,
401
                sources: [{ docId: { terms: { field: 'docId' } } }],
402
              },
403
            },
404
          },
405
        },
406
      });
UNCOV
407
      if (buckets.length > 0) {
×
408
        const {
409
          body: {
410
            hits: { hits: docs },
411
          },
UNCOV
412
        } = await client.search({
×
413
          index: docIndex,
414
          size: this.analyticsBatchSize,
415
          body: {
416
            query: {
UNCOV
417
              ids: { values: buckets.map((bucket) => bucket.key.docId) },
×
418
            },
419
            _source: {
420
              includes: ['userId', 'appId'],
421
            },
422
          },
423
        });
UNCOV
424
        const lastKey = { ...buckets[buckets.length - 1].key };
×
UNCOV
425
        await this.updateAnalyticsForDocs(docType, docs);
×
UNCOV
426
        return lastKey;
×
427
      }
428
    } catch (e) {
429
      logError(
×
430
        `error while updating analytics type ${docType} with pageIndex ${
431
          pageIndex ? JSON.stringify(pageIndex) : null
×
432
        }`
433
      );
434
      logError(e);
×
435
    }
436
  }
437

438
  async updateAnalyticsForDocs(docType, docs) {
UNCOV
439
    let totalUpdated = 0;
×
UNCOV
440
    for (const doc of docs) {
×
441
      {
UNCOV
442
        try {
×
UNCOV
443
          const requestBody = {
×
444
            script: {
445
              source: `ctx._source.docAppId = params.docAppId; ctx._source.docUserId = params.docUserId;`,
446
              lang: 'painless',
447
              params: {
448
                docAppId: doc._source.appId,
449
                docUserId: doc._source.userId,
450
              },
451
            },
452
            query: {
453
              bool: {
454
                must: [
455
                  { term: { type: docType } },
456
                  { term: { docId: doc._id } },
457
                ],
458
              },
459
            },
460
          };
461
          const {
462
            body: { updated },
UNCOV
463
          } = await client.updateByQuery({
×
464
            index: 'analytics',
465
            body: requestBody,
466
            refresh: 'true',
467
          });
UNCOV
468
          totalUpdated += updated;
×
469
        } catch (e) {
470
          logError(
×
471
            `error trying to update analytics user for doc ${docType} ${doc._id}`
472
          );
473
          logError(e);
×
474
        }
475
      }
476
    }
UNCOV
477
    `${totalUpdated} updated for ${docs.length} ${docType} analytics docs`;
×
UNCOV
478
    return this;
×
479
  }
480

481
  async updateAnalytics() {
UNCOV
482
    for (const [docType, docIndex] of [
×
483
      ['article', 'articles'],
484
      ['reply', 'replies'],
485
    ]) {
486
      let pageIndex;
UNCOV
487
      do {
×
UNCOV
488
        pageIndex = await this.fetchUniqueDocs(docType, docIndex, pageIndex);
×
489
      } while (pageIndex !== undefined);
490
    }
UNCOV
491
    return this;
×
492
  }
493

494
  async execute() {
UNCOV
495
    await this.storeScriptInDB();
×
UNCOV
496
    await this.fetchUsersFromAllDocs();
×
UNCOV
497
    await this.bulk.flush();
×
UNCOV
498
    await this.updateAllDocs();
×
UNCOV
499
    await this.bulk.flush();
×
500

UNCOV
501
    await client.indices.refresh({ index: 'replies' });
×
UNCOV
502
    await client.indices.refresh({ index: 'articles' });
×
UNCOV
503
    await this.updateAnalytics();
×
504
  }
505
}
506

507
async function main() {
508
  try {
×
509
    await new CreateBackendUsers().execute();
×
510
  } catch (e) {
511
    logError(e);
×
512
  }
513
}
514

515
if (require.main === module) {
1!
516
  main();
×
517
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc