• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cameri / nostream / 24638969260

19 Apr 2026 08:57PM UTC coverage: 73.033% (+3.9%) from 69.161%
24638969260

push

github

web-flow
feat: support compression when importing/exporting events (gzip/xz) (#514)

* feat: add gzip/xz compression utilities

* feat(import): support compressed event imports

* feat(export): add compression flags and stream metrics

* docs: document compressed import/export usage

* test(integration): add compression roundtrip scenario

* ci: add changeset and knip ignore for lzma-native

* fix(import): propagate decompression stream errors

* fix(deps): make lzma-native optional

* test: remove compression integration roundtrip scenario

* fix(import): resolve deduplication merge conflict handling

* test(integration): restore compression roundtrip scenario

* fix(redis): use logger in hash key methods

1234 of 1789 branches covered (68.98%)

Branch coverage included in aggregate %.

236 of 269 new or added lines in 5 files covered. (87.73%)

53 existing lines in 8 files now uncovered.

3007 of 4018 relevant lines covered (74.84%)

23.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.24
/src/utils/compression.ts
1
import { createGunzip, createGzip } from 'zlib'
5✔
2
import { PassThrough, Transform } from 'stream'
5✔
3
import { cpus } from 'os'
5✔
4
import { extname } from 'path'
5✔
5
import { open } from 'fs/promises'
5✔
6

7
export enum CompressionFormat {
5✔
8
  GZIP = 'gzip',
5✔
9
  XZ = 'xz',
5✔
10
}
11

12
const GZIP_MAGIC = Buffer.from([0x1f, 0x8b])
5✔
13
const XZ_MAGIC = Buffer.from([0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00])
5✔
14

15
const DEFAULT_XZ_PRESET = 6
5✔
16
const DEFAULT_MAX_XZ_THREADS = 4
5✔
17
const MIN_XZ_PRESET = 0
5✔
18
const MAX_XZ_PRESET = 9
5✔
19

20
type LzmaNative = {
21
  createCompressor: (options?: Record<string, unknown>) => Transform
22
  createDecompressor: (options?: Record<string, unknown>) => Transform
23
}
24

25
type Environment = Record<string, string | undefined>
26

27
const getLzmaNative = (): LzmaNative => {
5✔
28
  try {
21✔
29
    return require('lzma-native') as LzmaNative
21✔
30
  } catch (error) {
NEW
31
    const reason = error instanceof Error ? error.message : String(error)
×
32

NEW
33
    throw new Error(`XZ support requires the "lzma-native" package. Install dependencies and try again. (${reason})`)
×
34
  }
35
}
36

37
const parseIntegerEnv = (
5✔
38
  key: string,
39
  env: Environment,
40
): number | undefined => {
41
  const rawValue = env[key]
39✔
42
  if (!rawValue || rawValue.trim() === '') {
39✔
43
    return undefined
31✔
44
  }
45

46
  if (!/^-?\d+$/.test(rawValue.trim())) {
8✔
47
    throw new Error(`Invalid ${key}: ${rawValue}. Expected an integer.`)
2✔
48
  }
49

50
  return Number(rawValue)
6✔
51
}
52

53
export const getXzCompressionOptions = (
5✔
54
  cpuCount: number,
55
  env: Environment = process.env,
10✔
56
): { preset: number; threads: number } => {
57
  const parsedPreset = parseIntegerEnv('NOSTREAM_XZ_PRESET', env)
21✔
58
  const preset = parsedPreset ?? DEFAULT_XZ_PRESET
20✔
59

60
  if (preset < MIN_XZ_PRESET || preset > MAX_XZ_PRESET) {
20✔
61
    throw new Error(
2✔
62
      `Invalid NOSTREAM_XZ_PRESET: ${preset}. Expected an integer between ${MIN_XZ_PRESET} and ${MAX_XZ_PRESET}.`,
63
    )
64
  }
65

66
  const parsedThreadCap = parseIntegerEnv('NOSTREAM_XZ_THREADS', env)
18✔
67
  if (parsedThreadCap !== undefined && parsedThreadCap <= 0) {
17✔
68
    throw new Error('Invalid NOSTREAM_XZ_THREADS: expected a positive integer.')
1✔
69
  }
70

71
  // Keep one core available by default to reduce contention with the running relay.
72
  const availableThreads = Math.max(1, Math.max(1, Math.trunc(cpuCount)) - 1)
16✔
73
  const maxThreads = parsedThreadCap ?? DEFAULT_MAX_XZ_THREADS
16✔
74

75
  return {
16✔
76
    preset,
77
    threads: Math.max(1, Math.min(availableThreads, maxThreads)),
78
  }
79
}
80

81
export const parseCompressionFormat = (input: string): CompressionFormat => {
5✔
82
  switch (input.trim().toLowerCase()) {
10✔
83
    case 'gzip':
84
    case 'gz':
85
      return CompressionFormat.GZIP
6✔
86
    case 'xz':
87
      return CompressionFormat.XZ
3✔
88
    default:
89
      throw new Error(`Unsupported compression format: ${input}. Use gzip|gz|xz.`)
1✔
90
  }
91
}
92

93
export const getCompressionFormatFromExtension = (
5✔
94
  filePath: string,
95
): CompressionFormat | undefined => {
96
  switch (extname(filePath).toLowerCase()) {
12✔
97
    case '.gz':
98
      return CompressionFormat.GZIP
4✔
99
    case '.xz':
100
      return CompressionFormat.XZ
5✔
101
    default:
102
      return undefined
3✔
103
  }
104
}
105

106
export const getCompressionFormatFromHeader = (
5✔
107
  header: Buffer,
108
): CompressionFormat | undefined => {
109
  if (header.length >= GZIP_MAGIC.length && header.subarray(0, GZIP_MAGIC.length).equals(GZIP_MAGIC)) {
8✔
110
    return CompressionFormat.GZIP
4✔
111
  }
112

113
  if (header.length >= XZ_MAGIC.length && header.subarray(0, XZ_MAGIC.length).equals(XZ_MAGIC)) {
4✔
114
    return CompressionFormat.XZ
2✔
115
  }
116

117
  return undefined
2✔
118
}
119

120
const readFileHeader = async (filePath: string, bytes = XZ_MAGIC.length): Promise<Buffer> => {
5✔
121
  const fileHandle = await open(filePath, 'r')
5✔
122

123
  try {
5✔
124
    const header = Buffer.alloc(bytes)
5✔
125
    const { bytesRead } = await fileHandle.read(header, 0, bytes, 0)
5✔
126

127
    return header.subarray(0, bytesRead)
5✔
128
  } finally {
129
    await fileHandle.close()
5✔
130
  }
131
}
132

133
export const detectCompressionFormat = async (
5✔
134
  filePath: string,
135
): Promise<CompressionFormat | undefined> => {
136
  const extensionFormat = getCompressionFormatFromExtension(filePath)
5✔
137
  const header = await readFileHeader(filePath)
5✔
138
  const headerFormat = getCompressionFormatFromHeader(header)
5✔
139

140
  if (extensionFormat && headerFormat && extensionFormat !== headerFormat) {
5✔
141
    throw new Error(
1✔
142
      `Compression mismatch for ${filePath}: extension suggests ${extensionFormat} but header is ${headerFormat}.`,
143
    )
144
  }
145

146
  return headerFormat ?? extensionFormat
4✔
147
}
148

149
export const createCompressionStream = (
5✔
150
  format?: CompressionFormat,
151
): Transform => {
152
  if (!format) {
23✔
153
    return new PassThrough()
1✔
154
  }
155

156
  switch (format) {
22!
157
    case CompressionFormat.GZIP:
158
      return createGzip()
12✔
159
    case CompressionFormat.XZ: {
160
      const lzmaNative = getLzmaNative()
10✔
161
      const { preset, threads } = getXzCompressionOptions(cpus().length)
10✔
162

163
      return lzmaNative.createCompressor({
10✔
164
        preset,
165
        threads,
166
      })
167
    }
168
    default:
NEW
169
      throw new Error(`Unsupported compression format: ${String(format)}`)
×
170
  }
171
}
172

173
export const createDecompressionStream = (
5✔
174
  format?: CompressionFormat,
175
): Transform => {
176
  if (!format) {
23✔
177
    return new PassThrough()
1✔
178
  }
179

180
  switch (format) {
22!
181
    case CompressionFormat.GZIP:
182
      return createGunzip()
11✔
183
    case CompressionFormat.XZ:
184
      return getLzmaNative().createDecompressor()
11✔
185
    default:
NEW
186
      throw new Error(`Unsupported compression format: ${String(format)}`)
×
187
  }
188
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc