• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

albertito / chasquid / 19617441740

24 Oct 2025 11:43AM UTC coverage: 95.043% (+0.01%) from 95.033%
19617441740

push

albertito
internal/dkim: Apply gofmt -s

A run of `gofmt -s` found two instances where a struct definition could
be simplified. This patch applies those suggestions.

5925 of 6234 relevant lines covered (95.04%)

63735.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.59
/internal/queue/queue.go
1
// Package queue implements our email queue.
2
// Accepted envelopes get put in the queue, and processed asynchronously.
3
package queue
4

5
// Command to generate queue.pb.go from queue.proto.
6
//go:generate protoc --go_out=. --go_opt=paths=source_relative -I=${GOPATH}/src -I. queue.proto
7

8
import (
9
        "bytes"
10
        "context"
11
        "encoding/base64"
12
        "encoding/binary"
13
        "fmt"
14
        "math/rand/v2"
15
        "os"
16
        "os/exec"
17
        "path/filepath"
18
        "strings"
19
        "sync"
20
        "time"
21

22
        "blitiri.com.ar/go/chasquid/internal/aliases"
23
        "blitiri.com.ar/go/chasquid/internal/courier"
24
        "blitiri.com.ar/go/chasquid/internal/envelope"
25
        "blitiri.com.ar/go/chasquid/internal/expvarom"
26
        "blitiri.com.ar/go/chasquid/internal/maillog"
27
        "blitiri.com.ar/go/chasquid/internal/protoio"
28
        "blitiri.com.ar/go/chasquid/internal/safeio"
29
        "blitiri.com.ar/go/chasquid/internal/set"
30
        "blitiri.com.ar/go/chasquid/internal/trace"
31
        "blitiri.com.ar/go/log"
32

33
        "golang.org/x/net/idna"
34
)
35

36
const (
37
        // Prefix for item file names.
38
        // This is for convenience, versioning, and to be able to tell them apart
39
        // temporary files and other cruft.
40
        // It's important that it's outside the base64 space so it doesn't get
41
        // generated accidentally.
42
        itemFilePrefix = "m:"
43
)
44

45
var (
46
        errQueueFull = fmt.Errorf("Queue size too big, try again later")
47
)
48

49
// Exported variables.
50
var (
51
        putCount = expvarom.NewInt("chasquid/queue/putCount",
52
                "count of envelopes attempted to be put in the queue")
53
        itemsWritten = expvarom.NewInt("chasquid/queue/itemsWritten",
54
                "count of items the queue wrote to disk")
55
        dsnQueued = expvarom.NewInt("chasquid/queue/dsnQueued",
56
                "count of DSNs that we generated (queued)")
57
        deliverAttempts = expvarom.NewMap("chasquid/queue/deliverAttempts",
58
                "recipient_type", "attempts to deliver mail, by recipient type")
59
)
60

61
// Channel used to get random IDs for items in the queue.
62
var newID chan string
63

64
func generateNewIDs() {
44✔
65
        // The IDs are only used internally, we are ok with using a PRNG.
44✔
66
        // IDs are base64(8 random bytes), but the code doesn't care.
44✔
67
        buf := make([]byte, 8)
44✔
68
        for {
468✔
69
                binary.NativeEndian.PutUint64(buf, rand.Uint64())
424✔
70
                newID <- base64.RawURLEncoding.EncodeToString(buf)
424✔
71
        }
424✔
72
}
73

74
func init() {
44✔
75
        newID = make(chan string, 4)
44✔
76
        go generateNewIDs()
44✔
77
}
44✔
78

79
// Queue that keeps mail waiting for delivery.
80
type Queue struct {
81
        // Couriers to use to deliver mail.
82
        localC  courier.Courier
83
        remoteC courier.Courier
84

85
        // Domains we consider local.
86
        localDomains *set.String
87

88
        // Path where we store the queue.
89
        path string
90

91
        // Aliases resolver.
92
        aliases *aliases.Resolver
93

94
        // The maximum number of items in the queue.
95
        MaxItems int
96

97
        // Give up sending attempts after this long.
98
        GiveUpAfter time.Duration
99

100
        // Mutex protecting q.
101
        mu sync.RWMutex
102

103
        // Items in the queue. Map of id -> Item.
104
        q map[string]*Item
105
}
106

107
// New creates a new Queue instance.
108
func New(path string, localDomains *set.String, aliases *aliases.Resolver,
109
        localC, remoteC courier.Courier) (*Queue, error) {
39✔
110

39✔
111
        err := os.MkdirAll(path, 0700)
39✔
112
        q := &Queue{
39✔
113
                q:            map[string]*Item{},
39✔
114
                localC:       localC,
39✔
115
                remoteC:      remoteC,
39✔
116
                localDomains: localDomains,
39✔
117
                path:         path,
39✔
118
                aliases:      aliases,
39✔
119

39✔
120
                // We reject emails when we hit this.
39✔
121
                // Note the actual default used in the daemon is set in the config. We
39✔
122
                // put a non-zero value here just to be safe.
39✔
123
                MaxItems: 100,
39✔
124

39✔
125
                // We give up sending (and return a DSN) after this long.
39✔
126
                // Note the actual default used in the daemon is set in the config. We
39✔
127
                // put a non-zero value here just to be safe.
39✔
128
                GiveUpAfter: 20 * time.Hour,
39✔
129
        }
39✔
130
        return q, err
39✔
131
}
39✔
132

133
// Load the queue and launch the sending loops on startup.
134
func (q *Queue) Load() error {
32✔
135
        files, err := filepath.Glob(q.path + "/" + itemFilePrefix + "*")
32✔
136
        if err != nil {
32✔
137
                return err
×
138
        }
×
139

140
        for _, fname := range files {
34✔
141
                item, err := ItemFromFile(fname)
2✔
142
                if err != nil {
2✔
143
                        log.Errorf("error loading queue item from %q: %v", fname, err)
×
144
                        continue
×
145
                }
146

147
                q.mu.Lock()
2✔
148
                q.q[item.ID] = item
2✔
149
                q.mu.Unlock()
2✔
150

2✔
151
                go item.SendLoop(q)
2✔
152
        }
153

154
        return nil
32✔
155
}
156

157
// Len returns the number of elements in the queue.
158
func (q *Queue) Len() int {
94✔
159
        q.mu.RLock()
94✔
160
        defer q.mu.RUnlock()
94✔
161
        return len(q.q)
94✔
162
}
94✔
163

164
// Put an envelope in the queue.
165
func (q *Queue) Put(tr *trace.Trace, from string, to []string, data []byte) (string, error) {
91✔
166
        tr = tr.NewChild("Queue.Put", from)
91✔
167
        defer tr.Finish()
91✔
168

91✔
169
        if nItems := q.Len(); nItems >= q.MaxItems {
92✔
170
                tr.Errorf("queue full (%d items)", nItems)
1✔
171
                return "", errQueueFull
1✔
172
        }
1✔
173
        putCount.Add(1)
90✔
174

90✔
175
        item := &Item{
90✔
176
                Message: Message{
90✔
177
                        ID:   <-newID,
90✔
178
                        From: from,
90✔
179
                        Data: data,
90✔
180
                },
90✔
181
                CreatedAt: time.Now(),
90✔
182
        }
90✔
183

90✔
184
        for _, t := range to {
184✔
185
                item.To = append(item.To, t)
94✔
186

94✔
187
                rcpts, err := q.aliases.Resolve(tr, t)
94✔
188
                if err != nil {
94✔
189
                        return "", fmt.Errorf("error resolving aliases for %q: %v", t, err)
×
190
                }
×
191

192
                // Add the recipients (after resolving aliases); this conversion is
193
                // not very pretty but at least it's self contained.
194
                for _, aliasRcpt := range rcpts {
196✔
195
                        r := &Recipient{
102✔
196
                                Address:         aliasRcpt.Addr,
102✔
197
                                Status:          Recipient_PENDING,
102✔
198
                                OriginalAddress: t,
102✔
199
                        }
102✔
200
                        switch aliasRcpt.Type {
102✔
201
                        case aliases.EMAIL:
93✔
202
                                r.Type = Recipient_EMAIL
93✔
203
                        case aliases.PIPE:
3✔
204
                                r.Type = Recipient_PIPE
3✔
205
                        case aliases.FORWARD:
6✔
206
                                r.Type = Recipient_FORWARD
6✔
207
                                r.Via = aliasRcpt.Via
6✔
208
                        default:
×
209
                                log.Errorf("unknown alias type %v when resolving %q",
×
210
                                        aliasRcpt.Type, t)
×
211
                                return "", tr.Errorf("internal error - unknown alias type")
×
212
                        }
213
                        item.Rcpt = append(item.Rcpt, r)
102✔
214
                        tr.Debugf("recipient: %v", r.Address)
102✔
215
                }
216
        }
217

218
        err := item.WriteTo(q.path)
90✔
219
        if err != nil {
90✔
220
                return "", tr.Errorf("failed to write item: %v", err)
×
221
        }
×
222

223
        q.mu.Lock()
90✔
224
        q.q[item.ID] = item
90✔
225
        q.mu.Unlock()
90✔
226

90✔
227
        // Begin to send it right away.
90✔
228
        go item.SendLoop(q)
90✔
229

90✔
230
        tr.Debugf("queued")
90✔
231
        return item.ID, nil
90✔
232
}
233

234
// Remove an item from the queue.
235
func (q *Queue) Remove(id string) {
92✔
236
        path := fmt.Sprintf("%s/%s%s", q.path, itemFilePrefix, id)
92✔
237
        err := os.Remove(path)
92✔
238
        if err != nil {
96✔
239
                log.Errorf("failed to remove queue file %q: %v", path, err)
4✔
240
        }
4✔
241

242
        q.mu.Lock()
92✔
243
        delete(q.q, id)
92✔
244
        q.mu.Unlock()
92✔
245
}
246

247
// DumpString returns a human-readable string with the current queue.
248
// Useful for debugging purposes.
249
func (q *Queue) DumpString() string {
2✔
250
        q.mu.RLock()
2✔
251
        defer q.mu.RUnlock()
2✔
252
        s := "# Queue status\n\n"
2✔
253
        s += fmt.Sprintf("date: %v\n", time.Now())
2✔
254
        s += fmt.Sprintf("length: %d\n\n", len(q.q))
2✔
255

2✔
256
        for id, item := range q.q {
3✔
257
                s += fmt.Sprintf("## Item %s\n", id)
1✔
258
                item.Lock()
1✔
259
                s += fmt.Sprintf("created at: %s\n", item.CreatedAt)
1✔
260
                s += fmt.Sprintf("from: %s\n", item.From)
1✔
261
                s += fmt.Sprintf("to: %s\n", item.To)
1✔
262
                for _, rcpt := range item.Rcpt {
2✔
263
                        s += fmt.Sprintf("%s %s (%s)\n", rcpt.Status, rcpt.Address, rcpt.Type)
1✔
264
                        s += fmt.Sprintf("  original address: %s\n", rcpt.OriginalAddress)
1✔
265
                        s += fmt.Sprintf("  last failure: %q\n", rcpt.LastFailureMessage)
1✔
266
                }
1✔
267
                item.Unlock()
1✔
268
                s += "\n"
1✔
269
        }
270

271
        return s
2✔
272
}
273

274
// An Item in the queue.
275
type Item struct {
276
        // Base the item on the protobuf message.
277
        // We will use this for serialization, so any fields below are NOT
278
        // serialized.
279
        Message
280

281
        // Protect the entire item.
282
        sync.Mutex
283

284
        // Go-friendly version of Message.CreatedAtTs.
285
        CreatedAt time.Time
286
}
287

288
// ItemFromFile loads an item from the given file.
289
func ItemFromFile(fname string) (*Item, error) {
2✔
290
        item := &Item{}
2✔
291
        err := protoio.ReadTextMessage(fname, &item.Message)
2✔
292
        if err != nil {
2✔
293
                return nil, err
×
294
        }
×
295

296
        item.CreatedAt = timeFromProto(item.CreatedAtTs)
2✔
297
        return item, nil
2✔
298
}
299

300
// WriteTo saves an item to the given directory.
301
func (item *Item) WriteTo(dir string) error {
196✔
302
        item.Lock()
196✔
303
        defer item.Unlock()
196✔
304
        itemsWritten.Add(1)
196✔
305

196✔
306
        item.CreatedAtTs = timeToProto(item.CreatedAt)
196✔
307

196✔
308
        path := fmt.Sprintf("%s/%s%s", dir, itemFilePrefix, item.ID)
196✔
309

196✔
310
        // Write the file in text format for ease of debugging, and use fsync to
196✔
311
        // improve durability.
196✔
312
        return protoio.WriteTextMessage(
196✔
313
                path, &item.Message, 0600, safeio.FsyncFileOp)
196✔
314
}
196✔
315

316
// SendLoop repeatedly attempts to send the item.
317
func (item *Item) SendLoop(q *Queue) {
93✔
318
        tr := trace.New("Queue.SendLoop", item.ID)
93✔
319
        defer tr.Finish()
93✔
320
        tr.Printf("from %s", item.From)
93✔
321

93✔
322
        for time.Since(item.CreatedAt) < q.GiveUpAfter {
185✔
323
                // Send to all recipients that are still pending.
92✔
324
                var wg sync.WaitGroup
92✔
325
                for _, rcpt := range item.Rcpt {
196✔
326
                        if rcpt.Status != Recipient_PENDING {
104✔
327
                                continue
×
328
                        }
329

330
                        wg.Add(1)
104✔
331
                        go item.sendOneRcpt(&wg, tr, q, rcpt)
104✔
332
                }
333
                wg.Wait()
92✔
334

92✔
335
                // If they're all done, no need to wait.
92✔
336
                if item.countRcpt(Recipient_PENDING) == 0 {
181✔
337
                        break
89✔
338
                }
339

340
                // TODO: Consider sending a non-final notification after 30m or so,
341
                // that some of the messages have been delayed.
342

343
                delay := nextDelay(item.CreatedAt)
×
344
                tr.Printf("waiting for %v", delay)
×
345
                maillog.QueueLoop(item.ID, item.From, delay)
×
346
                time.Sleep(delay)
×
347
        }
348

349
        // Completed to all recipients (some may not have succeeded).
350
        if item.countRcpt(Recipient_FAILED, Recipient_PENDING) > 0 && item.From != "<>" {
94✔
351
                sendDSN(tr, q, item)
4✔
352
        }
4✔
353

354
        tr.Printf("all done")
90✔
355
        maillog.QueueLoop(item.ID, item.From, 0)
90✔
356
        q.Remove(item.ID)
90✔
357
}
358

359
// sendOneRcpt, and update it with the results.
360
func (item *Item) sendOneRcpt(wg *sync.WaitGroup, tr *trace.Trace, q *Queue, rcpt *Recipient) {
104✔
361
        defer wg.Done()
104✔
362
        to := rcpt.Address
104✔
363
        tr.Debugf("%s sending", to)
104✔
364

104✔
365
        err, permanent := item.deliver(q, rcpt)
104✔
366

104✔
367
        item.Lock()
104✔
368
        if err != nil {
108✔
369
                rcpt.LastFailureMessage = err.Error()
4✔
370
                if permanent {
8✔
371
                        tr.Errorf("%s permanent error: %v", to, err)
4✔
372
                        maillog.SendAttempt(item.ID, item.From, to, err, true)
4✔
373
                        rcpt.Status = Recipient_FAILED
4✔
374
                } else {
4✔
375
                        tr.Printf("%s temporary error: %v", to, err)
×
376
                        maillog.SendAttempt(item.ID, item.From, to, err, false)
×
377
                }
×
378
        } else {
100✔
379
                tr.Printf("%s sent", to)
100✔
380
                maillog.SendAttempt(item.ID, item.From, to, nil, false)
100✔
381
                rcpt.Status = Recipient_SENT
100✔
382
        }
100✔
383
        item.Unlock()
104✔
384

104✔
385
        err = item.WriteTo(q.path)
104✔
386
        if err != nil {
110✔
387
                tr.Errorf("failed to write: %v", err)
6✔
388
        }
6✔
389
}
390

391
// deliver the item to the given recipient, using the couriers from the queue.
392
// Return an error (if any), and whether it is permanent or not.
393
func (item *Item) deliver(q *Queue, rcpt *Recipient) (err error, permanent bool) {
105✔
394
        if rcpt.Type == Recipient_PIPE {
109✔
395
                deliverAttempts.Add("pipe", 1)
4✔
396
                c := strings.Fields(rcpt.Address)
4✔
397
                if len(c) == 0 {
4✔
398
                        return fmt.Errorf("empty pipe"), true
×
399
                }
×
400
                ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
4✔
401
                defer cancel()
4✔
402
                cmd := exec.CommandContext(ctx, c[0], c[1:]...)
4✔
403
                cmd.Stdin = bytes.NewReader(item.Data)
4✔
404
                return cmd.Run(), true
4✔
405
        }
406

407
        // Recipient type is FORWARD: we always use the remote courier, and pass
408
        // the list of servers that was given to us.
409
        if rcpt.Type == Recipient_FORWARD {
107✔
410
                deliverAttempts.Add("forward", 1)
6✔
411

6✔
412
                // When forwarding with an explicit list of servers, we use SRS if
6✔
413
                // we're sending from a non-local domain (regardless of the
6✔
414
                // destination).
6✔
415
                from := item.From
6✔
416
                if !envelope.DomainIn(item.From, q.localDomains) {
9✔
417
                        from = rewriteSender(item.From, rcpt.OriginalAddress)
3✔
418
                }
3✔
419
                return q.remoteC.Forward(from, rcpt.Address, item.Data, rcpt.Via)
6✔
420
        }
421

422
        // Recipient type is EMAIL.
423
        if envelope.DomainIn(rcpt.Address, q.localDomains) {
164✔
424
                deliverAttempts.Add("email:local", 1)
69✔
425
                return q.localC.Deliver(item.From, rcpt.Address, item.Data)
69✔
426
        }
69✔
427

428
        deliverAttempts.Add("email:remote", 1)
26✔
429
        from := item.From
26✔
430
        if !envelope.DomainIn(item.From, q.localDomains) {
31✔
431
                // We're sending from a non-local to a non-local, need to do SRS.
5✔
432
                from = rewriteSender(item.From, rcpt.OriginalAddress)
5✔
433
        }
5✔
434
        return q.remoteC.Deliver(from, rcpt.Address, item.Data)
26✔
435
}
436

437
func rewriteSender(from, originalAddr string) string {
8✔
438
        // Apply a send-only Sender Rewriting Scheme (SRS).
8✔
439
        // This is used when we are sending from a (potentially) non-local domain,
8✔
440
        // to a non-local domain.
8✔
441
        // This should happen only when there's an alias to forward email to a
8✔
442
        // non-local domain (either a normal "email" alias with a remote
8✔
443
        // destination, or a "forward" alias with a list of servers).
8✔
444
        // In this case, using the original From is problematic, as we may not be
8✔
445
        // an authorized sender for this.
8✔
446
        // To do this, we use a sender rewriting scheme, similar to what other
8✔
447
        // MTAs do (e.g. gmail or postfix).
8✔
448
        // Note this assumes "+" is an alias suffix separator.
8✔
449
        // We use the IDNA version of the domain if possible, because
8✔
450
        // we can't know if the other side will support SMTPUTF8.
8✔
451
        return fmt.Sprintf("%s+fwd_from=%s@%s",
8✔
452
                envelope.UserOf(originalAddr),
8✔
453
                strings.Replace(from, "@", "=", -1),
8✔
454
                mustIDNAToASCII(envelope.DomainOf(originalAddr)))
8✔
455
}
8✔
456

457
// countRcpt counts how many recipients are in the given status.
458
func (item *Item) countRcpt(statuses ...Recipient_Status) int {
179✔
459
        c := 0
179✔
460
        for _, rcpt := range item.Rcpt {
382✔
461
                for _, status := range statuses {
504✔
462
                        if rcpt.Status == status {
306✔
463
                                c++
5✔
464
                                break
5✔
465
                        }
466
                }
467
        }
468
        return c
179✔
469
}
470

471
func sendDSN(tr *trace.Trace, q *Queue, item *Item) {
4✔
472
        tr.Debugf("sending DSN")
4✔
473

4✔
474
        // Pick a (local) domain to send the DSN from. We should always find one,
4✔
475
        // as otherwise we're relaying.
4✔
476
        domain := "unknown"
4✔
477
        if item.From != "<>" && envelope.DomainIn(item.From, q.localDomains) {
7✔
478
                domain = envelope.DomainOf(item.From)
3✔
479
        } else {
4✔
480
                for _, rcpt := range item.Rcpt {
2✔
481
                        if envelope.DomainIn(rcpt.OriginalAddress, q.localDomains) {
2✔
482
                                domain = envelope.DomainOf(rcpt.OriginalAddress)
1✔
483
                                break
1✔
484
                        }
485
                }
486
        }
487

488
        msg, err := deliveryStatusNotification(domain, item)
4✔
489
        if err != nil {
4✔
490
                tr.Errorf("failed to build DSN: %v", err)
×
491
                return
×
492
        }
×
493

494
        // TODO: DKIM signing.
495

496
        id, err := q.Put(tr, "<>", []string{item.From}, msg)
4✔
497
        if err != nil {
4✔
498
                tr.Errorf("failed to queue DSN: %v", err)
×
499
                return
×
500
        }
×
501

502
        tr.Printf("queued DSN: %s", id)
4✔
503
        dsnQueued.Add(1)
4✔
504
}
505

506
func nextDelay(createdAt time.Time) time.Duration {
50✔
507
        var delay time.Duration
50✔
508

50✔
509
        since := time.Since(createdAt)
50✔
510
        switch {
50✔
511
        case since < 1*time.Minute:
10✔
512
                delay = 1 * time.Minute
10✔
513
        case since < 5*time.Minute:
10✔
514
                delay = 5 * time.Minute
10✔
515
        case since < 10*time.Minute:
10✔
516
                delay = 10 * time.Minute
10✔
517
        default:
20✔
518
                delay = 20 * time.Minute
20✔
519
        }
520

521
        // Perturb the delay, to avoid all queued emails to be retried at the
522
        // exact same time after a restart.
523
        delay += rand.N(60 * time.Second)
50✔
524
        return delay
50✔
525
}
526

527
func mustIDNAToASCII(s string) string {
8✔
528
        a, err := idna.ToASCII(s)
8✔
529
        if err != nil {
8✔
530
                return a
×
531
        }
×
532
        return s
8✔
533
}
534

535
func timeFromProto(ts *Timestamp) time.Time {
2✔
536
        return time.Unix(ts.Seconds, int64(ts.Nanos)).UTC()
2✔
537
}
2✔
538

539
func timeToProto(t time.Time) *Timestamp {
196✔
540
        return &Timestamp{
196✔
541
                Seconds: t.Unix(),
196✔
542
                Nanos:   int32(t.Nanosecond()),
196✔
543
        }
196✔
544
}
196✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc