• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

haraka / Haraka / 22784108292

06 Mar 2026 10:12PM UTC coverage: 62.935%. First build
22784108292

Pull #3528

github

web-flow
Merge e3e2ae984 into 0b153c204
Pull Request #3528: Outbound: replace fs callbacks with promises and async/await

946 of 1420 branches covered (66.62%)

134 of 346 new or added lines in 5 files covered. (38.73%)

5442 of 8647 relevant lines covered (62.94%)

27.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.4
/outbound/queue.js
1
'use strict'
1✔
2

1✔
3
const child_process = require('node:child_process')
1✔
4
const fs = require('node:fs/promises')
1✔
5
const path = require('node:path')
1✔
6

1✔
7
const { Address } = require('address-rfc2821')
1✔
8
const config = require('haraka-config')
1✔
9

1✔
10
const logger = require('../logger')
1✔
11
const TimerQueue = require('./timer_queue')
1✔
12
const HMailItem = require('./hmail')
1✔
13
const obc = require('./config')
1✔
14
const _qfile = require('./qfile')
1✔
15
const obtls = require('./tls')
1✔
16

1✔
17
class Queue {
1✔
18
    constructor(worker) {
1✔
19
        this.worker = worker
2✔
20
        this.tasks = []
2✔
21
        this.running = 0
2✔
22
        this._scheduled = false
2✔
23
    }
2✔
24

1✔
25
    push(task) {
1✔
NEW
26
        this.tasks.push(task)
×
NEW
27
        this._schedule()
×
NEW
28
    }
×
29

1✔
30
    length() {
1✔
31
        return this.tasks.length + this.running
15✔
32
    }
15✔
33

1✔
34
    _schedule() {
1✔
NEW
35
        if (this._scheduled) return
×
NEW
36
        this._scheduled = true
×
NEW
37
        setImmediate(() => {
×
NEW
38
            this._scheduled = false
×
NEW
39
            this._process()
×
NEW
40
        })
×
NEW
41
    }
×
42

1✔
43
    _process() {
1✔
NEW
44
        while (this.running < obc.cfg.concurrency_max && this.tasks.length > 0) {
×
NEW
45
            this.running++
×
NEW
46

×
NEW
47
            this.worker(this.tasks.shift())
×
NEW
48
                .catch((err) => {
×
NEW
49
                    logger.error(exports, `Queue worker error: ${err}`)
×
NEW
50
                })
×
NEW
51
                .finally(() => {
×
NEW
52
                    this.running--
×
NEW
53
                    this._schedule()
×
NEW
54
                })
×
NEW
55
        }
×
NEW
56
    }
×
57
}
1✔
58

1✔
59
exports.name = 'outbound/queue'
1✔
60

1✔
61
let queue_dir
1✔
62
if (config.get('queue_dir')) {
1!
63
    queue_dir = path.resolve(config.get('queue_dir'))
×
64
} else if (process.env.HARAKA) {
1!
65
    queue_dir = path.resolve(process.env.HARAKA, 'queue')
×
66
} else {
1✔
67
    queue_dir = path.resolve('test', 'test-queue')
1✔
68
}
1✔
69

1✔
70
exports.queue_dir = queue_dir
1✔
71

1✔
72
const load_queue = new Queue(async (file) => {
1✔
73
    const hmail = new HMailItem(file, path.join(queue_dir, file))
×
74
    exports._add_hmail(hmail)
×
NEW
75
    await new Promise((resolve, reject) => {
×
NEW
76
        const onReady = () => {
×
NEW
77
            hmail.off('error', onError)
×
NEW
78
            resolve()
×
NEW
79
        }
×
NEW
80
        const onError = (err) => {
×
NEW
81
            hmail.off('ready', onReady)
×
NEW
82
            reject(err)
×
NEW
83
        }
×
NEW
84
        hmail.once('ready', onReady)
×
NEW
85
        hmail.once('error', onError)
×
NEW
86
    })
×
87
})
1✔
88

1✔
89
let in_progress = 0
1✔
90
const delivery_queue = (exports.delivery_queue = new Queue(async (hmail) => {
1✔
91
    in_progress++
×
NEW
92
    await new Promise((resolve) => {
×
NEW
93
        hmail.next_cb = () => {
×
NEW
94
            in_progress--
×
NEW
95
            resolve()
×
NEW
96
        }
×
NEW
97
        if (obtls.cfg) {
×
NEW
98
            hmail.send()
×
NEW
99
        } else {
×
NEW
100
            obtls.init(() => {
×
NEW
101
                hmail.send()
×
NEW
102
            })
×
NEW
103
        }
×
104
    })
×
105
}))
1✔
106

1✔
107
const temp_fail_queue = (exports.temp_fail_queue = new TimerQueue())
1✔
108

1✔
109
let queue_count = 0
1✔
110

1✔
111
exports.get_stats = () => `${in_progress}/${exports.delivery_queue.length()}/${exports.temp_fail_queue.length()}`
1✔
112

1✔
113
exports.list_queue = async () => {
1✔
114
    return exports._load_cur_queue(null, exports._list_file)
1✔
115
}
1✔
116

1✔
117
exports._stat_file = async (file) => {
1✔
118
    queue_count++
×
119
}
×
120

1✔
121
exports.stat_queue = async () => {
1✔
NEW
122
    await exports._load_cur_queue(null, exports._stat_file)
×
NEW
123
    return exports.stats()
×
124
}
×
125

1✔
126
exports.init_queue = async (pid) => {
1✔
127
    // Initialise and load queue
7✔
128
    // This function is called first when not running under cluster,
7✔
129
    await exports.ensure_queue_dir()
7✔
130
    await exports.delete_dot_files()
7✔
131

7✔
132
    await exports._load_cur_queue(pid, exports._add_file)
7✔
133
    logger.info(exports, `[pid: ${pid}] ${delivery_queue.length()} files in my delivery queue`)
7✔
134
    logger.info(exports, `[pid: ${pid}] ${load_queue.length()} files in my load queue`)
7✔
135
    logger.info(exports, `[pid: ${pid}] ${temp_fail_queue.length()} files in my temp fail queue`)
7✔
136
}
7✔
137

1✔
138
exports._load_cur_queue = async (pid, iteratee) => {
1✔
139
    logger.info(exports, 'Loading outbound queue from ', queue_dir)
8✔
140
    let files
8✔
141
    try {
8✔
142
        files = await fs.readdir(queue_dir)
8✔
143
    } catch (err) {
8!
NEW
144
        logger.error(exports, `Failed to load queue directory (${queue_dir}): ${err}`)
×
NEW
145
        throw err
×
NEW
146
    }
×
147

8✔
148
    exports.cur_time = new Date() // set once so we're not calling it a lot
8✔
149

8✔
150
    return exports.load_queue_files(pid, files, iteratee)
8✔
151
}
8✔
152

1✔
153
exports.read_parts = (file) => {
1✔
154
    if (file.startsWith(_qfile.platformDOT)) {
1!
155
        logger.warn(exports, `'Skipping' dot-file in queue folder: ${file}`)
×
156
        return false
×
157
    }
×
158

1✔
159
    if (file.startsWith('error.')) {
1!
160
        logger.warn(exports, `'Skipping' error file in queue folder: ${file}`)
×
161
        return false
×
162
    }
×
163

1✔
164
    const parts = _qfile.parts(file)
1✔
165
    if (!parts) {
1✔
166
        logger.error(exports, `Unrecognized file in queue folder: ${file}`)
1✔
167
        return false
1✔
168
    }
1✔
169

×
170
    return parts
×
171
}
1✔
172

1✔
173
exports.rename_to_actual_pid = async (file, parts) => {
1✔
174
    // maintain some original details for the rename
×
175
    const new_filename = _qfile.name({
×
176
        arrival: parts.arrival,
×
177
        uid: parts.uid,
×
178
        next_attempt: parts.next_attempt,
×
179
        attempts: parts.attempts,
×
180
    })
×
181

×
NEW
182
    try {
×
NEW
183
        await fs.rename(path.join(queue_dir, file), path.join(queue_dir, new_filename))
×
NEW
184
        return new_filename
×
NEW
185
    } catch (err) {
×
NEW
186
        throw new Error(`Unable to rename queue file: ${file} to ${new_filename} : ${err}`)
×
NEW
187
    }
×
188
}
×
189

1✔
190
exports._add_file = async (file) => {
1✔
191
    const parts = _qfile.parts(file)
×
192

×
NEW
193
    if (parts.next_attempt <= exports.cur_time) {
×
194
        logger.debug(exports, `File ${file} needs processing now`)
×
195
        load_queue.push(file)
×
196
    } else {
×
NEW
197
        logger.debug(exports, `File ${file} needs processing later: ${parts.next_attempt - exports.cur_time}ms`)
×
NEW
198
        temp_fail_queue.add(file, parts.next_attempt - exports.cur_time, () => {
×
199
            load_queue.push(file)
×
200
        })
×
201
    }
×
202
}
×
203

1✔
204
exports.load_queue_files = async (pid, input_files, iteratee) => {
1✔
205
    const searchPid = parseInt(pid)
8✔
206

8✔
207
    let stat_renamed = 0
8✔
208
    let stat_loaded = 0
8✔
209

8✔
210
    if (searchPid) {
8!
211
        logger.info(exports, `Grabbing queue files for pid: ${pid}`)
×
212
    } else {
8✔
213
        logger.info(exports, 'Loading the queue...')
8✔
214
    }
8✔
215

8✔
216
    const results = await Promise.all(
8✔
217
        input_files.map(async (file) => {
8✔
218
            const parts = exports.read_parts(file)
1✔
219
            if (!parts) return null
1✔
220

×
NEW
221
            if (!searchPid) {
×
NEW
222
                stat_loaded++
×
NEW
223
                return file
×
NEW
224
            }
×
225

×
NEW
226
            if (parts.pid !== searchPid) return null
×
227

×
NEW
228
            try {
×
NEW
229
                const renamed_file = await exports.rename_to_actual_pid(file, parts)
×
NEW
230
                stat_renamed++
×
231
                stat_loaded++
×
NEW
232
                return renamed_file
×
NEW
233
            } catch (error) {
×
NEW
234
                logger.error(exports, `${error.message}`)
×
NEW
235
                return null
×
236
            }
×
237
        }),
8✔
238
    )
8✔
239

8✔
240
    if (searchPid) logger.info(exports, `[pid: ${pid}] ${stat_renamed} files old PID queue fixed up`)
8!
241
    logger.debug(exports, `[pid: ${pid}] ${stat_loaded} files loaded`)
8✔
242

8✔
243
    const filtered = results.filter((i) => i)
8✔
244
    return await Promise.all(
8✔
245
        filtered.map(async (item) => {
8✔
NEW
246
            await iteratee(item)
×
247
        }),
8✔
248
    )
8✔
249
}
8✔
250

1✔
251
exports.stats = () => {
1✔
252
    return {
×
253
        queue_dir,
×
254
        queue_count,
×
255
    }
×
256
}
×
257

1✔
258
// position `position`. Loops to handle partial reads.
1✔
259
// Read exactly `length` bytes into `buffer` starting at `offset`, from file
1✔
NEW
260
async function readFull(handle, buffer, offset, length, position) {
×
NEW
261
    let totalRead = 0
×
NEW
262
    while (totalRead < length) {
×
NEW
263
        const { bytesRead } = await handle.read(buffer, offset + totalRead, length - totalRead, position + totalRead)
×
NEW
264
        if (bytesRead === 0) {
×
NEW
265
            throw new Error(`Unexpected end of file: read ${totalRead} of ${length} bytes`)
×
NEW
266
        }
×
NEW
267
        totalRead += bytesRead
×
NEW
268
    }
×
NEW
269
}
×
270

1✔
271
exports._list_file = async (file) => {
1✔
NEW
272
    let handle
×
NEW
273
    try {
×
NEW
274
        const filePath = path.join(queue_dir, file)
×
NEW
275

×
NEW
276
        handle = await fs.open(filePath, 'r')
×
NEW
277

×
NEW
278
        // Read first 4 bytes to get the todo length
×
NEW
279
        const buf = Buffer.alloc(4)
×
NEW
280
        await readFull(handle, buf, 0, 4, 0)
×
281
        const todo_len = (buf[0] << 24) + (buf[1] << 16) + (buf[2] << 8) + buf[3]
×
NEW
282

×
NEW
283
        const todoBuf = Buffer.alloc(todo_len)
×
NEW
284
        await readFull(handle, todoBuf, 0, todo_len, 4)
×
NEW
285

×
NEW
286
        const todo = todoBuf.toString('utf8')
×
NEW
287
        const todo_struct = JSON.parse(todo)
×
NEW
288
        todo_struct.rcpt_to = todo_struct.rcpt_to.map((a) => new Address(a))
×
NEW
289
        todo_struct.mail_from = new Address(todo_struct.mail_from)
×
NEW
290
        todo_struct.file = file
×
NEW
291
        todo_struct.full_path = filePath
×
NEW
292
        const parts = _qfile.parts(file)
×
NEW
293
        todo_struct.pid = parts?.pid || null
×
NEW
294
        return todo_struct
×
NEW
295
    } catch (err) {
×
NEW
296
        console.error(`Error reading queue file: ${file}:`, err)
×
NEW
297
        return null
×
NEW
298
    } finally {
×
NEW
299
        if (handle) await handle.close().catch((err) => console.error(`Failed to close queue file handle for ${file}:`, err))
×
NEW
300
    }
×
301
}
×
302

1✔
303
exports.flush_queue = async (domain, pid) => {
1✔
304
    if (domain) {
×
NEW
305
        try {
×
NEW
306
            const qlist = await exports.list_queue()
×
307
            for (const todo of qlist) {
×
NEW
308
                if (todo.domain.toLowerCase() !== domain.toLowerCase()) continue
×
NEW
309
                if (pid && todo.pid !== pid) continue
×
310
                // console.log("requeue: ", todo);
×
311
                delivery_queue.push(new HMailItem(todo.file, todo.full_path))
×
312
            }
×
NEW
313
        } catch (err) {
×
NEW
314
            logger.error(exports, `Failed to load queue: ${err.message}`)
×
NEW
315
        }
×
316
    } else {
×
317
        temp_fail_queue.drain()
×
318
    }
×
319
}
×
320

1✔
321
exports.load_pid_queue = async (pid) => {
1✔
322
    logger.info(exports, `Loading queue for pid: ${pid}`)
×
NEW
323
    await exports.init_queue(pid)
×
324
}
×
325

1✔
326
exports.ensure_queue_dir = async () => {
1✔
327
    // this code is only run at start-up.
7✔
328
    try {
7✔
329
        await fs.access(queue_dir)
7✔
330
        return // directory already exists
7✔
331
    } catch (ignore) {
7!
NEW
332
        // directory doesn't exist, try to create it
×
NEW
333
    }
×
334

×
335
    logger.debug(exports, `Creating queue directory ${queue_dir}`)
×
336
    try {
×
NEW
337
        await fs.mkdir(queue_dir, { mode: 493 }) // 493 == 0755
×
338
        const cfg = config.get('smtp.ini')
×
339
        let uid
×
340
        let gid
×
NEW
341
        if (cfg.user) uid = Number.parseInt(child_process.execSync(`id -u ${cfg.user}`).toString().trim(), 10)
×
NEW
342
        if (cfg.group) gid = Number.parseInt(child_process.execSync(`id -g ${cfg.group}`).toString().trim(), 10)
×
343
        if (uid && gid) {
7!
NEW
344
            await fs.chown(queue_dir, uid, gid)
×
345
        } else if (uid) {
×
NEW
346
            await fs.chown(queue_dir, uid, -1)
×
347
        }
×
348
    } catch (err) {
7!
349
        if (err.code !== 'EEXIST') {
×
350
            logger.error(exports, `Error creating queue directory: ${err}`)
×
351
            throw err
×
352
        }
×
353
    }
×
354
}
7✔
355

1✔
356
exports.delete_dot_files = async () => {
1✔
357
    try {
7✔
358
        const files = await fs.readdir(queue_dir)
7✔
359
        for (const file of files) {
7!
NEW
360
            if (file.startsWith(_qfile.platformDOT)) {
×
NEW
361
                logger.warn(exports, `Removing left over dot-file: ${file}`)
×
NEW
362
                await fs.unlink(path.join(queue_dir, file))
×
NEW
363
            }
×
364
        }
×
365
    } catch (err) {
7!
NEW
366
        logger.error(exports, `Error deleting dot files: ${err}`)
×
367
    }
×
368
}
7✔
369

1✔
370
exports._add_hmail = (hmail) => {
1✔
NEW
371
    if (hmail.next_process <= exports.cur_time) {
×
372
        delivery_queue.push(hmail)
×
373
    } else {
×
374
        temp_fail_queue.add(hmail.filename, hmail.next_process - exports.cur_time, () => {
×
375
            delivery_queue.push(hmail)
×
376
        })
×
377
    }
×
378
}
×
379

1✔
380
exports.scan_queue_pids = async () => {
1✔
381
    // Under cluster, this is called first by the master
×
NEW
382
    await exports.ensure_queue_dir()
×
NEW
383
    await exports.delete_dot_files()
×
384

×
NEW
385
    let files
×
NEW
386
    try {
×
NEW
387
        files = await fs.readdir(queue_dir)
×
NEW
388
    } catch (err) {
×
NEW
389
        logger.error(exports, `Failed to load queue directory (${queue_dir}): ${err}`)
×
NEW
390
        throw err
×
NEW
391
    }
×
392

×
NEW
393
    const pids = {}
×
394

×
NEW
395
    for (const file of files) {
×
NEW
396
        const parts = exports.read_parts(file)
×
NEW
397
        if (parts) pids[parts.pid] = true
×
NEW
398
    }
×
399

×
NEW
400
    return Object.keys(pids)
×
401
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc