• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cameri / nostream / 24638070433

19 Apr 2026 08:11PM UTC coverage: 73.05%. First build
24638070433

Pull #514

github

web-flow
Merge 11ececcf1 into a38d402ba
Pull Request #514: feat: support compression when importing/exporting events (gzip/xz)

1235 of 1789 branches covered (69.03%)

Branch coverage included in aggregate %.

236 of 269 new or added lines in 5 files covered. (87.73%)

3007 of 4018 relevant lines covered (74.84%)

26.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.14
/src/scripts/export-events.ts
1
import 'pg-query-stream'
3✔
2

3
import fs from 'fs'
3✔
4
import path from 'path'
3✔
5
import { pipeline } from 'stream/promises'
3✔
6
import { Transform } from 'stream'
3✔
7

8
import {
3✔
9
  CompressionFormat,
10
  createCompressionStream,
11
  getCompressionFormatFromExtension,
12
  parseCompressionFormat,
13
} from '../utils/compression'
14
import { getMasterDbClient } from '../database/client'
3✔
15

16
type ExportCliOptions = {
17
  compress: boolean
18
  format?: CompressionFormat
19
  outputFilePath: string
20
  showHelp: boolean
21
}
22

23
const DEFAULT_OUTPUT_FILE_PATH = 'events.jsonl'
3✔
24
const MIN_ELAPSED_SECONDS = 0.001
3✔
25

26
export const formatBytes = (bytes: number): string => {
3✔
27
  const units = ['B', 'KiB', 'MiB', 'GiB', 'TiB']
13✔
28

29
  if (!Number.isFinite(bytes) || bytes <= 0) {
13✔
30
    return '0 B'
1✔
31
  }
32

33
  let unitIndex = 0
12✔
34
  let value = bytes
12✔
35

36
  while (value >= 1024 && unitIndex < units.length - 1) {
12✔
37
    value /= 1024
10✔
38
    unitIndex += 1
10✔
39
  }
40

41
  const rounded = Math.round(value * 100) / 100
12✔
42
  const formatted = String(rounded)
12✔
43

44
  return `${formatted} ${units[unitIndex]}`
12✔
45
}
46

47
export const formatCompressionDelta = (rawBytes: number, outputBytes: number): string | undefined => {
3✔
48
  if (rawBytes <= 0) {
5✔
49
    return undefined
1✔
50
  }
51

52
  const deltaPercent = ((rawBytes - outputBytes) / rawBytes) * 100
4✔
53
  const rounded = Math.round(Math.abs(deltaPercent) * 100) / 100
4✔
54
  const formattedPercent = String(rounded)
4✔
55

56
  if (deltaPercent >= 0) {
4✔
57
    return `${formattedPercent}% smaller`
3✔
58
  }
59

60
  return `${formattedPercent}% larger`
1✔
61
}
62

63
export const getRatePerSecond = (value: number, elapsedMs: number): number => {
3✔
64
  if (!Number.isFinite(value) || value <= 0) {
9✔
65
    return 0
1✔
66
  }
67

68
  const elapsedSeconds = Math.max(elapsedMs / 1000, MIN_ELAPSED_SECONDS)
8✔
69

70
  return value / elapsedSeconds
8✔
71
}
72

73
const formatCount = (value: number): string => {
3✔
74
  const rounded = Math.round(value * 100) / 100
2✔
75

76
  return Number.isInteger(rounded)
2✔
77
    ? rounded.toLocaleString('en-US')
78
    : rounded.toLocaleString('en-US', {
79
      maximumFractionDigits: 2,
80
      minimumFractionDigits: 2,
81
    })
82
}
83

84
const getOptionValue = (option: string, args: string[], index: number): [string, number] => {
3✔
85
  const inlineSeparator = `${option}=`
7✔
86
  if (args[index].startsWith(inlineSeparator)) {
7✔
87
    const value = args[index].slice(inlineSeparator.length)
3✔
88
    if (!value) {
3✔
89
      throw new Error(`Missing value for ${option}`)
1✔
90
    }
91

92
    return [value, index]
2✔
93
  }
94

95
  const nextIndex = index + 1
4✔
96
  const nextArg = args[nextIndex]
4✔
97
  if (typeof nextArg !== 'string' || nextArg.startsWith('-')) {
4✔
98
    throw new Error(`Missing value for ${option}`)
1✔
99
  }
100

101
  return [nextArg, nextIndex]
3✔
102
}
103

104
const printUsage = (): void => {
3✔
NEW
105
  console.log('Usage: npm run export -- [output-file] [--compress|-z] [--format gzip|gz|xz]')
×
NEW
106
  console.log('Example: npm run export -- ./events.jsonl')
×
NEW
107
  console.log('Example: npm run export -- ./events.jsonl.gz --compress --format gzip')
×
NEW
108
  console.log('Example: npm run export -- ./events.jsonl.xz -z --format xz')
×
109
}
110

111
const getCompressionLabel = (format: CompressionFormat): string => {
3✔
112
  switch (format) {
2!
113
    case CompressionFormat.GZIP:
114
      return 'gzip'
1✔
115
    case CompressionFormat.XZ:
116
      return 'xz'
1✔
117
    default:
NEW
118
      return String(format)
×
119
  }
120
}
121

122
export const parseCliArgs = (args: string[]): ExportCliOptions => {
3✔
123
  let compress = false
15✔
124
  let format: CompressionFormat | undefined
125
  let outputFilePath: string | undefined
126

127
  if (args.includes('--help') || args.includes('-h')) {
15✔
128
    return {
1✔
129
      compress,
130
      format,
131
      outputFilePath: DEFAULT_OUTPUT_FILE_PATH,
132
      showHelp: true,
133
    }
134
  }
135

136
  for (let index = 0; index < args.length; index++) {
14✔
137
    const arg = args[index]
28✔
138

139
    if (arg === '--compress' || arg === '-z') {
28✔
140
      compress = true
9✔
141
      continue
9✔
142
    }
143

144
    if (arg === '--format' || arg.startsWith('--format=')) {
19✔
145
      const [rawFormat, nextIndex] = getOptionValue('--format', args, index)
7✔
146
      format = parseCompressionFormat(rawFormat)
5✔
147
      index = nextIndex
5✔
148
      continue
5✔
149
    }
150

151
    if (arg.startsWith('-')) {
12✔
152
      throw new Error(`Unknown option: ${arg}`)
1✔
153
    }
154

155
    if (outputFilePath) {
11✔
156
      throw new Error(`Unexpected extra argument: ${arg}`)
1✔
157
    }
158

159
    outputFilePath = arg
10✔
160
  }
161

162
  if (!compress && format) {
10✔
163
    throw new Error('--format requires --compress')
1✔
164
  }
165

166
  outputFilePath = outputFilePath ?? DEFAULT_OUTPUT_FILE_PATH
9✔
167

168
  if (compress && !format) {
9✔
169
    format = getCompressionFormatFromExtension(outputFilePath) ?? CompressionFormat.GZIP
3✔
170
  }
171

172
  return {
9✔
173
    compress,
174
    format,
175
    outputFilePath,
176
    showHelp: false,
177
  }
178
}
179

180
type EventRow = {
181
  event_id: Buffer
182
  event_pubkey: Buffer
183
  event_kind: number
184
  event_created_at: number
185
  event_content: string
186
  event_tags: unknown[] | null
187
  event_signature: Buffer
188
}
189

190
async function exportEvents(): Promise<void> {
191
  const options = parseCliArgs(process.argv.slice(2))
2✔
192
  if (options.showHelp) {
2!
NEW
193
    printUsage()
×
NEW
194
    return
×
195
  }
196

197
  const outputPath = path.resolve(options.outputFilePath)
2✔
198
  const db = getMasterDbClient()
2✔
199
  const abortController = new AbortController()
2✔
200
  let interruptedBySignal: NodeJS.Signals | undefined
201

202
  const onSignal = (signal: NodeJS.Signals) => {
2✔
203
    if (abortController.signal.aborted) {
×
204
      return
×
205
    }
206

207
    interruptedBySignal = signal
×
208
    process.exitCode = 130
×
209
    console.log(`${signal} received. Stopping export...`)
×
210
    abortController.abort()
×
211
  }
212

213
  process.on('SIGINT', onSignal).on('SIGTERM', onSignal)
2✔
214

215
  try {
2✔
216
    const firstEvent = await db('events').select('event_id').whereNull('deleted_at').first()
2✔
217

218
    if (abortController.signal.aborted) {
2!
219
      return
×
220
    }
221

222
    if (!firstEvent) {
2!
223
      console.log('No events to export.')
×
224
      return
×
225
    }
226

227
    if (options.format) {
2!
228
      console.log(`Exporting events to ${outputPath} using ${getCompressionLabel(options.format)} compression`)
2✔
229
    } else {
NEW
230
      console.log(`Exporting events to ${outputPath}`)
×
231
    }
232

233
    const startedAt = Date.now()
2✔
234
    const output = fs.createWriteStream(outputPath)
2✔
235
    const compressionStream = createCompressionStream(options.format)
2✔
236
    let exported = 0
2✔
237
    let rawBytes = 0
2✔
238

239
    const dbStream = db('events')
2✔
240
      .select(
241
        'event_id',
242
        'event_pubkey',
243
        'event_kind',
244
        'event_created_at',
245
        'event_content',
246
        'event_tags',
247
        'event_signature',
248
      )
249
      .whereNull('deleted_at')
250
      .orderBy('event_created_at', 'asc')
251
      .orderBy('event_id', 'asc')
252
      .stream()
253

254
    const toJsonLine = new Transform({
2✔
255
      objectMode: true,
256
      transform(row: EventRow, _encoding, callback) {
257
        const event = {
6✔
258
          id: row.event_id.toString('hex'),
259
          pubkey: row.event_pubkey.toString('hex'),
260
          created_at: row.event_created_at,
261
          kind: row.event_kind,
262
          tags: Array.isArray(row.event_tags) ? row.event_tags : [],
6!
263
          content: row.event_content,
264
          sig: row.event_signature.toString('hex'),
265
        }
266

267
        exported++
6✔
268
        if (exported % 10000 === 0) {
6!
269
          console.log(`Exported ${exported} events...`)
×
270
        }
271

272
        const line = JSON.stringify(event) + '\n'
6✔
273
        rawBytes += Buffer.byteLength(line)
6✔
274
        callback(null, line)
6✔
275
      },
276
    })
277

278
    await pipeline(dbStream, toJsonLine, compressionStream, output, {
2✔
279
      signal: abortController.signal,
280
    })
281

282
    const elapsedMs = Date.now() - startedAt
2✔
283
    const outputBytes = output.bytesWritten
2✔
284
    const compressionDelta = formatCompressionDelta(rawBytes, outputBytes)
2✔
285
    const eventRate = getRatePerSecond(exported, elapsedMs)
2✔
286
    const rawRate = getRatePerSecond(rawBytes, elapsedMs)
2✔
287
    const outputRate = getRatePerSecond(outputBytes, elapsedMs)
2✔
288

289
    console.log(`Export complete: ${exported} events written to ${outputPath}`)
2✔
290
    if (compressionDelta) {
2!
291
      console.log(`Size: ${formatBytes(rawBytes)} raw -> ${formatBytes(outputBytes)} on disk (${compressionDelta})`)
2✔
292
    } else {
NEW
293
      console.log(`Size: ${formatBytes(outputBytes)} on disk`)
×
294
    }
295

296
    console.log(
2✔
297
      `Throughput: ${formatCount(eventRate)} events/s | ${formatBytes(rawRate)}/s raw | ${formatBytes(outputRate)}/s output`,
298
    )
299
  } catch (error) {
300
    if (abortController.signal.aborted) {
×
301
      console.log(`Export interrupted by ${interruptedBySignal ?? 'signal'}.`)
×
302
      process.exitCode = 130
×
303
      return
×
304
    }
305

306
    throw error
×
307
  } finally {
308
    process.off('SIGINT', onSignal).off('SIGTERM', onSignal)
2✔
309

310
    await db.destroy()
2✔
311
  }
312
}
313

314
if (require.main === module) {
3✔
315
  exportEvents().catch((error) => {
2✔
NEW
316
    console.error('Export failed:', error.message)
×
NEW
317
    process.exit(1)
×
318
  })
319
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc