• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albertito / chasquid / 18434984096

13 Aug 2025 10:10PM UTC coverage: 95.002% (-0.03%) from 95.034%
18434984096

push

albertito
courier: Don't hardcode path to `sleep` binary in the tests

On NixOS, `/bin` is basically empty and this causes the courier tests
(which invoke `/bin/sleep`) to fail.

This patch fixes the tests by removing the hardcoded path.

https://github.com/albertito/chasquid/pull/73

Amended-by: Alberto Bertogli <albertito@blitiri.com.ar>
  Adjusted commit message.

5911 of 6222 relevant lines covered (95.0%)

63857.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.91
/internal/queue/queue.go
1
// Package queue implements our email queue.
2
// Accepted envelopes get put in the queue, and processed asynchronously.
3
package queue
4

5
// Command to generate queue.pb.go from queue.proto.
6
//go:generate protoc --go_out=. --go_opt=paths=source_relative -I=${GOPATH}/src -I. queue.proto
7

8
import (
9
        "bytes"
10
        "context"
11
        "encoding/base64"
12
        "encoding/binary"
13
        "fmt"
14
        "math/rand/v2"
15
        "os"
16
        "os/exec"
17
        "path/filepath"
18
        "strings"
19
        "sync"
20
        "time"
21

22
        "blitiri.com.ar/go/chasquid/internal/aliases"
23
        "blitiri.com.ar/go/chasquid/internal/courier"
24
        "blitiri.com.ar/go/chasquid/internal/envelope"
25
        "blitiri.com.ar/go/chasquid/internal/expvarom"
26
        "blitiri.com.ar/go/chasquid/internal/maillog"
27
        "blitiri.com.ar/go/chasquid/internal/protoio"
28
        "blitiri.com.ar/go/chasquid/internal/set"
29
        "blitiri.com.ar/go/chasquid/internal/trace"
30
        "blitiri.com.ar/go/log"
31

32
        "golang.org/x/net/idna"
33
)
34

35
const (
36
        // Prefix for item file names.
37
        // This is for convenience, versioning, and to be able to tell them apart
38
        // temporary files and other cruft.
39
        // It's important that it's outside the base64 space so it doesn't get
40
        // generated accidentally.
41
        itemFilePrefix = "m:"
42
)
43

44
var (
45
        errQueueFull = fmt.Errorf("Queue size too big, try again later")
46
)
47

48
// Exported variables.
49
var (
50
        putCount = expvarom.NewInt("chasquid/queue/putCount",
51
                "count of envelopes attempted to be put in the queue")
52
        itemsWritten = expvarom.NewInt("chasquid/queue/itemsWritten",
53
                "count of items the queue wrote to disk")
54
        dsnQueued = expvarom.NewInt("chasquid/queue/dsnQueued",
55
                "count of DSNs that we generated (queued)")
56
        deliverAttempts = expvarom.NewMap("chasquid/queue/deliverAttempts",
57
                "recipient_type", "attempts to deliver mail, by recipient type")
58
)
59

60
// Channel used to get random IDs for items in the queue.
61
var newID chan string
62

63
func generateNewIDs() {
44✔
64
        // The IDs are only used internally, we are ok with using a PRNG.
44✔
65
        // IDs are base64(8 random bytes), but the code doesn't care.
44✔
66
        buf := make([]byte, 8)
44✔
67
        for {
468✔
68
                binary.NativeEndian.PutUint64(buf, rand.Uint64())
424✔
69
                newID <- base64.RawURLEncoding.EncodeToString(buf)
424✔
70
        }
424✔
71
}
72

73
func init() {
44✔
74
        newID = make(chan string, 4)
44✔
75
        go generateNewIDs()
44✔
76
}
44✔
77

78
// Queue that keeps mail waiting for delivery.
79
type Queue struct {
80
        // Couriers to use to deliver mail.
81
        localC  courier.Courier
82
        remoteC courier.Courier
83

84
        // Domains we consider local.
85
        localDomains *set.String
86

87
        // Path where we store the queue.
88
        path string
89

90
        // Aliases resolver.
91
        aliases *aliases.Resolver
92

93
        // The maximum number of items in the queue.
94
        MaxItems int
95

96
        // Give up sending attempts after this long.
97
        GiveUpAfter time.Duration
98

99
        // Mutex protecting q.
100
        mu sync.RWMutex
101

102
        // Items in the queue. Map of id -> Item.
103
        q map[string]*Item
104
}
105

106
// New creates a new Queue instance.
107
func New(path string, localDomains *set.String, aliases *aliases.Resolver,
108
        localC, remoteC courier.Courier) (*Queue, error) {
39✔
109

39✔
110
        err := os.MkdirAll(path, 0700)
39✔
111
        q := &Queue{
39✔
112
                q:            map[string]*Item{},
39✔
113
                localC:       localC,
39✔
114
                remoteC:      remoteC,
39✔
115
                localDomains: localDomains,
39✔
116
                path:         path,
39✔
117
                aliases:      aliases,
39✔
118

39✔
119
                // We reject emails when we hit this.
39✔
120
                // Note the actual default used in the daemon is set in the config. We
39✔
121
                // put a non-zero value here just to be safe.
39✔
122
                MaxItems: 100,
39✔
123

39✔
124
                // We give up sending (and return a DSN) after this long.
39✔
125
                // Note the actual default used in the daemon is set in the config. We
39✔
126
                // put a non-zero value here just to be safe.
39✔
127
                GiveUpAfter: 20 * time.Hour,
39✔
128
        }
39✔
129
        return q, err
39✔
130
}
39✔
131

132
// Load the queue and launch the sending loops on startup.
133
func (q *Queue) Load() error {
32✔
134
        files, err := filepath.Glob(q.path + "/" + itemFilePrefix + "*")
32✔
135
        if err != nil {
32✔
136
                return err
×
137
        }
×
138

139
        for _, fname := range files {
34✔
140
                item, err := ItemFromFile(fname)
2✔
141
                if err != nil {
2✔
142
                        log.Errorf("error loading queue item from %q: %v", fname, err)
×
143
                        continue
×
144
                }
145

146
                q.mu.Lock()
2✔
147
                q.q[item.ID] = item
2✔
148
                q.mu.Unlock()
2✔
149

2✔
150
                go item.SendLoop(q)
2✔
151
        }
152

153
        return nil
32✔
154
}
155

156
// Len returns the number of elements in the queue.
157
func (q *Queue) Len() int {
94✔
158
        q.mu.RLock()
94✔
159
        defer q.mu.RUnlock()
94✔
160
        return len(q.q)
94✔
161
}
94✔
162

163
// Put an envelope in the queue.
164
func (q *Queue) Put(tr *trace.Trace, from string, to []string, data []byte) (string, error) {
91✔
165
        tr = tr.NewChild("Queue.Put", from)
91✔
166
        defer tr.Finish()
91✔
167

91✔
168
        if nItems := q.Len(); nItems >= q.MaxItems {
92✔
169
                tr.Errorf("queue full (%d items)", nItems)
1✔
170
                return "", errQueueFull
1✔
171
        }
1✔
172
        putCount.Add(1)
90✔
173

90✔
174
        item := &Item{
90✔
175
                Message: Message{
90✔
176
                        ID:   <-newID,
90✔
177
                        From: from,
90✔
178
                        Data: data,
90✔
179
                },
90✔
180
                CreatedAt: time.Now(),
90✔
181
        }
90✔
182

90✔
183
        for _, t := range to {
184✔
184
                item.To = append(item.To, t)
94✔
185

94✔
186
                rcpts, err := q.aliases.Resolve(tr, t)
94✔
187
                if err != nil {
94✔
188
                        return "", fmt.Errorf("error resolving aliases for %q: %v", t, err)
×
189
                }
×
190

191
                // Add the recipients (after resolving aliases); this conversion is
192
                // not very pretty but at least it's self contained.
193
                for _, aliasRcpt := range rcpts {
196✔
194
                        r := &Recipient{
102✔
195
                                Address:         aliasRcpt.Addr,
102✔
196
                                Status:          Recipient_PENDING,
102✔
197
                                OriginalAddress: t,
102✔
198
                        }
102✔
199
                        switch aliasRcpt.Type {
102✔
200
                        case aliases.EMAIL:
93✔
201
                                r.Type = Recipient_EMAIL
93✔
202
                        case aliases.PIPE:
3✔
203
                                r.Type = Recipient_PIPE
3✔
204
                        case aliases.FORWARD:
6✔
205
                                r.Type = Recipient_FORWARD
6✔
206
                                r.Via = aliasRcpt.Via
6✔
207
                        default:
×
208
                                log.Errorf("unknown alias type %v when resolving %q",
×
209
                                        aliasRcpt.Type, t)
×
210
                                return "", tr.Errorf("internal error - unknown alias type")
×
211
                        }
212
                        item.Rcpt = append(item.Rcpt, r)
102✔
213
                        tr.Debugf("recipient: %v", r.Address)
102✔
214
                }
215
        }
216

217
        err := item.WriteTo(q.path)
90✔
218
        if err != nil {
90✔
219
                return "", tr.Errorf("failed to write item: %v", err)
×
220
        }
×
221

222
        q.mu.Lock()
90✔
223
        q.q[item.ID] = item
90✔
224
        q.mu.Unlock()
90✔
225

90✔
226
        // Begin to send it right away.
90✔
227
        go item.SendLoop(q)
90✔
228

90✔
229
        tr.Debugf("queued")
90✔
230
        return item.ID, nil
90✔
231
}
232

233
// Remove an item from the queue.
234
func (q *Queue) Remove(id string) {
93✔
235
        path := fmt.Sprintf("%s/%s%s", q.path, itemFilePrefix, id)
93✔
236
        err := os.Remove(path)
93✔
237
        if err != nil {
94✔
238
                log.Errorf("failed to remove queue file %q: %v", path, err)
1✔
239
        }
1✔
240

241
        q.mu.Lock()
93✔
242
        delete(q.q, id)
93✔
243
        q.mu.Unlock()
93✔
244
}
245

246
// DumpString returns a human-readable string with the current queue.
247
// Useful for debugging purposes.
248
func (q *Queue) DumpString() string {
2✔
249
        q.mu.RLock()
2✔
250
        defer q.mu.RUnlock()
2✔
251
        s := "# Queue status\n\n"
2✔
252
        s += fmt.Sprintf("date: %v\n", time.Now())
2✔
253
        s += fmt.Sprintf("length: %d\n\n", len(q.q))
2✔
254

2✔
255
        for id, item := range q.q {
3✔
256
                s += fmt.Sprintf("## Item %s\n", id)
1✔
257
                item.Lock()
1✔
258
                s += fmt.Sprintf("created at: %s\n", item.CreatedAt)
1✔
259
                s += fmt.Sprintf("from: %s\n", item.From)
1✔
260
                s += fmt.Sprintf("to: %s\n", item.To)
1✔
261
                for _, rcpt := range item.Rcpt {
2✔
262
                        s += fmt.Sprintf("%s %s (%s)\n", rcpt.Status, rcpt.Address, rcpt.Type)
1✔
263
                        s += fmt.Sprintf("  original address: %s\n", rcpt.OriginalAddress)
1✔
264
                        s += fmt.Sprintf("  last failure: %q\n", rcpt.LastFailureMessage)
1✔
265
                }
1✔
266
                item.Unlock()
1✔
267
                s += "\n"
1✔
268
        }
269

270
        return s
2✔
271
}
272

273
// An Item in the queue.
274
type Item struct {
275
        // Base the item on the protobuf message.
276
        // We will use this for serialization, so any fields below are NOT
277
        // serialized.
278
        Message
279

280
        // Protect the entire item.
281
        sync.Mutex
282

283
        // Go-friendly version of Message.CreatedAtTs.
284
        CreatedAt time.Time
285
}
286

287
// ItemFromFile loads an item from the given file.
288
func ItemFromFile(fname string) (*Item, error) {
2✔
289
        item := &Item{}
2✔
290
        err := protoio.ReadTextMessage(fname, &item.Message)
2✔
291
        if err != nil {
2✔
292
                return nil, err
×
293
        }
×
294

295
        item.CreatedAt = timeFromProto(item.CreatedAtTs)
2✔
296
        return item, nil
2✔
297
}
298

299
// WriteTo saves an item to the given directory.
300
func (item *Item) WriteTo(dir string) error {
196✔
301
        item.Lock()
196✔
302
        defer item.Unlock()
196✔
303
        itemsWritten.Add(1)
196✔
304

196✔
305
        item.CreatedAtTs = timeToProto(item.CreatedAt)
196✔
306

196✔
307
        path := fmt.Sprintf("%s/%s%s", dir, itemFilePrefix, item.ID)
196✔
308

196✔
309
        return protoio.WriteTextMessage(path, &item.Message, 0600)
196✔
310
}
196✔
311

312
// SendLoop repeatedly attempts to send the item.
313
func (item *Item) SendLoop(q *Queue) {
93✔
314
        tr := trace.New("Queue.SendLoop", item.ID)
93✔
315
        defer tr.Finish()
93✔
316
        tr.Printf("from %s", item.From)
93✔
317

93✔
318
        for time.Since(item.CreatedAt) < q.GiveUpAfter {
185✔
319
                // Send to all recipients that are still pending.
92✔
320
                var wg sync.WaitGroup
92✔
321
                for _, rcpt := range item.Rcpt {
196✔
322
                        if rcpt.Status != Recipient_PENDING {
104✔
323
                                continue
×
324
                        }
325

326
                        wg.Add(1)
104✔
327
                        go item.sendOneRcpt(&wg, tr, q, rcpt)
104✔
328
                }
329
                wg.Wait()
92✔
330

92✔
331
                // If they're all done, no need to wait.
92✔
332
                if item.countRcpt(Recipient_PENDING) == 0 {
182✔
333
                        break
90✔
334
                }
335

336
                // TODO: Consider sending a non-final notification after 30m or so,
337
                // that some of the messages have been delayed.
338

339
                delay := nextDelay(item.CreatedAt)
×
340
                tr.Printf("waiting for %v", delay)
×
341
                maillog.QueueLoop(item.ID, item.From, delay)
×
342
                time.Sleep(delay)
×
343
        }
344

345
        // Completed to all recipients (some may not have succeeded).
346
        if item.countRcpt(Recipient_FAILED, Recipient_PENDING) > 0 && item.From != "<>" {
95✔
347
                sendDSN(tr, q, item)
4✔
348
        }
4✔
349

350
        tr.Printf("all done")
91✔
351
        maillog.QueueLoop(item.ID, item.From, 0)
91✔
352
        q.Remove(item.ID)
91✔
353
}
354

355
// sendOneRcpt, and update it with the results.
356
func (item *Item) sendOneRcpt(wg *sync.WaitGroup, tr *trace.Trace, q *Queue, rcpt *Recipient) {
104✔
357
        defer wg.Done()
104✔
358
        to := rcpt.Address
104✔
359
        tr.Debugf("%s sending", to)
104✔
360

104✔
361
        err, permanent := item.deliver(q, rcpt)
104✔
362

104✔
363
        item.Lock()
104✔
364
        if err != nil {
108✔
365
                rcpt.LastFailureMessage = err.Error()
4✔
366
                if permanent {
8✔
367
                        tr.Errorf("%s permanent error: %v", to, err)
4✔
368
                        maillog.SendAttempt(item.ID, item.From, to, err, true)
4✔
369
                        rcpt.Status = Recipient_FAILED
4✔
370
                } else {
4✔
371
                        tr.Printf("%s temporary error: %v", to, err)
×
372
                        maillog.SendAttempt(item.ID, item.From, to, err, false)
×
373
                }
×
374
        } else {
100✔
375
                tr.Printf("%s sent", to)
100✔
376
                maillog.SendAttempt(item.ID, item.From, to, nil, false)
100✔
377
                rcpt.Status = Recipient_SENT
100✔
378
        }
100✔
379
        item.Unlock()
104✔
380

104✔
381
        err = item.WriteTo(q.path)
104✔
382
        if err != nil {
104✔
383
                tr.Errorf("failed to write: %v", err)
×
384
        }
×
385
}
386

387
// deliver the item to the given recipient, using the couriers from the queue.
388
// Return an error (if any), and whether it is permanent or not.
389
func (item *Item) deliver(q *Queue, rcpt *Recipient) (err error, permanent bool) {
105✔
390
        if rcpt.Type == Recipient_PIPE {
109✔
391
                deliverAttempts.Add("pipe", 1)
4✔
392
                c := strings.Fields(rcpt.Address)
4✔
393
                if len(c) == 0 {
4✔
394
                        return fmt.Errorf("empty pipe"), true
×
395
                }
×
396
                ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
4✔
397
                defer cancel()
4✔
398
                cmd := exec.CommandContext(ctx, c[0], c[1:]...)
4✔
399
                cmd.Stdin = bytes.NewReader(item.Data)
4✔
400
                return cmd.Run(), true
4✔
401
        }
402

403
        // Recipient type is FORWARD: we always use the remote courier, and pass
404
        // the list of servers that was given to us.
405
        if rcpt.Type == Recipient_FORWARD {
107✔
406
                deliverAttempts.Add("forward", 1)
6✔
407

6✔
408
                // When forwarding with an explicit list of servers, we use SRS if
6✔
409
                // we're sending from a non-local domain (regardless of the
6✔
410
                // destination).
6✔
411
                from := item.From
6✔
412
                if !envelope.DomainIn(item.From, q.localDomains) {
9✔
413
                        from = rewriteSender(item.From, rcpt.OriginalAddress)
3✔
414
                }
3✔
415
                return q.remoteC.Forward(from, rcpt.Address, item.Data, rcpt.Via)
6✔
416
        }
417

418
        // Recipient type is EMAIL.
419
        if envelope.DomainIn(rcpt.Address, q.localDomains) {
164✔
420
                deliverAttempts.Add("email:local", 1)
69✔
421
                return q.localC.Deliver(item.From, rcpt.Address, item.Data)
69✔
422
        }
69✔
423

424
        deliverAttempts.Add("email:remote", 1)
26✔
425
        from := item.From
26✔
426
        if !envelope.DomainIn(item.From, q.localDomains) {
31✔
427
                // We're sending from a non-local to a non-local, need to do SRS.
5✔
428
                from = rewriteSender(item.From, rcpt.OriginalAddress)
5✔
429
        }
5✔
430
        return q.remoteC.Deliver(from, rcpt.Address, item.Data)
26✔
431
}
432

433
func rewriteSender(from, originalAddr string) string {
8✔
434
        // Apply a send-only Sender Rewriting Scheme (SRS).
8✔
435
        // This is used when we are sending from a (potentially) non-local domain,
8✔
436
        // to a non-local domain.
8✔
437
        // This should happen only when there's an alias to forward email to a
8✔
438
        // non-local domain (either a normal "email" alias with a remote
8✔
439
        // destination, or a "forward" alias with a list of servers).
8✔
440
        // In this case, using the original From is problematic, as we may not be
8✔
441
        // an authorized sender for this.
8✔
442
        // To do this, we use a sender rewriting scheme, similar to what other
8✔
443
        // MTAs do (e.g. gmail or postfix).
8✔
444
        // Note this assumes "+" is an alias suffix separator.
8✔
445
        // We use the IDNA version of the domain if possible, because
8✔
446
        // we can't know if the other side will support SMTPUTF8.
8✔
447
        return fmt.Sprintf("%s+fwd_from=%s@%s",
8✔
448
                envelope.UserOf(originalAddr),
8✔
449
                strings.Replace(from, "@", "=", -1),
8✔
450
                mustIDNAToASCII(envelope.DomainOf(originalAddr)))
8✔
451
}
8✔
452

453
// countRcpt counts how many recipients are in the given status.
454
func (item *Item) countRcpt(statuses ...Recipient_Status) int {
181✔
455
        c := 0
181✔
456
        for _, rcpt := range item.Rcpt {
386✔
457
                for _, status := range statuses {
509✔
458
                        if rcpt.Status == status {
309✔
459
                                c++
5✔
460
                                break
5✔
461
                        }
462
                }
463
        }
464
        return c
181✔
465
}
466

467
func sendDSN(tr *trace.Trace, q *Queue, item *Item) {
4✔
468
        tr.Debugf("sending DSN")
4✔
469

4✔
470
        // Pick a (local) domain to send the DSN from. We should always find one,
4✔
471
        // as otherwise we're relaying.
4✔
472
        domain := "unknown"
4✔
473
        if item.From != "<>" && envelope.DomainIn(item.From, q.localDomains) {
7✔
474
                domain = envelope.DomainOf(item.From)
3✔
475
        } else {
4✔
476
                for _, rcpt := range item.Rcpt {
2✔
477
                        if envelope.DomainIn(rcpt.OriginalAddress, q.localDomains) {
2✔
478
                                domain = envelope.DomainOf(rcpt.OriginalAddress)
1✔
479
                                break
1✔
480
                        }
481
                }
482
        }
483

484
        msg, err := deliveryStatusNotification(domain, item)
4✔
485
        if err != nil {
4✔
486
                tr.Errorf("failed to build DSN: %v", err)
×
487
                return
×
488
        }
×
489

490
        // TODO: DKIM signing.
491

492
        id, err := q.Put(tr, "<>", []string{item.From}, msg)
4✔
493
        if err != nil {
4✔
494
                tr.Errorf("failed to queue DSN: %v", err)
×
495
                return
×
496
        }
×
497

498
        tr.Printf("queued DSN: %s", id)
4✔
499
        dsnQueued.Add(1)
4✔
500
}
501

502
func nextDelay(createdAt time.Time) time.Duration {
50✔
503
        var delay time.Duration
50✔
504

50✔
505
        since := time.Since(createdAt)
50✔
506
        switch {
50✔
507
        case since < 1*time.Minute:
10✔
508
                delay = 1 * time.Minute
10✔
509
        case since < 5*time.Minute:
10✔
510
                delay = 5 * time.Minute
10✔
511
        case since < 10*time.Minute:
10✔
512
                delay = 10 * time.Minute
10✔
513
        default:
20✔
514
                delay = 20 * time.Minute
20✔
515
        }
516

517
        // Perturb the delay, to avoid all queued emails to be retried at the
518
        // exact same time after a restart.
519
        delay += rand.N(60 * time.Second)
50✔
520
        return delay
50✔
521
}
522

523
func mustIDNAToASCII(s string) string {
8✔
524
        a, err := idna.ToASCII(s)
8✔
525
        if err != nil {
8✔
526
                return a
×
527
        }
×
528
        return s
8✔
529
}
530

531
func timeFromProto(ts *Timestamp) time.Time {
2✔
532
        return time.Unix(ts.Seconds, int64(ts.Nanos)).UTC()
2✔
533
}
2✔
534

535
func timeToProto(t time.Time) *Timestamp {
196✔
536
        return &Timestamp{
196✔
537
                Seconds: t.Unix(),
196✔
538
                Nanos:   int32(t.Nanosecond()),
196✔
539
        }
196✔
540
}
196✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc