• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cofacts / rumors-api / 28294969040

27 Jun 2026 04:26PM UTC coverage: 74.971% (-7.2%) from 82.22%
28294969040

Pull #388

github

yutin1987
fix: embedding update race and test mocks
Pull Request #388: [draft] Vector search

904 of 1304 branches covered (69.33%)

Branch coverage included in aggregate %.

142 of 177 new or added lines in 10 files covered. (80.23%)

146 existing lines in 6 files now uncovered.

1726 of 2204 relevant lines covered (78.31%)

16.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.34
/src/scripts/fetchStatsFromGA.js
1
// eslint-disable no-console
2
/*
3
  A script that fetches user activities stats between `startDate` and `endDate` from GA.
4

5
  - Default values for `startDate` and `endDate` are the current date (in GMT+8),
6
    they can be set by command line arguments.  Date should be in the format of
7
    YYYY-MM-DD or see https://developers.google.com/analytics/devguides/reporting/core/v3/reference#startDate
8
    for relative date pattern.
9

10
  - All update operations in db are handled by the script with id `analyticsUpsertScript`,
11
    if `analyticsUpsertScript` is not in db yet, run with `--loadScript`` to save
12
    the script to db.
13

14
 - Make sure `*_DATASET`, `GA_*_STREAM_ID`, and `TIMEZONE=Asia/Taipei` are set with correct settings in .env.
15
*/
16

17
import 'dotenv/config';
18
import assert from 'assert';
19
import { Transform } from 'stream';
20
import { pipeline } from 'stream/promises';
21
import DataLoader from 'dataloader';
22
import client from 'util/client';
23
import rollbar from '../rollbarInstance';
24
import { BigQuery } from '@google-cloud/bigquery';
25
import yargs from 'yargs';
26

27
const bigquery = new BigQuery();
1✔
28

29
const BATCH_SIZE = 1000;
1✔
30

31
const formatDate = (date) =>
1✔
UNCOV
32
  `${date.substr(0, 4)}-${date.substr(4, 2)}-${date.substr(6, 2)}`;
×
33

34
export function getId({ dateStr, type, docId }) {
UNCOV
35
  return `${type}_${docId}_${formatDate(dateStr)}`;
×
36
}
37

38
/**
39
 * @param {string} timeZone
40
 */
41
export function getTodayYYYYMMDD(timeZone) {
42
  const today = new Date(new Date().toLocaleString(undefined, { timeZone }));
3✔
43
  return `${today.getFullYear()}${(today.getMonth() + 1)
3✔
44
    .toString()
45
    .padStart(2, '0')}${today.getDate().toString().padStart(2, '0')}`;
46
}
47

48
/**
49
 * Create a transform stream to batch objects
50
 * @param {number} batchSize
51
 */
52
export function createBatchTransform(batchSize) {
53
  let batch = [];
2✔
54

55
  return new Transform({
2✔
56
    objectMode: true,
57
    transform(object, _, callback) {
58
      batch.push(object);
10✔
59

60
      if (batch.length >= batchSize) {
10✔
61
        this.push(batch);
3✔
62
        batch = [];
3✔
63
      }
64

65
      callback();
10✔
66
    },
67
    flush(callback) {
68
      if (batch.length > 0) {
2✔
69
        this.push(batch);
1✔
70
      }
71

72
      callback();
2✔
73
    },
74
  });
75
}
76

77
const TYPE_TO_ESIDX = { article: 'articles', reply: 'replies' };
1✔
78

79
/**
80
 * Given the {type, docId} from BigQuery result,
81
 * return the author information {docUserId, docAppId} of the given `docId` of `type`.
82
 */
83
const docUserAppLoader = new DataLoader(
1✔
84
  /** @param {{type: 'article' | 'reply', docId: string }} typeDocIds */
85
  async (typeDocIds) => {
UNCOV
86
    const docs = typeDocIds
×
87
      .map(({ type, docId }) => {
UNCOV
88
        const index = TYPE_TO_ESIDX[type];
×
UNCOV
89
        return !index
×
90
          ? null
91
          : {
92
              _index: index,
93
              _id: docId,
94
            };
95
      })
96
      .filter(Boolean);
97

UNCOV
98
    const docMap = (
×
99
      await client.mget({
100
        docs,
101
      })
102
    ).docs.reduce((map, { _source, _index, _id, found }) => {
UNCOV
103
      if (found) {
×
UNCOV
104
        const [index] = _index.split('_v'); // take the part before versions
×
UNCOV
105
        map[`${index}/${_id}`] = {
×
106
          docUserId: _source.userId,
107
          docAppId: _source.appId,
108
        };
109
      }
UNCOV
110
      return map;
×
111
    }, {});
112

UNCOV
113
    return typeDocIds.map(
×
UNCOV
114
      ({ type, docId }) => docMap[`${TYPE_TO_ESIDX[type]}/${docId}`]
×
115
    );
116
  },
117
  {
UNCOV
118
    cacheKeyFn: ({ type, docId }) => `${type}/${docId}`,
×
119
  }
120
);
121

122
/**
123
 * Fetch GA stats for given time period and store in db.
124
 * @param {object} params
125
 */
126
export async function fetchStatsFromGA(params) {
UNCOV
127
  const todayYYYYMMDD = getTodayYYYYMMDD(params.timezone);
×
128

UNCOV
129
  if (params.startDate) {
×
UNCOV
130
    assert(
×
131
      params.startDate.match(/^\d{8}$/),
132
      'startDate must be in YYYYMMDD format'
133
    );
134
  }
UNCOV
135
  const startDate = params.startDate ?? todayYYYYMMDD;
×
136

UNCOV
137
  if (params.endDate) {
×
UNCOV
138
    assert(
×
139
      params.endDate.match(/^\d{8}$/),
140
      'endDate must be in YYYYMMDD format'
141
    );
142
  }
UNCOV
143
  const endDate = params.endDate ?? todayYYYYMMDD;
×
144

UNCOV
145
  assert(
×
146
    startDate <= endDate,
147
    `endDate (${endDate}) should not be earlier than startDate (${startDate})`
148
  );
149

UNCOV
150
  const query = `
×
151
    WITH
152
      lineStats AS (
153
        SELECT
154
          FORMAT_DATE("%Y%m%d", DATE(createdAt, "${
155
            params.timezone
156
          }")) AS event_date,
157
          evt.category AS item_category,
158
          evt.label AS item_id,
159
          COUNT(*) AS lineVisit,
160
          COUNT(DISTINCT userId) AS lineUser
161
        FROM \`${params.lineBotEventDataset}.events\`, UNNEST (events) AS evt
162
        WHERE evt.action = 'Selected'
163
          -- Use createdAt so that BQ can select correct partition
164
          AND createdAt >= TIMESTAMP("${formatDate(
165
            startDate
166
          )} 00:00:00.000", "${params.timezone}")
167
          AND createdAt <= TIMESTAMP("${formatDate(endDate)} 23:59:59.999", "${
168
    params.timezone
169
  }")
170
        GROUP BY event_date, item_category, item_id
171
      ),
172

173
      webStats AS (
174
        SELECT
175
          event_date,
176
          item_category,
177
          item_id,
178
          COUNT(*) AS webVisit,
179
          COUNT(DISTINCT user_pseudo_id) AS webUser
180
        FROM \`${params.ga4Dataset}.events_*\`, UNNEST (items)
181
        WHERE event_name = 'view_item'
182
          AND stream_id = '${params.webStreamId}' -- web stream
183
          AND (
184
            _table_suffix between '${startDate}' and '${endDate}' OR
185
            _table_suffix between 'intraday_${startDate}' and 'intraday_${endDate}'
186
          )
187
        GROUP BY event_date, item_category, item_id
188
      ),
189

190
      liffStats AS (
191
        WITH t AS (
192
          SELECT
193
            event_date, item_category, item_id,
194
            struct(
195
              collected_traffic_source.manual_source as source,
196
              count(*) as visit, COUNT(DISTINCT user_pseudo_id) as user
197
            ) as liffObj
198
            FROM \`${params.ga4Dataset}.events_*\`, UNNEST (items)
199
          WHERE event_name = 'view_item'
200
            AND stream_id = '${params.liffStreamId}' -- LIFF stream
201
            AND (
202
              _table_suffix between '${startDate}' and '${endDate}' OR
203
              _table_suffix between 'intraday_${startDate}' and 'intraday_${endDate}'
204
            )
205
          GROUP BY event_date, item_category, item_id, collected_traffic_source.manual_source
206
        )
207
        SELECT event_date, item_category, item_id, ARRAY_AGG(liffObj) AS liff FROM t
208
        GROUP BY event_date, item_category, item_id
209
      )
210
    SELECT
211
      event_date AS dateStr,
212
      CAST(TIMESTAMP(PARSE_DATE("%Y%m%d", event_date), "${
213
        params.timezone
214
      }") AS STRING FORMAT 'YYYY-MM-DD"T"HH24:MI:SS".000"TZH:TZM' AT TIME ZONE '${
215
    params.timezone
216
  }') AS date,
217
      LOWER(item_category) AS type,
218
      item_id AS docId,
219
      STRUCT(lineUser, lineVisit, webUser, webVisit, liff) AS stats
220
    FROM lineStats
221
    FULL JOIN webStats USING (event_date, item_category, item_id)
222
    FULL JOIN liffStats USING (event_date, item_category, item_id)
223
  `;
224

UNCOV
225
  const [job] = await bigquery.createQueryJob({ query });
×
UNCOV
226
  console.log(`[fetchStatsFromGA] BQ job ${job.id} started.`);
×
227

UNCOV
228
  await pipeline(
×
229
    job.getQueryResultsStream(),
230
    createBatchTransform(BATCH_SIZE),
231
    async function (source) {
UNCOV
232
      let processedCount = 0;
×
233

UNCOV
234
      for await (const docs of source) {
×
UNCOV
235
        const esBatch = (
×
236
          await Promise.all(
237
            docs.map(async (doc) => {
UNCOV
238
              const { dateStr: dontcare, ...analyticsFields } = doc; // eslint-disable-line no-unused-vars
×
239

UNCOV
240
              return [
×
241
                {
242
                  index: { _index: 'analytics', _id: getId(doc) },
243
                },
244
                {
245
                  ...analyticsFields,
246
                  ...(await docUserAppLoader.load(doc)),
247
                  fetchedAt: new Date(),
248
                },
249
              ];
250
            })
251
          )
252
        ).flat();
253

UNCOV
254
        processedCount += docs.length;
×
255

UNCOV
256
        const response = await client.bulk({ operations: esBatch });
×
257

258
        /* istanbul ignore next */
259
        if (response.errors) {
260
          console.error('Elasticsearch Bulk Insert Error:', response.errors);
261
          rollbar.error(
262
            '[fetchStatsFromGA] Elasticsearch Bulk Insert Error',
263
            response
264
          );
265
          process.exit(1);
266
        }
UNCOV
267
        console.log(
×
268
          `[fetchStatsFromGA] ${response.items.length} item(s) indexed`
269
        );
270
      }
271

UNCOV
272
      console.log(
×
273
        `[fetchStatsFromGA] Finished processing ${processedCount} analytics records for ${startDate} ~ ${endDate} (${params.timezone}).`
274
      );
275
    }
276
  );
277
}
278

279
/* istanbul ignore next */
280
async function main() {
281
  try {
282
    const argv = yargs
283
      .options({
284
        startDate: {
285
          alias: 's',
286
          description:
287
            'YYYYMMDD. Omitting this means fetching stats from today.',
288
          type: 'string',
289
        },
290
        endDate: {
291
          alias: 'e',
292
          description:
293
            'YYYYMMDD. Omitting this means fetching stats until today.',
294
          type: 'string',
295
        },
296
      })
297
      .help('help').argv;
298

299
    const params = {
300
      startDate: argv.startDate,
301
      endDate: argv.endDate,
302

303
      timezone: process.env.TIMEZONE,
304
      lineBotEventDataset: process.env.LINE_BOT_EVENT_DATASET_ID,
305
      ga4Dataset: process.env.GA4_DATASET_ID,
306
      webStreamId: process.env.GA_WEB_STREAM_ID,
307
      liffStreamId: process.env.GA_LIFF_STREAM_ID,
308
    };
309

310
    await fetchStatsFromGA(params);
311
  } catch (e) {
312
    // rollbar.error(e);
313
    console.error(e);
314
  }
315
}
316

317
/* istanbul ignore next */
318
if (require.main === module) {
319
  main();
320
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc