• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

haraka / Haraka / 22929902245

11 Mar 2026 12:02AM UTC coverage: 63.783%. First build
22929902245

Pull #3528

github

web-flow
Merge a16d0f189 into e85fd8c5b
Pull Request #3528: Outbound: replace fs callbacks with promises and async/await

975 of 1452 branches covered (67.15%)

181 of 346 new or added lines in 5 files covered. (52.31%)

5514 of 8645 relevant lines covered (63.78%)

28.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.12
/outbound/queue.js
1
'use strict'
1✔
2

1✔
3
const child_process = require('node:child_process')
1✔
4
const fs = require('node:fs/promises')
1✔
5
const path = require('node:path')
1✔
6

1✔
7
const { Address } = require('address-rfc2821')
1✔
8
const config = require('haraka-config')
1✔
9

1✔
10
const logger = require('../logger')
1✔
11
const TimerQueue = require('./timer_queue')
1✔
12
const HMailItem = require('./hmail')
1✔
13
const obc = require('./config')
1✔
14
const _qfile = require('./qfile')
1✔
15
const obtls = require('./tls')
1✔
16

1✔
17
class Queue {
1✔
18
    constructor(worker) {
1✔
19
        this.worker = worker
2✔
20
        this.tasks = []
2✔
21
        this.running = 0
2✔
22
        this._scheduled = false
2✔
23
    }
2✔
24

1✔
25
    push(task) {
1✔
NEW
26
        this.tasks.push(task)
×
NEW
27
        this._schedule()
×
NEW
28
    }
×
29

1✔
30
    length() {
1✔
31
        return this.tasks.length + this.running
15✔
32
    }
15✔
33

1✔
34
    _schedule() {
1✔
NEW
35
        if (this._scheduled) return
×
NEW
36
        this._scheduled = true
×
NEW
37
        setImmediate(() => {
×
NEW
38
            this._scheduled = false
×
NEW
39
            this._process()
×
NEW
40
        })
×
NEW
41
    }
×
42

1✔
43
    _process() {
1✔
NEW
44
        while (this.running < obc.cfg.concurrency_max && this.tasks.length > 0) {
×
NEW
45
            this.running++
×
NEW
46

×
NEW
47
            this.worker(this.tasks.shift())
×
NEW
48
                .catch((err) => {
×
NEW
49
                    logger.error(exports, `Queue worker error: ${err}`)
×
NEW
50
                })
×
NEW
51
                .finally(() => {
×
NEW
52
                    this.running--
×
NEW
53
                    this._schedule()
×
NEW
54
                })
×
NEW
55
        }
×
NEW
56
    }
×
57
}
1✔
58

1✔
59
exports.name = 'outbound/queue'
1✔
60

1✔
61
if (config.get('queue_dir')) {
1!
62
    exports.queue_dir = path.resolve(config.get('queue_dir'))
×
63
} else if (process.env.HARAKA) {
1!
64
    exports.queue_dir = path.resolve(process.env.HARAKA, 'queue')
×
65
} else {
1✔
66
    exports.queue_dir = path.resolve('test', 'test-queue')
1✔
67
}
1✔
68

1✔
69
const load_queue = new Queue(async (file) => {
1✔
70
    const hmail = new HMailItem(file, path.join(exports.queue_dir, file))
×
71
    exports._add_hmail(hmail)
×
NEW
72
    await new Promise((resolve, reject) => {
×
NEW
73
        const onReady = () => {
×
NEW
74
            hmail.off('error', onError)
×
NEW
75
            resolve()
×
NEW
76
        }
×
NEW
77
        const onError = (err) => {
×
NEW
78
            hmail.off('ready', onReady)
×
NEW
79
            reject(err)
×
NEW
80
        }
×
NEW
81
        hmail.once('ready', onReady)
×
NEW
82
        hmail.once('error', onError)
×
NEW
83
    })
×
84
})
1✔
85

1✔
86
let in_progress = 0
1✔
87
const delivery_queue = (exports.delivery_queue = new Queue(async (hmail) => {
1✔
88
    in_progress++
×
NEW
89
    await new Promise((resolve) => {
×
NEW
90
        hmail.next_cb = () => {
×
NEW
91
            in_progress--
×
NEW
92
            resolve()
×
NEW
93
        }
×
NEW
94
        if (obtls.cfg) {
×
NEW
95
            hmail.send()
×
NEW
96
        } else {
×
NEW
97
            obtls.init(() => {
×
NEW
98
                hmail.send()
×
NEW
99
            })
×
NEW
100
        }
×
101
    })
×
102
}))
1✔
103

1✔
104
const temp_fail_queue = (exports.temp_fail_queue = new TimerQueue())
1✔
105

1✔
106
let queue_count = 0
1✔
107

1✔
108
exports.get_stats = () => `${in_progress}/${exports.delivery_queue.length()}/${exports.temp_fail_queue.length()}`
1✔
109

1✔
110
exports.list_queue = async () => {
1✔
111
    return exports._load_cur_queue(null, exports._list_file)
2✔
112
}
2✔
113

1✔
114
exports._stat_file = async (file) => {
1✔
115
    queue_count++
2✔
116
}
2✔
117

1✔
118
exports.stat_queue = async () => {
1✔
119
    await exports._load_cur_queue(null, exports._stat_file)
1✔
120
    return exports.stats()
1✔
121
}
1✔
122

1✔
123
exports.init_queue = async (pid) => {
1✔
124
    // Initialise and load queue
7✔
125
    // This function is called first when not running under cluster,
7✔
126
    await exports.ensure_queue_dir()
7✔
127
    await exports.delete_dot_files()
7✔
128

7✔
129
    await exports._load_cur_queue(pid, exports._add_file)
7✔
130
    logger.info(exports, `[pid: ${pid}] ${delivery_queue.length()} files in my delivery queue`)
7✔
131
    logger.info(exports, `[pid: ${pid}] ${load_queue.length()} files in my load queue`)
7✔
132
    logger.info(exports, `[pid: ${pid}] ${temp_fail_queue.length()} files in my temp fail queue`)
7✔
133
}
7✔
134

1✔
135
exports._load_cur_queue = async (pid, iteratee) => {
1✔
136
    logger.info(exports, 'Loading outbound queue from ', exports.queue_dir)
11✔
137
    let files
11✔
138
    try {
11✔
139
        files = await fs.readdir(exports.queue_dir)
11✔
140
    } catch (err) {
11!
NEW
141
        logger.error(exports, `Failed to load queue directory (${exports.queue_dir}): ${err}`)
×
NEW
142
        throw err
×
NEW
143
    }
×
144

11✔
145
    exports.cur_time = new Date() // set once so we're not calling it a lot
11✔
146

11✔
147
    return await exports.load_queue_files(pid, files, iteratee)
11✔
148
}
11✔
149

1✔
150
exports.read_parts = (file) => {
1✔
151
    if (file.startsWith(_qfile.platformDOT)) {
16✔
152
        logger.warn(exports, `'Skipping' dot-file in queue folder: ${file}`)
1✔
153
        return false
1✔
154
    }
1✔
155

15✔
156
    if (file.startsWith('error.')) {
16✔
157
        logger.warn(exports, `'Skipping' error file in queue folder: ${file}`)
1✔
158
        return false
1✔
159
    }
1✔
160

14✔
161
    const parts = _qfile.parts(file)
14✔
162
    if (!parts) {
16✔
163
        logger.error(exports, `Unrecognized file in queue folder: ${file}`)
3✔
164
        return false
3✔
165
    }
3✔
166

11✔
167
    return parts
11✔
168
}
16✔
169

1✔
170
exports.rename_to_actual_pid = async (file, parts) => {
1✔
171
    // maintain some original details for the rename
×
172
    const new_filename = _qfile.name({
×
173
        arrival: parts.arrival,
×
174
        uid: parts.uid,
×
175
        next_attempt: parts.next_attempt,
×
176
        attempts: parts.attempts,
×
177
    })
×
178

×
NEW
179
    try {
×
NEW
180
        await fs.rename(path.join(exports.queue_dir, file), path.join(exports.queue_dir, new_filename))
×
NEW
181
        return new_filename
×
NEW
182
    } catch (err) {
×
NEW
183
        throw new Error(`Unable to rename queue file: ${file} to ${new_filename} : ${err}`)
×
NEW
184
    }
×
185
}
×
186

1✔
187
exports._add_file = async (file) => {
1✔
188
    const parts = _qfile.parts(file)
×
189

×
NEW
190
    if (parts.next_attempt <= exports.cur_time) {
×
191
        logger.debug(exports, `File ${file} needs processing now`)
×
192
        load_queue.push(file)
×
193
    } else {
×
NEW
194
        logger.debug(exports, `File ${file} needs processing later: ${parts.next_attempt - exports.cur_time}ms`)
×
NEW
195
        temp_fail_queue.add(file, parts.next_attempt - exports.cur_time, () => {
×
196
            load_queue.push(file)
×
197
        })
×
198
    }
×
NEW
199
    return file
×
200
}
×
201

1✔
202
exports.load_queue_files = async (pid, input_files, iteratee) => {
1✔
203
    const searchPid = parseInt(pid)
14✔
204

14✔
205
    let stat_renamed = 0
14✔
206
    let stat_loaded = 0
14✔
207

14✔
208
    if (searchPid) {
14✔
209
        logger.info(exports, `Grabbing queue files for pid: ${pid}`)
1✔
210
    } else {
14✔
211
        logger.info(exports, 'Loading the queue...')
13✔
212
    }
13✔
213

14✔
214
    const results = await Promise.all(
14✔
215
        input_files.map(async (file) => {
14✔
216
            const parts = exports.read_parts(file)
12✔
217
            if (!parts) return null
12✔
218

10✔
219
            if (!searchPid) {
12✔
220
                stat_loaded++
8✔
221
                return file
8✔
222
            }
8✔
223

2✔
224
            if (parts.pid !== searchPid) return null
12✔
225

1✔
226
            try {
1✔
227
                const renamed_file = await exports.rename_to_actual_pid(file, parts)
1✔
NEW
228
                stat_renamed++
×
229
                stat_loaded++
×
NEW
230
                return renamed_file
×
231
            } catch (error) {
12✔
232
                logger.error(exports, `${error.message}`)
1✔
233
                return null
1✔
234
            }
1✔
235
        }),
14✔
236
    )
14✔
237

14✔
238
    if (searchPid) logger.info(exports, `[pid: ${pid}] ${stat_renamed} files old PID queue fixed up`)
14✔
239
    logger.debug(exports, `[pid: ${pid}] ${stat_loaded} files loaded`)
14✔
240

14✔
241
    const iterateeResults = await Promise.all(results.filter((i) => i).map(async (item) => await iteratee(item)))
14✔
242

14✔
243
    return iterateeResults.filter((result) => result !== null && result !== undefined)
14✔
244
}
14✔
245

1✔
246
exports.stats = () => {
1✔
247
    return {
1✔
248
        queue_dir: exports.queue_dir,
1✔
249
        queue_count,
1✔
250
    }
1✔
251
}
1✔
252

1✔
253
// position `position`. Loops to handle partial reads.
1✔
254
// Read exactly `length` bytes into `buffer` starting at `offset`, from file
1✔
255
async function readFull(handle, buffer, offset, length, position) {
4✔
256
    let totalRead = 0
4✔
257
    while (totalRead < length) {
4✔
258
        const { bytesRead } = await handle.read(buffer, offset + totalRead, length - totalRead, position + totalRead)
4✔
259
        if (bytesRead === 0) {
4!
NEW
260
            throw new Error(`Unexpected end of file: read ${totalRead} of ${length} bytes`)
×
NEW
261
        }
×
262
        totalRead += bytesRead
4✔
263
    }
4✔
264
}
4✔
265

1✔
266
exports._list_file = async (file) => {
1✔
267
    let handle
2✔
268
    try {
2✔
269
        const filePath = path.join(exports.queue_dir, file)
2✔
270

2✔
271
        handle = await fs.open(filePath, 'r')
2✔
272

2✔
273
        // Read first 4 bytes to get the todo length
2✔
274
        const buf = Buffer.alloc(4)
2✔
275
        await readFull(handle, buf, 0, 4, 0)
2✔
276
        const todo_len = (buf[0] << 24) + (buf[1] << 16) + (buf[2] << 8) + buf[3]
2✔
277

2✔
278
        const todoBuf = Buffer.alloc(todo_len)
2✔
279
        await readFull(handle, todoBuf, 0, todo_len, 4)
2✔
280

2✔
281
        const todo = todoBuf.toString('utf8')
2✔
282
        const todo_struct = JSON.parse(todo)
2✔
283
        todo_struct.rcpt_to = todo_struct.rcpt_to.map((a) => new Address(a))
2✔
284
        todo_struct.mail_from = new Address(todo_struct.mail_from)
2✔
285
        todo_struct.file = file
2✔
286
        todo_struct.full_path = filePath
2✔
287
        const parts = _qfile.parts(file)
2✔
288
        todo_struct.pid = parts?.pid || null
2!
289
        return todo_struct
2✔
290
    } catch (err) {
2!
NEW
291
        console.error(`Error reading queue file: ${file}:`, err)
×
NEW
292
        return null
×
293
    } finally {
2✔
294
        if (handle)
2✔
295
            await handle.close().catch((err) => console.error(`Failed to close queue file handle for ${file}:`, err))
2✔
296
    }
2✔
297
}
2✔
298

1✔
299
exports.flush_queue = async (domain, pid) => {
1✔
300
    if (domain) {
×
NEW
301
        try {
×
NEW
302
            const qlist = await exports.list_queue()
×
303
            for (const todo of qlist) {
×
NEW
304
                if (todo.domain.toLowerCase() !== domain.toLowerCase()) continue
×
NEW
305
                if (pid && todo.pid !== pid) continue
×
306
                delivery_queue.push(new HMailItem(todo.file, todo.full_path))
×
307
            }
×
NEW
308
        } catch (err) {
×
NEW
309
            logger.error(exports, `Failed to load queue: ${err.message}`)
×
NEW
310
        }
×
311
    } else {
×
312
        temp_fail_queue.drain()
×
313
    }
×
314
}
×
315

1✔
316
exports.load_pid_queue = async (pid) => {
1✔
317
    logger.info(exports, `Loading queue for pid: ${pid}`)
1✔
318
    await exports.init_queue(pid)
1✔
319
}
1✔
320

1✔
321
exports.ensure_queue_dir = async () => {
1✔
322
    // this code is only run at start-up.
9✔
323
    try {
9✔
324
        await fs.access(exports.queue_dir)
9✔
325
        return // directory already exists
8✔
326
    } catch (ignore) {
9✔
327
        // directory doesn't exist, try to create it
1✔
328
    }
1✔
329

1✔
330
    logger.debug(exports, `Creating queue directory ${exports.queue_dir}`)
1✔
331
    try {
1✔
332
        await fs.mkdir(exports.queue_dir, { mode: 493 }) // 493 == 0755
1✔
333
        const cfg = config.get('smtp.ini')
1✔
334
        let uid
1✔
335
        let gid
1✔
336
        if (cfg.user) uid = parseInt(child_process.execSync(`id -u ${cfg.user}`).toString().trim(), 10)
9!
337
        if (cfg.group) gid = parseInt(child_process.execSync(`id -g ${cfg.group}`).toString().trim(), 10)
9!
338
        if (uid && gid) {
9!
NEW
339
            await fs.chown(exports.queue_dir, uid, gid)
×
340
        } else if (uid) {
9!
NEW
341
            await fs.chown(exports.queue_dir, uid, -1)
×
342
        }
×
343
    } catch (err) {
9!
344
        if (err.code !== 'EEXIST') {
×
345
            logger.error(exports, `Error creating queue directory: ${err}`)
×
346
            throw err
×
347
        }
×
348
    }
×
349
}
9✔
350

1✔
351
exports.delete_dot_files = async () => {
1✔
352
    try {
7✔
353
        const files = await fs.readdir(exports.queue_dir)
7✔
354
        for (const file of files) {
7!
NEW
355
            if (file.startsWith(_qfile.platformDOT)) {
×
NEW
356
                logger.warn(exports, `Removing left over dot-file: ${file}`)
×
NEW
357
                await fs.unlink(path.join(exports.queue_dir, file))
×
NEW
358
            }
×
359
        }
×
360
    } catch (err) {
7!
NEW
361
        logger.error(exports, `Error deleting dot files: ${err}`)
×
362
    }
×
363
}
7✔
364

1✔
365
exports._add_hmail = (hmail) => {
1✔
NEW
366
    if (hmail.next_process <= exports.cur_time) {
×
367
        delivery_queue.push(hmail)
×
368
    } else {
×
369
        temp_fail_queue.add(hmail.filename, hmail.next_process - exports.cur_time, () => {
×
370
            delivery_queue.push(hmail)
×
371
        })
×
372
    }
×
373
}
×
374

1✔
375
exports.scan_queue_pids = async () => {
1✔
376
    // Under cluster, this is called first by the master
×
NEW
377
    await exports.ensure_queue_dir()
×
NEW
378
    await exports.delete_dot_files()
×
379

×
NEW
380
    let files
×
NEW
381
    try {
×
NEW
382
        files = await fs.readdir(exports.queue_dir)
×
NEW
383
    } catch (err) {
×
NEW
384
        logger.error(exports, `Failed to load queue directory (${exports.queue_dir}): ${err}`)
×
NEW
385
        throw err
×
NEW
386
    }
×
387

×
NEW
388
    const pids = {}
×
389

×
NEW
390
    for (const file of files) {
×
NEW
391
        const parts = exports.read_parts(file)
×
NEW
392
        if (parts) pids[parts.pid] = true
×
NEW
393
    }
×
394

×
NEW
395
    return Object.keys(pids)
×
396
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc