• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albertito / chasquid / 18614511724

12 Oct 2025 10:49AM UTC coverage: 95.001% (-0.03%) from 95.033%
18614511724

push

albertito
fail2ban: Use more targeted regexp to catch errors

The current fail2ban regexp catches all SMTP connection errors.

This works fine, but includes connection errors, that can be caused by
transient external causes, and accidentally delay email delivery.

This patch changes the regexp to be more targeted towards specific SMTP
errors that are likely to be caused by deliberate actions.

The expression was cross-checked with a few month of errors to confirm
it should not have false positives, and that it correctly left
connection errors alone.

Thanks to pepperbob@github for reporting this in
https://github.com/albertito/chasquid/issues/77.

5910 of 6221 relevant lines covered (95.0%)

63867.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.91
/internal/queue/queue.go
1
// Package queue implements our email queue.
2
// Accepted envelopes get put in the queue, and processed asynchronously.
3
package queue
4

5
// Command to generate queue.pb.go from queue.proto.
6
//go:generate protoc --go_out=. --go_opt=paths=source_relative -I=${GOPATH}/src -I. queue.proto
7

8
import (
9
        "bytes"
10
        "context"
11
        "encoding/base64"
12
        "encoding/binary"
13
        "fmt"
14
        "math/rand/v2"
15
        "os"
16
        "os/exec"
17
        "path/filepath"
18
        "strings"
19
        "sync"
20
        "time"
21

22
        "blitiri.com.ar/go/chasquid/internal/aliases"
23
        "blitiri.com.ar/go/chasquid/internal/courier"
24
        "blitiri.com.ar/go/chasquid/internal/envelope"
25
        "blitiri.com.ar/go/chasquid/internal/expvarom"
26
        "blitiri.com.ar/go/chasquid/internal/maillog"
27
        "blitiri.com.ar/go/chasquid/internal/protoio"
28
        "blitiri.com.ar/go/chasquid/internal/set"
29
        "blitiri.com.ar/go/chasquid/internal/trace"
30
        "blitiri.com.ar/go/log"
31

32
        "golang.org/x/net/idna"
33
)
34

35
const (
36
        // Prefix for item file names.
37
        // This is for convenience, versioning, and to be able to tell them apart
38
        // temporary files and other cruft.
39
        // It's important that it's outside the base64 space so it doesn't get
40
        // generated accidentally.
41
        itemFilePrefix = "m:"
42
)
43

44
var (
45
        errQueueFull = fmt.Errorf("Queue size too big, try again later")
46
)
47

48
// Exported variables.
49
var (
50
        putCount = expvarom.NewInt("chasquid/queue/putCount",
51
                "count of envelopes attempted to be put in the queue")
52
        itemsWritten = expvarom.NewInt("chasquid/queue/itemsWritten",
53
                "count of items the queue wrote to disk")
54
        dsnQueued = expvarom.NewInt("chasquid/queue/dsnQueued",
55
                "count of DSNs that we generated (queued)")
56
        deliverAttempts = expvarom.NewMap("chasquid/queue/deliverAttempts",
57
                "recipient_type", "attempts to deliver mail, by recipient type")
58
)
59

60
// Channel used to get random IDs for items in the queue.
61
var newID chan string
62

63
func generateNewIDs() {
44✔
64
        // The IDs are only used internally, we are ok with using a PRNG.
44✔
65
        // IDs are base64(8 random bytes), but the code doesn't care.
44✔
66
        buf := make([]byte, 8)
44✔
67
        for {
468✔
68
                binary.NativeEndian.PutUint64(buf, rand.Uint64())
424✔
69
                newID <- base64.RawURLEncoding.EncodeToString(buf)
424✔
70
        }
424✔
71
}
72

73
func init() {
44✔
74
        newID = make(chan string, 4)
44✔
75
        go generateNewIDs()
44✔
76
}
44✔
77

78
// Queue that keeps mail waiting for delivery.
79
type Queue struct {
80
        // Couriers to use to deliver mail.
81
        localC  courier.Courier
82
        remoteC courier.Courier
83

84
        // Domains we consider local.
85
        localDomains *set.String
86

87
        // Path where we store the queue.
88
        path string
89

90
        // Aliases resolver.
91
        aliases *aliases.Resolver
92

93
        // The maximum number of items in the queue.
94
        MaxItems int
95

96
        // Give up sending attempts after this long.
97
        GiveUpAfter time.Duration
98

99
        // Mutex protecting q.
100
        mu sync.RWMutex
101

102
        // Items in the queue. Map of id -> Item.
103
        q map[string]*Item
104
}
105

106
// New creates a new Queue instance.
107
func New(path string, localDomains *set.String, aliases *aliases.Resolver,
108
        localC, remoteC courier.Courier) (*Queue, error) {
39✔
109

39✔
110
        err := os.MkdirAll(path, 0700)
39✔
111
        q := &Queue{
39✔
112
                q:            map[string]*Item{},
39✔
113
                localC:       localC,
39✔
114
                remoteC:      remoteC,
39✔
115
                localDomains: localDomains,
39✔
116
                path:         path,
39✔
117
                aliases:      aliases,
39✔
118

39✔
119
                // We reject emails when we hit this.
39✔
120
                // Note the actual default used in the daemon is set in the config. We
39✔
121
                // put a non-zero value here just to be safe.
39✔
122
                MaxItems: 100,
39✔
123

39✔
124
                // We give up sending (and return a DSN) after this long.
39✔
125
                // Note the actual default used in the daemon is set in the config. We
39✔
126
                // put a non-zero value here just to be safe.
39✔
127
                GiveUpAfter: 20 * time.Hour,
39✔
128
        }
39✔
129
        return q, err
39✔
130
}
39✔
131

132
// Load the queue and launch the sending loops on startup.
133
func (q *Queue) Load() error {
32✔
134
        files, err := filepath.Glob(q.path + "/" + itemFilePrefix + "*")
32✔
135
        if err != nil {
32✔
136
                return err
×
137
        }
×
138

139
        for _, fname := range files {
34✔
140
                item, err := ItemFromFile(fname)
2✔
141
                if err != nil {
2✔
142
                        log.Errorf("error loading queue item from %q: %v", fname, err)
×
143
                        continue
×
144
                }
145

146
                q.mu.Lock()
2✔
147
                q.q[item.ID] = item
2✔
148
                q.mu.Unlock()
2✔
149

2✔
150
                go item.SendLoop(q)
2✔
151
        }
152

153
        return nil
32✔
154
}
155

156
// Len returns the number of elements in the queue.
157
func (q *Queue) Len() int {
94✔
158
        q.mu.RLock()
94✔
159
        defer q.mu.RUnlock()
94✔
160
        return len(q.q)
94✔
161
}
94✔
162

163
// Put an envelope in the queue.
164
func (q *Queue) Put(tr *trace.Trace, from string, to []string, data []byte) (string, error) {
91✔
165
        tr = tr.NewChild("Queue.Put", from)
91✔
166
        defer tr.Finish()
91✔
167

91✔
168
        if nItems := q.Len(); nItems >= q.MaxItems {
92✔
169
                tr.Errorf("queue full (%d items)", nItems)
1✔
170
                return "", errQueueFull
1✔
171
        }
1✔
172
        putCount.Add(1)
90✔
173

90✔
174
        item := &Item{
90✔
175
                Message: Message{
90✔
176
                        ID:   <-newID,
90✔
177
                        From: from,
90✔
178
                        Data: data,
90✔
179
                },
90✔
180
                CreatedAt: time.Now(),
90✔
181
        }
90✔
182

90✔
183
        for _, t := range to {
184✔
184
                item.To = append(item.To, t)
94✔
185

94✔
186
                rcpts, err := q.aliases.Resolve(tr, t)
94✔
187
                if err != nil {
94✔
188
                        return "", fmt.Errorf("error resolving aliases for %q: %v", t, err)
×
189
                }
×
190

191
                // Add the recipients (after resolving aliases); this conversion is
192
                // not very pretty but at least it's self contained.
193
                for _, aliasRcpt := range rcpts {
196✔
194
                        r := &Recipient{
102✔
195
                                Address:         aliasRcpt.Addr,
102✔
196
                                Status:          Recipient_PENDING,
102✔
197
                                OriginalAddress: t,
102✔
198
                        }
102✔
199
                        switch aliasRcpt.Type {
102✔
200
                        case aliases.EMAIL:
93✔
201
                                r.Type = Recipient_EMAIL
93✔
202
                        case aliases.PIPE:
3✔
203
                                r.Type = Recipient_PIPE
3✔
204
                        case aliases.FORWARD:
6✔
205
                                r.Type = Recipient_FORWARD
6✔
206
                                r.Via = aliasRcpt.Via
6✔
207
                        default:
×
208
                                log.Errorf("unknown alias type %v when resolving %q",
×
209
                                        aliasRcpt.Type, t)
×
210
                                return "", tr.Errorf("internal error - unknown alias type")
×
211
                        }
212
                        item.Rcpt = append(item.Rcpt, r)
102✔
213
                        tr.Debugf("recipient: %v", r.Address)
102✔
214
                }
215
        }
216

217
        err := item.WriteTo(q.path)
90✔
218
        if err != nil {
90✔
219
                return "", tr.Errorf("failed to write item: %v", err)
×
220
        }
×
221

222
        q.mu.Lock()
90✔
223
        q.q[item.ID] = item
90✔
224
        q.mu.Unlock()
90✔
225

90✔
226
        // Begin to send it right away.
90✔
227
        go item.SendLoop(q)
90✔
228

90✔
229
        tr.Debugf("queued")
90✔
230
        return item.ID, nil
90✔
231
}
232

233
// Remove an item from the queue.
234
func (q *Queue) Remove(id string) {
93✔
235
        path := fmt.Sprintf("%s/%s%s", q.path, itemFilePrefix, id)
93✔
236
        err := os.Remove(path)
93✔
237
        if err != nil {
94✔
238
                log.Errorf("failed to remove queue file %q: %v", path, err)
1✔
239
        }
1✔
240

241
        q.mu.Lock()
93✔
242
        delete(q.q, id)
93✔
243
        q.mu.Unlock()
93✔
244
}
245

246
// DumpString returns a human-readable string with the current queue.
247
// Useful for debugging purposes.
248
func (q *Queue) DumpString() string {
2✔
249
        q.mu.RLock()
2✔
250
        defer q.mu.RUnlock()
2✔
251
        s := "# Queue status\n\n"
2✔
252
        s += fmt.Sprintf("date: %v\n", time.Now())
2✔
253
        s += fmt.Sprintf("length: %d\n\n", len(q.q))
2✔
254

2✔
255
        for id, item := range q.q {
3✔
256
                s += fmt.Sprintf("## Item %s\n", id)
1✔
257
                item.Lock()
1✔
258
                s += fmt.Sprintf("created at: %s\n", item.CreatedAt)
1✔
259
                s += fmt.Sprintf("from: %s\n", item.From)
1✔
260
                s += fmt.Sprintf("to: %s\n", item.To)
1✔
261
                for _, rcpt := range item.Rcpt {
2✔
262
                        s += fmt.Sprintf("%s %s (%s)\n", rcpt.Status, rcpt.Address, rcpt.Type)
1✔
263
                        s += fmt.Sprintf("  original address: %s\n", rcpt.OriginalAddress)
1✔
264
                        s += fmt.Sprintf("  last failure: %q\n", rcpt.LastFailureMessage)
1✔
265
                }
1✔
266
                item.Unlock()
1✔
267
                s += "\n"
1✔
268
        }
269

270
        return s
2✔
271
}
272

273
// An Item in the queue.
274
type Item struct {
275
        // Base the item on the protobuf message.
276
        // We will use this for serialization, so any fields below are NOT
277
        // serialized.
278
        Message
279

280
        // Protect the entire item.
281
        sync.Mutex
282

283
        // Go-friendly version of Message.CreatedAtTs.
284
        CreatedAt time.Time
285
}
286

287
// ItemFromFile loads an item from the given file.
288
func ItemFromFile(fname string) (*Item, error) {
2✔
289
        item := &Item{}
2✔
290
        err := protoio.ReadTextMessage(fname, &item.Message)
2✔
291
        if err != nil {
2✔
292
                return nil, err
×
293
        }
×
294

295
        item.CreatedAt = timeFromProto(item.CreatedAtTs)
2✔
296
        return item, nil
2✔
297
}
298

299
// WriteTo saves an item to the given directory.
300
func (item *Item) WriteTo(dir string) error {
196✔
301
        item.Lock()
196✔
302
        defer item.Unlock()
196✔
303
        itemsWritten.Add(1)
196✔
304

196✔
305
        item.CreatedAtTs = timeToProto(item.CreatedAt)
196✔
306

196✔
307
        path := fmt.Sprintf("%s/%s%s", dir, itemFilePrefix, item.ID)
196✔
308

196✔
309
        return protoio.WriteTextMessage(path, &item.Message, 0600)
196✔
310
}
196✔
311

312
// SendLoop repeatedly attempts to send the item.
313
func (item *Item) SendLoop(q *Queue) {
93✔
314
        tr := trace.New("Queue.SendLoop", item.ID)
93✔
315
        defer tr.Finish()
93✔
316
        tr.Printf("from %s", item.From)
93✔
317

93✔
318
        for time.Since(item.CreatedAt) < q.GiveUpAfter {
185✔
319
                // Send to all recipients that are still pending.
92✔
320
                var wg sync.WaitGroup
92✔
321
                for _, rcpt := range item.Rcpt {
196✔
322
                        if rcpt.Status != Recipient_PENDING {
104✔
323
                                continue
×
324
                        }
325

326
                        wg.Add(1)
104✔
327
                        go item.sendOneRcpt(&wg, tr, q, rcpt)
104✔
328
                }
329
                wg.Wait()
92✔
330

92✔
331
                // If they're all done, no need to wait.
92✔
332
                if item.countRcpt(Recipient_PENDING) == 0 {
182✔
333
                        break
90✔
334
                }
335

336
                // TODO: Consider sending a non-final notification after 30m or so,
337
                // that some of the messages have been delayed.
338

339
                delay := nextDelay(item.CreatedAt)
×
340
                tr.Printf("waiting for %v", delay)
×
341
                maillog.QueueLoop(item.ID, item.From, delay)
×
342
                time.Sleep(delay)
×
343
        }
344

345
        // Completed to all recipients (some may not have succeeded).
346
        if item.countRcpt(Recipient_FAILED, Recipient_PENDING) > 0 && item.From != "<>" {
95✔
347
                sendDSN(tr, q, item)
4✔
348
        }
4✔
349

350
        tr.Printf("all done")
91✔
351
        maillog.QueueLoop(item.ID, item.From, 0)
91✔
352
        q.Remove(item.ID)
91✔
353
}
354

355
// sendOneRcpt, and update it with the results.
356
func (item *Item) sendOneRcpt(wg *sync.WaitGroup, tr *trace.Trace, q *Queue, rcpt *Recipient) {
104✔
357
        defer wg.Done()
104✔
358
        to := rcpt.Address
104✔
359
        tr.Debugf("%s sending", to)
104✔
360

104✔
361
        err, permanent := item.deliver(q, rcpt)
104✔
362

104✔
363
        item.Lock()
104✔
364
        if err != nil {
108✔
365
                rcpt.LastFailureMessage = err.Error()
4✔
366
                if permanent {
8✔
367
                        tr.Errorf("%s permanent error: %v", to, err)
4✔
368
                        maillog.SendAttempt(item.ID, item.From, to, err, true)
4✔
369
                        rcpt.Status = Recipient_FAILED
4✔
370
                } else {
4✔
371
                        tr.Printf("%s temporary error: %v", to, err)
×
372
                        maillog.SendAttempt(item.ID, item.From, to, err, false)
×
373
                }
×
374
        } else {
99✔
375
                tr.Printf("%s sent", to)
99✔
376
                maillog.SendAttempt(item.ID, item.From, to, nil, false)
99✔
377
                rcpt.Status = Recipient_SENT
99✔
378
        }
99✔
379
        item.Unlock()
103✔
380

103✔
381
        err = item.WriteTo(q.path)
103✔
382
        if err != nil {
103✔
383
                tr.Errorf("failed to write: %v", err)
×
384
        }
×
385
}
386

387
// deliver the item to the given recipient, using the couriers from the queue.
388
// Return an error (if any), and whether it is permanent or not.
389
func (item *Item) deliver(q *Queue, rcpt *Recipient) (err error, permanent bool) {
105✔
390
        if rcpt.Type == Recipient_PIPE {
109✔
391
                deliverAttempts.Add("pipe", 1)
4✔
392
                c := strings.Fields(rcpt.Address)
4✔
393
                if len(c) == 0 {
4✔
394
                        return fmt.Errorf("empty pipe"), true
×
395
                }
×
396
                ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
4✔
397
                defer cancel()
4✔
398
                cmd := exec.CommandContext(ctx, c[0], c[1:]...)
4✔
399
                cmd.Stdin = bytes.NewReader(item.Data)
4✔
400
                return cmd.Run(), true
4✔
401
        }
402

403
        // Recipient type is FORWARD: we always use the remote courier, and pass
404
        // the list of servers that was given to us.
405
        if rcpt.Type == Recipient_FORWARD {
107✔
406
                deliverAttempts.Add("forward", 1)
6✔
407

6✔
408
                // When forwarding with an explicit list of servers, we use SRS if
6✔
409
                // we're sending from a non-local domain (regardless of the
6✔
410
                // destination).
6✔
411
                from := item.From
6✔
412
                if !envelope.DomainIn(item.From, q.localDomains) {
9✔
413
                        from = rewriteSender(item.From, rcpt.OriginalAddress)
3✔
414
                }
3✔
415
                return q.remoteC.Forward(from, rcpt.Address, item.Data, rcpt.Via)
6✔
416
        }
417

418
        // Recipient type is EMAIL.
419
        if envelope.DomainIn(rcpt.Address, q.localDomains) {
164✔
420
                deliverAttempts.Add("email:local", 1)
69✔
421
                return q.localC.Deliver(item.From, rcpt.Address, item.Data)
69✔
422
        }
69✔
423

424
        deliverAttempts.Add("email:remote", 1)
26✔
425
        from := item.From
26✔
426
        if !envelope.DomainIn(item.From, q.localDomains) {
31✔
427
                // We're sending from a non-local to a non-local, need to do SRS.
5✔
428
                from = rewriteSender(item.From, rcpt.OriginalAddress)
5✔
429
        }
5✔
430
        return q.remoteC.Deliver(from, rcpt.Address, item.Data)
26✔
431
}
432

433
func rewriteSender(from, originalAddr string) string {
8✔
434
        // Apply a send-only Sender Rewriting Scheme (SRS).
8✔
435
        // This is used when we are sending from a (potentially) non-local domain,
8✔
436
        // to a non-local domain.
8✔
437
        // This should happen only when there's an alias to forward email to a
8✔
438
        // non-local domain (either a normal "email" alias with a remote
8✔
439
        // destination, or a "forward" alias with a list of servers).
8✔
440
        // In this case, using the original From is problematic, as we may not be
8✔
441
        // an authorized sender for this.
8✔
442
        // To do this, we use a sender rewriting scheme, similar to what other
8✔
443
        // MTAs do (e.g. gmail or postfix).
8✔
444
        // Note this assumes "+" is an alias suffix separator.
8✔
445
        // We use the IDNA version of the domain if possible, because
8✔
446
        // we can't know if the other side will support SMTPUTF8.
8✔
447
        return fmt.Sprintf("%s+fwd_from=%s@%s",
8✔
448
                envelope.UserOf(originalAddr),
8✔
449
                strings.Replace(from, "@", "=", -1),
8✔
450
                mustIDNAToASCII(envelope.DomainOf(originalAddr)))
8✔
451
}
8✔
452

453
// countRcpt counts how many recipients are in the given status.
454
func (item *Item) countRcpt(statuses ...Recipient_Status) int {
181✔
455
        c := 0
181✔
456
        for _, rcpt := range item.Rcpt {
386✔
457
                for _, status := range statuses {
509✔
458
                        if rcpt.Status == status {
309✔
459
                                c++
5✔
460
                                break
5✔
461
                        }
462
                }
463
        }
464
        return c
181✔
465
}
466

467
func sendDSN(tr *trace.Trace, q *Queue, item *Item) {
4✔
468
        tr.Debugf("sending DSN")
4✔
469

4✔
470
        // Pick a (local) domain to send the DSN from. We should always find one,
4✔
471
        // as otherwise we're relaying.
4✔
472
        domain := "unknown"
4✔
473
        if item.From != "<>" && envelope.DomainIn(item.From, q.localDomains) {
7✔
474
                domain = envelope.DomainOf(item.From)
3✔
475
        } else {
4✔
476
                for _, rcpt := range item.Rcpt {
2✔
477
                        if envelope.DomainIn(rcpt.OriginalAddress, q.localDomains) {
2✔
478
                                domain = envelope.DomainOf(rcpt.OriginalAddress)
1✔
479
                                break
1✔
480
                        }
481
                }
482
        }
483

484
        msg, err := deliveryStatusNotification(domain, item)
4✔
485
        if err != nil {
4✔
486
                tr.Errorf("failed to build DSN: %v", err)
×
487
                return
×
488
        }
×
489

490
        // TODO: DKIM signing.
491

492
        id, err := q.Put(tr, "<>", []string{item.From}, msg)
4✔
493
        if err != nil {
4✔
494
                tr.Errorf("failed to queue DSN: %v", err)
×
495
                return
×
496
        }
×
497

498
        tr.Printf("queued DSN: %s", id)
4✔
499
        dsnQueued.Add(1)
4✔
500
}
501

502
func nextDelay(createdAt time.Time) time.Duration {
50✔
503
        var delay time.Duration
50✔
504

50✔
505
        since := time.Since(createdAt)
50✔
506
        switch {
50✔
507
        case since < 1*time.Minute:
10✔
508
                delay = 1 * time.Minute
10✔
509
        case since < 5*time.Minute:
10✔
510
                delay = 5 * time.Minute
10✔
511
        case since < 10*time.Minute:
10✔
512
                delay = 10 * time.Minute
10✔
513
        default:
20✔
514
                delay = 20 * time.Minute
20✔
515
        }
516

517
        // Perturb the delay, to avoid all queued emails to be retried at the
518
        // exact same time after a restart.
519
        delay += rand.N(60 * time.Second)
50✔
520
        return delay
50✔
521
}
522

523
func mustIDNAToASCII(s string) string {
8✔
524
        a, err := idna.ToASCII(s)
8✔
525
        if err != nil {
8✔
526
                return a
×
527
        }
×
528
        return s
8✔
529
}
530

531
func timeFromProto(ts *Timestamp) time.Time {
2✔
532
        return time.Unix(ts.Seconds, int64(ts.Nanos)).UTC()
2✔
533
}
2✔
534

535
func timeToProto(t time.Time) *Timestamp {
196✔
536
        return &Timestamp{
196✔
537
                Seconds: t.Unix(),
196✔
538
                Nanos:   int32(t.Nanosecond()),
196✔
539
        }
196✔
540
}
196✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc