• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 23526005847

24 Mar 2026 12:20PM UTC coverage: 83.09% (+1.3%) from 81.839%
23526005847

push

github

neilalexander
Remove FIXME about auth callout nonce

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>

75563 of 90941 relevant lines covered (83.09%)

368595.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.64
/server/msgtrace.go
1
// Copyright 2024-2026 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "encoding/json"
19
        "fmt"
20
        "math/rand"
21
        "strconv"
22
        "strings"
23
        "sync/atomic"
24
        "time"
25
)
26

27
const (
28
        MsgTraceDest          = "Nats-Trace-Dest"
29
        MsgTraceDestDisabled  = "trace disabled" // This must be an invalid NATS subject
30
        MsgTraceHop           = "Nats-Trace-Hop"
31
        MsgTraceOriginAccount = "Nats-Trace-Origin-Account"
32
        MsgTraceOnly          = "Nats-Trace-Only"
33

34
        // External trace header. Note that this header is normally in lower
35
        // case (https://www.w3.org/TR/trace-context/#header-name). Vendors
36
        // MUST expect the header in any case (upper, lower, mixed), and
37
        // SHOULD send the header name in lowercase. We used to change it
38
        // to lower case, but no longer do that in 2.14.
39
        traceParentHdr = "traceparent"
40
)
41

42
var (
43
        traceDestHdrAsBytes      = stringToBytes(MsgTraceDest)
44
        traceDestDisabledAsBytes = stringToBytes(MsgTraceDestDisabled)
45
        traceParentHdrAsBytes    = stringToBytes(traceParentHdr)
46
        crLFAsBytes              = stringToBytes(CR_LF)
47
        dashAsBytes              = stringToBytes("-")
48
)
49

50
type MsgTraceType string
51

52
// Type of message trace events in the MsgTraceEvents list.
53
// This is needed to unmarshal the list.
54
const (
55
        MsgTraceIngressType        = "in"
56
        MsgTraceSubjectMappingType = "sm"
57
        MsgTraceStreamExportType   = "se"
58
        MsgTraceServiceImportType  = "si"
59
        MsgTraceJetStreamType      = "js"
60
        MsgTraceEgressType         = "eg"
61
)
62

63
type MsgTraceEvent struct {
64
        Server  ServerInfo      `json:"server"`
65
        Request MsgTraceRequest `json:"request"`
66
        Hops    int             `json:"hops,omitempty"`
67
        Events  MsgTraceEvents  `json:"events"`
68
}
69

70
type MsgTraceRequest struct {
71
        // We are not making this an http.Header so that header name case is preserved.
72
        Header  map[string][]string `json:"header,omitempty"`
73
        MsgSize int                 `json:"msgsize,omitempty"`
74
}
75

76
type MsgTraceEvents []MsgTrace
77

78
type MsgTrace interface {
79
        new() MsgTrace
80
        typ() MsgTraceType
81
}
82

83
type MsgTraceBase struct {
84
        Type      MsgTraceType `json:"type"`
85
        Timestamp time.Time    `json:"ts"`
86
}
87

88
type MsgTraceIngress struct {
89
        MsgTraceBase
90
        Kind    int    `json:"kind"`
91
        CID     uint64 `json:"cid"`
92
        Name    string `json:"name,omitempty"`
93
        Account string `json:"acc"`
94
        Subject string `json:"subj"`
95
        Error   string `json:"error,omitempty"`
96
}
97

98
type MsgTraceSubjectMapping struct {
99
        MsgTraceBase
100
        MappedTo string `json:"to"`
101
}
102

103
type MsgTraceStreamExport struct {
104
        MsgTraceBase
105
        Account string `json:"acc"`
106
        To      string `json:"to"`
107
}
108

109
type MsgTraceServiceImport struct {
110
        MsgTraceBase
111
        Account string `json:"acc"`
112
        From    string `json:"from"`
113
        To      string `json:"to"`
114
}
115

116
type MsgTraceJetStream struct {
117
        MsgTraceBase
118
        Stream     string `json:"stream"`
119
        Subject    string `json:"subject,omitempty"`
120
        NoInterest bool   `json:"nointerest,omitempty"`
121
        Error      string `json:"error,omitempty"`
122
}
123

124
type MsgTraceEgress struct {
125
        MsgTraceBase
126
        Kind         int    `json:"kind"`
127
        CID          uint64 `json:"cid"`
128
        Name         string `json:"name,omitempty"`
129
        Hop          string `json:"hop,omitempty"`
130
        Account      string `json:"acc,omitempty"`
131
        Subscription string `json:"sub,omitempty"`
132
        Queue        string `json:"queue,omitempty"`
133
        Error        string `json:"error,omitempty"`
134

135
        // This is for applications that unmarshal the trace events
136
        // and want to link an egress to route/leaf/gateway with
137
        // the MsgTraceEvent from that server.
138
        Link *MsgTraceEvent `json:"-"`
139
}
140

141
// -------------------------------------------------------------
142

143
func (t MsgTraceBase) typ() MsgTraceType     { return t.Type }
1,028✔
144
func (MsgTraceIngress) new() MsgTrace        { return &MsgTraceIngress{} }
2,931✔
145
func (MsgTraceSubjectMapping) new() MsgTrace { return &MsgTraceSubjectMapping{} }
14✔
146
func (MsgTraceStreamExport) new() MsgTrace   { return &MsgTraceStreamExport{} }
31✔
147
func (MsgTraceServiceImport) new() MsgTrace  { return &MsgTraceServiceImport{} }
35✔
148
func (MsgTraceJetStream) new() MsgTrace      { return &MsgTraceJetStream{} }
47✔
149
func (MsgTraceEgress) new() MsgTrace         { return &MsgTraceEgress{} }
2,004✔
150

151
var msgTraceInterfaces = map[MsgTraceType]MsgTrace{
152
        MsgTraceIngressType:        MsgTraceIngress{},
153
        MsgTraceSubjectMappingType: MsgTraceSubjectMapping{},
154
        MsgTraceStreamExportType:   MsgTraceStreamExport{},
155
        MsgTraceServiceImportType:  MsgTraceServiceImport{},
156
        MsgTraceJetStreamType:      MsgTraceJetStream{},
157
        MsgTraceEgressType:         MsgTraceEgress{},
158
}
159

160
func (t *MsgTraceEvents) UnmarshalJSON(data []byte) error {
2,931✔
161
        var raw []json.RawMessage
2,931✔
162
        err := json.Unmarshal(data, &raw)
2,931✔
163
        if err != nil {
2,931✔
164
                return err
×
165
        }
×
166
        *t = make(MsgTraceEvents, len(raw))
2,931✔
167
        var tt MsgTraceBase
2,931✔
168
        for i, r := range raw {
7,993✔
169
                if err = json.Unmarshal(r, &tt); err != nil {
5,062✔
170
                        return err
×
171
                }
×
172
                tr, ok := msgTraceInterfaces[tt.Type]
5,062✔
173
                if !ok {
5,062✔
174
                        return fmt.Errorf("unknown trace type %v", tt.Type)
×
175
                }
×
176
                te := tr.new()
5,062✔
177
                if err := json.Unmarshal(r, te); err != nil {
5,062✔
178
                        return err
×
179
                }
×
180
                (*t)[i] = te
5,062✔
181
        }
182
        return nil
2,931✔
183
}
184

185
func getTraceAs[T MsgTrace](e any) *T {
3,373✔
186
        v, ok := e.(*T)
3,373✔
187
        if ok {
6,746✔
188
                return v
3,373✔
189
        }
3,373✔
190
        return nil
×
191
}
192

193
func (t *MsgTraceEvent) Ingress() *MsgTraceIngress {
3,041✔
194
        if len(t.Events) < 1 {
3,041✔
195
                return nil
×
196
        }
×
197
        return getTraceAs[MsgTraceIngress](t.Events[0])
3,041✔
198
}
199

200
func (t *MsgTraceEvent) SubjectMapping() *MsgTraceSubjectMapping {
49✔
201
        for _, e := range t.Events {
176✔
202
                if e.typ() == MsgTraceSubjectMappingType {
141✔
203
                        return getTraceAs[MsgTraceSubjectMapping](e)
14✔
204
                }
14✔
205
        }
206
        return nil
35✔
207
}
208

209
func (t *MsgTraceEvent) StreamExports() []*MsgTraceStreamExport {
30✔
210
        var se []*MsgTraceStreamExport
30✔
211
        for _, e := range t.Events {
120✔
212
                if e.typ() == MsgTraceStreamExportType {
121✔
213
                        se = append(se, getTraceAs[MsgTraceStreamExport](e))
31✔
214
                }
31✔
215
        }
216
        return se
30✔
217
}
218

219
func (t *MsgTraceEvent) ServiceImports() []*MsgTraceServiceImport {
46✔
220
        var si []*MsgTraceServiceImport
46✔
221
        for _, e := range t.Events {
212✔
222
                if e.typ() == MsgTraceServiceImportType {
201✔
223
                        si = append(si, getTraceAs[MsgTraceServiceImport](e))
35✔
224
                }
35✔
225
        }
226
        return si
46✔
227
}
228

229
func (t *MsgTraceEvent) JetStream() *MsgTraceJetStream {
84✔
230
        for _, e := range t.Events {
251✔
231
                if e.typ() == MsgTraceJetStreamType {
250✔
232
                        return getTraceAs[MsgTraceJetStream](e)
83✔
233
                }
83✔
234
        }
235
        return nil
1✔
236
}
237

238
func (t *MsgTraceEvent) Egresses() []*MsgTraceEgress {
182✔
239
        var eg []*MsgTraceEgress
182✔
240
        for _, e := range t.Events {
660✔
241
                if e.typ() == MsgTraceEgressType {
647✔
242
                        eg = append(eg, getTraceAs[MsgTraceEgress](e))
169✔
243
                }
169✔
244
        }
245
        return eg
182✔
246
}
247

248
const (
249
        errMsgTraceOnlyNoSupport   = "Not delivered because remote does not support message tracing"
250
        errMsgTraceNoSupport       = "Message delivered but remote does not support message tracing so no trace event generated from there"
251
        errMsgTraceNoEcho          = "Not delivered because of no echo"
252
        errMsgTracePubViolation    = "Not delivered because publish denied for this subject"
253
        errMsgTraceSubDeny         = "Not delivered because subscription denies this subject"
254
        errMsgTraceSubClosed       = "Not delivered because subscription is closed"
255
        errMsgTraceClientClosed    = "Not delivered because client is closed"
256
        errMsgTraceAutoSubExceeded = "Not delivered because auto-unsubscribe exceeded"
257
        errMsgTraceFastProdNoStall = "Not delivered because fast producer not stalled and consumer is slow"
258
)
259

260
type msgTrace struct {
261
        ready int32
262
        srv   *Server
263
        acc   *Account
264
        // Origin account name, set only if acc is nil when acc lookup failed.
265
        oan   string
266
        dest  string
267
        event *MsgTraceEvent
268
        js    *MsgTraceJetStream
269
        hop   string
270
        nhop  string
271
        tonly bool // Will only trace the message, not do delivery.
272
        ct    compressionType
273
}
274

275
// This will be false outside of the tests, so when building the server binary,
276
// any code where you see `if msgTraceRunInTests` statement will be compiled
277
// out, so this will have no performance penalty.
278
var (
279
        msgTraceRunInTests   bool
280
        msgTraceCheckSupport bool
281
)
282

283
// Returns the message trace object, if message is being traced,
284
// and `true` if we want to only trace, not actually deliver the message.
285
func (c *client) isMsgTraceEnabled() (*msgTrace, bool) {
38,839,723✔
286
        t := c.pa.trace
38,839,723✔
287
        if t == nil {
77,675,204✔
288
                return nil, false
38,835,481✔
289
        }
38,835,481✔
290
        return t, t.tonly
4,242✔
291
}
292

293
// For LEAF/ROUTER/GATEWAY, return false if the remote does not support
294
// message tracing (important if the tracing requests trace-only).
295
func (c *client) msgTraceSupport() bool {
1,998✔
296
        // Exclude client connection from the protocol check.
1,998✔
297
        return c.kind == CLIENT || c.opts.Protocol >= MsgTraceProto
1,998✔
298
}
1,998✔
299

300
func getConnName(c *client) string {
4,942✔
301
        switch c.kind {
4,942✔
302
        case ROUTER:
1,878✔
303
                if n := c.route.remoteName; n != _EMPTY_ {
3,755✔
304
                        return n
1,877✔
305
                }
1,877✔
306
        case GATEWAY:
48✔
307
                if n := c.gw.remoteName; n != _EMPTY_ {
85✔
308
                        return n
37✔
309
                }
37✔
310
        case LEAF:
56✔
311
                if n := c.leaf.remoteServer; n != _EMPTY_ {
111✔
312
                        return n
55✔
313
                }
55✔
314
        }
315
        return c.opts.Name
2,973✔
316
}
317

318
func getCompressionType(cts string) compressionType {
2,929✔
319
        if cts == _EMPTY_ {
5,854✔
320
                return noCompression
2,925✔
321
        }
2,925✔
322
        cts = strings.ToLower(cts)
4✔
323
        if strings.Contains(cts, "snappy") || strings.Contains(cts, "s2") {
6✔
324
                return snappyCompression
2✔
325
        }
2✔
326
        if strings.Contains(cts, "gzip") {
3✔
327
                return gzipCompression
1✔
328
        }
1✔
329
        return unsupportedCompression
1✔
330
}
331

332
func (c *client) initMsgTrace() *msgTrace {
724,860✔
333
        // The code in the "if" statement is only running in test mode.
724,860✔
334
        if msgTraceRunInTests {
1,447,807✔
335
                // Check the type of client that tries to initialize a trace struct.
722,947✔
336
                if !(c.kind == CLIENT || c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF) {
722,947✔
337
                        panic(fmt.Sprintf("Unexpected client type %q trying to initialize msgTrace", c.kindString()))
×
338
                }
339
                // In some tests, we want to make a server behave like an old server
340
                // and so even if a trace header is received, we want the server to
341
                // simply ignore it.
342
                if msgTraceCheckSupport {
722,959✔
343
                        if c.srv == nil || c.srv.getServerProto() < MsgTraceProto {
16✔
344
                                return nil
4✔
345
                        }
4✔
346
                }
347
        }
348
        if c.pa.hdr <= 0 {
724,856✔
349
                return nil
×
350
        }
×
351
        hdr := c.msgBuf[:c.pa.hdr]
724,856✔
352
        headers, external := genHeaderMapIfTraceHeadersPresent(hdr)
724,856✔
353
        if len(headers) == 0 {
1,446,169✔
354
                return nil
721,313✔
355
        }
721,313✔
356
        // Little helper to give us the first value of a given header, or _EMPTY_
357
        // if key is not present.
358
        getHdrVal := func(key string) string {
8,890✔
359
                vv, ok := headers[key]
5,347✔
360
                if !ok {
9,343✔
361
                        return _EMPTY_
3,996✔
362
                }
3,996✔
363
                return vv[0]
1,351✔
364
        }
365
        var (
3,543✔
366
                dest      string
3,543✔
367
                traceOnly bool
3,543✔
368
        )
3,543✔
369
        // Check for traceOnly only if not external.
3,543✔
370
        if !external {
3,767✔
371
                if to := getHdrVal(MsgTraceOnly); to != _EMPTY_ {
357✔
372
                        tos := strings.ToLower(to)
133✔
373
                        switch tos {
133✔
374
                        case "1", "true", "on":
133✔
375
                                traceOnly = true
133✔
376
                        }
377
                }
378
                dest = getHdrVal(MsgTraceDest)
224✔
379
                if c.kind == CLIENT {
364✔
380
                        if td, ok := c.allowedMsgTraceDest(hdr, false); !ok {
145✔
381
                                return nil
5✔
382
                        } else if td != _EMPTY_ {
275✔
383
                                dest = td
135✔
384
                        }
135✔
385
                }
386
                // Check the destination to see if this is a valid public subject.
387
                if !IsValidPublishSubject(dest) {
219✔
388
                        // We still have to return a msgTrace object (if traceOnly is set)
×
389
                        // because if we don't, the message will end-up being delivered to
×
390
                        // applications, which may break them. We report the error in any case.
×
391
                        c.Errorf("Destination %q is not valid, won't be able to trace events", dest)
×
392
                        if !traceOnly {
×
393
                                // We can bail, tracing will be disabled for this message.
×
394
                                return nil
×
395
                        }
×
396
                }
397
        }
398
        var (
3,538✔
399
                // Account to use when sending the trace event
3,538✔
400
                acc *Account
3,538✔
401
                // Ingress' account name
3,538✔
402
                ian string
3,538✔
403
                // Origin account name
3,538✔
404
                oan string
3,538✔
405
                // The hop "id", taken from headers only when not from CLIENT
3,538✔
406
                hop string
3,538✔
407
        )
3,538✔
408
        if c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF {
4,523✔
409
                // The ingress account name will always be c.pa.account, but `acc` may
985✔
410
                // be different if we have an origin account header.
985✔
411
                if c.kind == LEAF {
1,010✔
412
                        ian = c.acc.GetName()
25✔
413
                } else {
985✔
414
                        ian = string(c.pa.account)
960✔
415
                }
960✔
416
                // The remote will have set the origin account header only if the
417
                // message changed account (think of service imports).
418
                oan = getHdrVal(MsgTraceOriginAccount)
985✔
419
                if oan == _EMPTY_ {
1,964✔
420
                        // For LEAF or ROUTER with pinned-account, we can use the c.acc.
979✔
421
                        if c.kind == LEAF || (c.kind == ROUTER && len(c.route.accName) > 0) {
1,004✔
422
                                acc = c.acc
25✔
423
                        } else {
979✔
424
                                // We will lookup account with c.pa.account (or ian).
954✔
425
                                oan = ian
954✔
426
                        }
954✔
427
                }
428
                // Unless we already got the account, we need to look it up.
429
                if acc == nil {
1,945✔
430
                        // We don't want to do account resolving here.
960✔
431
                        if acci, ok := c.srv.accounts.Load(oan); ok {
1,920✔
432
                                acc = acci.(*Account)
960✔
433
                                // Since we have looked-up the account, we don't need oan, so
960✔
434
                                // clear it in case it was set.
960✔
435
                                oan = _EMPTY_
960✔
436
                        } else {
960✔
437
                                // We still have to return a msgTrace object (if traceOnly is set)
×
438
                                // because if we don't, the message will end-up being delivered to
×
439
                                // applications, which may break them. We report the error in any case.
×
440
                                c.Errorf("Account %q was not found, won't be able to trace events", oan)
×
441
                                if !traceOnly {
×
442
                                        // We can bail, tracing will be disabled for this message.
×
443
                                        return nil
×
444
                                }
×
445
                        }
446
                }
447
                // Check the hop header
448
                hop = getHdrVal(MsgTraceHop)
985✔
449
        } else {
2,553✔
450
                acc = c.acc
2,553✔
451
                ian = acc.GetName()
2,553✔
452
        }
2,553✔
453
        // If external, we need to have the account's trace destination set,
454
        // otherwise, we are not enabling tracing.
455
        if external {
6,857✔
456
                var sampling int
3,319✔
457
                if acc != nil {
6,638✔
458
                        dest, sampling = acc.getTraceDestAndSampling()
3,319✔
459
                }
3,319✔
460
                if dest == _EMPTY_ {
3,332✔
461
                        // No account destination, no tracing for external trace headers.
13✔
462
                        return nil
13✔
463
                }
13✔
464
                // Check sampling, but only from origin server.
465
                if c.kind == CLIENT && !sample(sampling) {
3,902✔
466
                        // Need to disable tracing so that if the message is routed, it won't
596✔
467
                        // trigger a trace there.
596✔
468
                        c.msgBuf = c.setHeader(MsgTraceDest, MsgTraceDestDisabled, c.msgBuf)
596✔
469
                        return nil
596✔
470
                }
596✔
471
        }
472
        c.pa.trace = &msgTrace{
2,929✔
473
                srv:  c.srv,
2,929✔
474
                acc:  acc,
2,929✔
475
                oan:  oan,
2,929✔
476
                dest: dest,
2,929✔
477
                ct:   getCompressionType(getHdrVal(acceptEncodingHeader)),
2,929✔
478
                hop:  hop,
2,929✔
479
                event: &MsgTraceEvent{
2,929✔
480
                        Request: MsgTraceRequest{
2,929✔
481
                                Header:  headers,
2,929✔
482
                                MsgSize: c.pa.size,
2,929✔
483
                        },
2,929✔
484
                        Events: append(MsgTraceEvents(nil), &MsgTraceIngress{
2,929✔
485
                                MsgTraceBase: MsgTraceBase{
2,929✔
486
                                        Type:      MsgTraceIngressType,
2,929✔
487
                                        Timestamp: time.Now(),
2,929✔
488
                                },
2,929✔
489
                                Kind:    c.kind,
2,929✔
490
                                CID:     c.cid,
2,929✔
491
                                Name:    getConnName(c),
2,929✔
492
                                Account: ian,
2,929✔
493
                                Subject: string(c.pa.subject),
2,929✔
494
                        }),
2,929✔
495
                },
2,929✔
496
                tonly: traceOnly,
2,929✔
497
        }
2,929✔
498
        return c.pa.trace
2,929✔
499
}
500

501
func sample(sampling int) bool {
2,406✔
502
        // Option parsing should ensure that sampling is [1..100], but consider
2,406✔
503
        // any value outside of this range to be 100%.
2,406✔
504
        if sampling <= 0 || sampling >= 100 {
4,012✔
505
                return true
1,606✔
506
        }
1,606✔
507
        return rand.Int31n(100) <= int32(sampling)
800✔
508
}
509

510
// This function will return the header as a map (instead of http.Header because
511
// we want to preserve the header names' case) and a boolean that indicates if
512
// the headers have been lifted due to the presence of the external trace header
513
// only.
514
// Note that because of the traceParentHdr, the search is done in a case
515
// insensitive way. We used to rewrite it in lower case but no longer do since v2.14.
516
func genHeaderMapIfTraceHeadersPresent(hdr []byte) (map[string][]string, bool) {
724,887✔
517

724,887✔
518
        var (
724,887✔
519
                _keys               = [64][]byte{}
724,887✔
520
                _vals               = [64][]byte{}
724,887✔
521
                m                   map[string][]string
724,887✔
522
                traceDestHdrFound   bool
724,887✔
523
                traceParentHdrFound bool
724,887✔
524
        )
724,887✔
525
        // Skip the hdrLine
724,887✔
526
        if !bytes.HasPrefix(hdr, stringToBytes(hdrLine)) {
725,426✔
527
                return nil, false
539✔
528
        }
539✔
529

530
        keys := _keys[:0]
724,348✔
531
        vals := _vals[:0]
724,348✔
532

724,348✔
533
        for i := len(hdrLine); i < len(hdr); {
2,309,262✔
534
                // Search for key/val delimiter
1,584,914✔
535
                del := bytes.IndexByte(hdr[i:], ':')
1,584,914✔
536
                if del < 0 {
2,308,924✔
537
                        break
724,010✔
538
                }
539
                keyStart := i
860,904✔
540
                key := hdr[keyStart : keyStart+del]
860,904✔
541
                i += del + 1
860,904✔
542
                for i < len(hdr) && (hdr[i] == ' ' || hdr[i] == '\t') {
1,721,609✔
543
                        i++
860,705✔
544
                }
860,705✔
545
                valStart := i
860,904✔
546
                nl := bytes.Index(hdr[valStart:], crLFAsBytes)
860,904✔
547
                if nl < 0 {
860,908✔
548
                        break
4✔
549
                }
550
                valEnd := valStart + nl
860,900✔
551
                for valEnd > valStart && (hdr[valEnd-1] == ' ' || hdr[valEnd-1] == '\t') {
860,906✔
552
                        valEnd--
6✔
553
                }
6✔
554
                val := hdr[valStart:valEnd]
860,900✔
555
                if len(key) > 0 && len(val) > 0 {
1,721,796✔
556
                        vals = append(vals, val)
860,896✔
557

860,896✔
558
                        // We search for our special keys only if not already found.
860,896✔
559

860,896✔
560
                        // Check for the external trace header.
860,896✔
561
                        // Search needs to be case insensitive.
860,896✔
562
                        if !traceParentHdrFound && bytes.EqualFold(key, traceParentHdrAsBytes) {
864,621✔
563
                                // We will now check if the value has sampling or not.
3,725✔
564
                                // TODO(ik): Not sure if this header can have multiple values
3,725✔
565
                                // or not, and if so, what would be the rule to check for
3,725✔
566
                                // sampling. What is done here is to check them all until we
3,725✔
567
                                // found one with sampling.
3,725✔
568
                                tk := bytes.Split(val, dashAsBytes)
3,725✔
569
                                if len(tk) == 4 && len([]byte(tk[3])) == 2 {
7,449✔
570
                                        if hexVal, err := strconv.ParseInt(bytesToString(tk[3]), 16, 8); err == nil {
7,448✔
571
                                                if hexVal&0x1 == 0x1 {
7,436✔
572
                                                        traceParentHdrFound = true
3,712✔
573
                                                }
3,712✔
574
                                        }
575
                                }
576
                        } else if !traceDestHdrFound && bytes.Equal(key, traceDestHdrAsBytes) {
857,711✔
577
                                // This is the Nats-Trace-Dest header, check the value to see
540✔
578
                                // if it indicates that the trace was disabled.
540✔
579
                                if bytes.Equal(val, traceDestDisabledAsBytes) {
848✔
580
                                        return nil, false
308✔
581
                                }
308✔
582
                                traceDestHdrFound = true
232✔
583
                        }
584
                        // Add to the keys and preserve the key's case
585
                        keys = append(keys, key)
860,588✔
586
                }
587
                i += nl + 2
860,592✔
588
        }
589
        if !traceDestHdrFound && !traceParentHdrFound {
1,444,523✔
590
                return nil, false
720,483✔
591
        }
720,483✔
592
        m = make(map[string][]string, len(keys))
3,557✔
593
        for i, k := range keys {
8,421✔
594
                hname := string(k)
4,864✔
595
                m[hname] = append(m[hname], string(vals[i]))
4,864✔
596
        }
4,864✔
597
        return m, !traceDestHdrFound && traceParentHdrFound
3,557✔
598
}
599

600
// Special case where we create a trace event before parsing the message.
601
// This is for cases where the connection will be closed when detecting
602
// an error during early message processing (for instance max payload).
603
func (c *client) initAndSendIngressErrEvent(hdr []byte, dest string, ingressError error) {
2✔
604
        if ingressError == nil {
2✔
605
                return
×
606
        }
×
607
        ct := getAcceptEncoding(hdr)
2✔
608
        t := &msgTrace{
2✔
609
                srv:  c.srv,
2✔
610
                acc:  c.acc,
2✔
611
                dest: dest,
2✔
612
                ct:   ct,
2✔
613
                event: &MsgTraceEvent{
2✔
614
                        Request: MsgTraceRequest{MsgSize: c.pa.size},
2✔
615
                        Events: append(MsgTraceEvents(nil), &MsgTraceIngress{
2✔
616
                                MsgTraceBase: MsgTraceBase{
2✔
617
                                        Type:      MsgTraceIngressType,
2✔
618
                                        Timestamp: time.Now(),
2✔
619
                                },
2✔
620
                                Kind:  c.kind,
2✔
621
                                CID:   c.cid,
2✔
622
                                Name:  getConnName(c),
2✔
623
                                Error: ingressError.Error(),
2✔
624
                        }),
2✔
625
                },
2✔
626
        }
2✔
627
        t.sendEvent()
2✔
628
}
629

630
// Returns `true` if message tracing is enabled and we are tracing only,
631
// that is, we are not going to deliver the inbound message, returns
632
// `false` otherwise (no tracing, or tracing and message delivery).
633
func (t *msgTrace) traceOnly() bool {
1,963,439✔
634
        return t != nil && t.tonly
1,963,439✔
635
}
1,963,439✔
636

637
func (t *msgTrace) setOriginAccountHeaderIfNeeded(c *client, acc *Account, msg []byte) []byte {
1,018✔
638
        var oan string
1,018✔
639
        // If t.acc is set, only check that, not t.oan.
1,018✔
640
        if t.acc != nil {
2,036✔
641
                if t.acc != acc {
1,028✔
642
                        oan = t.acc.GetName()
10✔
643
                }
10✔
644
        } else if t.oan != acc.GetName() {
×
645
                oan = t.oan
×
646
        }
×
647
        if oan != _EMPTY_ {
1,028✔
648
                msg = c.setHeader(MsgTraceOriginAccount, oan, msg)
10✔
649
        }
10✔
650
        return msg
1,018✔
651
}
652

653
func (t *msgTrace) setHopHeader(c *client, msg []byte) []byte {
992✔
654
        e := t.event
992✔
655
        e.Hops++
992✔
656
        if len(t.hop) > 0 {
1,009✔
657
                t.nhop = fmt.Sprintf("%s.%d", t.hop, e.Hops)
17✔
658
        } else {
992✔
659
                t.nhop = fmt.Sprintf("%d", e.Hops)
975✔
660
        }
975✔
661
        return c.setHeader(MsgTraceHop, t.nhop, msg)
992✔
662
}
663

664
func (t *msgTrace) setIngressError(err string) {
6✔
665
        if i := t.event.Ingress(); i != nil {
12✔
666
                i.Error = err
6✔
667
        }
6✔
668
}
669

670
func (t *msgTrace) addSubjectMappingEvent(subj []byte) {
12,591✔
671
        if t == nil {
25,168✔
672
                return
12,577✔
673
        }
12,577✔
674
        t.event.Events = append(t.event.Events, &MsgTraceSubjectMapping{
14✔
675
                MsgTraceBase: MsgTraceBase{
14✔
676
                        Type:      MsgTraceSubjectMappingType,
14✔
677
                        Timestamp: time.Now(),
14✔
678
                },
14✔
679
                MappedTo: string(subj),
14✔
680
        })
14✔
681
}
682

683
func (t *msgTrace) addEgressEvent(dc *client, sub *subscription, err string) {
1,353,399✔
684
        if t == nil {
2,704,794✔
685
                return
1,351,395✔
686
        }
1,351,395✔
687
        e := &MsgTraceEgress{
2,004✔
688
                MsgTraceBase: MsgTraceBase{
2,004✔
689
                        Type:      MsgTraceEgressType,
2,004✔
690
                        Timestamp: time.Now(),
2,004✔
691
                },
2,004✔
692
                Kind:  dc.kind,
2,004✔
693
                CID:   dc.cid,
2,004✔
694
                Name:  getConnName(dc),
2,004✔
695
                Hop:   t.nhop,
2,004✔
696
                Error: err,
2,004✔
697
        }
2,004✔
698
        t.nhop = _EMPTY_
2,004✔
699
        // Specific to CLIENT connections...
2,004✔
700
        if dc.kind == CLIENT {
3,016✔
701
                // Set the subscription's subject and possibly queue name.
1,012✔
702
                e.Subscription = string(sub.subject)
1,012✔
703
                if len(sub.queue) > 0 {
1,050✔
704
                        e.Queue = string(sub.queue)
38✔
705
                }
38✔
706
        }
707
        if dc.kind == CLIENT || dc.kind == LEAF {
3,045✔
708
                if i := t.event.Ingress(); i != nil {
2,082✔
709
                        // If the Ingress' account is different from the destination's
1,041✔
710
                        // account, add the account name into the Egress trace event.
1,041✔
711
                        // This would happen with service imports.
1,041✔
712
                        if dcAccName := dc.acc.GetName(); dcAccName != i.Account {
1,069✔
713
                                e.Account = dcAccName
28✔
714
                        }
28✔
715
                }
716
        }
717
        t.event.Events = append(t.event.Events, e)
2,004✔
718
}
719

720
func (t *msgTrace) addStreamExportEvent(dc *client, to []byte) {
31✔
721
        if t == nil {
31✔
722
                return
×
723
        }
×
724
        dc.mu.Lock()
31✔
725
        accName := dc.acc.GetName()
31✔
726
        dc.mu.Unlock()
31✔
727
        t.event.Events = append(t.event.Events, &MsgTraceStreamExport{
31✔
728
                MsgTraceBase: MsgTraceBase{
31✔
729
                        Type:      MsgTraceStreamExportType,
31✔
730
                        Timestamp: time.Now(),
31✔
731
                },
31✔
732
                Account: accName,
31✔
733
                To:      string(to),
31✔
734
        })
31✔
735
}
736

737
func (t *msgTrace) addServiceImportEvent(accName, from, to string) {
35✔
738
        if t == nil {
35✔
739
                return
×
740
        }
×
741
        t.event.Events = append(t.event.Events, &MsgTraceServiceImport{
35✔
742
                MsgTraceBase: MsgTraceBase{
35✔
743
                        Type:      MsgTraceServiceImportType,
35✔
744
                        Timestamp: time.Now(),
35✔
745
                },
35✔
746
                Account: accName,
35✔
747
                From:    from,
35✔
748
                To:      to,
35✔
749
        })
35✔
750
}
751

752
func (t *msgTrace) addJetStreamEvent(streamName string) {
47✔
753
        if t == nil {
47✔
754
                return
×
755
        }
×
756
        t.js = &MsgTraceJetStream{
47✔
757
                MsgTraceBase: MsgTraceBase{
47✔
758
                        Type:      MsgTraceJetStreamType,
47✔
759
                        Timestamp: time.Now(),
47✔
760
                },
47✔
761
                Stream: streamName,
47✔
762
        }
47✔
763
        t.event.Events = append(t.event.Events, t.js)
47✔
764
}
765

766
func (t *msgTrace) updateJetStreamEvent(subject string, noInterest bool) {
1,500,501✔
767
        if t == nil {
3,000,992✔
768
                return
1,500,491✔
769
        }
1,500,491✔
770
        // JetStream event should have been created in addJetStreamEvent
771
        if t.js == nil {
10✔
772
                return
×
773
        }
×
774
        t.js.Subject = subject
10✔
775
        t.js.NoInterest = noInterest
10✔
776
        // Update the timestamp since this is more accurate than when it
10✔
777
        // was first added in addJetStreamEvent().
10✔
778
        t.js.Timestamp = time.Now()
10✔
779
}
780

781
func (t *msgTrace) sendEventFromJetStream(err error) {
47✔
782
        if t == nil {
47✔
783
                return
×
784
        }
×
785
        // JetStream event should have been created in addJetStreamEvent
786
        if t.js == nil {
47✔
787
                return
×
788
        }
×
789
        if err != nil {
84✔
790
                t.js.Error = err.Error()
37✔
791
        }
37✔
792
        t.sendEvent()
47✔
793
}
794

795
func (t *msgTrace) sendEvent() {
14,555,831✔
796
        if t == nil {
29,108,684✔
797
                return
14,552,853✔
798
        }
14,552,853✔
799
        if t.js != nil {
3,072✔
800
                ready := atomic.AddInt32(&t.ready, 1) == 2
94✔
801
                if !ready {
141✔
802
                        return
47✔
803
                }
47✔
804
        }
805
        t.srv.sendInternalAccountSysMsg(t.acc, t.dest, &t.event.Server, t.event, t.ct)
2,931✔
806
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc