• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cameri / nostream / 25529283840

08 May 2026 12:17AM UTC coverage: 64.439% (-0.5%) from 64.937%
25529283840

Pull #615

github

web-flow
Merge fb553e964 into 6c8e29a01
Pull Request #615: test: add unit tests for remaining app workers (#489)

1801 of 3166 branches covered (56.89%)

Branch coverage included in aggregate %.

4148 of 6066 relevant lines covered (68.38%)

8.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.53
/src/scripts/export-events.ts
1
import 'pg-query-stream'
2✔
2

3
import fs from 'fs'
2✔
4
import path, { extname } from 'path'
2✔
5
import { pipeline } from 'stream/promises'
2✔
6
import { Transform } from 'stream'
2✔
7

8
import {
2✔
9
  CompressionFormat,
10
  createCompressionStream,
11
  getCompressionFormatFromExtension,
12
  parseCompressionFormat,
13
} from '../utils/compression'
14
import { getMasterDbClient } from '../database/client'
2✔
15

16
type ExportCliOptions = {
17
  compress: boolean
18
  format?: CompressionFormat
19
  outputFilePath: string
20
  showHelp: boolean
21
}
22

23
type ExportOptions = {
24
  json?: boolean
25
  format?: 'jsonl' | 'json'
26
}
27

28
const DEFAULT_OUTPUT_FILE_PATH = 'events.jsonl'
2✔
29
const MIN_ELAPSED_SECONDS = 0.001
2✔
30

31
export const formatBytes = (bytes: number): string => {
2✔
32
  const units = ['B', 'KiB', 'MiB', 'GiB', 'TiB']
5✔
33

34
  if (!Number.isFinite(bytes) || bytes <= 0) {
5✔
35
    return '0 B'
1✔
36
  }
37

38
  let unitIndex = 0
4✔
39
  let value = bytes
4✔
40

41
  while (value >= 1024 && unitIndex < units.length - 1) {
4✔
42
    value /= 1024
4✔
43
    unitIndex += 1
4✔
44
  }
45

46
  const rounded = Math.round(value * 100) / 100
4✔
47
  const formatted = String(rounded)
4✔
48

49
  return `${formatted} ${units[unitIndex]}`
4✔
50
}
51

52
export const formatCompressionDelta = (rawBytes: number, outputBytes: number): string | undefined => {
2✔
53
  if (rawBytes <= 0) {
3✔
54
    return undefined
1✔
55
  }
56

57
  const deltaPercent = ((rawBytes - outputBytes) / rawBytes) * 100
2✔
58
  const rounded = Math.round(Math.abs(deltaPercent) * 100) / 100
2✔
59
  const formattedPercent = String(rounded)
2✔
60

61
  if (deltaPercent >= 0) {
2✔
62
    return `${formattedPercent}% smaller`
1✔
63
  }
64

65
  return `${formattedPercent}% larger`
1✔
66
}
67

68
export const getRatePerSecond = (value: number, elapsedMs: number): number => {
2✔
69
  if (!Number.isFinite(value) || value <= 0) {
3✔
70
    return 0
1✔
71
  }
72

73
  const elapsedSeconds = Math.max(elapsedMs / 1000, MIN_ELAPSED_SECONDS)
2✔
74

75
  return value / elapsedSeconds
2✔
76
}
77

78
const formatCount = (value: number): string => {
2✔
79
  const rounded = Math.round(value * 100) / 100
×
80

81
  return Number.isInteger(rounded)
×
82
    ? rounded.toLocaleString('en-US')
83
    : rounded.toLocaleString('en-US', {
84
        maximumFractionDigits: 2,
85
        minimumFractionDigits: 2,
86
      })
87
}
88

89
const getOptionValue = (option: string, args: string[], index: number): [string, number] => {
2✔
90
  const inlineSeparator = `${option}=`
5✔
91
  if (args[index].startsWith(inlineSeparator)) {
5✔
92
    const value = args[index].slice(inlineSeparator.length)
3✔
93
    if (!value) {
3✔
94
      throw new Error(`Missing value for ${option}`)
1✔
95
    }
96

97
    return [value, index]
2✔
98
  }
99

100
  const nextIndex = index + 1
2✔
101
  const nextArg = args[nextIndex]
2✔
102
  if (typeof nextArg !== 'string' || nextArg.startsWith('-')) {
2✔
103
    throw new Error(`Missing value for ${option}`)
1✔
104
  }
105

106
  return [nextArg, nextIndex]
1✔
107
}
108

109
const printUsage = (): void => {
2✔
110
  console.log('Usage: pnpm run export [output-file] [--compress|-z] [--format gzip|gz|xz]')
×
111
  console.log('Example: pnpm run export ./events.jsonl')
×
112
  console.log('Example: pnpm run export ./events.jsonl.gz --compress --format gzip')
×
113
  console.log('Example: pnpm run export ./events.jsonl.xz -z --format xz')
×
114
}
115

116
const getCompressionLabel = (format: CompressionFormat): string => {
2✔
117
  switch (format) {
×
118
    case CompressionFormat.GZIP:
119
      return 'gzip'
×
120
    case CompressionFormat.XZ:
121
      return 'xz'
×
122
    default:
123
      return String(format)
×
124
  }
125
}
126

127
export const parseCliArgs = (args: string[]): ExportCliOptions => {
2✔
128
  let compress = false
13✔
129
  let format: CompressionFormat | undefined
130
  let outputFilePath: string | undefined
131

132
  if (args.includes('--help') || args.includes('-h')) {
13✔
133
    return {
1✔
134
      compress,
135
      format,
136
      outputFilePath: DEFAULT_OUTPUT_FILE_PATH,
137
      showHelp: true,
138
    }
139
  }
140

141
  for (let index = 0; index < args.length; index++) {
12✔
142
    const arg = args[index]
22✔
143

144
    if (arg === '--compress' || arg === '-z') {
22✔
145
      compress = true
7✔
146
      continue
7✔
147
    }
148

149
    if (arg === '--format' || arg.startsWith('--format=')) {
15✔
150
      const [rawFormat, nextIndex] = getOptionValue('--format', args, index)
5✔
151
      format = parseCompressionFormat(rawFormat)
3✔
152
      index = nextIndex
3✔
153
      continue
3✔
154
    }
155

156
    if (arg.startsWith('-')) {
10✔
157
      throw new Error(`Unknown option: ${arg}`)
1✔
158
    }
159

160
    if (outputFilePath) {
9✔
161
      throw new Error(`Unexpected extra argument: ${arg}`)
1✔
162
    }
163

164
    outputFilePath = arg
8✔
165
  }
166

167
  if (!compress && format) {
8✔
168
    throw new Error('--format requires --compress')
1✔
169
  }
170

171
  outputFilePath = outputFilePath ?? DEFAULT_OUTPUT_FILE_PATH
7✔
172

173
  if (compress && !format) {
7✔
174
    format = getCompressionFormatFromExtension(outputFilePath) ?? CompressionFormat.GZIP
3✔
175
  }
176

177
  return {
7✔
178
    compress,
179
    format,
180
    outputFilePath,
181
    showHelp: false,
182
  }
183
}
184

185
type EventRow = {
186
  event_id: Buffer
187
  event_pubkey: Buffer
188
  event_kind: number
189
  event_created_at: number
190
  event_content: string
191
  event_tags: unknown[] | null
192
  event_signature: Buffer
193
}
194

195
const resolveExportFormat = (format?: string): 'jsonl' | 'json' => {
2✔
196
  if (!format) {
3!
197
    return 'jsonl'
×
198
  }
199

200
  if (format === 'jsonl' || format === 'json') {
3!
201
    return format
3✔
202
  }
203

204
  throw new Error(`Unsupported format: ${format}. Supported values: json, jsonl`)
×
205
}
206

207
const resolveOutputPath = (filename: string | undefined, format: 'jsonl' | 'json'): string => {
2✔
208
  const fallback = format === 'json' ? 'events.json' : 'events.jsonl'
3✔
209
  const outputPath = path.resolve(filename || fallback)
3!
210
  const expectedExtension = format === 'json' ? '.json' : '.jsonl'
3✔
211

212
  if (extname(outputPath).toLowerCase() !== expectedExtension) {
3✔
213
    throw new Error(`Output file extension must be ${expectedExtension} when using --format ${format}`)
1✔
214
  }
215

216
  return outputPath
2✔
217
}
218

219
const toEvent = (row: EventRow) => ({
3✔
220
  id: row.event_id.toString('hex'),
221
  pubkey: row.event_pubkey.toString('hex'),
222
  created_at: row.event_created_at,
223
  kind: row.event_kind,
224
  tags: Array.isArray(row.event_tags) ? row.event_tags : [],
3!
225
  content: row.event_content,
226
  sig: row.event_signature.toString('hex'),
227
})
228

229
const createFormatterTransform = (
2✔
230
  format: 'jsonl' | 'json',
231
  onExported: () => void,
232
): Transform => {
233
  if (format === 'jsonl') {
2✔
234
    return new Transform({
1✔
235
      objectMode: true,
236
      transform(row: EventRow, _encoding, callback) {
237
        onExported()
1✔
238
        callback(null, JSON.stringify(toEvent(row)) + '\n')
1✔
239
      },
240
    })
241
  }
242

243
  let hasRows = false
1✔
244
  return new Transform({
1✔
245
    objectMode: true,
246
    transform(row: EventRow, _encoding, callback) {
247
      const prefix = hasRows ? ',\n' : '[\n'
2✔
248
      hasRows = true
2✔
249
      onExported()
2✔
250
      callback(null, prefix + JSON.stringify(toEvent(row)))
2✔
251
    },
252
    flush(callback) {
253
      callback(null, hasRows ? '\n]\n' : '[]\n')
1!
254
    },
255
  })
256
}
257

258
export async function runExportEvents(args: string[] = process.argv.slice(2), options: ExportOptions = {}): Promise<number> {
2!
259
  const useStructuredFormat = Boolean(options.format)
3✔
260
  const structuredFormat = resolveExportFormat(options.format)
3✔
261
  const cliOptions = useStructuredFormat ? undefined : parseCliArgs(args)
3!
262

263
  if (!useStructuredFormat && cliOptions?.showHelp) {
3!
264
    printUsage()
×
265
    return 0
×
266
  }
267

268
  const outputPath = useStructuredFormat
3!
269
    ? resolveOutputPath(args[0], structuredFormat)
270
    : path.resolve(cliOptions?.outputFilePath ?? DEFAULT_OUTPUT_FILE_PATH)
×
271

272
  const db = getMasterDbClient()
2✔
273
  const abortController = new AbortController()
2✔
274
  let interruptedBySignal: NodeJS.Signals | undefined
275

276
  const onSignal = (signal: NodeJS.Signals) => {
2✔
277
    if (abortController.signal.aborted) {
×
278
      return
×
279
    }
280

281
    interruptedBySignal = signal
×
282
    process.exitCode = 130
×
283
    console.log(`${signal} received. Stopping export...`)
×
284
    abortController.abort()
×
285
  }
286

287
  process.on('SIGINT', onSignal).on('SIGTERM', onSignal)
2✔
288

289
  try {
2✔
290
    const firstEvent = await db('events').select('event_id').whereNull('deleted_at').first()
2✔
291

292
    if (abortController.signal.aborted) {
2!
293
      return 130
×
294
    }
295

296
    if (!firstEvent) {
2!
297
      if (options.json) {
×
298
        console.log(JSON.stringify({ exported: 0, outputPath, empty: true }, null, 2))
×
299
      } else {
300
        console.log('No events to export.')
×
301
      }
302
      return 0
×
303
    }
304

305
    if (useStructuredFormat) {
2!
306
      console.log(`Exporting events to ${outputPath}`)
2✔
307
    } else if (cliOptions?.format) {
×
308
      console.log(`Exporting events to ${outputPath} using ${getCompressionLabel(cliOptions.format)} compression`)
×
309
    } else {
310
      console.log(`Exporting events to ${outputPath}`)
×
311
    }
312

313
    const startedAt = Date.now()
2✔
314
    const output = fs.createWriteStream(outputPath)
2✔
315

316
    const dbStream = db('events')
2✔
317
      .select(
318
        'event_id',
319
        'event_pubkey',
320
        'event_kind',
321
        'event_created_at',
322
        'event_content',
323
        'event_tags',
324
        'event_signature',
325
      )
326
      .whereNull('deleted_at')
327
      .orderBy('event_created_at', 'asc')
328
      .orderBy('event_id', 'asc')
329
      .stream()
330

331
    let exported = 0
2✔
332

333
    if (useStructuredFormat) {
2!
334
      const formatter = createFormatterTransform(structuredFormat, () => {
2✔
335
        exported += 1
3✔
336
        if (exported % 10000 === 0) {
3!
337
          console.log(`Exported ${exported} events...`)
×
338
        }
339
      })
340

341
      await pipeline(dbStream, formatter, output, {
2✔
342
        signal: abortController.signal,
343
      })
344

345
      if (options.json) {
2!
346
        console.log(
×
347
          JSON.stringify(
348
            {
349
              exported,
350
              outputPath,
351
              format: structuredFormat,
352
            },
353
            null,
354
            2,
355
          ),
356
        )
357
      } else {
358
        console.log(`Export complete: ${exported} events written to ${outputPath} (${structuredFormat})`)
2✔
359
      }
360

361
      return 0
2✔
362
    }
363

364
    const compressionFormat = cliOptions?.format
×
365
    const compressionStream = createCompressionStream(compressionFormat)
×
366
    let rawBytes = 0
×
367

368
    const toJsonLine = new Transform({
×
369
      objectMode: true,
370
      transform(row: EventRow, _encoding, callback) {
371
        exported += 1
×
372
        if (exported % 10000 === 0) {
×
373
          console.log(`Exported ${exported} events...`)
×
374
        }
375

376
        const line = JSON.stringify(toEvent(row)) + '\n'
×
377
        rawBytes += Buffer.byteLength(line)
×
378
        callback(null, line)
×
379
      },
380
    })
381

382
    await pipeline(dbStream, toJsonLine, compressionStream, output, {
×
383
      signal: abortController.signal,
384
    })
385

386
    const elapsedMs = Date.now() - startedAt
×
387
    const outputBytes = output.bytesWritten
×
388
    const compressionDelta = formatCompressionDelta(rawBytes, outputBytes)
×
389
    const eventRate = getRatePerSecond(exported, elapsedMs)
×
390
    const rawRate = getRatePerSecond(rawBytes, elapsedMs)
×
391
    const outputRate = getRatePerSecond(outputBytes, elapsedMs)
×
392

393
    console.log(`Export complete: ${exported} events written to ${outputPath}`)
×
394
    if (compressionDelta) {
×
395
      console.log(`Size: ${formatBytes(rawBytes)} raw -> ${formatBytes(outputBytes)} on disk (${compressionDelta})`)
×
396
    } else {
397
      console.log(`Size: ${formatBytes(outputBytes)} on disk`)
×
398
    }
399

400
    console.log(
×
401
      `Throughput: ${formatCount(eventRate)} events/s | ${formatBytes(rawRate)}/s raw | ${formatBytes(outputRate)}/s output`,
402
    )
403

404
    return 0
×
405
  } catch (error) {
406
    if (abortController.signal.aborted) {
×
407
      console.log(`Export interrupted by ${interruptedBySignal ?? 'signal'}.`)
×
408
      process.exitCode = 130
×
409
      return 130
×
410
    }
411

412
    throw error
×
413
  } finally {
414
    process.off('SIGINT', onSignal).off('SIGTERM', onSignal)
2✔
415

416
    await db.destroy()
2✔
417
  }
418
}
419

420
if (require.main === module) {
2!
421
  runExportEvents().catch((error) => {
×
422
    console.error('Export failed:', error.message)
×
423
    process.exit(1)
×
424
  })
425
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc