• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

haraka / Haraka / 26464322293

26 May 2026 05:29PM UTC coverage: 73.576% (-0.06%) from 73.64%
26464322293

Pull #3577

github

web-flow
Merge 4c71c1001 into c4173efb8
Pull Request #3577: Implicit TLS with Proxy Protocol

1824 of 2354 branches covered (77.49%)

185 of 195 new or added lines in 2 files covered. (94.87%)

90 existing lines in 6 files now uncovered.

8111 of 11024 relevant lines covered (73.58%)

26.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.19
/server.js
1
'use strict'
2✔
2
// smtp network server
2✔
3

2✔
4
const cluster = require('node:cluster')
2✔
5
const { spawn } = require('node:child_process')
2✔
6
const fs = require('node:fs')
2✔
7
const net = require('node:net')
2✔
8
const os = require('node:os')
2✔
9
const path = require('node:path')
2✔
10
const tls = require('node:tls')
2✔
11
const constants = require('haraka-constants')
2✔
12
const net_utils = require('haraka-net-utils')
2✔
13

2✔
14
const tls_socket = require('./tls_socket')
2✔
15
const conn = require('./connection')
2✔
16
const outbound = require('./outbound')
2✔
17
const endpoint = require('./endpoint')
2✔
18

2✔
19
const Server = exports
2✔
20
Server.logger = require('./logger')
2✔
21
Server.config = require('haraka-config')
2✔
22
Server.plugins = require('./plugins')
2✔
23
Server.notes = {}
2✔
24

2✔
25
const { logger } = Server
2✔
26

2✔
27
// Need these here so we can run hooks
2✔
28
logger.add_log_methods(Server, 'server')
2✔
29

2✔
30
Server.listeners = []
2✔
31

2✔
32
Server.load_smtp_ini = () => {
2✔
33
    Server.cfg = Server.config.get(
9✔
34
        'smtp.ini',
9✔
35
        {
9✔
36
            booleans: ['-main.daemonize', '-main.graceful_shutdown'],
9✔
37
        },
9✔
38
        () => {
9✔
39
            Server.load_smtp_ini()
×
40
        },
9✔
41
    )
9✔
42

9✔
43
    if (Server.cfg.main.nodes === undefined) {
9✔
44
        Server.logwarn(`smtp.ini.nodes unset, using 1, see https://github.com/haraka/Haraka/wiki/Performance-Tuning`)
2✔
45
    }
2✔
46

9✔
47
    const defaults = {
9✔
48
        inactivity_timeout: 300,
9✔
49
        daemon_log_file: '/var/log/haraka.log',
9✔
50
        daemon_pid_file: '/var/run/haraka.pid',
9✔
51
        force_shutdown_timeout: 30,
9✔
52
        smtps_port: 465,
9✔
53
        nodes: 1,
9✔
54
    }
9✔
55

9✔
56
    for (const key in defaults) {
9✔
57
        if (Server.cfg.main[key] !== undefined) continue
54✔
58
        Server.cfg.main[key] = defaults[key]
17✔
59
    }
17✔
60
}
9✔
61

2✔
62
Server.load_http_ini = () => {
2✔
63
    Server.http = {}
2✔
64
    Server.http.cfg = Server.config.get('http.ini', () => {
2✔
65
        Server.load_http_ini()
×
66
    }).main
2✔
67
}
2✔
68

2✔
69
Server.load_connection_ini = () => {
2✔
70
    Server.connection = {}
2✔
71
    Server.connection.cfg = Server.config.get('connection.ini', {
2✔
72
        booleans: ['+haproxy.enabled'],
2✔
73
    })
2✔
74
}
2✔
75

2✔
76
Server.load_smtp_ini()
2✔
77
Server.load_http_ini()
2✔
78
Server.load_connection_ini()
2✔
79

2✔
80
Server.daemonize = function () {
2✔
81
    const c = this.cfg.main
6✔
82
    if (!c.daemonize) return
6✔
83

×
84
    if (!process.env.__daemon) {
×
85
        // Remove process.on('exit') listeners otherwise
×
86
        // we get a spurious 'Exiting' log entry.
×
87
        process.removeAllListeners('exit')
×
88
        Server.lognotice('Daemonizing...')
×
89

×
90
        const log_fd = fs.openSync(c.daemon_log_file, 'a')
×
91
        const child = spawn(process.execPath, process.argv.slice(1), {
×
92
            detached: true,
×
93
            stdio: ['ignore', log_fd, log_fd],
×
94
            env: { ...process.env, __daemon: '1' },
×
95
            cwd: process.cwd(),
×
96
        })
×
97
        child.unref()
×
98
        process.exit(0)
×
99
    }
×
100

×
101
    // We are the daemon from here on...
×
102
    try {
×
103
        fs.writeFileSync(c.daemon_pid_file, `${process.pid}\n`, { flag: 'wx' })
×
104
        process.on('exit', () => {
×
105
            try {
×
106
                fs.unlinkSync(c.daemon_pid_file)
×
107
            } catch {}
×
108
        })
×
109
    } catch (err) {
×
110
        Server.logerror(err.message)
×
111
        logger.dump_and_exit(1)
×
112
    }
×
113
}
6✔
114

2✔
115
Server.flushQueue = async (domain) => {
2✔
116
    if (!Server.cluster) {
×
117
        await outbound.flush_queue(domain)
×
118
        return
×
119
    }
×
120

×
121
    for (const id in cluster.workers) {
×
122
        cluster.workers[id].send({ event: 'outbound.flush_queue', domain })
×
123
    }
×
124
}
×
125

2✔
126
let graceful_in_progress = false
2✔
127

2✔
128
Server.gracefulRestart = () => {
2✔
129
    Server._graceful()
×
130
}
×
131

2✔
132
Server.stopListeners = () => {
2✔
133
    Server.loginfo('Shutting down listeners')
6✔
134
    for (const l of Server.listeners) {
6✔
135
        l.close()
17✔
136
    }
17✔
137
    Server.listeners = []
6✔
138
}
6✔
139

2✔
140
Server.performShutdown = () => {
2✔
141
    if (Server.cfg.main.graceful_shutdown) {
×
142
        return Server.gracefulShutdown()
×
143
    }
×
144
    Server.loginfo('Shutting down.')
×
145
    process.exit(0)
×
146
}
×
147

2✔
148
Server.gracefulShutdown = () => {
2✔
149
    Server.stopListeners()
×
150
    Server._graceful(() => {
×
151
        // log();
×
152
        Server.loginfo('Failed to shutdown naturally. Exiting.')
×
153
        process.exit(0)
×
154
    })
×
155
}
×
156

2✔
157
Server._graceful = async (shutdown) => {
2✔
158
    if (!Server.cluster && shutdown) {
1!
159
        for (const module of ['outbound', 'cfreader', 'plugins']) {
×
160
            process.emit('message', { event: `${module}.shutdown` })
×
161
        }
×
162
        const t = setTimeout(shutdown, Server.cfg.main.force_shutdown_timeout * 1000)
×
163
        return t.unref()
×
164
    }
×
165

1✔
166
    if (graceful_in_progress) {
1!
167
        Server.lognotice('Restart currently in progress - ignoring request')
×
168
        return
×
169
    }
×
170

1✔
171
    graceful_in_progress = true
1✔
172
    // TODO: Make these configurable
1✔
173
    const disconnect_timeout = 30
1✔
174
    const exit_timeout = 30
1✔
175
    cluster.removeAllListeners('exit')
1✔
176

1✔
177
    // we reload using eachLimit where limit = num_workers - 1
1✔
178
    // this kills all-but-one workers in parallel, leaving one running
1✔
179
    // for new connections, and then restarts that one last worker.
1✔
180

1✔
181
    const worker_ids = Object.keys(cluster.workers)
1✔
182
    let limit = worker_ids.length - 1
1✔
183
    if (limit < 2) limit = 1
1✔
184

1✔
185
    const todo = []
1✔
186

1✔
187
    for (const id of Object.keys(cluster.workers)) {
1✔
188
        todo.push(() => {
1✔
189
            return new Promise((resolve) => {
1✔
190
                Server.lognotice(`Killing worker: ${id}`)
1✔
191
                const worker = cluster.workers[id]
1✔
192
                for (const module of ['outbound', 'cfreader', 'plugins']) {
1✔
193
                    worker.send({ event: `${module}.shutdown` })
3✔
194
                }
3✔
195
                worker.disconnect()
1✔
196
                let disconnect_received = false
1✔
197
                const disconnect_timer = setTimeout(() => {
1✔
198
                    if (!disconnect_received) {
×
199
                        Server.logcrit('Disconnect never received by worker. Killing.')
×
200
                        worker.kill()
×
201
                    }
×
202
                }, disconnect_timeout * 1000)
1✔
203

1✔
204
                worker.once('disconnect', () => {
1✔
205
                    clearTimeout(disconnect_timer)
1✔
206
                    disconnect_received = true
1✔
207
                    Server.lognotice('Disconnect complete')
1✔
208
                    let dead = false
1✔
209
                    const timer = setTimeout(() => {
1✔
210
                        if (!dead) {
×
211
                            Server.logcrit(`Worker ${id} failed to shutdown. Killing.`)
×
212
                            worker.kill()
×
213
                        }
×
214
                    }, exit_timeout * 1000)
1✔
215
                    worker.once('exit', () => {
1✔
216
                        dead = true
1✔
217
                        clearTimeout(timer)
1✔
218
                        if (shutdown) resolve()
1!
219
                    })
1✔
220
                })
1✔
221
                if (!shutdown) {
1✔
222
                    const newWorker = cluster.fork()
1✔
223
                    newWorker.once('listening', () => {
1✔
224
                        Server.lognotice('Replacement worker online.')
1✔
225
                        newWorker.on('exit', (code, signal) => {
1✔
226
                            cluster_exit_listener(newWorker, code, signal)
×
227
                        })
1✔
228
                        resolve()
1✔
229
                    })
1✔
230
                }
1✔
231
            })
1✔
232
        })
1✔
233
    }
1✔
234

1✔
235
    while (todo.length) {
1✔
236
        // process batches of workers: invoke each queued thunk so we
1✔
237
        // actually await the worker shutdown promises (passing the bare
1✔
238
        // functions to Promise.all would resolve immediately).
1✔
239
        await Promise.all(todo.splice(0, limit).map((fn) => fn()))
1✔
240
    }
1✔
241

1✔
242
    if (shutdown) {
1!
243
        Server.loginfo('Workers closed. Shutting down master process subsystems')
×
244
        for (const module of ['outbound', 'cfreader', 'plugins']) {
×
245
            process.emit('message', { event: `${module}.shutdown` })
×
246
        }
×
247
        const t2 = setTimeout(shutdown, Server.cfg.main.force_shutdown_timeout * 1000)
×
248
        return t2.unref()
×
249
    }
×
250
    graceful_in_progress = false
1✔
251
    Server.lognotice(`Reload complete, workers: ${JSON.stringify(Object.keys(cluster.workers))}`)
1✔
252
}
1✔
253

2✔
254
Server.sendToMaster = (command, params) => {
2✔
255
    // console.log("Send to master: ", command);
1✔
256
    if (Server.cluster) {
1!
257
        if (Server.cluster.isMaster) {
×
258
            Server.receiveAsMaster(command, params)
×
259
        } else {
×
260
            process.send({ cmd: command, params })
×
261
        }
×
262
    } else {
1✔
263
        Server.receiveAsMaster(command, params)
1✔
264
    }
1✔
265
}
1✔
266

2✔
267
Server.receiveAsMaster = (command, params) => {
2✔
268
    if (!Server[command]) {
2✔
269
        Server.logerror(`Invalid command: ${command}`)
1✔
270
        return
1✔
271
    }
1✔
272
    Server[command].apply(Server, params)
1✔
273
}
2✔
274

2✔
275
function messageHandler(worker, msg, handle) {
×
276
    // sunset Haraka v3 (Node < 6)
×
277
    if (arguments.length === 2) {
×
278
        handle = msg
×
279
        msg = worker
×
280
        worker = undefined
×
281
    }
×
282
    // console.log("received cmd: ", msg);
×
283
    if (msg?.cmd) {
×
284
        Server.receiveAsMaster(msg.cmd, msg.params)
×
285
    }
×
286
}
×
287

2✔
288
Server.get_listen_addrs = (cfg, port) => {
2✔
289
    if (!port) port = 25
15✔
290
    let listeners = []
15✔
291
    if (cfg?.listen) {
15✔
292
        listeners = cfg.listen.split(/\s*,\s*/)
15✔
293
        if (listeners[0] === '') listeners = []
15!
294
        for (let i = 0; i < listeners.length; i++) {
15✔
295
            const ep = endpoint(listeners[i], port)
18✔
296
            if (ep instanceof Error) continue
18!
297
            listeners[i] = ep.toString()
18✔
298
        }
18✔
299
    }
15✔
300
    if (cfg.port) {
15!
301
        let host = cfg.listen_host
×
302
        if (!host) {
×
303
            host = '[::0]'
×
304
            Server.default_host = true
×
305
        }
×
306
        listeners.unshift(`${host}:${cfg.port}`)
×
307
    }
×
308
    if (listeners.length) return listeners
15✔
309

×
310
    Server.default_host = true
×
311
    listeners.push(`[::0]:${port}`)
×
312

×
313
    return listeners
×
314
}
15✔
315

2✔
316
Server.createServer = (params) => {
2✔
317
    const c = Server.cfg.main
6✔
318
    for (const key in params) {
6!
319
        if (typeof params[key] === 'function') continue
×
320
        c[key] = params[key]
×
321
    }
×
322

6✔
323
    Server.notes = {}
6✔
324
    Server.plugins.server = Server
6✔
325
    Server.plugins.load_plugins()
6✔
326

6✔
327
    const inactivity_timeout = (c.inactivity_timeout || 300) * 1000
6!
328

6✔
329
    if (!cluster || !c.nodes) {
6✔
330
        Server.daemonize(c)
6✔
331
        Server.setup_smtp_listeners(Server.plugins, 'master', inactivity_timeout)
6✔
332
        return
6✔
333
    }
6✔
334

×
335
    // Cluster
×
336
    Server.cluster = cluster
×
337

×
338
    // Cluster Workers
×
339
    if (!cluster.isMaster) {
×
340
        Server.setup_smtp_listeners(Server.plugins, 'child', inactivity_timeout)
×
341
        return
×
342
    } else {
×
343
        // console.log("Setting up message handler");
×
344
        cluster.on('message', messageHandler)
×
345
    }
×
346

×
347
    // Cluster Master
×
348
    // We fork workers in init_master_respond so that plugins
×
349
    // can put handlers on cluster events before they are emitted.
×
350
    Server.plugins.run_hooks('init_master', Server)
×
351
}
6✔
352

2✔
353
Server.load_default_tls_config = (done) => {
2✔
354
    // this fn exists solely for testing
17✔
355
    if (Server.config.root_path != tls_socket.config.root_path) {
17✔
356
        Server.loginfo(`resetting tls_config.config path to ${Server.config.root_path}`)
1✔
357
        tls_socket.config = tls_socket.config.module_config(path.dirname(Server.config.root_path))
1✔
358
    }
1✔
359
    tls_socket.getSocketOpts('*').then((opts) => {
17✔
360
        done(opts)
17✔
361
    })
17✔
362
}
17✔
363

2✔
364
Server.create_smtps_server = (opts, onConnect) => {
2✔
365
    let server
10✔
366
    const socket_tls_state = new WeakMap()
10✔
367
    const proxyPrefix = Buffer.from('PROXY ')
10✔
368
    // Defensive cap while waiting for a PROXY v1 line before TLS starts.
10✔
369
    const proxyLineReadLimit = 512
10✔
370

10✔
371
    const tlsServer = tls.createServer(opts, (cleartext) => {
10✔
372
        const smtps_state = socket_tls_state.get(cleartext._parent)
4✔
373
        if (smtps_state) cleartext.haraka_smtps = smtps_state
4✔
374
        socket_tls_state.delete(cleartext._parent)
4✔
375

4✔
376
        onConnect(cleartext)
4✔
377
    })
10✔
378

10✔
379
    function close_with_proxy_error(socket, timer, msg) {
10✔
380
        clearTimeout(timer)
3✔
381
        socket.removeAllListeners('data')
3✔
382
        socket.end(`421 ${msg}\r\n`, () => {
3✔
383
            socket.destroy()
3✔
384
        })
3✔
385
    }
3✔
386

10✔
387
    function start_tls(socket, proxy, peer_allowed) {
10✔
388
        if (proxy || peer_allowed) {
7✔
389
            const smtps_state = { peer_allowed }
3✔
390
            if (proxy) {
3✔
391
                smtps_state.proxy = {
1✔
392
                    ...proxy,
1✔
393
                    proxy_ip: net_utils.normalize_ip(socket.remoteAddress) || socket.remoteAddress,
1!
394
                }
1✔
395
            }
1✔
396
            socket_tls_state.set(socket, smtps_state)
3✔
397
        }
3✔
398

7✔
399
        tlsServer.emit('connection', socket)
7✔
400
    }
7✔
401

10✔
402
    tlsServer.on('tlsClientError', (err, cleartext) => {
10✔
403
        socket_tls_state.delete(cleartext?._parent)
3✔
404
        server.emit('tlsClientError', err, cleartext)
3✔
405
    })
10✔
406

10✔
407
    tlsServer.on('secureConnection', (cleartext) => {
10✔
408
        server.emit('secureConnection', cleartext)
4✔
409
    })
10✔
410

10✔
411
    function starts_with_proxy_prefix(data) {
10✔
412
        if (!data.length) return true
5!
413
        if (data.length > proxyPrefix.length) return data.subarray(0, proxyPrefix.length).equals(proxyPrefix)
5✔
NEW
414

×
NEW
415
        return proxyPrefix.subarray(0, data.length).equals(data)
×
416
    }
5✔
417

10✔
418
    function start_tls_with_buffer(socket, data, proxy, peer_allowed) {
10✔
419
        // Preserve bytes already read by the PROXY pre-parser, then hand the
3✔
420
        // paused socket to TLS before letting it read again.
3✔
421
        socket.pause()
3✔
422
        if (data?.length) socket.unshift(data)
3✔
423
        setImmediate(() => {
3✔
424
            start_tls(socket, proxy, peer_allowed)
3✔
425
            socket.resume()
3✔
426
        })
3✔
427
    }
3✔
428

10✔
429
    server = net.createServer((socket) => {
10✔
430
        const remote_ip = net_utils.normalize_ip(socket.remoteAddress) || socket.remoteAddress
10!
431

10✔
432
        if (!net_utils.is_haproxy_allowed(remote_ip)) {
10✔
433
            start_tls(socket)
4✔
434
            return
4✔
435
        }
4✔
436

6✔
437
        let current_data = null
6✔
438
        const proxy_timer = setTimeout(() => {
6✔
439
            close_with_proxy_error(socket, proxy_timer, 'PROXY timeout')
1✔
440
        }, 30 * 1000)
6✔
441

6✔
442
        function cleanup() {
6✔
443
            clearTimeout(proxy_timer)
6✔
444
            // Stop flowing before removing the pre-parser listener so TLS bytes
6✔
445
            // cannot arrive between listener removal and TLS attachment.
6✔
446
            socket.pause()
6✔
447
            socket.removeListener('data', on_data)
6✔
448
            socket.removeListener('close', cleanup)
6✔
449
            socket.removeListener('error', cleanup)
6✔
450
        }
6✔
451

6✔
452
        function on_data(data) {
6✔
453
            current_data = current_data ? Buffer.concat([current_data, data]) : data
5!
454

5✔
455
            if (!starts_with_proxy_prefix(current_data)) {
5✔
456
                cleanup()
2✔
457
                start_tls_with_buffer(socket, current_data, null, true)
2✔
458
                return
2✔
459
            }
2✔
460

3✔
461
            const offset = current_data.indexOf(0x0a)
3✔
462
            if (offset === -1) {
5✔
463
                if (current_data.length > proxyLineReadLimit) {
1✔
464
                    close_with_proxy_error(socket, proxy_timer, 'Invalid PROXY format')
1✔
465
                }
1✔
466
                return
1✔
467
            }
1✔
468
            if (offset > proxyLineReadLimit) {
5!
NEW
469
                close_with_proxy_error(socket, proxy_timer, 'Invalid PROXY format')
×
NEW
470
                return
×
NEW
471
            }
×
472

2✔
473
            cleanup()
2✔
474

2✔
475
            const proxy = net_utils.parse_proxy_line(current_data.slice(0, offset + 1))
2✔
476
            if (!proxy) {
5✔
477
                close_with_proxy_error(socket, proxy_timer, 'Invalid PROXY format')
1✔
478
                return
1✔
479
            }
1✔
480

1✔
481
            const rest = current_data.slice(offset + 1)
1✔
482
            start_tls_with_buffer(socket, rest, proxy, true)
1✔
483
        }
5✔
484

6✔
485
        socket.once('close', cleanup)
6✔
486
        socket.once('error', cleanup)
6✔
487
        socket.on('data', on_data)
6✔
488
    })
10✔
489

10✔
490
    server.tlsServer = tlsServer
10✔
491

10✔
492
    return server
10✔
493
}
10✔
494

2✔
495
Server.get_smtp_server = async (ep, inactivity_timeout) => {
2✔
496
    let server
17✔
497

17✔
498
    function onConnect(client) {
17✔
499
        client.setTimeout(inactivity_timeout)
9✔
500
        const connection = conn.createConnection(client, server, Server.cfg)
9✔
501

9✔
502
        if (server.has_tls) {
9✔
503
            const cipher = client.getCipher()
4✔
504
            cipher.version = client.getProtocol() // replace min with actual
4✔
505

4✔
506
            connection.setTLS({
4✔
507
                cipher,
4✔
508
                verified: client.authorized,
4✔
509
                verifyError: client.authorizationError,
4✔
510
                peerCertificate: client.getPeerCertificate(),
4✔
511
            })
4✔
512
        }
4✔
513

9✔
514
        if (client.haraka_smtps?.proxy) connection.apply_proxy(client.haraka_smtps.proxy)
9✔
515
    }
9✔
516

17✔
517
    if (ep.port === parseInt(Server.cfg.main.smtps_port, 10)) {
17✔
518
        Server.loginfo('getting SocketOpts for SMTPS server')
11✔
519
        const opts = await tls_socket.getSocketOpts('*')
11✔
520
        Server.loginfo(`Creating TLS server on ${ep}`)
11✔
521

11✔
522
        opts.rejectUnauthorized = tls_socket.get_rejectUnauthorized(
11✔
523
            opts.rejectUnauthorized,
11✔
524
            ep.port,
11✔
525
            tls_socket.cfg.main.requireAuthorized,
11✔
526
        )
11✔
527

11✔
528
        server = Server.connection.cfg.haproxy.enabled
11✔
529
            ? Server.create_smtps_server(opts, onConnect)
11✔
530
            : tls.createServer(opts, onConnect)
11✔
531
        const tls_event_server = server.tlsServer || server
11✔
532
        tls_socket.addOCSP(tls_event_server)
11✔
533
        server.has_tls = true
11✔
534
        tls_event_server.on('resumeSession', (id, rsDone) => {
11✔
535
            Server.loginfo('client requested TLS resumeSession')
6✔
536
            rsDone(null, null)
6✔
537
        })
11✔
538
        Server.listeners.push(server)
11✔
539
        return server
11✔
540
    } else {
17✔
541
        server = tls_socket.createServer(onConnect)
6✔
542
        server.has_tls = false
6✔
543
        const opts = await tls_socket.getSocketOpts('*')
6✔
544
        Server.listeners.push(server)
6✔
545
        return server
6✔
546
    }
6✔
547
}
17✔
548

2✔
549
Server.setup_smtp_listeners = async (plugins2, type, inactivity_timeout) => {
2✔
550
    const errors = []
6✔
551

6✔
552
    for (const [ifName, ifObj] of Object.entries(os.networkInterfaces())) {
6✔
553
        for (const addr of ifObj) {
12✔
554
            if (addr.family === 'IPv6') {
24✔
555
                if (!Server.notes.IPv6) Server.notes.IPv6 = true
12✔
556
            } else if (addr.family === 'IPv4') {
12✔
557
                if (!Server.notes.IPv4) Server.notes.IPv4 = true
12✔
558
            } else {
12!
559
                console.error(addr)
×
560
            }
×
561
        }
24✔
562
    }
12✔
563

6✔
564
    for (const listen_address of Server.get_listen_addrs(Server.cfg.main)) {
6✔
565
        const ep = endpoint(listen_address, 25)
6✔
566

6✔
567
        if (ep instanceof Error) {
6!
568
            Server.logerror(`Invalid "listen" format in smtp.ini: ${listen_address}`)
×
569
            continue
×
570
        }
×
571

6✔
572
        const server = await Server.get_smtp_server(ep, inactivity_timeout)
6✔
573
        if (!server) continue
6!
574

6✔
575
        server.notes = Server.notes
6✔
576
        if (Server.cluster) server.cluster = Server.cluster
6!
577

6✔
578
        server
6✔
579
            .on('listening', function () {
6✔
580
                const addr = this.address()
6✔
581
                Server.lognotice(`Listening on ${endpoint(addr)}`)
6✔
582
            })
6✔
583
            .on('close', () => {
6✔
584
                Server.loginfo(`Listener ${ep} stopped`)
6✔
585
            })
6✔
586
            .on('error', (e) => {
6✔
587
                errors.push(e)
×
588
                Server.logerror(`Failed to setup listeners: ${e.message}`)
×
589
                if (e.code !== 'EAFNOSUPPORT') {
×
590
                    Server.logerror(e)
×
591
                    return
×
592
                }
×
593
                // Fallback from IPv6 to IPv4 if not supported
×
594
                // But only if we supplied the default of [::0]:25
×
595
                if (/^::0/.test(ep.host) && Server.default_host) {
×
596
                    server.listen(ep.port, '0.0.0.0', 0)
×
597
                    return
×
598
                }
×
599
                // Pass error to callback
×
600
                Server.logerror(e)
×
601
            })
6✔
602

6✔
603
        await ep.bind(server, { backlog: 0 })
6✔
604
    }
6✔
605

6✔
606
    if (errors.length) {
6!
607
        for (const e of errors) {
×
608
            Server.logerror(`Failed to setup listeners: ${e.message}`)
×
609
        }
×
610
        return logger.dump_and_exit(-1)
×
611
    }
×
612
    Server.listening()
6✔
613
    plugins2.run_hooks(`init_${type}`, Server)
6✔
614
}
6✔
615

2✔
616
Server.setup_http_listeners = async () => {
2✔
617
    if (!Server.http?.cfg?.listen) return
6!
618

×
619
    const listeners = Server.get_listen_addrs(Server.http.cfg, 80)
×
620
    if (!listeners.length) return
×
621

×
622
    try {
×
623
        Server.http.express = require('express')
×
624
        Server.loginfo('express loaded at Server.http.express')
×
625
    } catch (err) {
×
626
        Server.logerror('express failed to load. No http server. Install express with: npm install -g express')
×
627
        return
×
628
    }
×
629

×
630
    const app = Server.http.express()
×
631
    Server.http.app = app
×
632
    Server.loginfo('express app is at Server.http.app')
×
633

×
634
    for (const listen_address of listeners) {
×
635
        const ep = endpoint(listen_address, 80)
×
636
        if (ep instanceof Error) {
×
637
            Server.logerror(`Invalid format for listen in http.ini: ${listen_address}`)
×
638
            continue
×
639
        }
×
640

×
641
        if (443 == ep.port) {
×
642
            const tlsOpts = { ...tls_socket.certsByHost['*'] }
×
643
            tlsOpts.requestCert = false // not appropriate for HTTPS
×
644
            Server.http.server = require('https').createServer(tlsOpts, app)
×
645
        } else {
×
646
            Server.http.server = require('http').createServer(app)
×
647
        }
×
648

×
649
        Server.listeners.push(Server.http.server)
×
650

×
651
        Server.http.server.on('listening', function () {
×
652
            Server.lognotice(`Listening on ${endpoint(this.address())}`)
×
653
        })
×
654

×
655
        Server.http.server.on('error', (e) => {
×
656
            Server.logerror(e)
×
657
        })
×
658

×
659
        await ep.bind(Server.http.server, { backlog: 0 })
×
660
    }
×
661

×
662
    Server.plugins.run_hooks('init_http', Server)
×
663
    app.use(Server.http.express.static(Server.get_http_docroot()))
×
664
    app.use(Server.handle404)
×
665
}
6✔
666

2✔
667
Server.init_master_respond = async (retval, msg) => {
2✔
668
    if (!(retval === constants.ok || retval === constants.cont)) {
6!
669
        Server.logerror(`init_master returned error${msg ? `: ${msg}` : ''}`)
×
670
        return logger.dump_and_exit(1)
×
671
    }
×
672

6✔
673
    const c = Server.cfg.main
6✔
674
    Server.ready = 1
6✔
675

6✔
676
    // Load the queue if we're just one process
6✔
677
    if (!(cluster && c.nodes)) {
6✔
678
        try {
6✔
679
            await outbound.init_queue()
6✔
680
        } catch (err) {
6!
681
            Server.logcrit('Loading queue failed. Shutting down.')
×
682
            return logger.dump_and_exit(1)
×
683
        }
×
684
        Server.setup_http_listeners()
6✔
685
        return
6✔
686
    }
6✔
687

×
688
    // Running under cluster, fork children here, so that
×
689
    // cluster events can be registered in init_master hooks.
×
690
    try {
×
691
        const pids = await outbound.scan_queue_pids()
×
692
        Server.daemonize()
×
693
        // Fork workers
×
694
        const workers = c.nodes === 'cpus' ? os.cpus().length : c.nodes
6!
695
        const new_workers = []
6✔
696
        for (let i = 0; i < workers; i++) {
6!
697
            new_workers.push(cluster.fork({ CLUSTER_MASTER_PID: process.pid }))
×
698
        }
×
699
        for (let j = 0; j < pids.length; j++) {
×
700
            new_workers[j % new_workers.length].send({
×
701
                event: 'outbound.load_pid_queue',
×
702
                data: pids[j],
×
703
            })
×
704
        }
×
705
        cluster.on('online', (worker) => {
×
706
            Server.lognotice('worker started', {
×
707
                worker: worker.id,
×
708
                pid: worker.process.pid,
×
709
            })
×
710
        })
×
711
        cluster.on('listening', (worker, address) => {
×
712
            Server.lognotice(`worker ${worker.id} listening on ${endpoint(address)}`)
×
713
        })
×
714
        cluster.on('exit', cluster_exit_listener)
×
715
    } catch (err) {
×
716
        Server.logcrit('Scanning queue failed. Shutting down.')
×
717
        logger.dump_and_exit(1)
×
718
    }
×
719
}
6✔
720

2✔
721
function cluster_exit_listener(worker, code, signal) {
×
722
    if (signal) {
×
723
        Server.lognotice(`worker ${worker.id} killed by signal ${signal}`)
×
724
    } else if (code !== 0) {
×
725
        Server.lognotice(`worker ${worker.id} exited with error code: ${code}`)
×
726
    }
×
727
    if (signal || code !== 0) {
×
728
        // Restart worker
×
729
        const new_worker = cluster.fork({
×
730
            CLUSTER_MASTER_PID: process.pid,
×
731
        })
×
732
        new_worker.send({
×
733
            event: 'outbound.load_pid_queue',
×
734
            data: worker.process.pid,
×
735
        })
×
736
    }
×
737
}
×
738

2✔
739
Server.init_child_respond = (retval, msg) => {
2✔
740
    switch (retval) {
2✔
741
        case constants.ok:
2✔
742
        case constants.cont:
2✔
743
            Server.setup_http_listeners()
1✔
744
            return
1✔
745
    }
2✔
746

1✔
747
    const pid = process.env.CLUSTER_MASTER_PID
1✔
748
    Server.logerror(`init_child returned error ${msg ? `: ${msg}` : ''}`)
2!
749
    try {
2✔
750
        if (pid) {
2✔
751
            process.kill(pid)
1✔
752
            Server.logerror(`Killing master (pid=${pid})`)
1✔
753
        }
1✔
754
    } catch (err) {
2!
755
        Server.logerror('Terminating child')
×
756
    }
×
757
    logger.dump_and_exit(1)
1✔
758
}
2✔
759

2✔
760
Server.listening = () => {
2✔
761
    const c = Server.cfg.main
7✔
762

7✔
763
    // Drop privileges
7✔
764
    if (c.group) {
7✔
765
        Server.lognotice(`Switching from current gid: ${process.getgid()}`)
1✔
766
        process.setgid(c.group)
1✔
767
        Server.lognotice(`New gid: ${process.getgid()}`)
1✔
768
    }
1✔
769
    if (c.user) {
7✔
770
        Server.lognotice(`Switching from current uid: ${process.getuid()}`)
1✔
771
        process.setuid(c.user)
1✔
772
        Server.lognotice(`New uid: ${process.getuid()}`)
1✔
773
    }
1✔
774

7✔
775
    Server.ready = 1
7✔
776
}
7✔
777

2✔
778
Server.init_http_respond = () => {
2✔
779
    Server.loginfo('init_http_respond')
1✔
780

1✔
781
    let WebSocketServer
1✔
782
    try {
1✔
783
        WebSocketServer = require('ws').Server
1✔
784
    } catch (e) {
1✔
785
        Server.logerror(`unable to load ws.\n  did you: npm install -g ws?`)
1✔
786
        return
1✔
787
    }
1✔
788

×
789
    if (!WebSocketServer) {
×
790
        Server.logerror('ws failed to load')
×
791
        return
×
792
    }
×
793

×
794
    Server.http.wss = new WebSocketServer({ server: Server.http.server })
×
795
    Server.loginfo('Server.http.wss loaded')
×
796

×
797
    Server.plugins.run_hooks('init_wss', Server)
×
798
}
1✔
799

2✔
800
Server.init_wss_respond = () => {
2✔
801
    Server.loginfo('init_wss_respond')
×
802
}
×
803

2✔
804
Server.get_http_docroot = () => {
2✔
805
    if (Server.http.cfg.docroot) return Server.http.cfg.docroot
4✔
806

1✔
807
    Server.http.cfg.docroot = path.join(process.env.HARAKA || __dirname, 'http', 'html')
4✔
808
    Server.loginfo(`using html docroot: ${Server.http.cfg.docroot}`)
4✔
809
    return Server.http.cfg.docroot
4✔
810
}
4✔
811

2✔
812
Server.handle404 = (req, res) => {
2✔
813
    // abandon all hope, serve up a 404
3✔
814
    const docroot = Server.get_http_docroot()
3✔
815

3✔
816
    // respond with html page
3✔
817
    if (req.accepts('html')) {
3✔
818
        res.status(404).sendFile('404.html', { root: docroot })
1✔
819
        return
1✔
820
    }
1✔
821

2✔
822
    // respond with json
2✔
823
    if (req.accepts('json')) {
3✔
824
        res.status(404).send({ err: 'Not found' })
1✔
825
        return
1✔
826
    }
1✔
827

1✔
828
    res.status(404).send('Not found!')
1✔
829
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc