• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albertito / chasquid / 14159752796

30 Mar 2025 08:35PM UTC coverage: 94.926% (-0.07%) from 94.993%
14159752796

push

albertito
github: Docker public image build doesn't depend on coverage

Today, the step to build the Docker public image depends on the coverage
run. This dependency isn't necessary, as the coverage could be failing
for a variety of reasons (e.g. codecov being down) and doesn't signal
any problem with chasquid itself.

So this patch fixes that: if the integration tests pass, then that is
good enough for building the public image.

5744 of 6051 relevant lines covered (94.93%)

65624.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.24
/internal/queue/queue.go
1
// Package queue implements our email queue.
2
// Accepted envelopes get put in the queue, and processed asynchronously.
3
package queue
4

5
// Command to generate queue.pb.go from queue.proto.
6
//go:generate protoc --go_out=. --go_opt=paths=source_relative -I=${GOPATH}/src -I. queue.proto
7

8
import (
9
        "bytes"
10
        "context"
11
        "encoding/base64"
12
        "encoding/binary"
13
        "fmt"
14
        "math/rand/v2"
15
        "os"
16
        "os/exec"
17
        "path/filepath"
18
        "strings"
19
        "sync"
20
        "time"
21

22
        "blitiri.com.ar/go/chasquid/internal/aliases"
23
        "blitiri.com.ar/go/chasquid/internal/courier"
24
        "blitiri.com.ar/go/chasquid/internal/envelope"
25
        "blitiri.com.ar/go/chasquid/internal/expvarom"
26
        "blitiri.com.ar/go/chasquid/internal/maillog"
27
        "blitiri.com.ar/go/chasquid/internal/protoio"
28
        "blitiri.com.ar/go/chasquid/internal/set"
29
        "blitiri.com.ar/go/chasquid/internal/trace"
30
        "blitiri.com.ar/go/log"
31

32
        "golang.org/x/net/idna"
33
)
34

35
const (
36
        // Maximum size of the queue; we reject emails when we hit this.
37
        maxQueueSize = 200
38

39
        // Give up sending attempts after this duration.
40
        giveUpAfter = 20 * time.Hour
41

42
        // Prefix for item file names.
43
        // This is for convenience, versioning, and to be able to tell them apart
44
        // temporary files and other cruft.
45
        // It's important that it's outside the base64 space so it doesn't get
46
        // generated accidentally.
47
        itemFilePrefix = "m:"
48
)
49

50
var (
51
        errQueueFull = fmt.Errorf("Queue size too big, try again later")
52
)
53

54
// Exported variables.
55
var (
56
        putCount = expvarom.NewInt("chasquid/queue/putCount",
57
                "count of envelopes attempted to be put in the queue")
58
        itemsWritten = expvarom.NewInt("chasquid/queue/itemsWritten",
59
                "count of items the queue wrote to disk")
60
        dsnQueued = expvarom.NewInt("chasquid/queue/dsnQueued",
61
                "count of DSNs that we generated (queued)")
62
        deliverAttempts = expvarom.NewMap("chasquid/queue/deliverAttempts",
63
                "recipient_type", "attempts to deliver mail, by recipient type")
64
)
65

66
// Channel used to get random IDs for items in the queue.
67
var newID chan string
68

69
func generateNewIDs() {
41✔
70
        // The IDs are only used internally, we are ok with using a PRNG.
41✔
71
        // IDs are base64(8 random bytes), but the code doesn't care.
41✔
72
        buf := make([]byte, 8)
41✔
73
        for {
540✔
74
                binary.NativeEndian.PutUint64(buf, rand.Uint64())
499✔
75
                newID <- base64.RawURLEncoding.EncodeToString(buf)
499✔
76
        }
499✔
77
}
78

79
func init() {
41✔
80
        newID = make(chan string, 4)
41✔
81
        go generateNewIDs()
41✔
82
}
41✔
83

84
// Queue that keeps mail waiting for delivery.
85
type Queue struct {
86
        // Items in the queue. Map of id -> Item.
87
        q map[string]*Item
88

89
        // Mutex protecting q.
90
        mu sync.RWMutex
91

92
        // Couriers to use to deliver mail.
93
        localC  courier.Courier
94
        remoteC courier.Courier
95

96
        // Domains we consider local.
97
        localDomains *set.String
98

99
        // Path where we store the queue.
100
        path string
101

102
        // Aliases resolver.
103
        aliases *aliases.Resolver
104
}
105

106
// New creates a new Queue instance.
107
func New(path string, localDomains *set.String, aliases *aliases.Resolver,
108
        localC, remoteC courier.Courier) (*Queue, error) {
36✔
109

36✔
110
        err := os.MkdirAll(path, 0700)
36✔
111
        q := &Queue{
36✔
112
                q:            map[string]*Item{},
36✔
113
                localC:       localC,
36✔
114
                remoteC:      remoteC,
36✔
115
                localDomains: localDomains,
36✔
116
                path:         path,
36✔
117
                aliases:      aliases,
36✔
118
        }
36✔
119
        return q, err
36✔
120
}
36✔
121

122
// Load the queue and launch the sending loops on startup.
123
func (q *Queue) Load() error {
29✔
124
        files, err := filepath.Glob(q.path + "/" + itemFilePrefix + "*")
29✔
125
        if err != nil {
29✔
126
                return err
×
127
        }
×
128

129
        for _, fname := range files {
31✔
130
                item, err := ItemFromFile(fname)
2✔
131
                if err != nil {
2✔
132
                        log.Errorf("error loading queue item from %q: %v", fname, err)
×
133
                        continue
×
134
                }
135

136
                q.mu.Lock()
2✔
137
                q.q[item.ID] = item
2✔
138
                q.mu.Unlock()
2✔
139

2✔
140
                go item.SendLoop(q)
2✔
141
        }
142

143
        return nil
29✔
144
}
145

146
// Len returns the number of elements in the queue.
147
func (q *Queue) Len() int {
84✔
148
        q.mu.RLock()
84✔
149
        defer q.mu.RUnlock()
84✔
150
        return len(q.q)
84✔
151
}
84✔
152

153
// Put an envelope in the queue.
154
func (q *Queue) Put(tr *trace.Trace, from string, to []string, data []byte) (string, error) {
81✔
155
        tr = tr.NewChild("Queue.Put", from)
81✔
156
        defer tr.Finish()
81✔
157

81✔
158
        if q.Len() >= maxQueueSize {
82✔
159
                tr.Errorf("queue full")
1✔
160
                return "", errQueueFull
1✔
161
        }
1✔
162
        putCount.Add(1)
80✔
163

80✔
164
        item := &Item{
80✔
165
                Message: Message{
80✔
166
                        ID:   <-newID,
80✔
167
                        From: from,
80✔
168
                        Data: data,
80✔
169
                },
80✔
170
                CreatedAt: time.Now(),
80✔
171
        }
80✔
172

80✔
173
        for _, t := range to {
163✔
174
                item.To = append(item.To, t)
83✔
175

83✔
176
                rcpts, err := q.aliases.Resolve(tr, t)
83✔
177
                if err != nil {
83✔
178
                        return "", fmt.Errorf("error resolving aliases for %q: %v", t, err)
×
179
                }
×
180

181
                // Add the recipients (after resolving aliases); this conversion is
182
                // not very pretty but at least it's self contained.
183
                for _, aliasRcpt := range rcpts {
174✔
184
                        r := &Recipient{
91✔
185
                                Address:         aliasRcpt.Addr,
91✔
186
                                Status:          Recipient_PENDING,
91✔
187
                                OriginalAddress: t,
91✔
188
                        }
91✔
189
                        switch aliasRcpt.Type {
91✔
190
                        case aliases.EMAIL:
88✔
191
                                r.Type = Recipient_EMAIL
88✔
192
                        case aliases.PIPE:
3✔
193
                                r.Type = Recipient_PIPE
3✔
194
                        default:
×
195
                                log.Errorf("unknown alias type %v when resolving %q",
×
196
                                        aliasRcpt.Type, t)
×
197
                                return "", tr.Errorf("internal error - unknown alias type")
×
198
                        }
199
                        item.Rcpt = append(item.Rcpt, r)
91✔
200
                        tr.Debugf("recipient: %v", r.Address)
91✔
201
                }
202
        }
203

204
        err := item.WriteTo(q.path)
80✔
205
        if err != nil {
80✔
206
                return "", tr.Errorf("failed to write item: %v", err)
×
207
        }
×
208

209
        q.mu.Lock()
80✔
210
        q.q[item.ID] = item
80✔
211
        q.mu.Unlock()
80✔
212

80✔
213
        // Begin to send it right away.
80✔
214
        go item.SendLoop(q)
80✔
215

80✔
216
        tr.Debugf("queued")
80✔
217
        return item.ID, nil
80✔
218
}
219

220
// Remove an item from the queue.
221
func (q *Queue) Remove(id string) {
83✔
222
        path := fmt.Sprintf("%s/%s%s", q.path, itemFilePrefix, id)
83✔
223
        err := os.Remove(path)
83✔
224
        if err != nil {
83✔
225
                log.Errorf("failed to remove queue file %q: %v", path, err)
×
226
        }
×
227

228
        q.mu.Lock()
83✔
229
        delete(q.q, id)
83✔
230
        q.mu.Unlock()
83✔
231
}
232

233
// DumpString returns a human-readable string with the current queue.
234
// Useful for debugging purposes.
235
func (q *Queue) DumpString() string {
2✔
236
        q.mu.RLock()
2✔
237
        defer q.mu.RUnlock()
2✔
238
        s := "# Queue status\n\n"
2✔
239
        s += fmt.Sprintf("date: %v\n", time.Now())
2✔
240
        s += fmt.Sprintf("length: %d\n\n", len(q.q))
2✔
241

2✔
242
        for id, item := range q.q {
3✔
243
                s += fmt.Sprintf("## Item %s\n", id)
1✔
244
                item.Lock()
1✔
245
                s += fmt.Sprintf("created at: %s\n", item.CreatedAt)
1✔
246
                s += fmt.Sprintf("from: %s\n", item.From)
1✔
247
                s += fmt.Sprintf("to: %s\n", item.To)
1✔
248
                for _, rcpt := range item.Rcpt {
2✔
249
                        s += fmt.Sprintf("%s %s (%s)\n", rcpt.Status, rcpt.Address, rcpt.Type)
1✔
250
                        s += fmt.Sprintf("  original address: %s\n", rcpt.OriginalAddress)
1✔
251
                        s += fmt.Sprintf("  last failure: %q\n", rcpt.LastFailureMessage)
1✔
252
                }
1✔
253
                item.Unlock()
1✔
254
                s += "\n"
1✔
255
        }
256

257
        return s
2✔
258
}
259

260
// An Item in the queue.
261
type Item struct {
262
        // Base the item on the protobuf message.
263
        // We will use this for serialization, so any fields below are NOT
264
        // serialized.
265
        Message
266

267
        // Protect the entire item.
268
        sync.Mutex
269

270
        // Go-friendly version of Message.CreatedAtTs.
271
        CreatedAt time.Time
272
}
273

274
// ItemFromFile loads an item from the given file.
275
func ItemFromFile(fname string) (*Item, error) {
2✔
276
        item := &Item{}
2✔
277
        err := protoio.ReadTextMessage(fname, &item.Message)
2✔
278
        if err != nil {
2✔
279
                return nil, err
×
280
        }
×
281

282
        item.CreatedAt = timeFromProto(item.CreatedAtTs)
2✔
283
        return item, nil
2✔
284
}
285

286
// WriteTo saves an item to the given directory.
287
func (item *Item) WriteTo(dir string) error {
175✔
288
        item.Lock()
175✔
289
        defer item.Unlock()
175✔
290
        itemsWritten.Add(1)
175✔
291

175✔
292
        item.CreatedAtTs = timeToProto(item.CreatedAt)
175✔
293

175✔
294
        path := fmt.Sprintf("%s/%s%s", dir, itemFilePrefix, item.ID)
175✔
295

175✔
296
        return protoio.WriteTextMessage(path, &item.Message, 0600)
175✔
297
}
175✔
298

299
// SendLoop repeatedly attempts to send the item.
300
func (item *Item) SendLoop(q *Queue) {
83✔
301
        tr := trace.New("Queue.SendLoop", item.ID)
83✔
302
        defer tr.Finish()
83✔
303
        tr.Printf("from %s", item.From)
83✔
304

83✔
305
        for time.Since(item.CreatedAt) < giveUpAfter {
165✔
306
                // Send to all recipients that are still pending.
82✔
307
                var wg sync.WaitGroup
82✔
308
                for _, rcpt := range item.Rcpt {
175✔
309
                        if rcpt.Status != Recipient_PENDING {
93✔
310
                                continue
×
311
                        }
312

313
                        wg.Add(1)
93✔
314
                        go item.sendOneRcpt(&wg, tr, q, rcpt)
93✔
315
                }
316
                wg.Wait()
82✔
317

82✔
318
                // If they're all done, no need to wait.
82✔
319
                if item.countRcpt(Recipient_PENDING) == 0 {
162✔
320
                        break
80✔
321
                }
322

323
                // TODO: Consider sending a non-final notification after 30m or so,
324
                // that some of the messages have been delayed.
325

326
                delay := nextDelay(item.CreatedAt)
×
327
                tr.Printf("waiting for %v", delay)
×
328
                maillog.QueueLoop(item.ID, item.From, delay)
×
329
                time.Sleep(delay)
×
330
        }
331

332
        // Completed to all recipients (some may not have succeeded).
333
        if item.countRcpt(Recipient_FAILED, Recipient_PENDING) > 0 && item.From != "<>" {
85✔
334
                sendDSN(tr, q, item)
4✔
335
        }
4✔
336

337
        tr.Printf("all done")
81✔
338
        maillog.QueueLoop(item.ID, item.From, 0)
81✔
339
        q.Remove(item.ID)
81✔
340
}
341

342
// sendOneRcpt, and update it with the results.
343
func (item *Item) sendOneRcpt(wg *sync.WaitGroup, tr *trace.Trace, q *Queue, rcpt *Recipient) {
93✔
344
        defer wg.Done()
93✔
345
        to := rcpt.Address
93✔
346
        tr.Debugf("%s sending", to)
93✔
347

93✔
348
        err, permanent := item.deliver(q, rcpt)
93✔
349

93✔
350
        item.Lock()
93✔
351
        if err != nil {
97✔
352
                rcpt.LastFailureMessage = err.Error()
4✔
353
                if permanent {
8✔
354
                        tr.Errorf("%s permanent error: %v", to, err)
4✔
355
                        maillog.SendAttempt(item.ID, item.From, to, err, true)
4✔
356
                        rcpt.Status = Recipient_FAILED
4✔
357
                } else {
4✔
358
                        tr.Printf("%s temporary error: %v", to, err)
×
359
                        maillog.SendAttempt(item.ID, item.From, to, err, false)
×
360
                }
×
361
        } else {
88✔
362
                tr.Printf("%s sent", to)
88✔
363
                maillog.SendAttempt(item.ID, item.From, to, nil, false)
88✔
364
                rcpt.Status = Recipient_SENT
88✔
365
        }
88✔
366
        item.Unlock()
92✔
367

92✔
368
        err = item.WriteTo(q.path)
92✔
369
        if err != nil {
92✔
370
                tr.Errorf("failed to write: %v", err)
×
371
        }
×
372
}
373

374
// deliver the item to the given recipient, using the couriers from the queue.
375
// Return an error (if any), and whether it is permanent or not.
376
func (item *Item) deliver(q *Queue, rcpt *Recipient) (err error, permanent bool) {
94✔
377
        if rcpt.Type == Recipient_PIPE {
98✔
378
                deliverAttempts.Add("pipe", 1)
4✔
379
                c := strings.Fields(rcpt.Address)
4✔
380
                if len(c) == 0 {
4✔
381
                        return fmt.Errorf("empty pipe"), true
×
382
                }
×
383
                ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
4✔
384
                defer cancel()
4✔
385
                cmd := exec.CommandContext(ctx, c[0], c[1:]...)
4✔
386
                cmd.Stdin = bytes.NewReader(item.Data)
4✔
387
                return cmd.Run(), true
4✔
388
        }
389

390
        // Recipient type is EMAIL.
391
        if envelope.DomainIn(rcpt.Address, q.localDomains) {
156✔
392
                deliverAttempts.Add("email:local", 1)
66✔
393
                return q.localC.Deliver(item.From, rcpt.Address, item.Data)
66✔
394
        }
66✔
395

396
        deliverAttempts.Add("email:remote", 1)
24✔
397
        from := item.From
24✔
398
        if !envelope.DomainIn(item.From, q.localDomains) {
29✔
399
                // We're sending from a non-local to a non-local. This should
5✔
400
                // happen only when there's an alias to forward email to a
5✔
401
                // non-local domain.  In this case, using the original From is
5✔
402
                // problematic, as we may not be an authorized sender for this.
5✔
403
                // Some MTAs (like Exim) will do it anyway, others (like
5✔
404
                // gmail) will construct a special address based on the
5✔
405
                // original address.  We go with the latter.
5✔
406
                // Note this assumes "+" is an alias suffix separator.
5✔
407
                // We use the IDNA version of the domain if possible, because
5✔
408
                // we can't know if the other side will support SMTPUTF8.
5✔
409
                from = fmt.Sprintf("%s+fwd_from=%s@%s",
5✔
410
                        envelope.UserOf(rcpt.OriginalAddress),
5✔
411
                        strings.Replace(from, "@", "=", -1),
5✔
412
                        mustIDNAToASCII(envelope.DomainOf(rcpt.OriginalAddress)))
5✔
413
        }
5✔
414
        return q.remoteC.Deliver(from, rcpt.Address, item.Data)
24✔
415
}
416

417
// countRcpt counts how many recipients are in the given status.
418
func (item *Item) countRcpt(statuses ...Recipient_Status) int {
161✔
419
        c := 0
161✔
420
        for _, rcpt := range item.Rcpt {
344✔
421
                for _, status := range statuses {
454✔
422
                        if rcpt.Status == status {
276✔
423
                                c++
5✔
424
                                break
5✔
425
                        }
426
                }
427
        }
428
        return c
161✔
429
}
430

431
func sendDSN(tr *trace.Trace, q *Queue, item *Item) {
4✔
432
        tr.Debugf("sending DSN")
4✔
433

4✔
434
        // Pick a (local) domain to send the DSN from. We should always find one,
4✔
435
        // as otherwise we're relaying.
4✔
436
        domain := "unknown"
4✔
437
        if item.From != "<>" && envelope.DomainIn(item.From, q.localDomains) {
7✔
438
                domain = envelope.DomainOf(item.From)
3✔
439
        } else {
4✔
440
                for _, rcpt := range item.Rcpt {
2✔
441
                        if envelope.DomainIn(rcpt.OriginalAddress, q.localDomains) {
2✔
442
                                domain = envelope.DomainOf(rcpt.OriginalAddress)
1✔
443
                                break
1✔
444
                        }
445
                }
446
        }
447

448
        msg, err := deliveryStatusNotification(domain, item)
4✔
449
        if err != nil {
4✔
450
                tr.Errorf("failed to build DSN: %v", err)
×
451
                return
×
452
        }
×
453

454
        // TODO: DKIM signing.
455

456
        id, err := q.Put(tr, "<>", []string{item.From}, msg)
4✔
457
        if err != nil {
4✔
458
                tr.Errorf("failed to queue DSN: %v", err)
×
459
                return
×
460
        }
×
461

462
        tr.Printf("queued DSN: %s", id)
4✔
463
        dsnQueued.Add(1)
4✔
464
}
465

466
func nextDelay(createdAt time.Time) time.Duration {
50✔
467
        var delay time.Duration
50✔
468

50✔
469
        since := time.Since(createdAt)
50✔
470
        switch {
50✔
471
        case since < 1*time.Minute:
10✔
472
                delay = 1 * time.Minute
10✔
473
        case since < 5*time.Minute:
10✔
474
                delay = 5 * time.Minute
10✔
475
        case since < 10*time.Minute:
10✔
476
                delay = 10 * time.Minute
10✔
477
        default:
20✔
478
                delay = 20 * time.Minute
20✔
479
        }
480

481
        // Perturb the delay, to avoid all queued emails to be retried at the
482
        // exact same time after a restart.
483
        delay += rand.N(60 * time.Second)
50✔
484
        return delay
50✔
485
}
486

487
func mustIDNAToASCII(s string) string {
5✔
488
        a, err := idna.ToASCII(s)
5✔
489
        if err != nil {
5✔
490
                return a
×
491
        }
×
492
        return s
5✔
493
}
494

495
func timeFromProto(ts *Timestamp) time.Time {
2✔
496
        return time.Unix(ts.Seconds, int64(ts.Nanos)).UTC()
2✔
497
}
2✔
498

499
func timeToProto(t time.Time) *Timestamp {
175✔
500
        return &Timestamp{
175✔
501
                Seconds: t.Unix(),
175✔
502
                Nanos:   int32(t.Nanosecond()),
175✔
503
        }
175✔
504
}
175✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc