• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

haraka / Haraka / 26427888890

26 May 2026 01:56AM UTC coverage: 73.353% (-0.3%) from 73.64%
26427888890

Pull #3577

github

web-flow
Merge d6883ed79 into c4173efb8
Pull Request #3577: Implicit TLS with Proxy Protocol

1807 of 2349 branches covered (76.93%)

186 of 220 new or added lines in 2 files covered. (84.55%)

93 existing lines in 7 files now uncovered.

8115 of 11063 relevant lines covered (73.35%)

24.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.47
/server.js
1
'use strict'
2✔
2
// smtp network server
2✔
3

2✔
4
const cluster = require('node:cluster')
2✔
5
const { spawn } = require('node:child_process')
2✔
6
const fs = require('node:fs')
2✔
7
const net = require('node:net')
2✔
8
const os = require('node:os')
2✔
9
const path = require('node:path')
2✔
10
const tls = require('node:tls')
2✔
11
const constants = require('haraka-constants')
2✔
12

2✔
13
const tls_socket = require('./tls_socket')
2✔
14
const conn = require('./connection')
2✔
15
const outbound = require('./outbound')
2✔
16
const endpoint = require('./endpoint')
2✔
17

2✔
18
const Server = exports
2✔
19
Server.logger = require('./logger')
2✔
20
Server.config = require('haraka-config')
2✔
21
Server.plugins = require('./plugins')
2✔
22
Server.notes = {}
2✔
23

2✔
24
const { logger } = Server
2✔
25

2✔
26
// Need these here so we can run hooks
2✔
27
logger.add_log_methods(Server, 'server')
2✔
28

2✔
29
Server.listeners = []
2✔
30

2✔
31
Server.load_smtp_ini = () => {
2✔
32
    Server.cfg = Server.config.get(
9✔
33
        'smtp.ini',
9✔
34
        {
9✔
35
            booleans: ['-main.daemonize', '-main.graceful_shutdown'],
9✔
36
        },
9✔
37
        () => {
9✔
38
            Server.load_smtp_ini()
×
39
        },
9✔
40
    )
9✔
41

9✔
42
    if (Server.cfg.main.nodes === undefined) {
9✔
43
        Server.logwarn(`smtp.ini.nodes unset, using 1, see https://github.com/haraka/Haraka/wiki/Performance-Tuning`)
2✔
44
    }
2✔
45

9✔
46
    const defaults = {
9✔
47
        inactivity_timeout: 300,
9✔
48
        daemon_log_file: '/var/log/haraka.log',
9✔
49
        daemon_pid_file: '/var/run/haraka.pid',
9✔
50
        force_shutdown_timeout: 30,
9✔
51
        smtps_port: 465,
9✔
52
        nodes: 1,
9✔
53
    }
9✔
54

9✔
55
    for (const key in defaults) {
9✔
56
        if (Server.cfg.main[key] !== undefined) continue
54✔
57
        Server.cfg.main[key] = defaults[key]
17✔
58
    }
17✔
59
}
9✔
60

2✔
61
Server.load_http_ini = () => {
2✔
62
    Server.http = {}
2✔
63
    Server.http.cfg = Server.config.get('http.ini', () => {
2✔
64
        Server.load_http_ini()
×
65
    }).main
2✔
66
}
2✔
67

2✔
68
Server.load_smtp_ini()
2✔
69
Server.load_http_ini()
2✔
70

2✔
71
Server.daemonize = function () {
2✔
72
    const c = this.cfg.main
6✔
73
    if (!c.daemonize) return
6✔
74

×
75
    if (!process.env.__daemon) {
×
76
        // Remove process.on('exit') listeners otherwise
×
77
        // we get a spurious 'Exiting' log entry.
×
78
        process.removeAllListeners('exit')
×
79
        Server.lognotice('Daemonizing...')
×
80

×
81
        const log_fd = fs.openSync(c.daemon_log_file, 'a')
×
82
        const child = spawn(process.execPath, process.argv.slice(1), {
×
83
            detached: true,
×
84
            stdio: ['ignore', log_fd, log_fd],
×
85
            env: { ...process.env, __daemon: '1' },
×
86
            cwd: process.cwd(),
×
87
        })
×
88
        child.unref()
×
89
        process.exit(0)
×
90
    }
×
91

×
92
    // We are the daemon from here on...
×
93
    try {
×
94
        fs.writeFileSync(c.daemon_pid_file, `${process.pid}\n`, { flag: 'wx' })
×
95
        process.on('exit', () => {
×
96
            try {
×
97
                fs.unlinkSync(c.daemon_pid_file)
×
98
            } catch {}
×
99
        })
×
100
    } catch (err) {
×
101
        Server.logerror(err.message)
×
102
        logger.dump_and_exit(1)
×
103
    }
×
104
}
6✔
105

2✔
106
Server.flushQueue = async (domain) => {
2✔
107
    if (!Server.cluster) {
×
108
        await outbound.flush_queue(domain)
×
109
        return
×
110
    }
×
111

×
112
    for (const id in cluster.workers) {
×
113
        cluster.workers[id].send({ event: 'outbound.flush_queue', domain })
×
114
    }
×
115
}
×
116

2✔
117
let graceful_in_progress = false
2✔
118

2✔
119
Server.gracefulRestart = () => {
2✔
120
    Server._graceful()
×
121
}
×
122

2✔
123
Server.stopListeners = () => {
2✔
124
    Server.loginfo('Shutting down listeners')
6✔
125
    for (const l of Server.listeners) {
6✔
126
        l.close()
10✔
127
    }
10✔
128
    Server.listeners = []
6✔
129
}
6✔
130

2✔
131
Server.performShutdown = () => {
2✔
132
    if (Server.cfg.main.graceful_shutdown) {
×
133
        return Server.gracefulShutdown()
×
134
    }
×
135
    Server.loginfo('Shutting down.')
×
136
    process.exit(0)
×
137
}
×
138

2✔
139
Server.gracefulShutdown = () => {
2✔
140
    Server.stopListeners()
×
141
    Server._graceful(() => {
×
142
        // log();
×
143
        Server.loginfo('Failed to shutdown naturally. Exiting.')
×
144
        process.exit(0)
×
145
    })
×
146
}
×
147

2✔
148
Server._graceful = async (shutdown) => {
2✔
149
    if (!Server.cluster && shutdown) {
1!
150
        for (const module of ['outbound', 'cfreader', 'plugins']) {
×
151
            process.emit('message', { event: `${module}.shutdown` })
×
152
        }
×
153
        const t = setTimeout(shutdown, Server.cfg.main.force_shutdown_timeout * 1000)
×
154
        return t.unref()
×
155
    }
×
156

1✔
157
    if (graceful_in_progress) {
1!
158
        Server.lognotice('Restart currently in progress - ignoring request')
×
159
        return
×
160
    }
×
161

1✔
162
    graceful_in_progress = true
1✔
163
    // TODO: Make these configurable
1✔
164
    const disconnect_timeout = 30
1✔
165
    const exit_timeout = 30
1✔
166
    cluster.removeAllListeners('exit')
1✔
167

1✔
168
    // we reload using eachLimit where limit = num_workers - 1
1✔
169
    // this kills all-but-one workers in parallel, leaving one running
1✔
170
    // for new connections, and then restarts that one last worker.
1✔
171

1✔
172
    const worker_ids = Object.keys(cluster.workers)
1✔
173
    let limit = worker_ids.length - 1
1✔
174
    if (limit < 2) limit = 1
1✔
175

1✔
176
    const todo = []
1✔
177

1✔
178
    for (const id of Object.keys(cluster.workers)) {
1✔
179
        todo.push(() => {
1✔
180
            return new Promise((resolve) => {
1✔
181
                Server.lognotice(`Killing worker: ${id}`)
1✔
182
                const worker = cluster.workers[id]
1✔
183
                for (const module of ['outbound', 'cfreader', 'plugins']) {
1✔
184
                    worker.send({ event: `${module}.shutdown` })
3✔
185
                }
3✔
186
                worker.disconnect()
1✔
187
                let disconnect_received = false
1✔
188
                const disconnect_timer = setTimeout(() => {
1✔
189
                    if (!disconnect_received) {
×
190
                        Server.logcrit('Disconnect never received by worker. Killing.')
×
191
                        worker.kill()
×
192
                    }
×
193
                }, disconnect_timeout * 1000)
1✔
194

1✔
195
                worker.once('disconnect', () => {
1✔
196
                    clearTimeout(disconnect_timer)
1✔
197
                    disconnect_received = true
1✔
198
                    Server.lognotice('Disconnect complete')
1✔
199
                    let dead = false
1✔
200
                    const timer = setTimeout(() => {
1✔
201
                        if (!dead) {
×
202
                            Server.logcrit(`Worker ${id} failed to shutdown. Killing.`)
×
203
                            worker.kill()
×
204
                        }
×
205
                    }, exit_timeout * 1000)
1✔
206
                    worker.once('exit', () => {
1✔
207
                        dead = true
1✔
208
                        clearTimeout(timer)
1✔
209
                        if (shutdown) resolve()
1!
210
                    })
1✔
211
                })
1✔
212
                if (!shutdown) {
1✔
213
                    const newWorker = cluster.fork()
1✔
214
                    newWorker.once('listening', () => {
1✔
215
                        Server.lognotice('Replacement worker online.')
1✔
216
                        newWorker.on('exit', (code, signal) => {
1✔
217
                            cluster_exit_listener(newWorker, code, signal)
×
218
                        })
1✔
219
                        resolve()
1✔
220
                    })
1✔
221
                }
1✔
222
            })
1✔
223
        })
1✔
224
    }
1✔
225

1✔
226
    while (todo.length) {
1✔
227
        // process batches of workers: invoke each queued thunk so we
1✔
228
        // actually await the worker shutdown promises (passing the bare
1✔
229
        // functions to Promise.all would resolve immediately).
1✔
230
        await Promise.all(todo.splice(0, limit).map((fn) => fn()))
1✔
231
    }
1✔
232

1✔
233
    if (shutdown) {
1!
234
        Server.loginfo('Workers closed. Shutting down master process subsystems')
×
235
        for (const module of ['outbound', 'cfreader', 'plugins']) {
×
236
            process.emit('message', { event: `${module}.shutdown` })
×
237
        }
×
238
        const t2 = setTimeout(shutdown, Server.cfg.main.force_shutdown_timeout * 1000)
×
239
        return t2.unref()
×
240
    }
×
241
    graceful_in_progress = false
1✔
242
    Server.lognotice(`Reload complete, workers: ${JSON.stringify(Object.keys(cluster.workers))}`)
1✔
243
}
1✔
244

2✔
245
Server.sendToMaster = (command, params) => {
2✔
246
    // console.log("Send to master: ", command);
1✔
247
    if (Server.cluster) {
1!
248
        if (Server.cluster.isMaster) {
×
249
            Server.receiveAsMaster(command, params)
×
250
        } else {
×
251
            process.send({ cmd: command, params })
×
252
        }
×
253
    } else {
1✔
254
        Server.receiveAsMaster(command, params)
1✔
255
    }
1✔
256
}
1✔
257

2✔
258
Server.receiveAsMaster = (command, params) => {
2✔
259
    if (!Server[command]) {
2✔
260
        Server.logerror(`Invalid command: ${command}`)
1✔
261
        return
1✔
262
    }
1✔
263
    Server[command].apply(Server, params)
1✔
264
}
2✔
265

2✔
266
function messageHandler(worker, msg, handle) {
×
267
    // sunset Haraka v3 (Node < 6)
×
268
    if (arguments.length === 2) {
×
269
        handle = msg
×
270
        msg = worker
×
271
        worker = undefined
×
272
    }
×
273
    // console.log("received cmd: ", msg);
×
274
    if (msg?.cmd) {
×
275
        Server.receiveAsMaster(msg.cmd, msg.params)
×
276
    }
×
277
}
×
278

2✔
279
Server.get_listen_addrs = (cfg, port) => {
2✔
280
    if (!port) port = 25
15✔
281
    let listeners = []
15✔
282
    if (cfg?.listen) {
15✔
283
        listeners = cfg.listen.split(/\s*,\s*/)
15✔
284
        if (listeners[0] === '') listeners = []
15!
285
        for (let i = 0; i < listeners.length; i++) {
15✔
286
            const ep = endpoint(listeners[i], port)
18✔
287
            if (ep instanceof Error) continue
18!
288
            listeners[i] = ep.toString()
18✔
289
        }
18✔
290
    }
15✔
291
    if (cfg.port) {
15!
292
        let host = cfg.listen_host
×
293
        if (!host) {
×
294
            host = '[::0]'
×
295
            Server.default_host = true
×
296
        }
×
297
        listeners.unshift(`${host}:${cfg.port}`)
×
298
    }
×
299
    if (listeners.length) return listeners
15✔
300

×
301
    Server.default_host = true
×
302
    listeners.push(`[::0]:${port}`)
×
303

×
304
    return listeners
×
305
}
15✔
306

2✔
307
Server.createServer = (params) => {
2✔
308
    const c = Server.cfg.main
6✔
309
    for (const key in params) {
6!
310
        if (typeof params[key] === 'function') continue
×
311
        c[key] = params[key]
×
312
    }
×
313

6✔
314
    Server.notes = {}
6✔
315
    Server.plugins.server = Server
6✔
316
    Server.plugins.load_plugins()
6✔
317

6✔
318
    const inactivity_timeout = (c.inactivity_timeout || 300) * 1000
6!
319

6✔
320
    if (!cluster || !c.nodes) {
6✔
321
        Server.daemonize(c)
6✔
322
        Server.setup_smtp_listeners(Server.plugins, 'master', inactivity_timeout)
6✔
323
        return
6✔
324
    }
6✔
325

×
326
    // Cluster
×
327
    Server.cluster = cluster
×
328

×
329
    // Cluster Workers
×
330
    if (!cluster.isMaster) {
×
331
        Server.setup_smtp_listeners(Server.plugins, 'child', inactivity_timeout)
×
332
        return
×
333
    } else {
×
334
        // console.log("Setting up message handler");
×
335
        cluster.on('message', messageHandler)
×
336
    }
×
337

×
338
    // Cluster Master
×
339
    // We fork workers in init_master_respond so that plugins
×
340
    // can put handlers on cluster events before they are emitted.
×
341
    Server.plugins.run_hooks('init_master', Server)
×
342
}
6✔
343

2✔
344
Server.load_default_tls_config = (done) => {
2✔
345
    // this fn exists solely for testing
10✔
346
    if (Server.config.root_path != tls_socket.config.root_path) {
10✔
347
        Server.loginfo(`resetting tls_config.config path to ${Server.config.root_path}`)
1✔
348
        tls_socket.config = tls_socket.config.module_config(path.dirname(Server.config.root_path))
1✔
349
    }
1✔
350
    tls_socket.getSocketOpts('*').then((opts) => {
10✔
351
        done(opts)
10✔
352
    })
10✔
353
}
10✔
354

2✔
355
Server.create_smtps_server = (opts, onConnect) => {
2✔
356
    let server
4✔
357
    const proxyPrefix = Buffer.from('PROXY ')
4✔
358

4✔
359
    function close_with_proxy_error(socket, timer, msg) {
4✔
NEW
360
        clearTimeout(timer)
×
NEW
361
        socket.removeAllListeners('data')
×
NEW
362
        socket.end(`421 ${msg}\r\n`, () => {
×
NEW
363
            socket.destroy()
×
NEW
364
        })
×
NEW
365
    }
×
366

4✔
367
    function start_tls(socket, proxy, proxy_checked) {
4✔
368
        const tls_opts = { ...opts, server }
3✔
369
        const cleartext = new tls.TLSSocket(socket, tls_opts)
3✔
370

3✔
371
        if (proxy) {
3✔
372
            cleartext.haraka_proxy = {
1✔
373
                ...proxy,
1✔
374
                proxy_ip: conn.normalize_ip(socket.remoteAddress) || socket.remoteAddress,
1!
375
            }
1✔
376
        }
1✔
377
        if (proxy_checked) cleartext.haraka_proxy_checked = true
3✔
378

3✔
379
        const handshake_timeout = setTimeout(
3✔
380
            () => {
3✔
NEW
381
                const err = new Error('TLS handshake timeout')
×
NEW
382
                err.code = 'ERR_TLS_HANDSHAKE_TIMEOUT'
×
NEW
383
                server.emit('tlsClientError', err, cleartext)
×
NEW
384
                cleartext.destroy()
×
385
            },
3✔
386
            tls_opts.handshakeTimeout || 120 * 1000,
3✔
387
        )
3✔
388

3✔
389
        function clear_handshake_timeout() {
3✔
390
            clearTimeout(handshake_timeout)
6✔
391
        }
6✔
392

3✔
393
        function on_tls_error(err) {
3✔
394
            clear_handshake_timeout()
1✔
395
            err.source = 'tls'
1✔
396
            server.emit('tlsClientError', err, cleartext)
1✔
397
            cleartext.destroy()
1✔
398
        }
1✔
399

3✔
400
        cleartext.once('error', on_tls_error)
3✔
401
        cleartext.once('close', clear_handshake_timeout)
3✔
402
        cleartext.once('secure', () => {
3✔
403
            clear_handshake_timeout()
2✔
404
            cleartext.removeListener('error', on_tls_error)
2✔
405
            server.emit('secureConnection', cleartext)
2✔
406
            onConnect(cleartext)
2✔
407
        })
3✔
408
    }
3✔
409

4✔
410
    function starts_with_proxy_prefix(data) {
4✔
411
        if (!data.length) return true
2!
412
        if (data.length > proxyPrefix.length) return data.subarray(0, proxyPrefix.length).equals(proxyPrefix)
2✔
NEW
413

×
NEW
414
        return proxyPrefix.subarray(0, data.length).equals(data)
×
415
    }
2✔
416

4✔
417
    function start_tls_with_buffer(socket, data, proxy, proxy_checked) {
4✔
418
        socket.pause()
2✔
419
        if (data?.length) socket.unshift(data)
2✔
420
        setImmediate(() => {
2✔
421
            start_tls(socket, proxy, proxy_checked)
2✔
422
            socket.resume()
2✔
423
        })
2✔
424
    }
2✔
425

4✔
426
    server = net.createServer((socket) => {
4✔
427
        const remote_ip = conn.normalize_ip(socket.remoteAddress) || socket.remoteAddress
3!
428

3✔
429
        if (!conn.is_haproxy_allowed(remote_ip)) {
3✔
430
            start_tls(socket)
1✔
431
            return
1✔
432
        }
1✔
433

2✔
434
        let current_data = null
2✔
435
        const proxy_timer = setTimeout(() => {
2✔
NEW
436
            close_with_proxy_error(socket, proxy_timer, 'PROXY timeout')
×
437
        }, 30 * 1000)
2✔
438

2✔
439
        function cleanup() {
2✔
440
            clearTimeout(proxy_timer)
2✔
441
            socket.pause()
2✔
442
            socket.removeListener('data', on_data)
2✔
443
            socket.removeListener('close', cleanup)
2✔
444
            socket.removeListener('error', cleanup)
2✔
445
        }
2✔
446

2✔
447
        function on_data(data) {
2✔
448
            current_data = current_data ? Buffer.concat([current_data, data]) : data
2!
449

2✔
450
            if (!starts_with_proxy_prefix(current_data)) {
2✔
451
                cleanup()
1✔
452
                start_tls_with_buffer(socket, current_data, null, true)
1✔
453
                return
1✔
454
            }
1✔
455

1✔
456
            const offset = current_data.indexOf(0x0a)
1✔
457
            if (offset === -1) {
2!
NEW
458
                if (current_data.length > 512) {
×
NEW
459
                    close_with_proxy_error(socket, proxy_timer, 'Invalid PROXY format')
×
NEW
460
                }
×
NEW
461
                return
×
NEW
462
            }
×
463
            if (offset > 512) {
2!
NEW
464
                close_with_proxy_error(socket, proxy_timer, 'Invalid PROXY format')
×
NEW
465
                return
×
NEW
466
            }
×
467

1✔
468
            cleanup()
1✔
469

1✔
470
            const proxy = conn.parse_proxy_line(current_data.slice(0, offset + 1))
1✔
471
            if (!proxy) {
2!
NEW
472
                close_with_proxy_error(socket, proxy_timer, 'Invalid PROXY format')
×
NEW
473
                return
×
NEW
474
            }
×
475

1✔
476
            const rest = current_data.slice(offset + 1)
1✔
477
            start_tls_with_buffer(socket, rest, proxy, true)
1✔
478
        }
2✔
479

2✔
480
        socket.once('close', cleanup)
2✔
481
        socket.once('error', cleanup)
2✔
482
        socket.on('data', on_data)
2✔
483
    })
4✔
484

4✔
485
    return server
4✔
486
}
4✔
487

2✔
488
Server.get_smtp_server = async (ep, inactivity_timeout) => {
2✔
489
    let server
10✔
490

10✔
491
    function onConnect(client) {
10✔
492
        client.setTimeout(inactivity_timeout)
7✔
493
        const connection = conn.createConnection(client, server, Server.cfg)
7✔
494

7✔
495
        if (server.has_tls) {
7✔
496
            const cipher = client.getCipher()
2✔
497
            cipher.version = client.getProtocol() // replace min with actual
2✔
498

2✔
499
            connection.setTLS({
2✔
500
                cipher,
2✔
501
                verified: client.authorized,
2✔
502
                verifyError: client.authorizationError,
2✔
503
                peerCertificate: client.getPeerCertificate(),
2✔
504
            })
2✔
505
        }
2✔
506

7✔
507
        if (client.haraka_proxy) connection.apply_proxy(client.haraka_proxy)
7✔
508
    }
7✔
509

10✔
510
    if (ep.port === parseInt(Server.cfg.main.smtps_port, 10)) {
10✔
511
        Server.loginfo('getting SocketOpts for SMTPS server')
4✔
512
        const opts = await tls_socket.getSocketOpts('*')
4✔
513
        Server.loginfo(`Creating TLS server on ${ep}`)
4✔
514

4✔
515
        opts.rejectUnauthorized = tls_socket.get_rejectUnauthorized(
4✔
516
            opts.rejectUnauthorized,
4✔
517
            ep.port,
4✔
518
            tls_socket.cfg.main.requireAuthorized,
4✔
519
        )
4✔
520

4✔
521
        server = Server.create_smtps_server(opts, onConnect)
4✔
522
        tls_socket.addOCSP(server)
4✔
523
        server.has_tls = true
4✔
524
        server.on('resumeSession', (id, rsDone) => {
4✔
525
            Server.loginfo('client requested TLS resumeSession')
3✔
526
            rsDone(null, null)
3✔
527
        })
4✔
528
        Server.listeners.push(server)
4✔
529
        return server
4✔
530
    } else {
10✔
531
        server = tls_socket.createServer(onConnect)
6✔
532
        server.has_tls = false
6✔
533
        const opts = await tls_socket.getSocketOpts('*')
6✔
534
        Server.listeners.push(server)
6✔
535
        return server
6✔
536
    }
6✔
537
}
10✔
538

2✔
539
Server.setup_smtp_listeners = async (plugins2, type, inactivity_timeout) => {
2✔
540
    const errors = []
6✔
541

6✔
542
    for (const [ifName, ifObj] of Object.entries(os.networkInterfaces())) {
6✔
543
        for (const addr of ifObj) {
12✔
544
            if (addr.family === 'IPv6') {
24✔
545
                if (!Server.notes.IPv6) Server.notes.IPv6 = true
12✔
546
            } else if (addr.family === 'IPv4') {
12✔
547
                if (!Server.notes.IPv4) Server.notes.IPv4 = true
12✔
548
            } else {
12!
549
                console.error(addr)
×
550
            }
×
551
        }
24✔
552
    }
12✔
553

6✔
554
    for (const listen_address of Server.get_listen_addrs(Server.cfg.main)) {
6✔
555
        const ep = endpoint(listen_address, 25)
6✔
556

6✔
557
        if (ep instanceof Error) {
6!
558
            Server.logerror(`Invalid "listen" format in smtp.ini: ${listen_address}`)
×
559
            continue
×
560
        }
×
561

6✔
562
        const server = await Server.get_smtp_server(ep, inactivity_timeout)
6✔
563
        if (!server) continue
6!
564

6✔
565
        server.notes = Server.notes
6✔
566
        if (Server.cluster) server.cluster = Server.cluster
6!
567

6✔
568
        server
6✔
569
            .on('listening', function () {
6✔
570
                const addr = this.address()
6✔
571
                Server.lognotice(`Listening on ${endpoint(addr)}`)
6✔
572
            })
6✔
573
            .on('close', () => {
6✔
574
                Server.loginfo(`Listener ${ep} stopped`)
6✔
575
            })
6✔
576
            .on('error', (e) => {
6✔
577
                errors.push(e)
×
578
                Server.logerror(`Failed to setup listeners: ${e.message}`)
×
579
                if (e.code !== 'EAFNOSUPPORT') {
×
580
                    Server.logerror(e)
×
581
                    return
×
582
                }
×
583
                // Fallback from IPv6 to IPv4 if not supported
×
584
                // But only if we supplied the default of [::0]:25
×
585
                if (/^::0/.test(ep.host) && Server.default_host) {
×
586
                    server.listen(ep.port, '0.0.0.0', 0)
×
587
                    return
×
588
                }
×
589
                // Pass error to callback
×
590
                Server.logerror(e)
×
591
            })
6✔
592

6✔
593
        await ep.bind(server, { backlog: 0 })
6✔
594
    }
6✔
595

6✔
596
    if (errors.length) {
6!
597
        for (const e of errors) {
×
598
            Server.logerror(`Failed to setup listeners: ${e.message}`)
×
599
        }
×
600
        return logger.dump_and_exit(-1)
×
601
    }
×
602
    Server.listening()
6✔
603
    plugins2.run_hooks(`init_${type}`, Server)
6✔
604
}
6✔
605

2✔
606
Server.setup_http_listeners = async () => {
2✔
607
    if (!Server.http?.cfg?.listen) return
6!
608

×
609
    const listeners = Server.get_listen_addrs(Server.http.cfg, 80)
×
610
    if (!listeners.length) return
×
611

×
612
    try {
×
613
        Server.http.express = require('express')
×
614
        Server.loginfo('express loaded at Server.http.express')
×
615
    } catch (err) {
×
616
        Server.logerror('express failed to load. No http server. Install express with: npm install -g express')
×
617
        return
×
618
    }
×
619

×
620
    const app = Server.http.express()
×
621
    Server.http.app = app
×
622
    Server.loginfo('express app is at Server.http.app')
×
623

×
624
    for (const listen_address of listeners) {
×
625
        const ep = endpoint(listen_address, 80)
×
626
        if (ep instanceof Error) {
×
627
            Server.logerror(`Invalid format for listen in http.ini: ${listen_address}`)
×
628
            continue
×
629
        }
×
630

×
631
        if (443 == ep.port) {
×
632
            const tlsOpts = { ...tls_socket.certsByHost['*'] }
×
633
            tlsOpts.requestCert = false // not appropriate for HTTPS
×
634
            Server.http.server = require('https').createServer(tlsOpts, app)
×
635
        } else {
×
636
            Server.http.server = require('http').createServer(app)
×
637
        }
×
638

×
639
        Server.listeners.push(Server.http.server)
×
640

×
641
        Server.http.server.on('listening', function () {
×
642
            Server.lognotice(`Listening on ${endpoint(this.address())}`)
×
643
        })
×
644

×
645
        Server.http.server.on('error', (e) => {
×
646
            Server.logerror(e)
×
647
        })
×
648

×
649
        await ep.bind(Server.http.server, { backlog: 0 })
×
650
    }
×
651

×
652
    Server.plugins.run_hooks('init_http', Server)
×
653
    app.use(Server.http.express.static(Server.get_http_docroot()))
×
654
    app.use(Server.handle404)
×
655
}
6✔
656

2✔
657
Server.init_master_respond = async (retval, msg) => {
2✔
658
    if (!(retval === constants.ok || retval === constants.cont)) {
6!
659
        Server.logerror(`init_master returned error${msg ? `: ${msg}` : ''}`)
×
660
        return logger.dump_and_exit(1)
×
661
    }
×
662

6✔
663
    const c = Server.cfg.main
6✔
664
    Server.ready = 1
6✔
665

6✔
666
    // Load the queue if we're just one process
6✔
667
    if (!(cluster && c.nodes)) {
6✔
668
        try {
6✔
669
            await outbound.init_queue()
6✔
670
        } catch (err) {
6!
671
            Server.logcrit('Loading queue failed. Shutting down.')
×
672
            return logger.dump_and_exit(1)
×
673
        }
×
674
        Server.setup_http_listeners()
6✔
675
        return
6✔
676
    }
6✔
677

×
678
    // Running under cluster, fork children here, so that
×
679
    // cluster events can be registered in init_master hooks.
×
680
    try {
×
681
        const pids = await outbound.scan_queue_pids()
×
682
        Server.daemonize()
×
683
        // Fork workers
×
684
        const workers = c.nodes === 'cpus' ? os.cpus().length : c.nodes
6!
685
        const new_workers = []
6✔
686
        for (let i = 0; i < workers; i++) {
6!
687
            new_workers.push(cluster.fork({ CLUSTER_MASTER_PID: process.pid }))
×
688
        }
×
689
        for (let j = 0; j < pids.length; j++) {
×
690
            new_workers[j % new_workers.length].send({
×
691
                event: 'outbound.load_pid_queue',
×
692
                data: pids[j],
×
693
            })
×
694
        }
×
695
        cluster.on('online', (worker) => {
×
696
            Server.lognotice('worker started', {
×
697
                worker: worker.id,
×
698
                pid: worker.process.pid,
×
699
            })
×
700
        })
×
701
        cluster.on('listening', (worker, address) => {
×
702
            Server.lognotice(`worker ${worker.id} listening on ${endpoint(address)}`)
×
703
        })
×
704
        cluster.on('exit', cluster_exit_listener)
×
705
    } catch (err) {
×
706
        Server.logcrit('Scanning queue failed. Shutting down.')
×
707
        logger.dump_and_exit(1)
×
708
    }
×
709
}
6✔
710

2✔
711
function cluster_exit_listener(worker, code, signal) {
×
712
    if (signal) {
×
713
        Server.lognotice(`worker ${worker.id} killed by signal ${signal}`)
×
714
    } else if (code !== 0) {
×
715
        Server.lognotice(`worker ${worker.id} exited with error code: ${code}`)
×
716
    }
×
717
    if (signal || code !== 0) {
×
718
        // Restart worker
×
719
        const new_worker = cluster.fork({
×
720
            CLUSTER_MASTER_PID: process.pid,
×
721
        })
×
722
        new_worker.send({
×
723
            event: 'outbound.load_pid_queue',
×
724
            data: worker.process.pid,
×
725
        })
×
726
    }
×
727
}
×
728

2✔
729
Server.init_child_respond = (retval, msg) => {
2✔
730
    switch (retval) {
2✔
731
        case constants.ok:
2✔
732
        case constants.cont:
2✔
733
            Server.setup_http_listeners()
1✔
734
            return
1✔
735
    }
2✔
736

1✔
737
    const pid = process.env.CLUSTER_MASTER_PID
1✔
738
    Server.logerror(`init_child returned error ${msg ? `: ${msg}` : ''}`)
2!
739
    try {
2✔
740
        if (pid) {
2✔
741
            process.kill(pid)
1✔
742
            Server.logerror(`Killing master (pid=${pid})`)
1✔
743
        }
1✔
744
    } catch (err) {
2!
745
        Server.logerror('Terminating child')
×
746
    }
×
747
    logger.dump_and_exit(1)
1✔
748
}
2✔
749

2✔
750
Server.listening = () => {
2✔
751
    const c = Server.cfg.main
7✔
752

7✔
753
    // Drop privileges
7✔
754
    if (c.group) {
7✔
755
        Server.lognotice(`Switching from current gid: ${process.getgid()}`)
1✔
756
        process.setgid(c.group)
1✔
757
        Server.lognotice(`New gid: ${process.getgid()}`)
1✔
758
    }
1✔
759
    if (c.user) {
7✔
760
        Server.lognotice(`Switching from current uid: ${process.getuid()}`)
1✔
761
        process.setuid(c.user)
1✔
762
        Server.lognotice(`New uid: ${process.getuid()}`)
1✔
763
    }
1✔
764

7✔
765
    Server.ready = 1
7✔
766
}
7✔
767

2✔
768
Server.init_http_respond = () => {
2✔
769
    Server.loginfo('init_http_respond')
1✔
770

1✔
771
    let WebSocketServer
1✔
772
    try {
1✔
773
        WebSocketServer = require('ws').Server
1✔
774
    } catch (e) {
1✔
775
        Server.logerror(`unable to load ws.\n  did you: npm install -g ws?`)
1✔
776
        return
1✔
777
    }
1✔
778

×
779
    if (!WebSocketServer) {
×
780
        Server.logerror('ws failed to load')
×
781
        return
×
782
    }
×
783

×
784
    Server.http.wss = new WebSocketServer({ server: Server.http.server })
×
785
    Server.loginfo('Server.http.wss loaded')
×
786

×
787
    Server.plugins.run_hooks('init_wss', Server)
×
788
}
1✔
789

2✔
790
Server.init_wss_respond = () => {
2✔
791
    Server.loginfo('init_wss_respond')
×
792
}
×
793

2✔
794
Server.get_http_docroot = () => {
2✔
795
    if (Server.http.cfg.docroot) return Server.http.cfg.docroot
4✔
796

1✔
797
    Server.http.cfg.docroot = path.join(process.env.HARAKA || __dirname, 'http', 'html')
4✔
798
    Server.loginfo(`using html docroot: ${Server.http.cfg.docroot}`)
4✔
799
    return Server.http.cfg.docroot
4✔
800
}
4✔
801

2✔
802
Server.handle404 = (req, res) => {
2✔
803
    // abandon all hope, serve up a 404
3✔
804
    const docroot = Server.get_http_docroot()
3✔
805

3✔
806
    // respond with html page
3✔
807
    if (req.accepts('html')) {
3✔
808
        res.status(404).sendFile('404.html', { root: docroot })
1✔
809
        return
1✔
810
    }
1✔
811

2✔
812
    // respond with json
2✔
813
    if (req.accepts('json')) {
3✔
814
        res.status(404).send({ err: 'Not found' })
1✔
815
        return
1✔
816
    }
1✔
817

1✔
818
    res.status(404).send('Not found!')
1✔
819
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc