• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albertito / chasquid / 14423898557

12 Apr 2025 10:23PM UTC coverage: 94.937% (-0.07%) from 95.002%
14423898557

push

albertito
aliases: Implement "via" aliases

This patch implements "via" aliases, which let us explicitly select a
server to use for delivery.

This feature is useful in different scenarios, such as a secondary MX
server that forwards all incoming email to a primary.

For now, it is experimental and the syntax and semantics are subject to
change.

116 of 126 new or added lines in 5 files covered. (92.06%)

51 existing lines in 4 files now uncovered.

5869 of 6182 relevant lines covered (94.94%)

64270.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.99
/internal/queue/queue.go
1
// Package queue implements our email queue.
2
// Accepted envelopes get put in the queue, and processed asynchronously.
3
package queue
4

5
// Command to generate queue.pb.go from queue.proto.
6
//go:generate protoc --go_out=. --go_opt=paths=source_relative -I=${GOPATH}/src -I. queue.proto
7

8
import (
9
        "bytes"
10
        "context"
11
        "encoding/base64"
12
        "encoding/binary"
13
        "fmt"
14
        "math/rand/v2"
15
        "os"
16
        "os/exec"
17
        "path/filepath"
18
        "strings"
19
        "sync"
20
        "time"
21

22
        "blitiri.com.ar/go/chasquid/internal/aliases"
23
        "blitiri.com.ar/go/chasquid/internal/courier"
24
        "blitiri.com.ar/go/chasquid/internal/envelope"
25
        "blitiri.com.ar/go/chasquid/internal/expvarom"
26
        "blitiri.com.ar/go/chasquid/internal/maillog"
27
        "blitiri.com.ar/go/chasquid/internal/protoio"
28
        "blitiri.com.ar/go/chasquid/internal/set"
29
        "blitiri.com.ar/go/chasquid/internal/trace"
30
        "blitiri.com.ar/go/log"
31

32
        "golang.org/x/net/idna"
33
)
34

35
const (
36
        // Maximum size of the queue; we reject emails when we hit this.
37
        maxQueueSize = 200
38

39
        // Give up sending attempts after this duration.
40
        giveUpAfter = 20 * time.Hour
41

42
        // Prefix for item file names.
43
        // This is for convenience, versioning, and to be able to tell them apart
44
        // temporary files and other cruft.
45
        // It's important that it's outside the base64 space so it doesn't get
46
        // generated accidentally.
47
        itemFilePrefix = "m:"
48
)
49

50
var (
51
        errQueueFull = fmt.Errorf("Queue size too big, try again later")
52
)
53

54
// Exported variables.
55
var (
56
        putCount = expvarom.NewInt("chasquid/queue/putCount",
57
                "count of envelopes attempted to be put in the queue")
58
        itemsWritten = expvarom.NewInt("chasquid/queue/itemsWritten",
59
                "count of items the queue wrote to disk")
60
        dsnQueued = expvarom.NewInt("chasquid/queue/dsnQueued",
61
                "count of DSNs that we generated (queued)")
62
        deliverAttempts = expvarom.NewMap("chasquid/queue/deliverAttempts",
63
                "recipient_type", "attempts to deliver mail, by recipient type")
64
)
65

66
// Channel used to get random IDs for items in the queue.
67
var newID chan string
68

69
func generateNewIDs() {
44✔
70
        // The IDs are only used internally, we are ok with using a PRNG.
44✔
71
        // IDs are base64(8 random bytes), but the code doesn't care.
44✔
72
        buf := make([]byte, 8)
44✔
73
        for {
568✔
74
                binary.NativeEndian.PutUint64(buf, rand.Uint64())
524✔
75
                newID <- base64.RawURLEncoding.EncodeToString(buf)
524✔
76
        }
524✔
77
}
78

79
func init() {
44✔
80
        newID = make(chan string, 4)
44✔
81
        go generateNewIDs()
44✔
82
}
44✔
83

84
// Queue that keeps mail waiting for delivery.
85
type Queue struct {
86
        // Items in the queue. Map of id -> Item.
87
        q map[string]*Item
88

89
        // Mutex protecting q.
90
        mu sync.RWMutex
91

92
        // Couriers to use to deliver mail.
93
        localC  courier.Courier
94
        remoteC courier.Courier
95

96
        // Domains we consider local.
97
        localDomains *set.String
98

99
        // Path where we store the queue.
100
        path string
101

102
        // Aliases resolver.
103
        aliases *aliases.Resolver
104
}
105

106
// New creates a new Queue instance.
107
func New(path string, localDomains *set.String, aliases *aliases.Resolver,
108
        localC, remoteC courier.Courier) (*Queue, error) {
39✔
109

39✔
110
        err := os.MkdirAll(path, 0700)
39✔
111
        q := &Queue{
39✔
112
                q:            map[string]*Item{},
39✔
113
                localC:       localC,
39✔
114
                remoteC:      remoteC,
39✔
115
                localDomains: localDomains,
39✔
116
                path:         path,
39✔
117
                aliases:      aliases,
39✔
118
        }
39✔
119
        return q, err
39✔
120
}
39✔
121

122
// Load the queue and launch the sending loops on startup.
123
func (q *Queue) Load() error {
32✔
124
        files, err := filepath.Glob(q.path + "/" + itemFilePrefix + "*")
32✔
125
        if err != nil {
32✔
126
                return err
×
127
        }
×
128

129
        for _, fname := range files {
34✔
130
                item, err := ItemFromFile(fname)
2✔
131
                if err != nil {
2✔
132
                        log.Errorf("error loading queue item from %q: %v", fname, err)
×
133
                        continue
×
134
                }
135

136
                q.mu.Lock()
2✔
137
                q.q[item.ID] = item
2✔
138
                q.mu.Unlock()
2✔
139

2✔
140
                go item.SendLoop(q)
2✔
141
        }
142

143
        return nil
32✔
144
}
145

146
// Len returns the number of elements in the queue.
147
func (q *Queue) Len() int {
94✔
148
        q.mu.RLock()
94✔
149
        defer q.mu.RUnlock()
94✔
150
        return len(q.q)
94✔
151
}
94✔
152

153
// Put an envelope in the queue.
154
func (q *Queue) Put(tr *trace.Trace, from string, to []string, data []byte) (string, error) {
91✔
155
        tr = tr.NewChild("Queue.Put", from)
91✔
156
        defer tr.Finish()
91✔
157

91✔
158
        if q.Len() >= maxQueueSize {
92✔
159
                tr.Errorf("queue full")
1✔
160
                return "", errQueueFull
1✔
161
        }
1✔
162
        putCount.Add(1)
90✔
163

90✔
164
        item := &Item{
90✔
165
                Message: Message{
90✔
166
                        ID:   <-newID,
90✔
167
                        From: from,
90✔
168
                        Data: data,
90✔
169
                },
90✔
170
                CreatedAt: time.Now(),
90✔
171
        }
90✔
172

90✔
173
        for _, t := range to {
184✔
174
                item.To = append(item.To, t)
94✔
175

94✔
176
                rcpts, err := q.aliases.Resolve(tr, t)
94✔
177
                if err != nil {
94✔
178
                        return "", fmt.Errorf("error resolving aliases for %q: %v", t, err)
×
179
                }
×
180

181
                // Add the recipients (after resolving aliases); this conversion is
182
                // not very pretty but at least it's self contained.
183
                for _, aliasRcpt := range rcpts {
196✔
184
                        r := &Recipient{
102✔
185
                                Address:         aliasRcpt.Addr,
102✔
186
                                Status:          Recipient_PENDING,
102✔
187
                                OriginalAddress: t,
102✔
188
                        }
102✔
189
                        switch aliasRcpt.Type {
102✔
190
                        case aliases.EMAIL:
93✔
191
                                r.Type = Recipient_EMAIL
93✔
192
                        case aliases.PIPE:
3✔
193
                                r.Type = Recipient_PIPE
3✔
194
                        case aliases.FORWARD:
6✔
195
                                r.Type = Recipient_FORWARD
6✔
196
                                r.Via = aliasRcpt.Via
6✔
UNCOV
197
                        default:
×
UNCOV
198
                                log.Errorf("unknown alias type %v when resolving %q",
×
UNCOV
199
                                        aliasRcpt.Type, t)
×
200
                                return "", tr.Errorf("internal error - unknown alias type")
×
201
                        }
202
                        item.Rcpt = append(item.Rcpt, r)
102✔
203
                        tr.Debugf("recipient: %v", r.Address)
102✔
204
                }
205
        }
206

207
        err := item.WriteTo(q.path)
90✔
208
        if err != nil {
90✔
UNCOV
209
                return "", tr.Errorf("failed to write item: %v", err)
×
UNCOV
210
        }
×
211

212
        q.mu.Lock()
90✔
213
        q.q[item.ID] = item
90✔
214
        q.mu.Unlock()
90✔
215

90✔
216
        // Begin to send it right away.
90✔
217
        go item.SendLoop(q)
90✔
218

90✔
219
        tr.Debugf("queued")
90✔
220
        return item.ID, nil
90✔
221
}
222

223
// Remove an item from the queue.
224
func (q *Queue) Remove(id string) {
92✔
225
        path := fmt.Sprintf("%s/%s%s", q.path, itemFilePrefix, id)
92✔
226
        err := os.Remove(path)
92✔
227
        if err != nil {
92✔
UNCOV
228
                log.Errorf("failed to remove queue file %q: %v", path, err)
×
UNCOV
229
        }
×
230

231
        q.mu.Lock()
92✔
232
        delete(q.q, id)
92✔
233
        q.mu.Unlock()
92✔
234
}
235

236
// DumpString returns a human-readable string with the current queue.
237
// Useful for debugging purposes.
238
func (q *Queue) DumpString() string {
2✔
239
        q.mu.RLock()
2✔
240
        defer q.mu.RUnlock()
2✔
241
        s := "# Queue status\n\n"
2✔
242
        s += fmt.Sprintf("date: %v\n", time.Now())
2✔
243
        s += fmt.Sprintf("length: %d\n\n", len(q.q))
2✔
244

2✔
245
        for id, item := range q.q {
3✔
246
                s += fmt.Sprintf("## Item %s\n", id)
1✔
247
                item.Lock()
1✔
248
                s += fmt.Sprintf("created at: %s\n", item.CreatedAt)
1✔
249
                s += fmt.Sprintf("from: %s\n", item.From)
1✔
250
                s += fmt.Sprintf("to: %s\n", item.To)
1✔
251
                for _, rcpt := range item.Rcpt {
2✔
252
                        s += fmt.Sprintf("%s %s (%s)\n", rcpt.Status, rcpt.Address, rcpt.Type)
1✔
253
                        s += fmt.Sprintf("  original address: %s\n", rcpt.OriginalAddress)
1✔
254
                        s += fmt.Sprintf("  last failure: %q\n", rcpt.LastFailureMessage)
1✔
255
                }
1✔
256
                item.Unlock()
1✔
257
                s += "\n"
1✔
258
        }
259

260
        return s
2✔
261
}
262

263
// An Item in the queue.
264
type Item struct {
265
        // Base the item on the protobuf message.
266
        // We will use this for serialization, so any fields below are NOT
267
        // serialized.
268
        Message
269

270
        // Protect the entire item.
271
        sync.Mutex
272

273
        // Go-friendly version of Message.CreatedAtTs.
274
        CreatedAt time.Time
275
}
276

277
// ItemFromFile loads an item from the given file.
278
func ItemFromFile(fname string) (*Item, error) {
2✔
279
        item := &Item{}
2✔
280
        err := protoio.ReadTextMessage(fname, &item.Message)
2✔
281
        if err != nil {
2✔
UNCOV
282
                return nil, err
×
UNCOV
283
        }
×
284

285
        item.CreatedAt = timeFromProto(item.CreatedAtTs)
2✔
286
        return item, nil
2✔
287
}
288

289
// WriteTo saves an item to the given directory.
290
func (item *Item) WriteTo(dir string) error {
195✔
291
        item.Lock()
195✔
292
        defer item.Unlock()
195✔
293
        itemsWritten.Add(1)
195✔
294

195✔
295
        item.CreatedAtTs = timeToProto(item.CreatedAt)
195✔
296

195✔
297
        path := fmt.Sprintf("%s/%s%s", dir, itemFilePrefix, item.ID)
195✔
298

195✔
299
        return protoio.WriteTextMessage(path, &item.Message, 0600)
195✔
300
}
195✔
301

302
// SendLoop repeatedly attempts to send the item.
303
func (item *Item) SendLoop(q *Queue) {
93✔
304
        tr := trace.New("Queue.SendLoop", item.ID)
93✔
305
        defer tr.Finish()
93✔
306
        tr.Printf("from %s", item.From)
93✔
307

93✔
308
        for time.Since(item.CreatedAt) < giveUpAfter {
185✔
309
                // Send to all recipients that are still pending.
92✔
310
                var wg sync.WaitGroup
92✔
311
                for _, rcpt := range item.Rcpt {
196✔
312
                        if rcpt.Status != Recipient_PENDING {
104✔
UNCOV
313
                                continue
×
314
                        }
315

316
                        wg.Add(1)
104✔
317
                        go item.sendOneRcpt(&wg, tr, q, rcpt)
104✔
318
                }
319
                wg.Wait()
92✔
320

92✔
321
                // If they're all done, no need to wait.
92✔
322
                if item.countRcpt(Recipient_PENDING) == 0 {
181✔
323
                        break
89✔
324
                }
325

326
                // TODO: Consider sending a non-final notification after 30m or so,
327
                // that some of the messages have been delayed.
328

UNCOV
329
                delay := nextDelay(item.CreatedAt)
×
UNCOV
330
                tr.Printf("waiting for %v", delay)
×
UNCOV
331
                maillog.QueueLoop(item.ID, item.From, delay)
×
332
                time.Sleep(delay)
×
333
        }
334

335
        // Completed to all recipients (some may not have succeeded).
336
        if item.countRcpt(Recipient_FAILED, Recipient_PENDING) > 0 && item.From != "<>" {
94✔
337
                sendDSN(tr, q, item)
4✔
338
        }
4✔
339

340
        tr.Printf("all done")
90✔
341
        maillog.QueueLoop(item.ID, item.From, 0)
90✔
342
        q.Remove(item.ID)
90✔
343
}
344

345
// sendOneRcpt, and update it with the results.
346
func (item *Item) sendOneRcpt(wg *sync.WaitGroup, tr *trace.Trace, q *Queue, rcpt *Recipient) {
104✔
347
        defer wg.Done()
104✔
348
        to := rcpt.Address
104✔
349
        tr.Debugf("%s sending", to)
104✔
350

104✔
351
        err, permanent := item.deliver(q, rcpt)
104✔
352

104✔
353
        item.Lock()
104✔
354
        if err != nil {
108✔
355
                rcpt.LastFailureMessage = err.Error()
4✔
356
                if permanent {
8✔
357
                        tr.Errorf("%s permanent error: %v", to, err)
4✔
358
                        maillog.SendAttempt(item.ID, item.From, to, err, true)
4✔
359
                        rcpt.Status = Recipient_FAILED
4✔
360
                } else {
4✔
UNCOV
361
                        tr.Printf("%s temporary error: %v", to, err)
×
UNCOV
362
                        maillog.SendAttempt(item.ID, item.From, to, err, false)
×
UNCOV
363
                }
×
364
        } else {
99✔
365
                tr.Printf("%s sent", to)
99✔
366
                maillog.SendAttempt(item.ID, item.From, to, nil, false)
99✔
367
                rcpt.Status = Recipient_SENT
99✔
368
        }
99✔
369
        item.Unlock()
102✔
370

102✔
371
        err = item.WriteTo(q.path)
102✔
372
        if err != nil {
102✔
UNCOV
373
                tr.Errorf("failed to write: %v", err)
×
UNCOV
374
        }
×
375
}
376

377
// deliver the item to the given recipient, using the couriers from the queue.
378
// Return an error (if any), and whether it is permanent or not.
379
func (item *Item) deliver(q *Queue, rcpt *Recipient) (err error, permanent bool) {
105✔
380
        if rcpt.Type == Recipient_PIPE {
109✔
381
                deliverAttempts.Add("pipe", 1)
4✔
382
                c := strings.Fields(rcpt.Address)
4✔
383
                if len(c) == 0 {
4✔
UNCOV
384
                        return fmt.Errorf("empty pipe"), true
×
UNCOV
385
                }
×
386
                ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
4✔
387
                defer cancel()
4✔
388
                cmd := exec.CommandContext(ctx, c[0], c[1:]...)
4✔
389
                cmd.Stdin = bytes.NewReader(item.Data)
4✔
390
                return cmd.Run(), true
4✔
391
        }
392

393
        // Recipient type is FORWARD: we always use the remote courier, and pass
394
        // the list of servers that was given to us.
395
        if rcpt.Type == Recipient_FORWARD {
107✔
396
                deliverAttempts.Add("forward", 1)
6✔
397

6✔
398
                // When forwarding with an explicit list of servers, we use SRS if
6✔
399
                // we're sending from a non-local domain (regardless of the
6✔
400
                // destination).
6✔
401
                from := item.From
6✔
402
                if !envelope.DomainIn(item.From, q.localDomains) {
9✔
403
                        from = rewriteSender(item.From, rcpt.OriginalAddress)
3✔
404
                }
3✔
405
                return q.remoteC.Forward(from, rcpt.Address, item.Data, rcpt.Via)
6✔
406
        }
407

408
        // Recipient type is EMAIL.
409
        if envelope.DomainIn(rcpt.Address, q.localDomains) {
164✔
410
                deliverAttempts.Add("email:local", 1)
69✔
411
                return q.localC.Deliver(item.From, rcpt.Address, item.Data)
69✔
412
        }
69✔
413

414
        deliverAttempts.Add("email:remote", 1)
26✔
415
        from := item.From
26✔
416
        if !envelope.DomainIn(item.From, q.localDomains) {
31✔
417
                // We're sending from a non-local to a non-local, need to do SRS.
5✔
418
                from = rewriteSender(item.From, rcpt.OriginalAddress)
5✔
419
        }
5✔
420
        return q.remoteC.Deliver(from, rcpt.Address, item.Data)
26✔
421
}
422

423
func rewriteSender(from, originalAddr string) string {
8✔
424
        // Apply a send-only Sender Rewriting Scheme (SRS).
8✔
425
        // This is used when we are sending from a (potentially) non-local domain,
8✔
426
        // to a non-local domain.
8✔
427
        // This should happen only when there's an alias to forward email to a
8✔
428
        // non-local domain (either a normal "email" alias with a remote
8✔
429
        // destination, or a "forward" alias with a list of servers).
8✔
430
        // In this case, using the original From is problematic, as we may not be
8✔
431
        // an authorized sender for this.
8✔
432
        // To do this, we use a sender rewriting scheme, similar to what other
8✔
433
        // MTAs do (e.g. gmail or postfix).
8✔
434
        // Note this assumes "+" is an alias suffix separator.
8✔
435
        // We use the IDNA version of the domain if possible, because
8✔
436
        // we can't know if the other side will support SMTPUTF8.
8✔
437
        return fmt.Sprintf("%s+fwd_from=%s@%s",
8✔
438
                envelope.UserOf(originalAddr),
8✔
439
                strings.Replace(from, "@", "=", -1),
8✔
440
                mustIDNAToASCII(envelope.DomainOf(originalAddr)))
8✔
441
}
8✔
442

443
// countRcpt counts how many recipients are in the given status.
444
func (item *Item) countRcpt(statuses ...Recipient_Status) int {
179✔
445
        c := 0
179✔
446
        for _, rcpt := range item.Rcpt {
382✔
447
                for _, status := range statuses {
504✔
448
                        if rcpt.Status == status {
306✔
449
                                c++
5✔
450
                                break
5✔
451
                        }
452
                }
453
        }
454
        return c
179✔
455
}
456

457
func sendDSN(tr *trace.Trace, q *Queue, item *Item) {
4✔
458
        tr.Debugf("sending DSN")
4✔
459

4✔
460
        // Pick a (local) domain to send the DSN from. We should always find one,
4✔
461
        // as otherwise we're relaying.
4✔
462
        domain := "unknown"
4✔
463
        if item.From != "<>" && envelope.DomainIn(item.From, q.localDomains) {
7✔
464
                domain = envelope.DomainOf(item.From)
3✔
465
        } else {
4✔
466
                for _, rcpt := range item.Rcpt {
2✔
467
                        if envelope.DomainIn(rcpt.OriginalAddress, q.localDomains) {
2✔
468
                                domain = envelope.DomainOf(rcpt.OriginalAddress)
1✔
469
                                break
1✔
470
                        }
471
                }
472
        }
473

474
        msg, err := deliveryStatusNotification(domain, item)
4✔
475
        if err != nil {
4✔
UNCOV
476
                tr.Errorf("failed to build DSN: %v", err)
×
UNCOV
477
                return
×
UNCOV
478
        }
×
479

480
        // TODO: DKIM signing.
481

482
        id, err := q.Put(tr, "<>", []string{item.From}, msg)
4✔
483
        if err != nil {
4✔
UNCOV
484
                tr.Errorf("failed to queue DSN: %v", err)
×
UNCOV
485
                return
×
UNCOV
486
        }
×
487

488
        tr.Printf("queued DSN: %s", id)
4✔
489
        dsnQueued.Add(1)
4✔
490
}
491

492
func nextDelay(createdAt time.Time) time.Duration {
50✔
493
        var delay time.Duration
50✔
494

50✔
495
        since := time.Since(createdAt)
50✔
496
        switch {
50✔
497
        case since < 1*time.Minute:
10✔
498
                delay = 1 * time.Minute
10✔
499
        case since < 5*time.Minute:
10✔
500
                delay = 5 * time.Minute
10✔
501
        case since < 10*time.Minute:
10✔
502
                delay = 10 * time.Minute
10✔
503
        default:
20✔
504
                delay = 20 * time.Minute
20✔
505
        }
506

507
        // Perturb the delay, to avoid all queued emails to be retried at the
508
        // exact same time after a restart.
509
        delay += rand.N(60 * time.Second)
50✔
510
        return delay
50✔
511
}
512

513
func mustIDNAToASCII(s string) string {
8✔
514
        a, err := idna.ToASCII(s)
8✔
515
        if err != nil {
8✔
UNCOV
516
                return a
×
UNCOV
517
        }
×
518
        return s
8✔
519
}
520

521
func timeFromProto(ts *Timestamp) time.Time {
2✔
522
        return time.Unix(ts.Seconds, int64(ts.Nanos)).UTC()
2✔
523
}
2✔
524

525
func timeToProto(t time.Time) *Timestamp {
195✔
526
        return &Timestamp{
195✔
527
                Seconds: t.Unix(),
195✔
528
                Nanos:   int32(t.Nanosecond()),
195✔
529
        }
195✔
530
}
195✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc