• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

haraka / Haraka / 26860023436

03 Jun 2026 02:27AM UTC coverage: 72.488% (-0.006%) from 72.494%
26860023436

push

github

web-flow
dep(eslint): update @haraka/eslint-config to v3, fix surfaced warnings (#3586)

1721 of 2268 branches covered (75.88%)

33 of 45 new or added lines in 15 files covered. (73.33%)

19 existing lines in 3 files now uncovered.

7807 of 10770 relevant lines covered (72.49%)

25.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.64
/outbound/queue.js
1
'use strict'
9✔
2

9✔
3
const child_process = require('node:child_process')
9✔
4
const fs = require('node:fs/promises')
9✔
5
const path = require('node:path')
9✔
6

9✔
7
const { Address } = require('../address')
9✔
8
const config = require('haraka-config')
9✔
9

9✔
10
const logger = require('../logger')
9✔
11
const TimerQueue = require('./timer_queue')
9✔
12
const HMailItem = require('./hmail')
9✔
13
const obc = require('./config')
9✔
14
const _qfile = require('./qfile')
9✔
15
const obtls = require('./tls')
9✔
16

9✔
17
class Queue {
9✔
18
    constructor(worker) {
9✔
19
        this.worker = worker
18✔
20
        this.tasks = []
18✔
21
        this.running = 0
18✔
22
        this._scheduled = false
18✔
23
    }
18✔
24

9✔
25
    push(task) {
9✔
26
        this.tasks.push(task)
1✔
27
        this._schedule()
1✔
28
    }
1✔
29

9✔
30
    length() {
9✔
31
        return this.tasks.length + this.running
13✔
32
    }
13✔
33

9✔
34
    _schedule() {
9✔
35
        if (this._scheduled) return
1!
36
        this._scheduled = true
1✔
37
        setImmediate(() => {
1✔
38
            this._scheduled = false
1✔
39
            this._process()
1✔
40
        })
1✔
41
    }
1✔
42

9✔
43
    _process() {
9✔
44
        while (this.running < obc.cfg.concurrency_max && this.tasks.length > 0) {
1✔
45
            this.running++
1✔
46

1✔
47
            this.worker(this.tasks.shift())
1✔
48
                .catch((err) => {
1✔
49
                    logger.error(exports, `Queue worker error: ${err}`)
×
50
                })
1✔
51
                .finally(() => {
1✔
52
                    this.running--
×
53
                    this._schedule()
×
54
                })
1✔
55
        }
1✔
56
    }
1✔
57
}
9✔
58

9✔
59
exports.name = 'outbound/queue'
9✔
60

9✔
61
if (config.get('queue_dir')) {
9!
62
    exports.queue_dir = path.resolve(config.get('queue_dir'))
×
63
} else if (process.env.HARAKA) {
9!
64
    exports.queue_dir = path.resolve(process.env.HARAKA, 'queue')
×
65
} else {
9✔
66
    exports.queue_dir = path.resolve('test', 'test-queue')
9✔
67
}
9✔
68

9✔
69
const load_queue = new Queue(async (file) => {
9✔
70
    const hmail = new HMailItem(file, path.join(exports.queue_dir, file))
×
71
    exports._add_hmail(hmail)
×
72
    await new Promise((resolve, reject) => {
×
73
        const onReady = () => {
×
74
            hmail.off('error', onError)
×
75
            resolve()
×
76
        }
×
77
        const onError = (err) => {
×
78
            hmail.off('ready', onReady)
×
79
            reject(err)
×
80
        }
×
81
        hmail.once('ready', onReady)
×
82
        hmail.once('error', onError)
×
83
    })
×
84
})
9✔
85

9✔
86
let in_progress = 0
9✔
87
const delivery_queue = (exports.delivery_queue = new Queue(async (hmail) => {
9✔
88
    in_progress++
1✔
89
    await new Promise((resolve) => {
1✔
90
        hmail.next_cb = () => {
1✔
91
            in_progress--
×
92
            resolve()
×
93
        }
×
94
        if (obtls.cfg) {
1✔
95
            hmail.send()
1✔
96
        } else {
1!
97
            obtls.init(() => {
×
98
                hmail.send()
×
99
            })
×
100
        }
×
101
    })
1!
102
}))
9✔
103

9✔
104
const temp_fail_queue = (exports.temp_fail_queue = new TimerQueue())
9✔
105

9✔
106
let queue_count = 0
9✔
107

9✔
108
exports.get_stats = () => `${in_progress}/${exports.delivery_queue.length()}/${exports.temp_fail_queue.length()}`
9✔
109

9✔
110
exports.list_queue = async () => {
9✔
111
    return exports._load_cur_queue(null, exports._list_file)
2✔
112
}
2✔
113

9✔
114
exports._stat_file = async () => {
9✔
115
    queue_count++
2✔
116
}
2✔
117

9✔
118
exports.stat_queue = async () => {
9✔
119
    await exports._load_cur_queue(null, exports._stat_file)
1✔
120
    return exports.stats()
1✔
121
}
1✔
122

9✔
123
exports.init_queue = async (pid) => {
9✔
124
    // Initialise and load queue
6✔
125
    // This function is called first when not running under cluster,
6✔
126
    await exports.ensure_queue_dir()
6✔
127
    await exports.delete_dot_files()
6✔
128

6✔
129
    await exports._load_cur_queue(pid, exports._add_file)
6✔
130
    logger.info(exports, `[pid: ${pid}] ${delivery_queue.length()} files in my delivery queue`)
6✔
131
    logger.info(exports, `[pid: ${pid}] ${load_queue.length()} files in my load queue`)
6✔
132
    logger.info(exports, `[pid: ${pid}] ${temp_fail_queue.length()} files in my temp fail queue`)
6✔
133
}
6✔
134

9✔
135
exports._load_cur_queue = async (pid, iteratee) => {
9✔
136
    logger.info(exports, 'Loading outbound queue from ', exports.queue_dir)
10✔
137
    let files
10✔
138
    try {
10✔
139
        files = await fs.readdir(exports.queue_dir)
10✔
140
    } catch (err) {
10!
141
        logger.error(exports, `Failed to load queue directory (${exports.queue_dir}): ${err}`)
×
142
        throw err
×
143
    }
×
144

10✔
145
    exports.cur_time = new Date() // set once so we're not calling it a lot
10✔
146

10✔
147
    return await exports.load_queue_files(pid, files, iteratee)
10✔
148
}
10✔
149

9✔
150
exports.read_parts = (file) => {
9✔
151
    if (file.startsWith(_qfile.platformDOT)) {
18✔
152
        logger.warn(exports, `'Skipping' dot-file in queue folder: ${file}`)
1✔
153
        return false
1✔
154
    }
1✔
155

17✔
156
    if (file.startsWith('error.')) {
18✔
157
        logger.warn(exports, `'Skipping' error file in queue folder: ${file}`)
1✔
158
        return false
1✔
159
    }
1✔
160

16✔
161
    const parts = _qfile.parts(file)
16✔
162
    if (!parts) {
18✔
163
        logger.error(exports, `Unrecognized file in queue folder: ${file}`)
3✔
164
        return false
3✔
165
    }
3✔
166

13✔
167
    return parts
13✔
168
}
18✔
169

9✔
170
exports.rename_to_actual_pid = async (file, parts) => {
9✔
171
    // maintain some original details for the rename
×
172
    const new_filename = _qfile.name({
×
173
        arrival: parts.arrival,
×
174
        uid: parts.uid,
×
175
        next_attempt: parts.next_attempt,
×
176
        attempts: parts.attempts,
×
177
    })
×
178

×
179
    try {
×
180
        await fs.rename(path.join(exports.queue_dir, file), path.join(exports.queue_dir, new_filename))
×
181
        return new_filename
×
182
    } catch (err) {
×
NEW
183
        throw new Error(`Unable to rename queue file: ${file} to ${new_filename}`, {
×
NEW
184
            cause: err,
×
NEW
185
        })
×
186
    }
×
187
}
×
188

9✔
189
exports._add_file = async (file) => {
9✔
190
    const parts = _qfile.parts(file)
×
191

×
192
    if (parts.next_attempt <= exports.cur_time) {
×
193
        logger.debug(exports, `File ${file} needs processing now`)
×
194
        load_queue.push(file)
×
195
    } else {
×
196
        logger.debug(exports, `File ${file} needs processing later: ${parts.next_attempt - exports.cur_time}ms`)
×
197
        temp_fail_queue.add(file, parts.next_attempt - exports.cur_time, () => {
×
198
            load_queue.push(file)
×
199
        })
×
200
    }
×
201
    return file
×
202
}
×
203

9✔
204
exports.load_queue_files = async (pid, input_files, iteratee) => {
9✔
205
    const searchPid = parseInt(pid)
13✔
206

13✔
207
    let stat_renamed = 0
13✔
208
    let stat_loaded = 0
13✔
209

13✔
210
    if (searchPid) {
13✔
211
        logger.info(exports, `Grabbing queue files for pid: ${pid}`)
1✔
212
    } else {
13✔
213
        logger.info(exports, 'Loading the queue...')
5✔
214
    }
5✔
215

13✔
216
    const results = await Promise.all(
13✔
217
        input_files.map(async (file) => {
13✔
218
            const parts = exports.read_parts(file)
12✔
219
            if (!parts) return null
12✔
220

10✔
221
            if (!searchPid) {
12✔
222
                stat_loaded++
8✔
223
                return file
8✔
224
            }
8✔
225

2✔
226
            if (parts.pid !== searchPid) return null
12✔
227

1✔
228
            try {
1✔
229
                const renamed_file = await exports.rename_to_actual_pid(file, parts)
1!
230
                stat_renamed++
×
231
                stat_loaded++
×
232
                return renamed_file
×
233
            } catch (error) {
12✔
234
                logger.error(exports, `${error.message}`)
1✔
235
                return null
1✔
236
            }
1✔
237
        }),
13✔
238
    )
13✔
239

13✔
240
    if (searchPid) logger.info(exports, `[pid: ${pid}] ${stat_renamed} files old PID queue fixed up`)
13✔
241
    logger.debug(exports, `[pid: ${pid}] ${stat_loaded} files loaded`)
13✔
242

13✔
243
    const iterateeResults = await Promise.all(results.filter((i) => i).map(async (item) => await iteratee(item)))
13✔
244

13✔
245
    return iterateeResults.filter((result) => result !== null && result !== undefined)
13✔
246
}
13✔
247

9✔
248
exports.stats = () => {
9✔
249
    return {
1✔
250
        queue_dir: exports.queue_dir,
1✔
251
        queue_count,
1✔
252
    }
1✔
253
}
1✔
254

9✔
255
// position `position`. Loops to handle partial reads.
9✔
256
// Read exactly `length` bytes into `buffer` starting at `offset`, from file
9✔
257
async function readFull(handle, buffer, offset, length, position) {
4✔
258
    let totalRead = 0
4✔
259
    while (totalRead < length) {
4✔
260
        const { bytesRead } = await handle.read(buffer, offset + totalRead, length - totalRead, position + totalRead)
4✔
261
        if (bytesRead === 0) {
4!
262
            throw new Error(`Unexpected end of file: read ${totalRead} of ${length} bytes`)
×
263
        }
×
264
        totalRead += bytesRead
4✔
265
    }
4✔
266
}
4✔
267

9✔
268
exports._list_file = async (file) => {
9✔
269
    let handle
2✔
270
    try {
2✔
271
        const filePath = path.join(exports.queue_dir, file)
2✔
272

2✔
273
        handle = await fs.open(filePath, 'r')
2✔
274

2✔
275
        // Read first 4 bytes to get the todo length
2✔
276
        const buf = Buffer.alloc(4)
2✔
277
        await readFull(handle, buf, 0, 4, 0)
2✔
278
        const todo_len = (buf[0] << 24) + (buf[1] << 16) + (buf[2] << 8) + buf[3]
2✔
279

2✔
280
        const todoBuf = Buffer.alloc(todo_len)
2✔
281
        await readFull(handle, todoBuf, 0, todo_len, 4)
2✔
282

2✔
283
        const todo = todoBuf.toString('utf8')
2✔
284
        const todo_struct = JSON.parse(todo)
2✔
285
        todo_struct.rcpt_to = todo_struct.rcpt_to.map((a) => new Address(a))
2✔
286
        todo_struct.mail_from = new Address(todo_struct.mail_from)
2✔
287
        todo_struct.file = file
2✔
288
        todo_struct.full_path = filePath
2✔
289
        const parts = _qfile.parts(file)
2✔
290
        todo_struct.pid = parts?.pid || null
2!
291
        return todo_struct
2✔
292
    } catch (err) {
2!
293
        console.error(`Error reading queue file: ${file}:`, err)
×
294
        return null
×
295
    } finally {
2✔
296
        if (handle)
2✔
297
            await handle.close().catch((err) => console.error(`Failed to close queue file handle for ${file}:`, err))
2✔
298
    }
2✔
299
}
2✔
300

9✔
301
exports.flush_queue = async (domain, pid) => {
9✔
302
    if (domain) {
×
303
        try {
×
304
            const qlist = await exports.list_queue()
×
305
            for (const todo of qlist) {
×
306
                if (todo.domain.toLowerCase() !== domain.toLowerCase()) continue
×
307
                if (pid && todo.pid !== pid) continue
×
308
                delivery_queue.push(new HMailItem(todo.file, todo.full_path))
×
309
            }
×
310
        } catch (err) {
×
311
            logger.error(exports, `Failed to load queue: ${err.message}`)
×
312
        }
×
313
    } else {
×
314
        temp_fail_queue.drain()
×
315
    }
×
316
}
×
317

9✔
318
exports.load_pid_queue = async (pid) => {
9✔
319
    logger.info(exports, `Loading queue for pid: ${pid}`)
1✔
320
    await exports.init_queue(pid)
1✔
321
}
1✔
322

9✔
323
exports.ensure_queue_dir = async () => {
9✔
324
    // this code is only run at start-up.
10✔
325
    try {
10✔
326
        await fs.access(exports.queue_dir)
10✔
327
        return // directory already exists
3✔
328
    } catch (ignore) {
10✔
329
        // directory doesn't exist, try to create it
1✔
330
    }
1✔
331

1✔
332
    logger.debug(exports, `Creating queue directory ${exports.queue_dir}`)
1✔
333
    try {
1✔
334
        await fs.mkdir(exports.queue_dir, { mode: 493 }) // 493 == 0755
1✔
335
        const cfg = config.get('smtp.ini')
1✔
336
        let uid
1✔
337
        let gid
1✔
338
        if (cfg.user) uid = parseInt(child_process.execSync(`id -u ${cfg.user}`).toString().trim(), 10)
10!
339
        if (cfg.group) gid = parseInt(child_process.execSync(`id -g ${cfg.group}`).toString().trim(), 10)
10!
340
        if (uid && gid) {
10!
341
            await fs.chown(exports.queue_dir, uid, gid)
×
342
        } else if (uid) {
10!
343
            await fs.chown(exports.queue_dir, uid, -1)
×
344
        }
×
345
    } catch (err) {
10!
346
        if (err.code !== 'EEXIST') {
×
347
            logger.error(exports, `Error creating queue directory: ${err}`)
×
348
            throw err
×
349
        }
×
350
    }
×
351
}
10✔
352

9✔
353
exports.delete_dot_files = async () => {
9✔
354
    try {
10✔
355
        const files = await fs.readdir(exports.queue_dir)
10✔
356
        for (const file of files) {
10✔
357
            if (file.startsWith(_qfile.platformDOT)) {
10✔
358
                logger.warn(exports, `Removing left over dot-file: ${file}`)
1✔
359
                await fs.unlink(path.join(exports.queue_dir, file))
1✔
360
            }
1✔
361
        }
10✔
362
    } catch (err) {
2✔
363
        logger.error(exports, `Error deleting dot files: ${err}`)
2✔
364
    }
2✔
365
}
10✔
366

9✔
367
exports._add_hmail = (hmail) => {
9✔
368
    if (hmail.next_process <= exports.cur_time) {
2✔
369
        delivery_queue.push(hmail)
1✔
370
    } else {
1✔
371
        temp_fail_queue.add(hmail.filename, hmail.next_process - exports.cur_time, () => {
1✔
372
            delivery_queue.push(hmail)
1✔
373
        })
1✔
374
    }
1✔
375
}
2✔
376

9✔
377
exports.scan_queue_pids = async () => {
9✔
378
    // Under cluster, this is called first by the master
2✔
379
    await exports.ensure_queue_dir()
2✔
380
    await exports.delete_dot_files()
2✔
381

2✔
382
    let files
2✔
383
    try {
2✔
384
        files = await fs.readdir(exports.queue_dir)
2✔
385
    } catch (err) {
1✔
386
        logger.error(exports, `Failed to load queue directory (${exports.queue_dir}): ${err}`)
1✔
387
        throw err
1✔
388
    }
1✔
389

1✔
390
    const pids = {}
1✔
391

1✔
392
    for (const file of files) {
2✔
393
        const parts = exports.read_parts(file)
2✔
394
        if (parts) pids[parts.pid] = true
2✔
395
    }
2✔
396

1✔
397
    return Object.keys(pids)
1✔
398
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc