• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 13513815732

24 Feb 2025 07:33PM UTC coverage: 68.025% (-17.4%) from 85.444%
13513815732

push

github

web-flow
[FIXED] Preserve max delivered messages with Interest retention (#6575)

Resolves https://github.com/nats-io/nats-server/issues/6538

If a consumer reached max deliveries for a message, it should preserve
the redelivered state and allow inspecting its content. However, if a
new consumer would be created and consume this message as well, it would
still be removed under Interest retention.

This PR fixes that by using the redelivered state to keep marking
there's interest.

Only downside is that the redelivered state gets cleaned up after a
restart (this PR does not change/fix that). So if the consumer that had
a max delivery message keeps acknowledging messages and its
acknowledgement floor moves up, it would clean up the redelivered state
below this ack floor.

Honestly I feel like keeping messages around if max delivery is reached
makes the code very complex. It would be a lot cleaner if we'd only have
the acknowledgement floor, starting sequence, and pending messages
in-between, not also redelivered state that can be below ack floor. It's
not something we can change now I suppose, but I'd be in favor of having
messages automatically be removed once max delivery is reached and all
consumers have consumed the message. DLQ-style behavior would then be
more explicitly (and reliably) handled by the client, for example by
publishing into another stream and then TERM the message, instead of
relying on advisories that could be missed.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>

55031 of 80898 relevant lines covered (68.03%)

310123.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.87
/server/msgtrace.go
1
// Copyright 2024 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "encoding/json"
19
        "fmt"
20
        "math/rand"
21
        "strconv"
22
        "strings"
23
        "sync/atomic"
24
        "time"
25
)
26

27
const (
28
        MsgTraceDest          = "Nats-Trace-Dest"
29
        MsgTraceHop           = "Nats-Trace-Hop"
30
        MsgTraceOriginAccount = "Nats-Trace-Origin-Account"
31
        MsgTraceOnly          = "Nats-Trace-Only"
32

33
        // External trace header. Note that this header is normally in lower
34
        // case (https://www.w3.org/TR/trace-context/#header-name). Vendors
35
        // MUST expect the header in any case (upper, lower, mixed), and
36
        // SHOULD send the header name in lowercase.
37
        traceParentHdr = "traceparent"
38
)
39

40
type MsgTraceType string
41

42
// Type of message trace events in the MsgTraceEvents list.
43
// This is needed to unmarshal the list.
44
const (
45
        MsgTraceIngressType        = "in"
46
        MsgTraceSubjectMappingType = "sm"
47
        MsgTraceStreamExportType   = "se"
48
        MsgTraceServiceImportType  = "si"
49
        MsgTraceJetStreamType      = "js"
50
        MsgTraceEgressType         = "eg"
51
)
52

53
type MsgTraceEvent struct {
54
        Server  ServerInfo      `json:"server"`
55
        Request MsgTraceRequest `json:"request"`
56
        Hops    int             `json:"hops,omitempty"`
57
        Events  MsgTraceEvents  `json:"events"`
58
}
59

60
type MsgTraceRequest struct {
61
        // We are not making this an http.Header so that header name case is preserved.
62
        Header  map[string][]string `json:"header,omitempty"`
63
        MsgSize int                 `json:"msgsize,omitempty"`
64
}
65

66
type MsgTraceEvents []MsgTrace
67

68
type MsgTrace interface {
69
        new() MsgTrace
70
        typ() MsgTraceType
71
}
72

73
type MsgTraceBase struct {
74
        Type      MsgTraceType `json:"type"`
75
        Timestamp time.Time    `json:"ts"`
76
}
77

78
type MsgTraceIngress struct {
79
        MsgTraceBase
80
        Kind    int    `json:"kind"`
81
        CID     uint64 `json:"cid"`
82
        Name    string `json:"name,omitempty"`
83
        Account string `json:"acc"`
84
        Subject string `json:"subj"`
85
        Error   string `json:"error,omitempty"`
86
}
87

88
type MsgTraceSubjectMapping struct {
89
        MsgTraceBase
90
        MappedTo string `json:"to"`
91
}
92

93
type MsgTraceStreamExport struct {
94
        MsgTraceBase
95
        Account string `json:"acc"`
96
        To      string `json:"to"`
97
}
98

99
type MsgTraceServiceImport struct {
100
        MsgTraceBase
101
        Account string `json:"acc"`
102
        From    string `json:"from"`
103
        To      string `json:"to"`
104
}
105

106
type MsgTraceJetStream struct {
107
        MsgTraceBase
108
        Stream     string `json:"stream"`
109
        Subject    string `json:"subject,omitempty"`
110
        NoInterest bool   `json:"nointerest,omitempty"`
111
        Error      string `json:"error,omitempty"`
112
}
113

114
type MsgTraceEgress struct {
115
        MsgTraceBase
116
        Kind         int    `json:"kind"`
117
        CID          uint64 `json:"cid"`
118
        Name         string `json:"name,omitempty"`
119
        Hop          string `json:"hop,omitempty"`
120
        Account      string `json:"acc,omitempty"`
121
        Subscription string `json:"sub,omitempty"`
122
        Queue        string `json:"queue,omitempty"`
123
        Error        string `json:"error,omitempty"`
124

125
        // This is for applications that unmarshal the trace events
126
        // and want to link an egress to route/leaf/gateway with
127
        // the MsgTraceEvent from that server.
128
        Link *MsgTraceEvent `json:"-"`
129
}
130

131
// -------------------------------------------------------------
132

133
func (t MsgTraceBase) typ() MsgTraceType     { return t.Type }
×
134
func (MsgTraceIngress) new() MsgTrace        { return &MsgTraceIngress{} }
×
135
func (MsgTraceSubjectMapping) new() MsgTrace { return &MsgTraceSubjectMapping{} }
×
136
func (MsgTraceStreamExport) new() MsgTrace   { return &MsgTraceStreamExport{} }
×
137
func (MsgTraceServiceImport) new() MsgTrace  { return &MsgTraceServiceImport{} }
×
138
func (MsgTraceJetStream) new() MsgTrace      { return &MsgTraceJetStream{} }
×
139
func (MsgTraceEgress) new() MsgTrace         { return &MsgTraceEgress{} }
×
140

141
var msgTraceInterfaces = map[MsgTraceType]MsgTrace{
142
        MsgTraceIngressType:        MsgTraceIngress{},
143
        MsgTraceSubjectMappingType: MsgTraceSubjectMapping{},
144
        MsgTraceStreamExportType:   MsgTraceStreamExport{},
145
        MsgTraceServiceImportType:  MsgTraceServiceImport{},
146
        MsgTraceJetStreamType:      MsgTraceJetStream{},
147
        MsgTraceEgressType:         MsgTraceEgress{},
148
}
149

150
func (t *MsgTraceEvents) UnmarshalJSON(data []byte) error {
×
151
        var raw []json.RawMessage
×
152
        err := json.Unmarshal(data, &raw)
×
153
        if err != nil {
×
154
                return err
×
155
        }
×
156
        *t = make(MsgTraceEvents, len(raw))
×
157
        var tt MsgTraceBase
×
158
        for i, r := range raw {
×
159
                if err = json.Unmarshal(r, &tt); err != nil {
×
160
                        return err
×
161
                }
×
162
                tr, ok := msgTraceInterfaces[tt.Type]
×
163
                if !ok {
×
164
                        return fmt.Errorf("unknown trace type %v", tt.Type)
×
165
                }
×
166
                te := tr.new()
×
167
                if err := json.Unmarshal(r, te); err != nil {
×
168
                        return err
×
169
                }
×
170
                (*t)[i] = te
×
171
        }
172
        return nil
×
173
}
174

175
func getTraceAs[T MsgTrace](e any) *T {
×
176
        v, ok := e.(*T)
×
177
        if ok {
×
178
                return v
×
179
        }
×
180
        return nil
×
181
}
182

183
func (t *MsgTraceEvent) Ingress() *MsgTraceIngress {
×
184
        if len(t.Events) < 1 {
×
185
                return nil
×
186
        }
×
187
        return getTraceAs[MsgTraceIngress](t.Events[0])
×
188
}
189

190
func (t *MsgTraceEvent) SubjectMapping() *MsgTraceSubjectMapping {
×
191
        for _, e := range t.Events {
×
192
                if e.typ() == MsgTraceSubjectMappingType {
×
193
                        return getTraceAs[MsgTraceSubjectMapping](e)
×
194
                }
×
195
        }
196
        return nil
×
197
}
198

199
func (t *MsgTraceEvent) StreamExports() []*MsgTraceStreamExport {
×
200
        var se []*MsgTraceStreamExport
×
201
        for _, e := range t.Events {
×
202
                if e.typ() == MsgTraceStreamExportType {
×
203
                        se = append(se, getTraceAs[MsgTraceStreamExport](e))
×
204
                }
×
205
        }
206
        return se
×
207
}
208

209
func (t *MsgTraceEvent) ServiceImports() []*MsgTraceServiceImport {
×
210
        var si []*MsgTraceServiceImport
×
211
        for _, e := range t.Events {
×
212
                if e.typ() == MsgTraceServiceImportType {
×
213
                        si = append(si, getTraceAs[MsgTraceServiceImport](e))
×
214
                }
×
215
        }
216
        return si
×
217
}
218

219
func (t *MsgTraceEvent) JetStream() *MsgTraceJetStream {
×
220
        for _, e := range t.Events {
×
221
                if e.typ() == MsgTraceJetStreamType {
×
222
                        return getTraceAs[MsgTraceJetStream](e)
×
223
                }
×
224
        }
225
        return nil
×
226
}
227

228
func (t *MsgTraceEvent) Egresses() []*MsgTraceEgress {
×
229
        var eg []*MsgTraceEgress
×
230
        for _, e := range t.Events {
×
231
                if e.typ() == MsgTraceEgressType {
×
232
                        eg = append(eg, getTraceAs[MsgTraceEgress](e))
×
233
                }
×
234
        }
235
        return eg
×
236
}
237

238
const (
239
        errMsgTraceOnlyNoSupport   = "Not delivered because remote does not support message tracing"
240
        errMsgTraceNoSupport       = "Message delivered but remote does not support message tracing so no trace event generated from there"
241
        errMsgTraceNoEcho          = "Not delivered because of no echo"
242
        errMsgTracePubViolation    = "Not delivered because publish denied for this subject"
243
        errMsgTraceSubDeny         = "Not delivered because subscription denies this subject"
244
        errMsgTraceSubClosed       = "Not delivered because subscription is closed"
245
        errMsgTraceClientClosed    = "Not delivered because client is closed"
246
        errMsgTraceAutoSubExceeded = "Not delivered because auto-unsubscribe exceeded"
247
        errMsgTraceFastProdNoStall = "Not delivered because fast producer not stalled and consumer is slow"
248
)
249

250
type msgTrace struct {
251
        ready int32
252
        srv   *Server
253
        acc   *Account
254
        // Origin account name, set only if acc is nil when acc lookup failed.
255
        oan   string
256
        dest  string
257
        event *MsgTraceEvent
258
        js    *MsgTraceJetStream
259
        hop   string
260
        nhop  string
261
        tonly bool // Will only trace the message, not do delivery.
262
        ct    compressionType
263
}
264

265
// This will be false outside of the tests, so when building the server binary,
266
// any code where you see `if msgTraceRunInTests` statement will be compiled
267
// out, so this will have no performance penalty.
268
var (
269
        msgTraceRunInTests   bool
270
        msgTraceCheckSupport bool
271
)
272

273
// Returns the message trace object, if message is being traced,
274
// and `true` if we want to only trace, not actually deliver the message.
275
func (c *client) isMsgTraceEnabled() (*msgTrace, bool) {
35,231,003✔
276
        t := c.pa.trace
35,231,003✔
277
        if t == nil {
70,462,006✔
278
                return nil, false
35,231,003✔
279
        }
35,231,003✔
280
        return t, t.tonly
×
281
}
282

283
// For LEAF/ROUTER/GATEWAY, return false if the remote does not support
284
// message tracing (important if the tracing requests trace-only).
285
func (c *client) msgTraceSupport() bool {
×
286
        // Exclude client connection from the protocol check.
×
287
        return c.kind == CLIENT || c.opts.Protocol >= MsgTraceProto
×
288
}
×
289

290
func getConnName(c *client) string {
×
291
        switch c.kind {
×
292
        case ROUTER:
×
293
                if n := c.route.remoteName; n != _EMPTY_ {
×
294
                        return n
×
295
                }
×
296
        case GATEWAY:
×
297
                if n := c.gw.remoteName; n != _EMPTY_ {
×
298
                        return n
×
299
                }
×
300
        case LEAF:
×
301
                if n := c.leaf.remoteServer; n != _EMPTY_ {
×
302
                        return n
×
303
                }
×
304
        }
305
        return c.opts.Name
×
306
}
307

308
func getCompressionType(cts string) compressionType {
9✔
309
        if cts == _EMPTY_ {
18✔
310
                return noCompression
9✔
311
        }
9✔
312
        cts = strings.ToLower(cts)
×
313
        if strings.Contains(cts, "snappy") || strings.Contains(cts, "s2") {
×
314
                return snappyCompression
×
315
        }
×
316
        if strings.Contains(cts, "gzip") {
×
317
                return gzipCompression
×
318
        }
×
319
        return unsupportedCompression
×
320
}
321

322
func (c *client) initMsgTrace() *msgTrace {
616,944✔
323
        // The code in the "if" statement is only running in test mode.
616,944✔
324
        if msgTraceRunInTests {
1,232,026✔
325
                // Check the type of client that tries to initialize a trace struct.
615,082✔
326
                if !(c.kind == CLIENT || c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF) {
615,082✔
327
                        panic(fmt.Sprintf("Unexpected client type %q trying to initialize msgTrace", c.kindString()))
×
328
                }
329
                // In some tests, we want to make a server behave like an old server
330
                // and so even if a trace header is received, we want the server to
331
                // simply ignore it.
332
                if msgTraceCheckSupport {
615,082✔
333
                        if c.srv == nil || c.srv.getServerProto() < MsgTraceProto {
×
334
                                return nil
×
335
                        }
×
336
                }
337
        }
338
        if c.pa.hdr <= 0 {
616,944✔
339
                return nil
×
340
        }
×
341
        hdr := c.msgBuf[:c.pa.hdr]
616,944✔
342
        headers, external := genHeaderMapIfTraceHeadersPresent(hdr)
616,944✔
343
        if len(headers) == 0 {
1,233,879✔
344
                return nil
616,935✔
345
        }
616,935✔
346
        // Little helper to give us the first value of a given header, or _EMPTY_
347
        // if key is not present.
348
        getHdrVal := func(key string) string {
18✔
349
                vv, ok := headers[key]
9✔
350
                if !ok {
18✔
351
                        return _EMPTY_
9✔
352
                }
9✔
353
                return vv[0]
×
354
        }
355
        ct := getCompressionType(getHdrVal(acceptEncodingHeader))
9✔
356
        var (
9✔
357
                dest      string
9✔
358
                traceOnly bool
9✔
359
        )
9✔
360
        // Check for traceOnly only if not external.
9✔
361
        if !external {
9✔
362
                if to := getHdrVal(MsgTraceOnly); to != _EMPTY_ {
×
363
                        tos := strings.ToLower(to)
×
364
                        switch tos {
×
365
                        case "1", "true", "on":
×
366
                                traceOnly = true
×
367
                        }
368
                }
369
                dest = getHdrVal(MsgTraceDest)
×
370
                // Check the destination to see if this is a valid public subject.
×
371
                if !IsValidPublishSubject(dest) {
×
372
                        // We still have to return a msgTrace object (if traceOnly is set)
×
373
                        // because if we don't, the message will end-up being delivered to
×
374
                        // applications, which may break them. We report the error in any case.
×
375
                        c.Errorf("Destination %q is not valid, won't be able to trace events", dest)
×
376
                        if !traceOnly {
×
377
                                // We can bail, tracing will be disabled for this message.
×
378
                                return nil
×
379
                        }
×
380
                }
381
        }
382
        var (
9✔
383
                // Account to use when sending the trace event
9✔
384
                acc *Account
9✔
385
                // Ingress' account name
9✔
386
                ian string
9✔
387
                // Origin account name
9✔
388
                oan string
9✔
389
                // The hop "id", taken from headers only when not from CLIENT
9✔
390
                hop string
9✔
391
        )
9✔
392
        if c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF {
9✔
393
                // The ingress account name will always be c.pa.account, but `acc` may
×
394
                // be different if we have an origin account header.
×
395
                if c.kind == LEAF {
×
396
                        ian = c.acc.GetName()
×
397
                } else {
×
398
                        ian = string(c.pa.account)
×
399
                }
×
400
                // The remote will have set the origin account header only if the
401
                // message changed account (think of service imports).
402
                oan = getHdrVal(MsgTraceOriginAccount)
×
403
                if oan == _EMPTY_ {
×
404
                        // For LEAF or ROUTER with pinned-account, we can use the c.acc.
×
405
                        if c.kind == LEAF || (c.kind == ROUTER && len(c.route.accName) > 0) {
×
406
                                acc = c.acc
×
407
                        } else {
×
408
                                // We will lookup account with c.pa.account (or ian).
×
409
                                oan = ian
×
410
                        }
×
411
                }
412
                // Unless we already got the account, we need to look it up.
413
                if acc == nil {
×
414
                        // We don't want to do account resolving here.
×
415
                        if acci, ok := c.srv.accounts.Load(oan); ok {
×
416
                                acc = acci.(*Account)
×
417
                                // Since we have looked-up the account, we don't need oan, so
×
418
                                // clear it in case it was set.
×
419
                                oan = _EMPTY_
×
420
                        } else {
×
421
                                // We still have to return a msgTrace object (if traceOnly is set)
×
422
                                // because if we don't, the message will end-up being delivered to
×
423
                                // applications, which may break them. We report the error in any case.
×
424
                                c.Errorf("Account %q was not found, won't be able to trace events", oan)
×
425
                                if !traceOnly {
×
426
                                        // We can bail, tracing will be disabled for this message.
×
427
                                        return nil
×
428
                                }
×
429
                        }
430
                }
431
                // Check the hop header
432
                hop = getHdrVal(MsgTraceHop)
×
433
        } else {
9✔
434
                acc = c.acc
9✔
435
                ian = acc.GetName()
9✔
436
        }
9✔
437
        // If external, we need to have the account's trace destination set,
438
        // otherwise, we are not enabling tracing.
439
        if external {
18✔
440
                var sampling int
9✔
441
                if acc != nil {
18✔
442
                        dest, sampling = acc.getTraceDestAndSampling()
9✔
443
                }
9✔
444
                if dest == _EMPTY_ {
18✔
445
                        // No account destination, no tracing for external trace headers.
9✔
446
                        return nil
9✔
447
                }
9✔
448
                // Check sampling, but only from origin server.
449
                if c.kind == CLIENT && !sample(sampling) {
×
450
                        // Need to desactivate the traceParentHdr so that if the message
×
451
                        // is routed, it does possibly trigger a trace there.
×
452
                        disableTraceHeaders(c, hdr)
×
453
                        return nil
×
454
                }
×
455
        }
456
        c.pa.trace = &msgTrace{
×
457
                srv:  c.srv,
×
458
                acc:  acc,
×
459
                oan:  oan,
×
460
                dest: dest,
×
461
                ct:   ct,
×
462
                hop:  hop,
×
463
                event: &MsgTraceEvent{
×
464
                        Request: MsgTraceRequest{
×
465
                                Header:  headers,
×
466
                                MsgSize: c.pa.size,
×
467
                        },
×
468
                        Events: append(MsgTraceEvents(nil), &MsgTraceIngress{
×
469
                                MsgTraceBase: MsgTraceBase{
×
470
                                        Type:      MsgTraceIngressType,
×
471
                                        Timestamp: time.Now(),
×
472
                                },
×
473
                                Kind:    c.kind,
×
474
                                CID:     c.cid,
×
475
                                Name:    getConnName(c),
×
476
                                Account: ian,
×
477
                                Subject: string(c.pa.subject),
×
478
                        }),
×
479
                },
×
480
                tonly: traceOnly,
×
481
        }
×
482
        return c.pa.trace
×
483
}
484

485
func sample(sampling int) bool {
×
486
        // Option parsing should ensure that sampling is [1..100], but consider
×
487
        // any value outside of this range to be 100%.
×
488
        if sampling <= 0 || sampling >= 100 {
×
489
                return true
×
490
        }
×
491
        return rand.Int31n(100) <= int32(sampling)
×
492
}
493

494
// This function will return the header as a map (instead of http.Header because
495
// we want to preserve the header names' case) and a boolean that indicates if
496
// the headers have been lifted due to the presence of the external trace header
497
// only.
498
// Note that because of the traceParentHdr, the search is done in a case
499
// insensitive way, but if the header is found, it is rewritten in lower case
500
// as suggested by the spec, but also to make it easier to disable the header
501
// when needed.
502
func genHeaderMapIfTraceHeadersPresent(hdr []byte) (map[string][]string, bool) {
616,944✔
503

616,944✔
504
        var (
616,944✔
505
                _keys               = [64][]byte{}
616,944✔
506
                _vals               = [64][]byte{}
616,944✔
507
                m                   map[string][]string
616,944✔
508
                traceDestHdrFound   bool
616,944✔
509
                traceParentHdrFound bool
616,944✔
510
        )
616,944✔
511
        // Skip the hdrLine
616,944✔
512
        if !bytes.HasPrefix(hdr, stringToBytes(hdrLine)) {
628,174✔
513
                return nil, false
11,230✔
514
        }
11,230✔
515

516
        traceDestHdrAsBytes := stringToBytes(MsgTraceDest)
605,714✔
517
        traceParentHdrAsBytes := stringToBytes(traceParentHdr)
605,714✔
518
        crLFAsBytes := stringToBytes(CR_LF)
605,714✔
519
        dashAsBytes := stringToBytes("-")
605,714✔
520

605,714✔
521
        keys := _keys[:0]
605,714✔
522
        vals := _vals[:0]
605,714✔
523

605,714✔
524
        for i := len(hdrLine); i < len(hdr); {
1,817,595✔
525
                // Search for key/val delimiter
1,211,881✔
526
                del := bytes.IndexByte(hdr[i:], ':')
1,211,881✔
527
                if del < 0 {
1,817,595✔
528
                        break
605,714✔
529
                }
530
                keyStart := i
606,167✔
531
                key := hdr[keyStart : keyStart+del]
606,167✔
532
                i += del + 1
606,167✔
533
                valStart := i
606,167✔
534
                nl := bytes.Index(hdr[valStart:], crLFAsBytes)
606,167✔
535
                if nl < 0 {
606,167✔
536
                        break
×
537
                }
538
                if len(key) > 0 {
1,212,334✔
539
                        val := bytes.Trim(hdr[valStart:valStart+nl], " \t")
606,167✔
540
                        vals = append(vals, val)
606,167✔
541

606,167✔
542
                        // Check for the external trace header.
606,167✔
543
                        if bytes.EqualFold(key, traceParentHdrAsBytes) {
606,176✔
544
                                // Rewrite the header using lower case if needed.
9✔
545
                                if !bytes.Equal(key, traceParentHdrAsBytes) {
18✔
546
                                        copy(hdr[keyStart:], traceParentHdrAsBytes)
9✔
547
                                }
9✔
548
                                // We will now check if the value has sampling or not.
549
                                // TODO(ik): Not sure if this header can have multiple values
550
                                // or not, and if so, what would be the rule to check for
551
                                // sampling. What is done here is to check them all until we
552
                                // found one with sampling.
553
                                if !traceParentHdrFound {
18✔
554
                                        tk := bytes.Split(val, dashAsBytes)
9✔
555
                                        if len(tk) == 4 && len([]byte(tk[3])) == 2 {
18✔
556
                                                if hexVal, err := strconv.ParseInt(bytesToString(tk[3]), 16, 8); err == nil {
18✔
557
                                                        if hexVal&0x1 == 0x1 {
18✔
558
                                                                traceParentHdrFound = true
9✔
559
                                                        }
9✔
560
                                                }
561
                                        }
562
                                }
563
                                // Add to the keys with the external trace header in lower case.
564
                                keys = append(keys, traceParentHdrAsBytes)
9✔
565
                        } else {
606,158✔
566
                                // Is the key the Nats-Trace-Dest header?
606,158✔
567
                                if bytes.EqualFold(key, traceDestHdrAsBytes) {
606,158✔
568
                                        traceDestHdrFound = true
×
569
                                }
×
570
                                // Add to the keys and preserve the key's case
571
                                keys = append(keys, key)
606,158✔
572
                        }
573
                }
574
                i += nl + 2
606,167✔
575
        }
576
        if !traceDestHdrFound && !traceParentHdrFound {
1,211,419✔
577
                return nil, false
605,705✔
578
        }
605,705✔
579
        m = make(map[string][]string, len(keys))
9✔
580
        for i, k := range keys {
30✔
581
                hname := string(k)
21✔
582
                m[hname] = append(m[hname], string(vals[i]))
21✔
583
        }
21✔
584
        return m, !traceDestHdrFound && traceParentHdrFound
9✔
585
}
586

587
// Special case where we create a trace event before parsing the message.
588
// This is for cases where the connection will be closed when detecting
589
// an error during early message processing (for instance max payload).
590
func (c *client) initAndSendIngressErrEvent(hdr []byte, dest string, ingressError error) {
×
591
        if ingressError == nil {
×
592
                return
×
593
        }
×
594
        ct := getAcceptEncoding(hdr)
×
595
        t := &msgTrace{
×
596
                srv:  c.srv,
×
597
                acc:  c.acc,
×
598
                dest: dest,
×
599
                ct:   ct,
×
600
                event: &MsgTraceEvent{
×
601
                        Request: MsgTraceRequest{MsgSize: c.pa.size},
×
602
                        Events: append(MsgTraceEvents(nil), &MsgTraceIngress{
×
603
                                MsgTraceBase: MsgTraceBase{
×
604
                                        Type:      MsgTraceIngressType,
×
605
                                        Timestamp: time.Now(),
×
606
                                },
×
607
                                Kind:  c.kind,
×
608
                                CID:   c.cid,
×
609
                                Name:  getConnName(c),
×
610
                                Error: ingressError.Error(),
×
611
                        }),
×
612
                },
×
613
        }
×
614
        t.sendEvent()
×
615
}
616

617
// Returns `true` if message tracing is enabled and we are tracing only,
618
// that is, we are not going to deliver the inbound message, returns
619
// `false` otherwise (no tracing, or tracing and message delivery).
620
func (t *msgTrace) traceOnly() bool {
3,443,815✔
621
        return t != nil && t.tonly
3,443,815✔
622
}
3,443,815✔
623

624
func (t *msgTrace) setOriginAccountHeaderIfNeeded(c *client, acc *Account, msg []byte) []byte {
×
625
        var oan string
×
626
        // If t.acc is set, only check that, not t.oan.
×
627
        if t.acc != nil {
×
628
                if t.acc != acc {
×
629
                        oan = t.acc.GetName()
×
630
                }
×
631
        } else if t.oan != acc.GetName() {
×
632
                oan = t.oan
×
633
        }
×
634
        if oan != _EMPTY_ {
×
635
                msg = c.setHeader(MsgTraceOriginAccount, oan, msg)
×
636
        }
×
637
        return msg
×
638
}
639

640
func (t *msgTrace) setHopHeader(c *client, msg []byte) []byte {
×
641
        e := t.event
×
642
        e.Hops++
×
643
        if len(t.hop) > 0 {
×
644
                t.nhop = fmt.Sprintf("%s.%d", t.hop, e.Hops)
×
645
        } else {
×
646
                t.nhop = fmt.Sprintf("%d", e.Hops)
×
647
        }
×
648
        return c.setHeader(MsgTraceHop, t.nhop, msg)
×
649
}
650

651
// Will look for the MsgTraceSendTo and traceParentHdr headers and change the first
652
// character to an 'X' so that if this message is sent to a remote, the remote
653
// will not initialize tracing since it won't find the actual trace headers.
654
// The function returns the position of the headers so it can efficiently be
655
// re-enabled by calling enableTraceHeaders.
656
// Note that if `msg` can be either the header alone or the full message
657
// (header and payload). This function will use c.pa.hdr to limit the
658
// search to the header section alone.
659
func disableTraceHeaders(c *client, msg []byte) []int {
×
660
        // Code largely copied from getHeader(), except that we don't need the value
×
661
        if c.pa.hdr <= 0 {
×
662
                return []int{-1, -1}
×
663
        }
×
664
        hdr := msg[:c.pa.hdr]
×
665
        headers := [2]string{MsgTraceDest, traceParentHdr}
×
666
        positions := [2]int{-1, -1}
×
667
        for i := 0; i < 2; i++ {
×
668
                key := stringToBytes(headers[i])
×
669
                pos := bytes.Index(hdr, key)
×
670
                if pos < 0 {
×
671
                        continue
×
672
                }
673
                // Make sure this key does not have additional prefix.
674
                if pos < 2 || hdr[pos-1] != '\n' || hdr[pos-2] != '\r' {
×
675
                        continue
×
676
                }
677
                index := pos + len(key)
×
678
                if index >= len(hdr) {
×
679
                        continue
×
680
                }
681
                if hdr[index] != ':' {
×
682
                        continue
×
683
                }
684
                // Disable the trace by altering the first character of the header
685
                hdr[pos] = 'X'
×
686
                positions[i] = pos
×
687
        }
688
        // Return the positions of those characters so we can re-enable the headers.
689
        return positions[:2]
×
690
}
691

692
// Changes back the character at the given position `pos` in the `msg`
693
// byte slice to the first character of the MsgTraceSendTo header.
694
func enableTraceHeaders(msg []byte, positions []int) {
×
695
        firstChar := [2]byte{MsgTraceDest[0], traceParentHdr[0]}
×
696
        for i, pos := range positions {
×
697
                if pos == -1 {
×
698
                        continue
×
699
                }
700
                msg[pos] = firstChar[i]
×
701
        }
702
}
703

704
func (t *msgTrace) setIngressError(err string) {
×
705
        if i := t.event.Ingress(); i != nil {
×
706
                i.Error = err
×
707
        }
×
708
}
709

710
func (t *msgTrace) addSubjectMappingEvent(subj []byte) {
10,292✔
711
        if t == nil {
20,584✔
712
                return
10,292✔
713
        }
10,292✔
714
        t.event.Events = append(t.event.Events, &MsgTraceSubjectMapping{
×
715
                MsgTraceBase: MsgTraceBase{
×
716
                        Type:      MsgTraceSubjectMappingType,
×
717
                        Timestamp: time.Now(),
×
718
                },
×
719
                MappedTo: string(subj),
×
720
        })
×
721
}
722

723
func (t *msgTrace) addEgressEvent(dc *client, sub *subscription, err string) {
1,307,613✔
724
        if t == nil {
2,615,226✔
725
                return
1,307,613✔
726
        }
1,307,613✔
727
        e := &MsgTraceEgress{
×
728
                MsgTraceBase: MsgTraceBase{
×
729
                        Type:      MsgTraceEgressType,
×
730
                        Timestamp: time.Now(),
×
731
                },
×
732
                Kind:  dc.kind,
×
733
                CID:   dc.cid,
×
734
                Name:  getConnName(dc),
×
735
                Hop:   t.nhop,
×
736
                Error: err,
×
737
        }
×
738
        t.nhop = _EMPTY_
×
739
        // Specific to CLIENT connections...
×
740
        if dc.kind == CLIENT {
×
741
                // Set the subscription's subject and possibly queue name.
×
742
                e.Subscription = string(sub.subject)
×
743
                if len(sub.queue) > 0 {
×
744
                        e.Queue = string(sub.queue)
×
745
                }
×
746
        }
747
        if dc.kind == CLIENT || dc.kind == LEAF {
×
748
                if i := t.event.Ingress(); i != nil {
×
749
                        // If the Ingress' account is different from the destination's
×
750
                        // account, add the account name into the Egress trace event.
×
751
                        // This would happen with service imports.
×
752
                        if dcAccName := dc.acc.GetName(); dcAccName != i.Account {
×
753
                                e.Account = dcAccName
×
754
                        }
×
755
                }
756
        }
757
        t.event.Events = append(t.event.Events, e)
×
758
}
759

760
func (t *msgTrace) addStreamExportEvent(dc *client, to []byte) {
×
761
        if t == nil {
×
762
                return
×
763
        }
×
764
        dc.mu.Lock()
×
765
        accName := dc.acc.GetName()
×
766
        dc.mu.Unlock()
×
767
        t.event.Events = append(t.event.Events, &MsgTraceStreamExport{
×
768
                MsgTraceBase: MsgTraceBase{
×
769
                        Type:      MsgTraceStreamExportType,
×
770
                        Timestamp: time.Now(),
×
771
                },
×
772
                Account: accName,
×
773
                To:      string(to),
×
774
        })
×
775
}
776

777
func (t *msgTrace) addServiceImportEvent(accName, from, to string) {
×
778
        if t == nil {
×
779
                return
×
780
        }
×
781
        t.event.Events = append(t.event.Events, &MsgTraceServiceImport{
×
782
                MsgTraceBase: MsgTraceBase{
×
783
                        Type:      MsgTraceServiceImportType,
×
784
                        Timestamp: time.Now(),
×
785
                },
×
786
                Account: accName,
×
787
                From:    from,
×
788
                To:      to,
×
789
        })
×
790
}
791

792
func (t *msgTrace) addJetStreamEvent(streamName string) {
×
793
        if t == nil {
×
794
                return
×
795
        }
×
796
        t.js = &MsgTraceJetStream{
×
797
                MsgTraceBase: MsgTraceBase{
×
798
                        Type:      MsgTraceJetStreamType,
×
799
                        Timestamp: time.Now(),
×
800
                },
×
801
                Stream: streamName,
×
802
        }
×
803
        t.event.Events = append(t.event.Events, t.js)
×
804
}
805

806
func (t *msgTrace) updateJetStreamEvent(subject string, noInterest bool) {
2,528,802✔
807
        if t == nil {
5,057,604✔
808
                return
2,528,802✔
809
        }
2,528,802✔
810
        // JetStream event should have been created in addJetStreamEvent
811
        if t.js == nil {
×
812
                return
×
813
        }
×
814
        t.js.Subject = subject
×
815
        t.js.NoInterest = noInterest
×
816
        // Update the timestamp since this is more accurate than when it
×
817
        // was first added in addJetStreamEvent().
×
818
        t.js.Timestamp = time.Now()
×
819
}
820

821
func (t *msgTrace) sendEventFromJetStream(err error) {
×
822
        if t == nil {
×
823
                return
×
824
        }
×
825
        // JetStream event should have been created in addJetStreamEvent
826
        if t.js == nil {
×
827
                return
×
828
        }
×
829
        if err != nil {
×
830
                t.js.Error = err.Error()
×
831
        }
×
832
        t.sendEvent()
×
833
}
834

835
func (t *msgTrace) sendEvent() {
16,421,897✔
836
        if t == nil {
32,843,794✔
837
                return
16,421,897✔
838
        }
16,421,897✔
839
        if t.js != nil {
×
840
                ready := atomic.AddInt32(&t.ready, 1) == 2
×
841
                if !ready {
×
842
                        return
×
843
                }
×
844
        }
845
        t.srv.sendInternalAccountSysMsg(t.acc, t.dest, &t.event.Server, t.event, t.ct)
×
846
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc