• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

haraka / Haraka / 22786810141

06 Mar 2026 11:50PM UTC coverage: 64.062%. First build
22786810141

Pull #3531

github

web-flow
Merge 949bee569 into 0b153c204
Pull Request #3531: Outbound queue

980 of 1453 branches covered (67.45%)

13 of 23 new or added lines in 1 file covered. (56.52%)

5501 of 8587 relevant lines covered (64.06%)

28.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.86
/outbound/queue.js
1
'use strict'
1✔
2

1✔
3
const child_process = require('node:child_process')
1✔
4
const fs = require('node:fs')
1✔
5
const path = require('node:path')
1✔
6

1✔
7
const async = require('async')
1✔
8
const { Address } = require('address-rfc2821')
1✔
9
const config = require('haraka-config')
1✔
10

1✔
11
const logger = require('../logger')
1✔
12
const TimerQueue = require('./timer_queue')
1✔
13
const HMailItem = require('./hmail')
1✔
14
const obc = require('./config')
1✔
15
const _qfile = require('./qfile')
1✔
16
const obtls = require('./tls')
1✔
17

1✔
18
exports.name = 'outbound/queue'
1✔
19

1✔
20
if (config.get('queue_dir')) {
1!
NEW
21
    exports.queue_dir = path.resolve(config.get('queue_dir'))
×
22
} else if (process.env.HARAKA) {
1!
NEW
23
    exports.queue_dir = path.resolve(process.env.HARAKA, 'queue')
×
24
} else {
1✔
25
    exports.queue_dir = path.resolve('test', 'test-queue')
1✔
26
}
1✔
27

1✔
28
const load_queue = async.queue((file, cb) => {
1✔
NEW
29
    const hmail = new HMailItem(file, path.join(exports.queue_dir, file))
×
30
    exports._add_hmail(hmail)
×
31
    hmail.once('ready', cb)
×
32
}, obc.cfg.concurrency_max)
1✔
33

1✔
34
let in_progress = 0
1✔
35
const delivery_queue = (exports.delivery_queue = async.queue((hmail, cb) => {
1✔
36
    in_progress++
×
37
    hmail.next_cb = () => {
×
38
        in_progress--
×
39
        cb()
×
40
    }
×
41
    if (obtls.cfg) return hmail.send()
×
42
    obtls.init(() => {
×
43
        hmail.send()
×
44
    })
×
45
}, obc.cfg.concurrency_max))
1✔
46

1✔
47
const temp_fail_queue = (exports.temp_fail_queue = new TimerQueue())
1✔
48

1✔
49
let queue_count = 0
1✔
50

1✔
51
exports.get_stats = () => `${in_progress}/${exports.delivery_queue.length()}/${exports.temp_fail_queue.length()}`
1✔
52

1✔
53
exports.list_queue = (cb) => {
1✔
54
    exports._load_cur_queue(null, exports._list_file, cb)
2✔
55
}
2✔
56

1✔
57
exports._stat_file = (file, cb) => {
1✔
58
    queue_count++
2✔
59
    setImmediate(cb)
2✔
60
}
2✔
61

1✔
62
exports.stat_queue = (cb) => {
1✔
63
    const self = exports
1✔
64
    exports._load_cur_queue(null, exports._stat_file, (err) => {
1✔
65
        if (err) return cb(err)
1!
66
        return cb(null, self.stats())
1✔
67
    })
1✔
68
}
1✔
69

1✔
70
exports.load_queue = (pid) => {
1✔
71
    // Initialise and load queue
7✔
72
    // This function is called first when not running under cluster,
7✔
73
    exports.ensure_queue_dir()
7✔
74
    exports.delete_dot_files()
7✔
75

7✔
76
    exports._load_cur_queue(pid, exports._add_file, () => {
7✔
77
        logger.info(exports, `[pid: ${pid}] ${delivery_queue.length()} files in my delivery queue`)
7✔
78
        logger.info(exports, `[pid: ${pid}] ${load_queue.length()} files in my load queue`)
7✔
79
        logger.info(exports, `[pid: ${pid}] ${temp_fail_queue.length()} files in my temp fail queue`)
7✔
80
    })
7✔
81
}
7✔
82

1✔
83
exports._load_cur_queue = (pid, iteratee, cb) => {
1✔
84
    logger.info(exports, 'Loading outbound queue from ', exports.queue_dir)
11✔
85
    fs.readdir(exports.queue_dir, (err, files) => {
11✔
86
        if (err) {
11!
NEW
87
            return logger.error(exports, `Failed to load queue directory (${exports.queue_dir}): ${err}`)
×
88
        }
×
89

11✔
90
        this.cur_time = new Date() // set once so we're not calling it a lot
11✔
91

11✔
92
        this.load_queue_files(pid, files, iteratee, cb)
11✔
93
    })
11✔
94
}
11✔
95

1✔
96
exports.read_parts = (file) => {
1✔
97
    if (file.startsWith(_qfile.platformDOT)) {
21✔
98
        logger.warn(exports, `'Skipping' dot-file in queue folder: ${file}`)
1✔
99
        return false
1✔
100
    }
1✔
101

20✔
102
    if (file.startsWith('error.')) {
21✔
103
        logger.warn(exports, `'Skipping' error file in queue folder: ${file}`)
1✔
104
        return false
1✔
105
    }
1✔
106

19✔
107
    const parts = _qfile.parts(file)
19✔
108
    if (!parts) {
21✔
109
        logger.error(exports, `Unrecognized file in queue folder: ${file}`)
6✔
110
        return false
6✔
111
    }
6✔
112

13✔
113
    return parts
13✔
114
}
21✔
115

1✔
116
exports.rename_to_actual_pid = (file, parts, cb) => {
1✔
117
    // maintain some original details for the rename
×
118
    const new_filename = _qfile.name({
×
119
        arrival: parts.arrival,
×
120
        uid: parts.uid,
×
121
        next_attempt: parts.next_attempt,
×
122
        attempts: parts.attempts,
×
123
    })
×
124

×
NEW
125
    fs.rename(path.join(exports.queue_dir, file), path.join(exports.queue_dir, new_filename), (err) => {
×
126
        if (err) {
×
127
            return cb(`Unable to rename queue file: ${file} to ${new_filename} : ${err}`)
×
128
        }
×
129

×
130
        cb(null, new_filename)
×
131
    })
×
132
}
×
133

1✔
134
exports._add_file = (file, cb) => {
1✔
135
    const self = exports
×
136
    const parts = _qfile.parts(file)
×
137

×
138
    if (parts.next_attempt <= self.cur_time) {
×
139
        logger.debug(exports, `File ${file} needs processing now`)
×
140
        load_queue.push(file)
×
141
    } else {
×
142
        logger.debug(exports, `File ${file} needs processing later: ${parts.next_attempt - self.cur_time}ms`)
×
143
        temp_fail_queue.add(file, parts.next_attempt - self.cur_time, () => {
×
144
            load_queue.push(file)
×
145
        })
×
146
    }
×
147

×
148
    cb()
×
149
}
×
150

1✔
151
exports.load_queue_files = (pid, input_files, iteratee, callback = function () {}) => {
1✔
152
    const self = exports
14✔
153
    const searchPid = parseInt(pid)
14✔
154

14✔
155
    let stat_renamed = 0
14✔
156
    let stat_loaded = 0
14✔
157

14✔
158
    if (searchPid) {
14✔
159
        logger.info(exports, `Grabbing queue files for pid: ${pid}`)
1✔
160
    } else {
14✔
161
        logger.info(exports, 'Loading the queue...')
13✔
162
    }
13✔
163

14✔
164
    async.map(
14✔
165
        input_files,
14✔
166
        (file, cb) => {
14✔
167
            const parts = self.read_parts(file)
17✔
168
            if (!parts) return cb()
17✔
169

12✔
170
            if (searchPid) {
17✔
171
                if (parts.pid !== searchPid) return cb()
2✔
172

1✔
173
                self.rename_to_actual_pid(file, parts, (error, renamed_file) => {
1✔
174
                    if (error) {
1✔
175
                        logger.error(exports, `${error}`)
1✔
176
                        return cb()
1✔
177
                    }
1✔
178

×
179
                    stat_renamed++
×
180
                    stat_loaded++
×
181
                    cb(null, renamed_file)
×
182
                })
1✔
183
            } else {
17✔
184
                stat_loaded++
10✔
185
                cb(null, file)
10✔
186
            }
10✔
187
        },
14✔
188
        (err, results) => {
14✔
189
            if (err) logger.err(exports, `[pid: ${pid}] ${err}`)
14!
190
            if (searchPid) logger.info(exports, `[pid: ${pid}] ${stat_renamed} files old PID queue fixed up`)
14✔
191
            logger.debug(exports, `[pid: ${pid}] ${stat_loaded} files loaded`)
14✔
192

14✔
193
            async.map(
14✔
194
                results.filter((i) => i),
14✔
195
                iteratee,
14✔
196
                callback,
14✔
197
            )
14✔
198
        },
14✔
199
    )
14✔
200
}
14✔
201

1✔
202
exports.stats = () => {
1✔
203
    return {
1✔
204
        queue_dir: exports.queue_dir,
1✔
205
        queue_count,
1✔
206
    }
1✔
207
}
1✔
208

1✔
209
exports._list_file = (file, cb) => {
1✔
210
    const tl_reader = fs.createReadStream(path.join(exports.queue_dir, file), {
2✔
211
        start: 0,
2✔
212
        end: 3,
2✔
213
    })
2✔
214
    tl_reader.on('error', (err) => {
2✔
215
        console.error(`Error reading queue file: ${file}:`, err)
×
216
    })
2✔
217
    tl_reader.once('data', (buf) => {
2✔
218
        // I'm making the assumption here we won't ever read less than 4 bytes
2✔
219
        // as no filesystem on the planet should be that dumb...
2✔
220
        tl_reader.destroy()
2✔
221
        const todo_len = (buf[0] << 24) + (buf[1] << 16) + (buf[2] << 8) + buf[3]
2✔
222
        const td_reader = fs.createReadStream(path.join(exports.queue_dir, file), {
2✔
223
            encoding: 'utf8',
2✔
224
            start: 4,
2✔
225
            end: todo_len + 3,
2✔
226
        })
2✔
227
        let todo = ''
2✔
228
        td_reader.on('data', (str) => {
2✔
229
            todo += str
2✔
230
            if (Buffer.byteLength(todo) === todo_len) {
2✔
231
                // we read everything
2✔
232
                const todo_struct = JSON.parse(todo)
2✔
233
                todo_struct.rcpt_to = todo_struct.rcpt_to.map((a) => new Address(a))
2✔
234
                todo_struct.mail_from = new Address(todo_struct.mail_from)
2✔
235
                todo_struct.file = file
2✔
236
                todo_struct.full_path = path.join(exports.queue_dir, file)
2✔
237
                const parts = _qfile.parts(file)
2✔
238
                todo_struct.pid = parts?.pid || null
2!
239
                cb(null, todo_struct)
2✔
240
            }
2✔
241
        })
2✔
242
        td_reader.on('end', () => {
2✔
243
            if (Buffer.byteLength(todo) !== todo_len) {
2!
244
                console.error("Didn't find right amount of data in todo for file:", file)
×
245
                return cb()
×
246
            }
×
247
        })
2✔
248
    })
2✔
249
}
2✔
250

1✔
251
exports.flush_queue = (domain, pid) => {
1✔
252
    if (domain) {
×
253
        exports.list_queue((err, qlist) => {
×
254
            if (err) return logger.error(exports, `Failed to load queue: ${err}`)
×
255
            for (const todo of qlist) {
×
256
                if (todo.domain.toLowerCase() != domain.toLowerCase()) return
×
257
                if (pid && todo.pid != pid) return
×
258
                // console.log("requeue: ", todo);
×
259
                delivery_queue.push(new HMailItem(todo.file, todo.full_path))
×
260
            }
×
261
        })
×
262
    } else {
×
263
        temp_fail_queue.drain()
×
264
    }
×
265
}
×
266

1✔
267
exports.load_pid_queue = (pid) => {
1✔
268
    logger.info(exports, `Loading queue for pid: ${pid}`)
1✔
269
    exports.load_queue(pid)
1✔
270
}
1✔
271

1✔
272
exports.ensure_queue_dir = () => {
1✔
273
    // this code is only run at start-up.
9✔
274
    if (fs.existsSync(exports.queue_dir)) return
9✔
275

1✔
276
    logger.debug(exports, `Creating queue directory ${exports.queue_dir}`)
1✔
277
    try {
1✔
278
        fs.mkdirSync(exports.queue_dir, 493) // 493 == 0755
1✔
279
        const cfg = config.get('smtp.ini')
1✔
280
        let uid
1✔
281
        let gid
1✔
282
        if (cfg.user) uid = parseInt(child_process.execSync(`id -u ${cfg.user}`).toString().trim(), 10)
9!
283
        if (cfg.group) gid = parseInt(child_process.execSync(`id -g ${cfg.group}`).toString().trim(), 10)
9!
284
        if (uid && gid) {
9!
NEW
285
            fs.chown(exports.queue_dir, uid, gid)
×
286
        } else if (uid) {
9!
NEW
287
            fs.chown(exports.queue_dir, uid)
×
288
        }
×
289
    } catch (err) {
9!
290
        if (err.code !== 'EEXIST') {
×
291
            logger.error(exports, `Error creating queue directory: ${err}`)
×
292
            throw err
×
293
        }
×
294
    }
×
295
}
9✔
296

1✔
297
exports.delete_dot_files = () => {
1✔
298
    for (const file of fs.readdirSync(exports.queue_dir)) {
7!
299
        if (file.startsWith(_qfile.platformDOT)) {
×
300
            logger.warn(exports, `Removing left over dot-file: ${file}`)
×
NEW
301
            return fs.unlinkSync(path.join(exports.queue_dir, file))
×
302
        }
×
303
    }
×
304
}
7✔
305

1✔
306
exports._add_hmail = (hmail) => {
1✔
307
    if (hmail.next_process < exports.cur_time) {
×
308
        delivery_queue.push(hmail)
×
309
    } else {
×
310
        temp_fail_queue.add(hmail.filename, hmail.next_process - exports.cur_time, () => {
×
311
            delivery_queue.push(hmail)
×
312
        })
×
313
    }
×
314
}
×
315

1✔
316
exports.scan_queue_pids = (cb) => {
1✔
317
    const self = exports
×
318

×
319
    // Under cluster, this is called first by the master
×
320
    self.ensure_queue_dir()
×
321
    self.delete_dot_files()
×
322

×
NEW
323
    fs.readdir(exports.queue_dir, (err, files) => {
×
324
        if (err) {
×
NEW
325
            logger.error(exports, `Failed to load queue directory (${exports.queue_dir}): ${err}`)
×
326
            return cb(err)
×
327
        }
×
328

×
329
        const pids = {}
×
330

×
331
        for (const file of files) {
×
332
            const parts = self.read_parts(file)
×
333
            if (parts) pids[parts.pid] = true
×
334
        }
×
335

×
336
        return cb(null, Object.keys(pids))
×
337
    })
×
338
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc