• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 24949216239

24 Apr 2026 08:34AM UTC coverage: 80.645% (-2.4%) from 83.05%
24949216239

push

github

web-flow
(2.14) [ADDED] `RemoteLeafOpts.IgnoreDiscoveredServers` option (#8067)

For a given leafnode remote, if this is set to true, this remote will
ignore any server leafnode URLs returned by the hub, allowing the user
to fully manage the servers this remote can connect to.

Resolves #8002

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>

74685 of 92610 relevant lines covered (80.64%)

632737.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.1
/server/msgtrace.go
1
// Copyright 2024-2026 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "encoding/json"
19
        "fmt"
20
        "math/rand"
21
        "strconv"
22
        "strings"
23
        "sync/atomic"
24
        "time"
25
)
26

27
const (
28
        MsgTraceDest          = "Nats-Trace-Dest"
29
        MsgTraceDestDisabled  = "trace disabled" // This must be an invalid NATS subject
30
        MsgTraceHop           = "Nats-Trace-Hop"
31
        MsgTraceOriginAccount = "Nats-Trace-Origin-Account"
32
        MsgTraceOnly          = "Nats-Trace-Only"
33

34
        // External trace header. Note that this header is normally in lower
35
        // case (https://www.w3.org/TR/trace-context/#header-name). Vendors
36
        // MUST expect the header in any case (upper, lower, mixed), and
37
        // SHOULD send the header name in lowercase. We used to change it
38
        // to lower case, but no longer do that in 2.14.
39
        traceParentHdr = "traceparent"
40
)
41

42
var (
43
        traceDestHdrAsBytes      = stringToBytes(MsgTraceDest)
44
        traceDestDisabledAsBytes = stringToBytes(MsgTraceDestDisabled)
45
        traceParentHdrAsBytes    = stringToBytes(traceParentHdr)
46
        crLFAsBytes              = stringToBytes(CR_LF)
47
        dashAsBytes              = stringToBytes("-")
48
)
49

50
type MsgTraceType string
51

52
// Type of message trace events in the MsgTraceEvents list.
53
// This is needed to unmarshal the list.
54
const (
55
        MsgTraceIngressType        = "in"
56
        MsgTraceSubjectMappingType = "sm"
57
        MsgTraceStreamExportType   = "se"
58
        MsgTraceServiceImportType  = "si"
59
        MsgTraceJetStreamType      = "js"
60
        MsgTraceEgressType         = "eg"
61
)
62

63
type MsgTraceEvent struct {
64
        Server  ServerInfo      `json:"server"`
65
        Request MsgTraceRequest `json:"request"`
66
        Hops    int             `json:"hops,omitempty"`
67
        Events  MsgTraceEvents  `json:"events"`
68
}
69

70
type MsgTraceRequest struct {
71
        // We are not making this an http.Header so that header name case is preserved.
72
        Header  map[string][]string `json:"header,omitempty"`
73
        MsgSize int                 `json:"msgsize,omitempty"`
74
}
75

76
type MsgTraceEvents []MsgTrace
77

78
type MsgTrace interface {
79
        new() MsgTrace
80
        typ() MsgTraceType
81
}
82

83
type MsgTraceBase struct {
84
        Type      MsgTraceType `json:"type"`
85
        Timestamp time.Time    `json:"ts"`
86
}
87

88
type MsgTraceIngress struct {
89
        MsgTraceBase
90
        Kind    int    `json:"kind"`
91
        CID     uint64 `json:"cid"`
92
        Name    string `json:"name,omitempty"`
93
        Account string `json:"acc"`
94
        Subject string `json:"subj"`
95
        Error   string `json:"error,omitempty"`
96
}
97

98
type MsgTraceSubjectMapping struct {
99
        MsgTraceBase
100
        MappedTo string `json:"to"`
101
}
102

103
type MsgTraceStreamExport struct {
104
        MsgTraceBase
105
        Account string `json:"acc"`
106
        To      string `json:"to"`
107
}
108

109
type MsgTraceServiceImport struct {
110
        MsgTraceBase
111
        Account string `json:"acc"`
112
        From    string `json:"from"`
113
        To      string `json:"to"`
114
}
115

116
type MsgTraceJetStream struct {
117
        MsgTraceBase
118
        Stream     string `json:"stream"`
119
        Subject    string `json:"subject,omitempty"`
120
        NoInterest bool   `json:"nointerest,omitempty"`
121
        Error      string `json:"error,omitempty"`
122
}
123

124
type MsgTraceEgress struct {
125
        MsgTraceBase
126
        Kind         int    `json:"kind"`
127
        CID          uint64 `json:"cid"`
128
        Name         string `json:"name,omitempty"`
129
        Hop          string `json:"hop,omitempty"`
130
        Account      string `json:"acc,omitempty"`
131
        Subscription string `json:"sub,omitempty"`
132
        Queue        string `json:"queue,omitempty"`
133
        Error        string `json:"error,omitempty"`
134

135
        // This is for applications that unmarshal the trace events
136
        // and want to link an egress to route/leaf/gateway with
137
        // the MsgTraceEvent from that server.
138
        Link *MsgTraceEvent `json:"-"`
139
}
140

141
// -------------------------------------------------------------
142

143
func (t MsgTraceBase) typ() MsgTraceType     { return t.Type }
×
144
func (MsgTraceIngress) new() MsgTrace        { return &MsgTraceIngress{} }
×
145
func (MsgTraceSubjectMapping) new() MsgTrace { return &MsgTraceSubjectMapping{} }
×
146
func (MsgTraceStreamExport) new() MsgTrace   { return &MsgTraceStreamExport{} }
×
147
func (MsgTraceServiceImport) new() MsgTrace  { return &MsgTraceServiceImport{} }
×
148
func (MsgTraceJetStream) new() MsgTrace      { return &MsgTraceJetStream{} }
×
149
func (MsgTraceEgress) new() MsgTrace         { return &MsgTraceEgress{} }
×
150

151
var msgTraceInterfaces = map[MsgTraceType]MsgTrace{
152
        MsgTraceIngressType:        MsgTraceIngress{},
153
        MsgTraceSubjectMappingType: MsgTraceSubjectMapping{},
154
        MsgTraceStreamExportType:   MsgTraceStreamExport{},
155
        MsgTraceServiceImportType:  MsgTraceServiceImport{},
156
        MsgTraceJetStreamType:      MsgTraceJetStream{},
157
        MsgTraceEgressType:         MsgTraceEgress{},
158
}
159

160
func (t *MsgTraceEvents) UnmarshalJSON(data []byte) error {
×
161
        var raw []json.RawMessage
×
162
        err := json.Unmarshal(data, &raw)
×
163
        if err != nil {
×
164
                return err
×
165
        }
×
166
        *t = make(MsgTraceEvents, len(raw))
×
167
        var tt MsgTraceBase
×
168
        for i, r := range raw {
×
169
                if err = json.Unmarshal(r, &tt); err != nil {
×
170
                        return err
×
171
                }
×
172
                tr, ok := msgTraceInterfaces[tt.Type]
×
173
                if !ok {
×
174
                        return fmt.Errorf("unknown trace type %v", tt.Type)
×
175
                }
×
176
                te := tr.new()
×
177
                if err := json.Unmarshal(r, te); err != nil {
×
178
                        return err
×
179
                }
×
180
                (*t)[i] = te
×
181
        }
182
        return nil
×
183
}
184

185
func getTraceAs[T MsgTrace](e any) *T {
×
186
        v, ok := e.(*T)
×
187
        if ok {
×
188
                return v
×
189
        }
×
190
        return nil
×
191
}
192

193
func (t *MsgTraceEvent) Ingress() *MsgTraceIngress {
×
194
        if len(t.Events) < 1 {
×
195
                return nil
×
196
        }
×
197
        return getTraceAs[MsgTraceIngress](t.Events[0])
×
198
}
199

200
func (t *MsgTraceEvent) SubjectMapping() *MsgTraceSubjectMapping {
×
201
        for _, e := range t.Events {
×
202
                if e.typ() == MsgTraceSubjectMappingType {
×
203
                        return getTraceAs[MsgTraceSubjectMapping](e)
×
204
                }
×
205
        }
206
        return nil
×
207
}
208

209
func (t *MsgTraceEvent) StreamExports() []*MsgTraceStreamExport {
×
210
        var se []*MsgTraceStreamExport
×
211
        for _, e := range t.Events {
×
212
                if e.typ() == MsgTraceStreamExportType {
×
213
                        se = append(se, getTraceAs[MsgTraceStreamExport](e))
×
214
                }
×
215
        }
216
        return se
×
217
}
218

219
func (t *MsgTraceEvent) ServiceImports() []*MsgTraceServiceImport {
×
220
        var si []*MsgTraceServiceImport
×
221
        for _, e := range t.Events {
×
222
                if e.typ() == MsgTraceServiceImportType {
×
223
                        si = append(si, getTraceAs[MsgTraceServiceImport](e))
×
224
                }
×
225
        }
226
        return si
×
227
}
228

229
func (t *MsgTraceEvent) JetStream() *MsgTraceJetStream {
×
230
        for _, e := range t.Events {
×
231
                if e.typ() == MsgTraceJetStreamType {
×
232
                        return getTraceAs[MsgTraceJetStream](e)
×
233
                }
×
234
        }
235
        return nil
×
236
}
237

238
func (t *MsgTraceEvent) Egresses() []*MsgTraceEgress {
×
239
        var eg []*MsgTraceEgress
×
240
        for _, e := range t.Events {
×
241
                if e.typ() == MsgTraceEgressType {
×
242
                        eg = append(eg, getTraceAs[MsgTraceEgress](e))
×
243
                }
×
244
        }
245
        return eg
×
246
}
247

248
const (
249
        errMsgTraceOnlyNoSupport   = "Not delivered because remote does not support message tracing"
250
        errMsgTraceNoSupport       = "Message delivered but remote does not support message tracing so no trace event generated from there"
251
        errMsgTraceNoEcho          = "Not delivered because of no echo"
252
        errMsgTracePubViolation    = "Not delivered because publish denied for this subject"
253
        errMsgTraceSubDeny         = "Not delivered because subscription denies this subject"
254
        errMsgTraceSubClosed       = "Not delivered because subscription is closed"
255
        errMsgTraceClientClosed    = "Not delivered because client is closed"
256
        errMsgTraceAutoSubExceeded = "Not delivered because auto-unsubscribe exceeded"
257
        errMsgTraceFastProdNoStall = "Not delivered because fast producer not stalled and consumer is slow"
258
)
259

260
type msgTrace struct {
261
        ready int32
262
        srv   *Server
263
        acc   *Account
264
        // Origin account name, set only if acc is nil when acc lookup failed.
265
        oan   string
266
        dest  string
267
        event *MsgTraceEvent
268
        js    *MsgTraceJetStream
269
        hop   string
270
        nhop  string
271
        tonly bool // Will only trace the message, not do delivery.
272
        ct    compressionType
273
}
274

275
// This will be false outside of the tests, so when building the server binary,
276
// any code where you see `if msgTraceRunInTests` statement will be compiled
277
// out, so this will have no performance penalty.
278
var (
279
        msgTraceRunInTests   bool
280
        msgTraceCheckSupport bool
281
)
282

283
// Returns the message trace object, if message is being traced,
284
// and `true` if we want to only trace, not actually deliver the message.
285
func (c *client) isMsgTraceEnabled() (*msgTrace, bool) {
77,629,751✔
286
        t := c.pa.trace
77,629,751✔
287
        if t == nil {
155,259,502✔
288
                return nil, false
77,629,751✔
289
        }
77,629,751✔
290
        return t, t.tonly
×
291
}
292

293
// For LEAF/ROUTER/GATEWAY, return false if the remote does not support
294
// message tracing (important if the tracing requests trace-only).
295
func (c *client) msgTraceSupport() bool {
×
296
        // Exclude client connection from the protocol check.
×
297
        return c.kind == CLIENT || c.opts.Protocol >= MsgTraceProto
×
298
}
×
299

300
func getConnName(c *client) string {
×
301
        switch c.kind {
×
302
        case ROUTER:
×
303
                if n := c.route.remoteName; n != _EMPTY_ {
×
304
                        return n
×
305
                }
×
306
        case GATEWAY:
×
307
                if n := c.gw.remoteName; n != _EMPTY_ {
×
308
                        return n
×
309
                }
×
310
        case LEAF:
×
311
                if n := c.leaf.remoteServer; n != _EMPTY_ {
×
312
                        return n
×
313
                }
×
314
        }
315
        return c.opts.Name
×
316
}
317

318
func getCompressionType(cts string) compressionType {
×
319
        if cts == _EMPTY_ {
×
320
                return noCompression
×
321
        }
×
322
        cts = strings.ToLower(cts)
×
323
        if strings.Contains(cts, "snappy") || strings.Contains(cts, "s2") {
×
324
                return snappyCompression
×
325
        }
×
326
        if strings.Contains(cts, "gzip") {
×
327
                return gzipCompression
×
328
        }
×
329
        return unsupportedCompression
×
330
}
331

332
func (c *client) initMsgTrace() *msgTrace {
1,992,702✔
333
        // The code in the "if" statement is only running in test mode.
1,992,702✔
334
        if msgTraceRunInTests {
3,983,502✔
335
                // Check the type of client that tries to initialize a trace struct.
1,990,800✔
336
                if !(c.kind == CLIENT || c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF) {
1,990,800✔
337
                        panic(fmt.Sprintf("Unexpected client type %q trying to initialize msgTrace", c.kindString()))
×
338
                }
339
                // In some tests, we want to make a server behave like an old server
340
                // and so even if a trace header is received, we want the server to
341
                // simply ignore it.
342
                if msgTraceCheckSupport {
1,990,800✔
343
                        if c.srv == nil || c.srv.getServerProto() < MsgTraceProto {
×
344
                                return nil
×
345
                        }
×
346
                }
347
        }
348
        if c.pa.hdr <= 0 {
1,992,702✔
349
                return nil
×
350
        }
×
351
        hdr := c.msgBuf[:c.pa.hdr]
1,992,702✔
352
        headers, external := genHeaderMapIfTraceHeadersPresent(hdr)
1,992,702✔
353
        if len(headers) == 0 {
3,985,395✔
354
                return nil
1,992,693✔
355
        }
1,992,693✔
356
        // Little helper to give us the first value of a given header, or _EMPTY_
357
        // if key is not present.
358
        getHdrVal := func(key string) string {
9✔
359
                vv, ok := headers[key]
×
360
                if !ok {
×
361
                        return _EMPTY_
×
362
                }
×
363
                return vv[0]
×
364
        }
365
        var (
9✔
366
                dest      string
9✔
367
                traceOnly bool
9✔
368
        )
9✔
369
        // Check for traceOnly only if not external.
9✔
370
        if !external {
9✔
371
                if to := getHdrVal(MsgTraceOnly); to != _EMPTY_ {
×
372
                        tos := strings.ToLower(to)
×
373
                        switch tos {
×
374
                        case "1", "true", "on":
×
375
                                traceOnly = true
×
376
                        }
377
                }
378
                dest = getHdrVal(MsgTraceDest)
×
379
                if c.kind == CLIENT {
×
380
                        if td, ok := c.allowedMsgTraceDest(hdr, false); !ok {
×
381
                                return nil
×
382
                        } else if td != _EMPTY_ {
×
383
                                dest = td
×
384
                        }
×
385
                }
386
                // Check the destination to see if this is a valid public subject.
387
                if !IsValidPublishSubject(dest) {
×
388
                        // We still have to return a msgTrace object (if traceOnly is set)
×
389
                        // because if we don't, the message will end-up being delivered to
×
390
                        // applications, which may break them. We report the error in any case.
×
391
                        c.Errorf("Destination %q is not valid, won't be able to trace events", dest)
×
392
                        if !traceOnly {
×
393
                                // We can bail, tracing will be disabled for this message.
×
394
                                return nil
×
395
                        }
×
396
                }
397
        }
398
        var (
9✔
399
                // Account to use when sending the trace event
9✔
400
                acc *Account
9✔
401
                // Ingress' account name
9✔
402
                ian string
9✔
403
                // Origin account name
9✔
404
                oan string
9✔
405
                // The hop "id", taken from headers only when not from CLIENT
9✔
406
                hop string
9✔
407
        )
9✔
408
        if c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF {
9✔
409
                // The ingress account name will always be c.pa.account, but `acc` may
×
410
                // be different if we have an origin account header.
×
411
                if c.kind == LEAF {
×
412
                        ian = c.acc.GetName()
×
413
                } else {
×
414
                        ian = string(c.pa.account)
×
415
                }
×
416
                // The remote will have set the origin account header only if the
417
                // message changed account (think of service imports).
418
                oan = getHdrVal(MsgTraceOriginAccount)
×
419
                if oan == _EMPTY_ {
×
420
                        // For LEAF or ROUTER with pinned-account, we can use the c.acc.
×
421
                        if c.kind == LEAF || (c.kind == ROUTER && len(c.route.accName) > 0) {
×
422
                                acc = c.acc
×
423
                        } else {
×
424
                                // We will lookup account with c.pa.account (or ian).
×
425
                                oan = ian
×
426
                        }
×
427
                }
428
                // Unless we already got the account, we need to look it up.
429
                if acc == nil {
×
430
                        // We don't want to do account resolving here.
×
431
                        if acci, ok := c.srv.accounts.Load(oan); ok {
×
432
                                acc = acci.(*Account)
×
433
                                // Since we have looked-up the account, we don't need oan, so
×
434
                                // clear it in case it was set.
×
435
                                oan = _EMPTY_
×
436
                        } else {
×
437
                                // We still have to return a msgTrace object (if traceOnly is set)
×
438
                                // because if we don't, the message will end-up being delivered to
×
439
                                // applications, which may break them. We report the error in any case.
×
440
                                c.Errorf("Account %q was not found, won't be able to trace events", oan)
×
441
                                if !traceOnly {
×
442
                                        // We can bail, tracing will be disabled for this message.
×
443
                                        return nil
×
444
                                }
×
445
                        }
446
                }
447
                // Check the hop header
448
                hop = getHdrVal(MsgTraceHop)
×
449
        } else {
9✔
450
                acc = c.acc
9✔
451
                ian = acc.GetName()
9✔
452
        }
9✔
453
        // If external, we need to have the account's trace destination set,
454
        // otherwise, we are not enabling tracing.
455
        if external {
18✔
456
                var sampling int
9✔
457
                if acc != nil {
18✔
458
                        dest, sampling = acc.getTraceDestAndSampling()
9✔
459
                }
9✔
460
                if dest == _EMPTY_ {
18✔
461
                        // No account destination, no tracing for external trace headers.
9✔
462
                        return nil
9✔
463
                }
9✔
464
                // Check sampling, but only from origin server.
465
                if c.kind == CLIENT && !sample(sampling) {
×
466
                        // Need to disable tracing so that if the message is routed, it won't
×
467
                        // trigger a trace there.
×
468
                        c.msgBuf = c.setHeader(MsgTraceDest, MsgTraceDestDisabled, c.msgBuf)
×
469
                        return nil
×
470
                }
×
471
        }
472
        c.pa.trace = &msgTrace{
×
473
                srv:  c.srv,
×
474
                acc:  acc,
×
475
                oan:  oan,
×
476
                dest: dest,
×
477
                ct:   getCompressionType(getHdrVal(acceptEncodingHeader)),
×
478
                hop:  hop,
×
479
                event: &MsgTraceEvent{
×
480
                        Request: MsgTraceRequest{
×
481
                                Header:  headers,
×
482
                                MsgSize: c.pa.size,
×
483
                        },
×
484
                        Events: append(MsgTraceEvents(nil), &MsgTraceIngress{
×
485
                                MsgTraceBase: MsgTraceBase{
×
486
                                        Type:      MsgTraceIngressType,
×
487
                                        Timestamp: time.Now(),
×
488
                                },
×
489
                                Kind:    c.kind,
×
490
                                CID:     c.cid,
×
491
                                Name:    getConnName(c),
×
492
                                Account: ian,
×
493
                                Subject: string(c.pa.subject),
×
494
                        }),
×
495
                },
×
496
                tonly: traceOnly,
×
497
        }
×
498
        return c.pa.trace
×
499
}
500

501
func sample(sampling int) bool {
×
502
        // Option parsing should ensure that sampling is [1..100], but consider
×
503
        // any value outside of this range to be 100%.
×
504
        if sampling <= 0 || sampling >= 100 {
×
505
                return true
×
506
        }
×
507
        return rand.Int31n(100) <= int32(sampling)
×
508
}
509

510
// This function will return the header as a map (instead of http.Header because
511
// we want to preserve the header names' case) and a boolean that indicates if
512
// the headers have been lifted due to the presence of the external trace header
513
// only.
514
// Note that because of the traceParentHdr, the search is done in a case
515
// insensitive way. We used to rewrite it in lower case but no longer do since v2.14.
516
func genHeaderMapIfTraceHeadersPresent(hdr []byte) (map[string][]string, bool) {
1,992,702✔
517

1,992,702✔
518
        var (
1,992,702✔
519
                _keys               = [64][]byte{}
1,992,702✔
520
                _vals               = [64][]byte{}
1,992,702✔
521
                m                   map[string][]string
1,992,702✔
522
                traceDestHdrFound   bool
1,992,702✔
523
                traceParentHdrFound bool
1,992,702✔
524
        )
1,992,702✔
525
        // Skip the hdrLine
1,992,702✔
526
        if !bytes.HasPrefix(hdr, stringToBytes(hdrLine)) {
2,003,659✔
527
                return nil, false
10,957✔
528
        }
10,957✔
529

530
        keys := _keys[:0]
1,981,745✔
531
        vals := _vals[:0]
1,981,745✔
532

1,981,745✔
533
        for i := len(hdrLine); i < len(hdr); {
6,121,573✔
534
                // Search for key/val delimiter
4,139,828✔
535
                del := bytes.IndexByte(hdr[i:], ':')
4,139,828✔
536
                if del < 0 {
6,121,573✔
537
                        break
1,981,745✔
538
                }
539
                keyStart := i
2,158,083✔
540
                key := hdr[keyStart : keyStart+del]
2,158,083✔
541
                i += del + 1
2,158,083✔
542
                for i < len(hdr) && (hdr[i] == ' ' || hdr[i] == '\t') {
4,315,970✔
543
                        i++
2,157,887✔
544
                }
2,157,887✔
545
                valStart := i
2,158,083✔
546
                nl := bytes.Index(hdr[valStart:], crLFAsBytes)
2,158,083✔
547
                if nl < 0 {
2,158,083✔
548
                        break
×
549
                }
550
                valEnd := valStart + nl
2,158,083✔
551
                for valEnd > valStart && (hdr[valEnd-1] == ' ' || hdr[valEnd-1] == '\t') {
2,158,083✔
552
                        valEnd--
×
553
                }
×
554
                val := hdr[valStart:valEnd]
2,158,083✔
555
                if len(key) > 0 && len(val) > 0 {
4,316,165✔
556
                        vals = append(vals, val)
2,158,082✔
557

2,158,082✔
558
                        // We search for our special keys only if not already found.
2,158,082✔
559

2,158,082✔
560
                        // Check for the external trace header.
2,158,082✔
561
                        // Search needs to be case insensitive.
2,158,082✔
562
                        if !traceParentHdrFound && bytes.EqualFold(key, traceParentHdrAsBytes) {
2,158,091✔
563
                                // We will now check if the value has sampling or not.
9✔
564
                                // TODO(ik): Not sure if this header can have multiple values
9✔
565
                                // or not, and if so, what would be the rule to check for
9✔
566
                                // sampling. What is done here is to check them all until we
9✔
567
                                // found one with sampling.
9✔
568
                                tk := bytes.Split(val, dashAsBytes)
9✔
569
                                if len(tk) == 4 && len([]byte(tk[3])) == 2 {
18✔
570
                                        if hexVal, err := strconv.ParseInt(bytesToString(tk[3]), 16, 8); err == nil {
18✔
571
                                                if hexVal&0x1 == 0x1 {
18✔
572
                                                        traceParentHdrFound = true
9✔
573
                                                }
9✔
574
                                        }
575
                                }
576
                        } else if !traceDestHdrFound && bytes.Equal(key, traceDestHdrAsBytes) {
2,158,073✔
577
                                // This is the Nats-Trace-Dest header, check the value to see
×
578
                                // if it indicates that the trace was disabled.
×
579
                                if bytes.Equal(val, traceDestDisabledAsBytes) {
×
580
                                        return nil, false
×
581
                                }
×
582
                                traceDestHdrFound = true
×
583
                        }
584
                        // Add to the keys and preserve the key's case
585
                        keys = append(keys, key)
2,158,082✔
586
                }
587
                i += nl + 2
2,158,083✔
588
        }
589
        if !traceDestHdrFound && !traceParentHdrFound {
3,963,481✔
590
                return nil, false
1,981,736✔
591
        }
1,981,736✔
592
        m = make(map[string][]string, len(keys))
9✔
593
        for i, k := range keys {
30✔
594
                hname := string(k)
21✔
595
                m[hname] = append(m[hname], string(vals[i]))
21✔
596
        }
21✔
597
        return m, !traceDestHdrFound && traceParentHdrFound
9✔
598
}
599

600
// Special case where we create a trace event before parsing the message.
601
// This is for cases where the connection will be closed when detecting
602
// an error during early message processing (for instance max payload).
603
func (c *client) initAndSendIngressErrEvent(hdr []byte, dest string, ingressError error) {
×
604
        if ingressError == nil {
×
605
                return
×
606
        }
×
607
        ct := getAcceptEncoding(hdr)
×
608
        t := &msgTrace{
×
609
                srv:  c.srv,
×
610
                acc:  c.acc,
×
611
                dest: dest,
×
612
                ct:   ct,
×
613
                event: &MsgTraceEvent{
×
614
                        Request: MsgTraceRequest{MsgSize: c.pa.size},
×
615
                        Events: append(MsgTraceEvents(nil), &MsgTraceIngress{
×
616
                                MsgTraceBase: MsgTraceBase{
×
617
                                        Type:      MsgTraceIngressType,
×
618
                                        Timestamp: time.Now(),
×
619
                                },
×
620
                                Kind:  c.kind,
×
621
                                CID:   c.cid,
×
622
                                Name:  getConnName(c),
×
623
                                Error: ingressError.Error(),
×
624
                        }),
×
625
                },
×
626
        }
×
627
        t.sendEvent()
×
628
}
629

630
// Returns `true` if message tracing is enabled and we are tracing only,
631
// that is, we are not going to deliver the inbound message, returns
632
// `false` otherwise (no tracing, or tracing and message delivery).
633
func (t *msgTrace) traceOnly() bool {
3,650,274✔
634
        return t != nil && t.tonly
3,650,274✔
635
}
3,650,274✔
636

637
func (t *msgTrace) setOriginAccountHeaderIfNeeded(c *client, acc *Account, msg []byte) []byte {
×
638
        var oan string
×
639
        // If t.acc is set, only check that, not t.oan.
×
640
        if t.acc != nil {
×
641
                if t.acc != acc {
×
642
                        oan = t.acc.GetName()
×
643
                }
×
644
        } else if t.oan != acc.GetName() {
×
645
                oan = t.oan
×
646
        }
×
647
        if oan != _EMPTY_ {
×
648
                msg = c.setHeader(MsgTraceOriginAccount, oan, msg)
×
649
        }
×
650
        return msg
×
651
}
652

653
func (t *msgTrace) setHopHeader(c *client, msg []byte) []byte {
×
654
        e := t.event
×
655
        e.Hops++
×
656
        if len(t.hop) > 0 {
×
657
                t.nhop = fmt.Sprintf("%s.%d", t.hop, e.Hops)
×
658
        } else {
×
659
                t.nhop = fmt.Sprintf("%d", e.Hops)
×
660
        }
×
661
        return c.setHeader(MsgTraceHop, t.nhop, msg)
×
662
}
663

664
func (t *msgTrace) setIngressError(err string) {
×
665
        if i := t.event.Ingress(); i != nil {
×
666
                i.Error = err
×
667
        }
×
668
}
669

670
func (t *msgTrace) addSubjectMappingEvent(subj []byte) {
12,565✔
671
        if t == nil {
25,130✔
672
                return
12,565✔
673
        }
12,565✔
674
        t.event.Events = append(t.event.Events, &MsgTraceSubjectMapping{
×
675
                MsgTraceBase: MsgTraceBase{
×
676
                        Type:      MsgTraceSubjectMappingType,
×
677
                        Timestamp: time.Now(),
×
678
                },
×
679
                MappedTo: string(subj),
×
680
        })
×
681
}
682

683
func (t *msgTrace) addEgressEvent(dc *client, sub *subscription, err string) {
3,373,956✔
684
        if t == nil {
6,747,912✔
685
                return
3,373,956✔
686
        }
3,373,956✔
687
        e := &MsgTraceEgress{
×
688
                MsgTraceBase: MsgTraceBase{
×
689
                        Type:      MsgTraceEgressType,
×
690
                        Timestamp: time.Now(),
×
691
                },
×
692
                Kind:  dc.kind,
×
693
                CID:   dc.cid,
×
694
                Name:  getConnName(dc),
×
695
                Hop:   t.nhop,
×
696
                Error: err,
×
697
        }
×
698
        t.nhop = _EMPTY_
×
699
        // Specific to CLIENT connections...
×
700
        if dc.kind == CLIENT {
×
701
                // Set the subscription's subject and possibly queue name.
×
702
                e.Subscription = string(sub.subject)
×
703
                if len(sub.queue) > 0 {
×
704
                        e.Queue = string(sub.queue)
×
705
                }
×
706
        }
707
        if dc.kind == CLIENT || dc.kind == LEAF {
×
708
                if i := t.event.Ingress(); i != nil {
×
709
                        // If the Ingress' account is different from the destination's
×
710
                        // account, add the account name into the Egress trace event.
×
711
                        // This would happen with service imports.
×
712
                        if dcAccName := dc.acc.GetName(); dcAccName != i.Account {
×
713
                                e.Account = dcAccName
×
714
                        }
×
715
                }
716
        }
717
        t.event.Events = append(t.event.Events, e)
×
718
}
719

720
func (t *msgTrace) addStreamExportEvent(dc *client, to []byte) {
×
721
        if t == nil {
×
722
                return
×
723
        }
×
724
        dc.mu.Lock()
×
725
        accName := dc.acc.GetName()
×
726
        dc.mu.Unlock()
×
727
        t.event.Events = append(t.event.Events, &MsgTraceStreamExport{
×
728
                MsgTraceBase: MsgTraceBase{
×
729
                        Type:      MsgTraceStreamExportType,
×
730
                        Timestamp: time.Now(),
×
731
                },
×
732
                Account: accName,
×
733
                To:      string(to),
×
734
        })
×
735
}
736

737
func (t *msgTrace) addServiceImportEvent(accName, from, to string) {
×
738
        if t == nil {
×
739
                return
×
740
        }
×
741
        t.event.Events = append(t.event.Events, &MsgTraceServiceImport{
×
742
                MsgTraceBase: MsgTraceBase{
×
743
                        Type:      MsgTraceServiceImportType,
×
744
                        Timestamp: time.Now(),
×
745
                },
×
746
                Account: accName,
×
747
                From:    from,
×
748
                To:      to,
×
749
        })
×
750
}
751

752
func (t *msgTrace) addJetStreamEvent(streamName string) {
×
753
        if t == nil {
×
754
                return
×
755
        }
×
756
        t.js = &MsgTraceJetStream{
×
757
                MsgTraceBase: MsgTraceBase{
×
758
                        Type:      MsgTraceJetStreamType,
×
759
                        Timestamp: time.Now(),
×
760
                },
×
761
                Stream: streamName,
×
762
        }
×
763
        t.event.Events = append(t.event.Events, t.js)
×
764
}
765

766
func (t *msgTrace) updateJetStreamEvent(subject string, noInterest bool) {
2,674,056✔
767
        if t == nil {
5,348,112✔
768
                return
2,674,056✔
769
        }
2,674,056✔
770
        // JetStream event should have been created in addJetStreamEvent
771
        if t.js == nil {
×
772
                return
×
773
        }
×
774
        t.js.Subject = subject
×
775
        t.js.NoInterest = noInterest
×
776
        // Update the timestamp since this is more accurate than when it
×
777
        // was first added in addJetStreamEvent().
×
778
        t.js.Timestamp = time.Now()
×
779
}
780

781
func (t *msgTrace) sendEventFromJetStream(err error) {
×
782
        if t == nil {
×
783
                return
×
784
        }
×
785
        // JetStream event should have been created in addJetStreamEvent
786
        if t.js == nil {
×
787
                return
×
788
        }
×
789
        if err != nil {
×
790
                t.js.Error = err.Error()
×
791
        }
×
792
        t.sendEvent()
×
793
}
794

795
func (t *msgTrace) sendEvent() {
27,193,426✔
796
        if t == nil {
54,386,852✔
797
                return
27,193,426✔
798
        }
27,193,426✔
799
        if t.js != nil {
×
800
                ready := atomic.AddInt32(&t.ready, 1) == 2
×
801
                if !ready {
×
802
                        return
×
803
                }
×
804
        }
805
        t.srv.sendInternalAccountSysMsg(t.acc, t.dest, &t.event.Server, t.event, t.ct)
×
806
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc