• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albertito / chasquid / 13358541199

16 Feb 2025 08:25PM UTC coverage: 94.971% (-0.007%) from 94.978%
13358541199

push

albertito
modules: Update Go dependencies

5722 of 6025 relevant lines covered (94.97%)

65907.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.54
/internal/queue/queue.go
1
// Package queue implements our email queue.
2
// Accepted envelopes get put in the queue, and processed asynchronously.
3
package queue
4

5
// Command to generate queue.pb.go from queue.proto.
6
//go:generate protoc --go_out=. --go_opt=paths=source_relative -I=${GOPATH}/src -I. queue.proto
7

8
import (
9
        "bytes"
10
        "context"
11
        "encoding/base64"
12
        "encoding/binary"
13
        "fmt"
14
        "math/rand/v2"
15
        "os"
16
        "os/exec"
17
        "path/filepath"
18
        "strings"
19
        "sync"
20
        "time"
21

22
        "blitiri.com.ar/go/chasquid/internal/aliases"
23
        "blitiri.com.ar/go/chasquid/internal/courier"
24
        "blitiri.com.ar/go/chasquid/internal/envelope"
25
        "blitiri.com.ar/go/chasquid/internal/expvarom"
26
        "blitiri.com.ar/go/chasquid/internal/maillog"
27
        "blitiri.com.ar/go/chasquid/internal/protoio"
28
        "blitiri.com.ar/go/chasquid/internal/set"
29
        "blitiri.com.ar/go/chasquid/internal/trace"
30
        "blitiri.com.ar/go/log"
31

32
        "golang.org/x/net/idna"
33
)
34

35
const (
36
        // Maximum size of the queue; we reject emails when we hit this.
37
        maxQueueSize = 200
38

39
        // Give up sending attempts after this duration.
40
        giveUpAfter = 20 * time.Hour
41

42
        // Prefix for item file names.
43
        // This is for convenience, versioning, and to be able to tell them apart
44
        // temporary files and other cruft.
45
        // It's important that it's outside the base64 space so it doesn't get
46
        // generated accidentally.
47
        itemFilePrefix = "m:"
48
)
49

50
var (
51
        errQueueFull = fmt.Errorf("Queue size too big, try again later")
52
)
53

54
// Exported variables.
55
var (
56
        putCount = expvarom.NewInt("chasquid/queue/putCount",
57
                "count of envelopes attempted to be put in the queue")
58
        itemsWritten = expvarom.NewInt("chasquid/queue/itemsWritten",
59
                "count of items the queue wrote to disk")
60
        dsnQueued = expvarom.NewInt("chasquid/queue/dsnQueued",
61
                "count of DSNs that we generated (queued)")
62
        deliverAttempts = expvarom.NewMap("chasquid/queue/deliverAttempts",
63
                "recipient_type", "attempts to deliver mail, by recipient type")
64
)
65

66
// Channel used to get random IDs for items in the queue.
67
var newID chan string
68

69
func generateNewIDs() {
41✔
70
        // The IDs are only used internally, we are ok with using a PRNG.
41✔
71
        // IDs are base64(8 random bytes), but the code doesn't care.
41✔
72
        buf := make([]byte, 8)
41✔
73
        for {
540✔
74
                binary.NativeEndian.PutUint64(buf, rand.Uint64())
499✔
75
                newID <- base64.RawURLEncoding.EncodeToString(buf)
499✔
76
        }
499✔
77
}
78

79
func init() {
41✔
80
        newID = make(chan string, 4)
41✔
81
        go generateNewIDs()
41✔
82
}
41✔
83

84
// Queue that keeps mail waiting for delivery.
85
type Queue struct {
86
        // Items in the queue. Map of id -> Item.
87
        q map[string]*Item
88

89
        // Mutex protecting q.
90
        mu sync.RWMutex
91

92
        // Couriers to use to deliver mail.
93
        localC  courier.Courier
94
        remoteC courier.Courier
95

96
        // Domains we consider local.
97
        localDomains *set.String
98

99
        // Path where we store the queue.
100
        path string
101

102
        // Aliases resolver.
103
        aliases *aliases.Resolver
104
}
105

106
// New creates a new Queue instance.
107
func New(path string, localDomains *set.String, aliases *aliases.Resolver,
108
        localC, remoteC courier.Courier) (*Queue, error) {
36✔
109

36✔
110
        err := os.MkdirAll(path, 0700)
36✔
111
        q := &Queue{
36✔
112
                q:            map[string]*Item{},
36✔
113
                localC:       localC,
36✔
114
                remoteC:      remoteC,
36✔
115
                localDomains: localDomains,
36✔
116
                path:         path,
36✔
117
                aliases:      aliases,
36✔
118
        }
36✔
119
        return q, err
36✔
120
}
36✔
121

122
// Load the queue and launch the sending loops on startup.
123
func (q *Queue) Load() error {
29✔
124
        files, err := filepath.Glob(q.path + "/" + itemFilePrefix + "*")
29✔
125
        if err != nil {
29✔
126
                return err
×
127
        }
×
128

129
        for _, fname := range files {
31✔
130
                item, err := ItemFromFile(fname)
2✔
131
                if err != nil {
2✔
132
                        log.Errorf("error loading queue item from %q: %v", fname, err)
×
133
                        continue
×
134
                }
135

136
                q.mu.Lock()
2✔
137
                q.q[item.ID] = item
2✔
138
                q.mu.Unlock()
2✔
139

2✔
140
                go item.SendLoop(q)
2✔
141
        }
142

143
        return nil
29✔
144
}
145

146
// Len returns the number of elements in the queue.
147
func (q *Queue) Len() int {
84✔
148
        q.mu.RLock()
84✔
149
        defer q.mu.RUnlock()
84✔
150
        return len(q.q)
84✔
151
}
84✔
152

153
// Put an envelope in the queue.
154
func (q *Queue) Put(tr *trace.Trace, from string, to []string, data []byte) (string, error) {
81✔
155
        tr = tr.NewChild("Queue.Put", from)
81✔
156
        defer tr.Finish()
81✔
157

81✔
158
        if q.Len() >= maxQueueSize {
82✔
159
                tr.Errorf("queue full")
1✔
160
                return "", errQueueFull
1✔
161
        }
1✔
162
        putCount.Add(1)
80✔
163

80✔
164
        item := &Item{
80✔
165
                Message: Message{
80✔
166
                        ID:   <-newID,
80✔
167
                        From: from,
80✔
168
                        Data: data,
80✔
169
                },
80✔
170
                CreatedAt: time.Now(),
80✔
171
        }
80✔
172

80✔
173
        for _, t := range to {
163✔
174
                item.To = append(item.To, t)
83✔
175

83✔
176
                rcpts, err := q.aliases.Resolve(tr, t)
83✔
177
                if err != nil {
83✔
178
                        return "", fmt.Errorf("error resolving aliases for %q: %v", t, err)
×
179
                }
×
180

181
                // Add the recipients (after resolving aliases); this conversion is
182
                // not very pretty but at least it's self contained.
183
                for _, aliasRcpt := range rcpts {
174✔
184
                        r := &Recipient{
91✔
185
                                Address:         aliasRcpt.Addr,
91✔
186
                                Status:          Recipient_PENDING,
91✔
187
                                OriginalAddress: t,
91✔
188
                        }
91✔
189
                        switch aliasRcpt.Type {
91✔
190
                        case aliases.EMAIL:
88✔
191
                                r.Type = Recipient_EMAIL
88✔
192
                        case aliases.PIPE:
3✔
193
                                r.Type = Recipient_PIPE
3✔
194
                        default:
×
195
                                log.Errorf("unknown alias type %v when resolving %q",
×
196
                                        aliasRcpt.Type, t)
×
197
                                return "", tr.Errorf("internal error - unknown alias type")
×
198
                        }
199
                        item.Rcpt = append(item.Rcpt, r)
91✔
200
                        tr.Debugf("recipient: %v", r.Address)
91✔
201
                }
202
        }
203

204
        err := item.WriteTo(q.path)
80✔
205
        if err != nil {
80✔
206
                return "", tr.Errorf("failed to write item: %v", err)
×
207
        }
×
208

209
        q.mu.Lock()
80✔
210
        q.q[item.ID] = item
80✔
211
        q.mu.Unlock()
80✔
212

80✔
213
        // Begin to send it right away.
80✔
214
        go item.SendLoop(q)
80✔
215

80✔
216
        tr.Debugf("queued")
80✔
217
        return item.ID, nil
80✔
218
}
219

220
// Remove an item from the queue.
221
func (q *Queue) Remove(id string) {
82✔
222
        path := fmt.Sprintf("%s/%s%s", q.path, itemFilePrefix, id)
82✔
223
        err := os.Remove(path)
82✔
224
        if err != nil {
84✔
225
                log.Errorf("failed to remove queue file %q: %v", path, err)
2✔
226
        }
2✔
227

228
        q.mu.Lock()
82✔
229
        delete(q.q, id)
82✔
230
        q.mu.Unlock()
82✔
231
}
232

233
// DumpString returns a human-readable string with the current queue.
234
// Useful for debugging purposes.
235
func (q *Queue) DumpString() string {
2✔
236
        q.mu.RLock()
2✔
237
        defer q.mu.RUnlock()
2✔
238
        s := "# Queue status\n\n"
2✔
239
        s += fmt.Sprintf("date: %v\n", time.Now())
2✔
240
        s += fmt.Sprintf("length: %d\n\n", len(q.q))
2✔
241

2✔
242
        for id, item := range q.q {
3✔
243
                s += fmt.Sprintf("## Item %s\n", id)
1✔
244
                item.Lock()
1✔
245
                s += fmt.Sprintf("created at: %s\n", item.CreatedAt)
1✔
246
                s += fmt.Sprintf("from: %s\n", item.From)
1✔
247
                s += fmt.Sprintf("to: %s\n", item.To)
1✔
248
                for _, rcpt := range item.Rcpt {
2✔
249
                        s += fmt.Sprintf("%s %s (%s)\n", rcpt.Status, rcpt.Address, rcpt.Type)
1✔
250
                        s += fmt.Sprintf("  original address: %s\n", rcpt.OriginalAddress)
1✔
251
                        s += fmt.Sprintf("  last failure: %q\n", rcpt.LastFailureMessage)
1✔
252
                }
1✔
253
                item.Unlock()
1✔
254
                s += "\n"
1✔
255
        }
256

257
        return s
2✔
258
}
259

260
// An Item in the queue.
261
type Item struct {
262
        // Base the item on the protobuf message.
263
        // We will use this for serialization, so any fields below are NOT
264
        // serialized.
265
        Message
266

267
        // Protect the entire item.
268
        sync.Mutex
269

270
        // Go-friendly version of Message.CreatedAtTs.
271
        CreatedAt time.Time
272
}
273

274
// ItemFromFile loads an item from the given file.
275
func ItemFromFile(fname string) (*Item, error) {
2✔
276
        item := &Item{}
2✔
277
        err := protoio.ReadTextMessage(fname, &item.Message)
2✔
278
        if err != nil {
2✔
279
                return nil, err
×
280
        }
×
281

282
        item.CreatedAt = timeFromProto(item.CreatedAtTs)
2✔
283
        return item, nil
2✔
284
}
285

286
// WriteTo saves an item to the given directory.
287
func (item *Item) WriteTo(dir string) error {
175✔
288
        item.Lock()
175✔
289
        defer item.Unlock()
175✔
290
        itemsWritten.Add(1)
175✔
291

175✔
292
        item.CreatedAtTs = timeToProto(item.CreatedAt)
175✔
293

175✔
294
        path := fmt.Sprintf("%s/%s%s", dir, itemFilePrefix, item.ID)
175✔
295

175✔
296
        return protoio.WriteTextMessage(path, &item.Message, 0600)
175✔
297
}
175✔
298

299
// SendLoop repeatedly attempts to send the item.
300
func (item *Item) SendLoop(q *Queue) {
83✔
301
        tr := trace.New("Queue.SendLoop", item.ID)
83✔
302
        defer tr.Finish()
83✔
303
        tr.Printf("from %s", item.From)
83✔
304

83✔
305
        for time.Since(item.CreatedAt) < giveUpAfter {
165✔
306
                // Send to all recipients that are still pending.
82✔
307
                var wg sync.WaitGroup
82✔
308
                for _, rcpt := range item.Rcpt {
175✔
309
                        if rcpt.Status != Recipient_PENDING {
93✔
310
                                continue
×
311
                        }
312

313
                        wg.Add(1)
93✔
314
                        go item.sendOneRcpt(&wg, tr, q, rcpt)
93✔
315
                }
316
                wg.Wait()
82✔
317

82✔
318
                // If they're all done, no need to wait.
82✔
319
                if item.countRcpt(Recipient_PENDING) == 0 {
161✔
320
                        break
79✔
321
                }
322

323
                // TODO: Consider sending a non-final notification after 30m or so,
324
                // that some of the messages have been delayed.
325

326
                delay := nextDelay(item.CreatedAt)
×
327
                tr.Printf("waiting for %v", delay)
×
328
                maillog.QueueLoop(item.ID, item.From, delay)
×
329
                time.Sleep(delay)
×
330
        }
331

332
        // Completed to all recipients (some may not have succeeded).
333
        if item.countRcpt(Recipient_FAILED, Recipient_PENDING) > 0 && item.From != "<>" {
84✔
334
                sendDSN(tr, q, item)
4✔
335
        }
4✔
336

337
        tr.Printf("all done")
80✔
338
        maillog.QueueLoop(item.ID, item.From, 0)
80✔
339
        q.Remove(item.ID)
80✔
340
}
341

342
// sendOneRcpt, and update it with the results.
343
func (item *Item) sendOneRcpt(wg *sync.WaitGroup, tr *trace.Trace, q *Queue, rcpt *Recipient) {
93✔
344
        defer wg.Done()
93✔
345
        to := rcpt.Address
93✔
346
        tr.Debugf("%s sending", to)
93✔
347

93✔
348
        err, permanent := item.deliver(q, rcpt)
93✔
349

93✔
350
        item.Lock()
93✔
351
        if err != nil {
97✔
352
                rcpt.LastFailureMessage = err.Error()
4✔
353
                if permanent {
8✔
354
                        tr.Errorf("%s permanent error: %v", to, err)
4✔
355
                        maillog.SendAttempt(item.ID, item.From, to, err, true)
4✔
356
                        rcpt.Status = Recipient_FAILED
4✔
357
                } else {
4✔
358
                        tr.Printf("%s temporary error: %v", to, err)
×
359
                        maillog.SendAttempt(item.ID, item.From, to, err, false)
×
360
                }
×
361
        } else {
89✔
362
                tr.Printf("%s sent", to)
89✔
363
                maillog.SendAttempt(item.ID, item.From, to, nil, false)
89✔
364
                rcpt.Status = Recipient_SENT
89✔
365
        }
89✔
366
        item.Unlock()
93✔
367

93✔
368
        err = item.WriteTo(q.path)
93✔
369
        if err != nil {
95✔
370
                tr.Errorf("failed to write: %v", err)
2✔
371
        }
2✔
372
}
373

374
// deliver the item to the given recipient, using the couriers from the queue.
375
// Return an error (if any), and whether it is permanent or not.
376
func (item *Item) deliver(q *Queue, rcpt *Recipient) (err error, permanent bool) {
94✔
377
        if rcpt.Type == Recipient_PIPE {
98✔
378
                deliverAttempts.Add("pipe", 1)
4✔
379
                c := strings.Fields(rcpt.Address)
4✔
380
                if len(c) == 0 {
4✔
381
                        return fmt.Errorf("empty pipe"), true
×
382
                }
×
383
                ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
4✔
384
                defer cancel()
4✔
385
                cmd := exec.CommandContext(ctx, c[0], c[1:]...)
4✔
386
                cmd.Stdin = bytes.NewReader(item.Data)
4✔
387
                return cmd.Run(), true
4✔
388
        }
389

390
        // Recipient type is EMAIL.
391
        if envelope.DomainIn(rcpt.Address, q.localDomains) {
156✔
392
                deliverAttempts.Add("email:local", 1)
66✔
393
                return q.localC.Deliver(item.From, rcpt.Address, item.Data)
66✔
394
        }
66✔
395

396
        deliverAttempts.Add("email:remote", 1)
24✔
397
        from := item.From
24✔
398
        if !envelope.DomainIn(item.From, q.localDomains) {
29✔
399
                // We're sending from a non-local to a non-local. This should
5✔
400
                // happen only when there's an alias to forward email to a
5✔
401
                // non-local domain.  In this case, using the original From is
5✔
402
                // problematic, as we may not be an authorized sender for this.
5✔
403
                // Some MTAs (like Exim) will do it anyway, others (like
5✔
404
                // gmail) will construct a special address based on the
5✔
405
                // original address.  We go with the latter.
5✔
406
                // Note this assumes "+" is an alias suffix separator.
5✔
407
                // We use the IDNA version of the domain if possible, because
5✔
408
                // we can't know if the other side will support SMTPUTF8.
5✔
409
                from = fmt.Sprintf("%s+fwd_from=%s@%s",
5✔
410
                        envelope.UserOf(rcpt.OriginalAddress),
5✔
411
                        strings.Replace(from, "@", "=", -1),
5✔
412
                        mustIDNAToASCII(envelope.DomainOf(rcpt.OriginalAddress)))
5✔
413
        }
5✔
414
        return q.remoteC.Deliver(from, rcpt.Address, item.Data)
24✔
415
}
416

417
// countRcpt counts how many recipients are in the given status.
418
func (item *Item) countRcpt(statuses ...Recipient_Status) int {
159✔
419
        c := 0
159✔
420
        for _, rcpt := range item.Rcpt {
340✔
421
                for _, status := range statuses {
449✔
422
                        if rcpt.Status == status {
273✔
423
                                c++
5✔
424
                                break
5✔
425
                        }
426
                }
427
        }
428
        return c
159✔
429
}
430

431
func sendDSN(tr *trace.Trace, q *Queue, item *Item) {
4✔
432
        tr.Debugf("sending DSN")
4✔
433

4✔
434
        // Pick a (local) domain to send the DSN from. We should always find one,
4✔
435
        // as otherwise we're relaying.
4✔
436
        domain := "unknown"
4✔
437
        if item.From != "<>" && envelope.DomainIn(item.From, q.localDomains) {
7✔
438
                domain = envelope.DomainOf(item.From)
3✔
439
        } else {
4✔
440
                for _, rcpt := range item.Rcpt {
2✔
441
                        if envelope.DomainIn(rcpt.OriginalAddress, q.localDomains) {
2✔
442
                                domain = envelope.DomainOf(rcpt.OriginalAddress)
1✔
443
                                break
1✔
444
                        }
445
                }
446
        }
447

448
        msg, err := deliveryStatusNotification(domain, item)
4✔
449
        if err != nil {
4✔
450
                tr.Errorf("failed to build DSN: %v", err)
×
451
                return
×
452
        }
×
453

454
        // TODO: DKIM signing.
455

456
        id, err := q.Put(tr, "<>", []string{item.From}, msg)
4✔
457
        if err != nil {
4✔
458
                tr.Errorf("failed to queue DSN: %v", err)
×
459
                return
×
460
        }
×
461

462
        tr.Printf("queued DSN: %s", id)
4✔
463
        dsnQueued.Add(1)
4✔
464
}
465

466
func nextDelay(createdAt time.Time) time.Duration {
50✔
467
        var delay time.Duration
50✔
468

50✔
469
        since := time.Since(createdAt)
50✔
470
        switch {
50✔
471
        case since < 1*time.Minute:
10✔
472
                delay = 1 * time.Minute
10✔
473
        case since < 5*time.Minute:
10✔
474
                delay = 5 * time.Minute
10✔
475
        case since < 10*time.Minute:
10✔
476
                delay = 10 * time.Minute
10✔
477
        default:
20✔
478
                delay = 20 * time.Minute
20✔
479
        }
480

481
        // Perturb the delay, to avoid all queued emails to be retried at the
482
        // exact same time after a restart.
483
        delay += rand.N(60 * time.Second)
50✔
484
        return delay
50✔
485
}
486

487
func mustIDNAToASCII(s string) string {
5✔
488
        a, err := idna.ToASCII(s)
5✔
489
        if err != nil {
5✔
490
                return a
×
491
        }
×
492
        return s
5✔
493
}
494

495
func timeFromProto(ts *Timestamp) time.Time {
2✔
496
        return time.Unix(ts.Seconds, int64(ts.Nanos)).UTC()
2✔
497
}
2✔
498

499
func timeToProto(t time.Time) *Timestamp {
175✔
500
        return &Timestamp{
175✔
501
                Seconds: t.Unix(),
175✔
502
                Nanos:   int32(t.Nanosecond()),
175✔
503
        }
175✔
504
}
175✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc