• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 25621091956

08 May 2026 01:49PM UTC coverage: 81.505% (-1.7%) from 83.18%
25621091956

push

github

web-flow
MQTT: Return `errMQTTUnsupportedCharacters` for control characters on both pub and sub (#8112)

Replaces #8104.

75814 of 93018 relevant lines covered (81.5%)

630142.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.1
/server/msgtrace.go
1
// Copyright 2024-2026 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "encoding/json"
19
        "fmt"
20
        "math/rand"
21
        "strconv"
22
        "strings"
23
        "sync/atomic"
24
        "time"
25
)
26

27
const (
28
        MsgTraceDest          = "Nats-Trace-Dest"
29
        MsgTraceDestDisabled  = "trace disabled" // This must be an invalid NATS subject
30
        MsgTraceHop           = "Nats-Trace-Hop"
31
        MsgTraceOriginAccount = "Nats-Trace-Origin-Account"
32
        MsgTraceOnly          = "Nats-Trace-Only"
33

34
        // External trace header. Note that this header is normally in lower
35
        // case (https://www.w3.org/TR/trace-context/#header-name). Vendors
36
        // MUST expect the header in any case (upper, lower, mixed), and
37
        // SHOULD send the header name in lowercase. We used to change it
38
        // to lower case, but no longer do that in 2.14.
39
        traceParentHdr = "traceparent"
40
)
41

42
var (
43
        traceDestHdrAsBytes      = stringToBytes(MsgTraceDest)
44
        traceDestDisabledAsBytes = stringToBytes(MsgTraceDestDisabled)
45
        traceParentHdrAsBytes    = stringToBytes(traceParentHdr)
46
        crLFAsBytes              = stringToBytes(CR_LF)
47
        dashAsBytes              = stringToBytes("-")
48
)
49

50
type MsgTraceType string
51

52
// Type of message trace events in the MsgTraceEvents list.
53
// This is needed to unmarshal the list.
54
const (
55
        MsgTraceIngressType        = "in"
56
        MsgTraceSubjectMappingType = "sm"
57
        MsgTraceStreamExportType   = "se"
58
        MsgTraceServiceImportType  = "si"
59
        MsgTraceJetStreamType      = "js"
60
        MsgTraceEgressType         = "eg"
61
)
62

63
type MsgTraceEvent struct {
64
        Server  ServerInfo      `json:"server"`
65
        Request MsgTraceRequest `json:"request"`
66
        Hops    int             `json:"hops,omitempty"`
67
        Events  MsgTraceEvents  `json:"events"`
68
}
69

70
type MsgTraceRequest struct {
71
        // We are not making this an http.Header so that header name case is preserved.
72
        Header  map[string][]string `json:"header,omitempty"`
73
        MsgSize int                 `json:"msgsize,omitempty"`
74
}
75

76
type MsgTraceEvents []MsgTrace
77

78
type MsgTrace interface {
79
        new() MsgTrace
80
        typ() MsgTraceType
81
}
82

83
type MsgTraceBase struct {
84
        Type      MsgTraceType `json:"type"`
85
        Timestamp time.Time    `json:"ts"`
86
}
87

88
type MsgTraceIngress struct {
89
        MsgTraceBase
90
        Kind    int    `json:"kind"`
91
        CID     uint64 `json:"cid"`
92
        Name    string `json:"name,omitempty"`
93
        Account string `json:"acc"`
94
        Subject string `json:"subj"`
95
        Error   string `json:"error,omitempty"`
96
}
97

98
type MsgTraceSubjectMapping struct {
99
        MsgTraceBase
100
        MappedTo string `json:"to"`
101
}
102

103
type MsgTraceStreamExport struct {
104
        MsgTraceBase
105
        Account string `json:"acc"`
106
        To      string `json:"to"`
107
}
108

109
type MsgTraceServiceImport struct {
110
        MsgTraceBase
111
        Account string `json:"acc"`
112
        From    string `json:"from"`
113
        To      string `json:"to"`
114
}
115

116
type MsgTraceJetStream struct {
117
        MsgTraceBase
118
        Stream     string `json:"stream"`
119
        Subject    string `json:"subject,omitempty"`
120
        NoInterest bool   `json:"nointerest,omitempty"`
121
        Error      string `json:"error,omitempty"`
122
}
123

124
type MsgTraceEgress struct {
125
        MsgTraceBase
126
        Kind         int    `json:"kind"`
127
        CID          uint64 `json:"cid"`
128
        Name         string `json:"name,omitempty"`
129
        Hop          string `json:"hop,omitempty"`
130
        Account      string `json:"acc,omitempty"`
131
        Subscription string `json:"sub,omitempty"`
132
        Queue        string `json:"queue,omitempty"`
133
        Error        string `json:"error,omitempty"`
134

135
        // This is for applications that unmarshal the trace events
136
        // and want to link an egress to route/leaf/gateway with
137
        // the MsgTraceEvent from that server.
138
        Link *MsgTraceEvent `json:"-"`
139
}
140

141
// -------------------------------------------------------------
142

143
func (t MsgTraceBase) typ() MsgTraceType     { return t.Type }
×
144
func (MsgTraceIngress) new() MsgTrace        { return &MsgTraceIngress{} }
×
145
func (MsgTraceSubjectMapping) new() MsgTrace { return &MsgTraceSubjectMapping{} }
×
146
func (MsgTraceStreamExport) new() MsgTrace   { return &MsgTraceStreamExport{} }
×
147
func (MsgTraceServiceImport) new() MsgTrace  { return &MsgTraceServiceImport{} }
×
148
func (MsgTraceJetStream) new() MsgTrace      { return &MsgTraceJetStream{} }
×
149
func (MsgTraceEgress) new() MsgTrace         { return &MsgTraceEgress{} }
×
150

151
var msgTraceInterfaces = map[MsgTraceType]MsgTrace{
152
        MsgTraceIngressType:        MsgTraceIngress{},
153
        MsgTraceSubjectMappingType: MsgTraceSubjectMapping{},
154
        MsgTraceStreamExportType:   MsgTraceStreamExport{},
155
        MsgTraceServiceImportType:  MsgTraceServiceImport{},
156
        MsgTraceJetStreamType:      MsgTraceJetStream{},
157
        MsgTraceEgressType:         MsgTraceEgress{},
158
}
159

160
func (t *MsgTraceEvents) UnmarshalJSON(data []byte) error {
×
161
        var raw []json.RawMessage
×
162
        err := json.Unmarshal(data, &raw)
×
163
        if err != nil {
×
164
                return err
×
165
        }
×
166
        *t = make(MsgTraceEvents, len(raw))
×
167
        var tt MsgTraceBase
×
168
        for i, r := range raw {
×
169
                if err = json.Unmarshal(r, &tt); err != nil {
×
170
                        return err
×
171
                }
×
172
                tr, ok := msgTraceInterfaces[tt.Type]
×
173
                if !ok {
×
174
                        return fmt.Errorf("unknown trace type %v", tt.Type)
×
175
                }
×
176
                te := tr.new()
×
177
                if err := json.Unmarshal(r, te); err != nil {
×
178
                        return err
×
179
                }
×
180
                (*t)[i] = te
×
181
        }
182
        return nil
×
183
}
184

185
func getTraceAs[T MsgTrace](e any) *T {
×
186
        v, ok := e.(*T)
×
187
        if ok {
×
188
                return v
×
189
        }
×
190
        return nil
×
191
}
192

193
func (t *MsgTraceEvent) Ingress() *MsgTraceIngress {
×
194
        if len(t.Events) < 1 {
×
195
                return nil
×
196
        }
×
197
        return getTraceAs[MsgTraceIngress](t.Events[0])
×
198
}
199

200
func (t *MsgTraceEvent) SubjectMapping() *MsgTraceSubjectMapping {
×
201
        for _, e := range t.Events {
×
202
                if e.typ() == MsgTraceSubjectMappingType {
×
203
                        return getTraceAs[MsgTraceSubjectMapping](e)
×
204
                }
×
205
        }
206
        return nil
×
207
}
208

209
func (t *MsgTraceEvent) StreamExports() []*MsgTraceStreamExport {
×
210
        var se []*MsgTraceStreamExport
×
211
        for _, e := range t.Events {
×
212
                if e.typ() == MsgTraceStreamExportType {
×
213
                        se = append(se, getTraceAs[MsgTraceStreamExport](e))
×
214
                }
×
215
        }
216
        return se
×
217
}
218

219
func (t *MsgTraceEvent) ServiceImports() []*MsgTraceServiceImport {
×
220
        var si []*MsgTraceServiceImport
×
221
        for _, e := range t.Events {
×
222
                if e.typ() == MsgTraceServiceImportType {
×
223
                        si = append(si, getTraceAs[MsgTraceServiceImport](e))
×
224
                }
×
225
        }
226
        return si
×
227
}
228

229
func (t *MsgTraceEvent) JetStream() *MsgTraceJetStream {
×
230
        for _, e := range t.Events {
×
231
                if e.typ() == MsgTraceJetStreamType {
×
232
                        return getTraceAs[MsgTraceJetStream](e)
×
233
                }
×
234
        }
235
        return nil
×
236
}
237

238
func (t *MsgTraceEvent) Egresses() []*MsgTraceEgress {
×
239
        var eg []*MsgTraceEgress
×
240
        for _, e := range t.Events {
×
241
                if e.typ() == MsgTraceEgressType {
×
242
                        eg = append(eg, getTraceAs[MsgTraceEgress](e))
×
243
                }
×
244
        }
245
        return eg
×
246
}
247

248
const (
249
        errMsgTraceOnlyNoSupport   = "Not delivered because remote does not support message tracing"
250
        errMsgTraceNoSupport       = "Message delivered but remote does not support message tracing so no trace event generated from there"
251
        errMsgTraceNoEcho          = "Not delivered because of no echo"
252
        errMsgTracePubViolation    = "Not delivered because publish denied for this subject"
253
        errMsgTraceSubDeny         = "Not delivered because subscription denies this subject"
254
        errMsgTraceSubClosed       = "Not delivered because subscription is closed"
255
        errMsgTraceClientClosed    = "Not delivered because client is closed"
256
        errMsgTraceAutoSubExceeded = "Not delivered because auto-unsubscribe exceeded"
257
        errMsgTraceFastProdNoStall = "Not delivered because fast producer not stalled and consumer is slow"
258
)
259

260
type msgTrace struct {
261
        ready int32
262
        srv   *Server
263
        acc   *Account
264
        // Origin account name, set only if acc is nil when acc lookup failed.
265
        oan   string
266
        dest  string
267
        event *MsgTraceEvent
268
        js    *MsgTraceJetStream
269
        hop   string
270
        nhop  string
271
        tonly bool // Will only trace the message, not do delivery.
272
        ct    compressionType
273
}
274

275
// This will be false outside of the tests, so when building the server binary,
276
// any code where you see `if msgTraceRunInTests` statement will be compiled
277
// out, so this will have no performance penalty.
278
var (
279
        msgTraceRunInTests   bool
280
        msgTraceCheckSupport bool
281
)
282

283
// Returns the message trace object, if message is being traced,
284
// and `true` if we want to only trace, not actually deliver the message.
285
func (c *client) isMsgTraceEnabled() (*msgTrace, bool) {
74,076,468✔
286
        t := c.pa.trace
74,076,468✔
287
        if t == nil {
148,152,936✔
288
                return nil, false
74,076,468✔
289
        }
74,076,468✔
290
        return t, t.tonly
×
291
}
292

293
// For LEAF/ROUTER/GATEWAY, return false if the remote does not support
294
// message tracing (important if the tracing requests trace-only).
295
func (c *client) msgTraceSupport() bool {
×
296
        // Exclude client connection from the protocol check.
×
297
        return c.kind == CLIENT || c.opts.Protocol >= MsgTraceProto
×
298
}
×
299

300
func getConnName(c *client) string {
×
301
        switch c.kind {
×
302
        case ROUTER:
×
303
                if n := c.route.remoteName; n != _EMPTY_ {
×
304
                        return n
×
305
                }
×
306
        case GATEWAY:
×
307
                if n := c.gw.remoteName; n != _EMPTY_ {
×
308
                        return n
×
309
                }
×
310
        case LEAF:
×
311
                if n := c.leaf.remoteServer; n != _EMPTY_ {
×
312
                        return n
×
313
                }
×
314
        }
315
        return c.opts.Name
×
316
}
317

318
func getCompressionType(cts string) compressionType {
×
319
        if cts == _EMPTY_ {
×
320
                return noCompression
×
321
        }
×
322
        cts = strings.ToLower(cts)
×
323
        if strings.Contains(cts, "snappy") || strings.Contains(cts, "s2") {
×
324
                return snappyCompression
×
325
        }
×
326
        if strings.Contains(cts, "gzip") {
×
327
                return gzipCompression
×
328
        }
×
329
        return unsupportedCompression
×
330
}
331

332
func (c *client) initMsgTrace() *msgTrace {
1,418,258✔
333
        // The code in the "if" statement is only running in test mode.
1,418,258✔
334
        if msgTraceRunInTests {
2,834,787✔
335
                // Check the type of client that tries to initialize a trace struct.
1,416,529✔
336
                if !(c.kind == CLIENT || c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF) {
1,416,529✔
337
                        panic(fmt.Sprintf("Unexpected client type %q trying to initialize msgTrace", c.kindString()))
×
338
                }
339
                // In some tests, we want to make a server behave like an old server
340
                // and so even if a trace header is received, we want the server to
341
                // simply ignore it.
342
                if msgTraceCheckSupport {
1,416,529✔
343
                        if c.srv == nil || c.srv.getServerProto() < MsgTraceProto {
×
344
                                return nil
×
345
                        }
×
346
                }
347
        }
348
        if c.pa.hdr <= 0 {
1,418,258✔
349
                return nil
×
350
        }
×
351
        hdr := c.msgBuf[:c.pa.hdr]
1,418,258✔
352
        headers, external := genHeaderMapIfTraceHeadersPresent(hdr)
1,418,258✔
353
        if len(headers) == 0 {
2,836,507✔
354
                return nil
1,418,249✔
355
        }
1,418,249✔
356
        // Little helper to give us the first value of a given header, or _EMPTY_
357
        // if key is not present.
358
        getHdrVal := func(key string) string {
9✔
359
                vv, ok := headers[key]
×
360
                if !ok {
×
361
                        return _EMPTY_
×
362
                }
×
363
                return vv[0]
×
364
        }
365
        var (
9✔
366
                dest      string
9✔
367
                traceOnly bool
9✔
368
        )
9✔
369
        // Check for traceOnly only if not external.
9✔
370
        if !external {
9✔
371
                if to := getHdrVal(MsgTraceOnly); to != _EMPTY_ {
×
372
                        tos := strings.ToLower(to)
×
373
                        switch tos {
×
374
                        case "1", "true", "on":
×
375
                                traceOnly = true
×
376
                        }
377
                }
378
                dest = getHdrVal(MsgTraceDest)
×
379
                if c.kind == CLIENT {
×
380
                        if td, ok := c.allowedMsgTraceDest(hdr, false); !ok {
×
381
                                return nil
×
382
                        } else if td != _EMPTY_ {
×
383
                                dest = td
×
384
                        }
×
385
                }
386
                // Check the destination to see if this is a valid public subject.
387
                if !IsValidPublishSubject(dest) {
×
388
                        // We still have to return a msgTrace object (if traceOnly is set)
×
389
                        // because if we don't, the message will end-up being delivered to
×
390
                        // applications, which may break them. We report the error in any case.
×
391
                        c.Errorf("Destination %q is not valid, won't be able to trace events", dest)
×
392
                        if !traceOnly {
×
393
                                // We can bail, tracing will be disabled for this message.
×
394
                                return nil
×
395
                        }
×
396
                }
397
        }
398
        var (
9✔
399
                // Account to use when sending the trace event
9✔
400
                acc *Account
9✔
401
                // Ingress' account name
9✔
402
                ian string
9✔
403
                // Origin account name
9✔
404
                oan string
9✔
405
                // The hop "id", taken from headers only when not from CLIENT
9✔
406
                hop string
9✔
407
        )
9✔
408
        if c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF {
9✔
409
                // The ingress account name will always be c.pa.account, but `acc` may
×
410
                // be different if we have an origin account header.
×
411
                if c.kind == LEAF {
×
412
                        ian = c.acc.GetName()
×
413
                } else {
×
414
                        ian = string(c.pa.account)
×
415
                }
×
416
                // The remote will have set the origin account header only if the
417
                // message changed account (think of service imports).
418
                oan = getHdrVal(MsgTraceOriginAccount)
×
419
                if oan == _EMPTY_ {
×
420
                        // For LEAF or ROUTER with pinned-account, we can use the c.acc.
×
421
                        if c.kind == LEAF || (c.kind == ROUTER && len(c.route.accName) > 0) {
×
422
                                acc = c.acc
×
423
                        } else {
×
424
                                // We will lookup account with c.pa.account (or ian).
×
425
                                oan = ian
×
426
                        }
×
427
                }
428
                // Unless we already got the account, we need to look it up.
429
                if acc == nil {
×
430
                        // We don't want to do account resolving here.
×
431
                        if acci, ok := c.srv.accounts.Load(oan); ok {
×
432
                                acc = acci.(*Account)
×
433
                                // Since we have looked-up the account, we don't need oan, so
×
434
                                // clear it in case it was set.
×
435
                                oan = _EMPTY_
×
436
                        } else {
×
437
                                // We still have to return a msgTrace object (if traceOnly is set)
×
438
                                // because if we don't, the message will end-up being delivered to
×
439
                                // applications, which may break them. We report the error in any case.
×
440
                                c.Errorf("Account %q was not found, won't be able to trace events", oan)
×
441
                                if !traceOnly {
×
442
                                        // We can bail, tracing will be disabled for this message.
×
443
                                        return nil
×
444
                                }
×
445
                        }
446
                }
447
                // Check the hop header
448
                hop = getHdrVal(MsgTraceHop)
×
449
        } else {
9✔
450
                acc = c.acc
9✔
451
                ian = acc.GetName()
9✔
452
        }
9✔
453
        // If external, we need to have the account's trace destination set,
454
        // otherwise, we are not enabling tracing.
455
        if external {
18✔
456
                var sampling int
9✔
457
                if acc != nil {
18✔
458
                        dest, sampling = acc.getTraceDestAndSampling()
9✔
459
                }
9✔
460
                if dest == _EMPTY_ {
18✔
461
                        // No account destination, no tracing for external trace headers.
9✔
462
                        return nil
9✔
463
                }
9✔
464
                // Check sampling, but only from origin server.
465
                if c.kind == CLIENT && !sample(sampling) {
×
466
                        // Need to disable tracing so that if the message is routed, it won't
×
467
                        // trigger a trace there.
×
468
                        c.msgBuf = c.setHeader(MsgTraceDest, MsgTraceDestDisabled, c.msgBuf)
×
469
                        return nil
×
470
                }
×
471
        }
472
        c.pa.trace = &msgTrace{
×
473
                srv:  c.srv,
×
474
                acc:  acc,
×
475
                oan:  oan,
×
476
                dest: dest,
×
477
                ct:   getCompressionType(getHdrVal(acceptEncodingHeader)),
×
478
                hop:  hop,
×
479
                event: &MsgTraceEvent{
×
480
                        Request: MsgTraceRequest{
×
481
                                Header:  headers,
×
482
                                MsgSize: c.pa.size,
×
483
                        },
×
484
                        Events: append(MsgTraceEvents(nil), &MsgTraceIngress{
×
485
                                MsgTraceBase: MsgTraceBase{
×
486
                                        Type:      MsgTraceIngressType,
×
487
                                        Timestamp: time.Now(),
×
488
                                },
×
489
                                Kind:    c.kind,
×
490
                                CID:     c.cid,
×
491
                                Name:    getConnName(c),
×
492
                                Account: ian,
×
493
                                Subject: string(c.pa.subject),
×
494
                        }),
×
495
                },
×
496
                tonly: traceOnly,
×
497
        }
×
498
        return c.pa.trace
×
499
}
500

501
func sample(sampling int) bool {
×
502
        // Option parsing should ensure that sampling is [1..100], but consider
×
503
        // any value outside of this range to be 100%.
×
504
        if sampling <= 0 || sampling >= 100 {
×
505
                return true
×
506
        }
×
507
        return rand.Int31n(100) <= int32(sampling)
×
508
}
509

510
// This function will return the header as a map (instead of http.Header because
511
// we want to preserve the header names' case) and a boolean that indicates if
512
// the headers have been lifted due to the presence of the external trace header
513
// only.
514
// Note that because of the traceParentHdr, the search is done in a case
515
// insensitive way. We used to rewrite it in lower case but no longer do since v2.14.
516
func genHeaderMapIfTraceHeadersPresent(hdr []byte) (map[string][]string, bool) {
1,418,258✔
517

1,418,258✔
518
        var (
1,418,258✔
519
                _keys               = [64][]byte{}
1,418,258✔
520
                _vals               = [64][]byte{}
1,418,258✔
521
                m                   map[string][]string
1,418,258✔
522
                traceDestHdrFound   bool
1,418,258✔
523
                traceParentHdrFound bool
1,418,258✔
524
        )
1,418,258✔
525
        // Skip the hdrLine
1,418,258✔
526
        if !bytes.HasPrefix(hdr, stringToBytes(hdrLine)) {
1,429,189✔
527
                return nil, false
10,931✔
528
        }
10,931✔
529

530
        keys := _keys[:0]
1,407,327✔
531
        vals := _vals[:0]
1,407,327✔
532

1,407,327✔
533
        for i := len(hdrLine); i < len(hdr); {
4,360,767✔
534
                // Search for key/val delimiter
2,953,440✔
535
                del := bytes.IndexByte(hdr[i:], ':')
2,953,440✔
536
                if del < 0 {
4,360,767✔
537
                        break
1,407,327✔
538
                }
539
                keyStart := i
1,546,113✔
540
                key := hdr[keyStart : keyStart+del]
1,546,113✔
541
                i += del + 1
1,546,113✔
542
                for i < len(hdr) && (hdr[i] == ' ' || hdr[i] == '\t') {
3,092,000✔
543
                        i++
1,545,887✔
544
                }
1,545,887✔
545
                valStart := i
1,546,113✔
546
                nl := bytes.Index(hdr[valStart:], crLFAsBytes)
1,546,113✔
547
                if nl < 0 {
1,546,113✔
548
                        break
×
549
                }
550
                valEnd := valStart + nl
1,546,113✔
551
                for valEnd > valStart && (hdr[valEnd-1] == ' ' || hdr[valEnd-1] == '\t') {
1,546,113✔
552
                        valEnd--
×
553
                }
×
554
                val := hdr[valStart:valEnd]
1,546,113✔
555
                if len(key) > 0 && len(val) > 0 {
3,092,219✔
556
                        vals = append(vals, val)
1,546,106✔
557

1,546,106✔
558
                        // We search for our special keys only if not already found.
1,546,106✔
559

1,546,106✔
560
                        // Check for the external trace header.
1,546,106✔
561
                        // Search needs to be case insensitive.
1,546,106✔
562
                        if !traceParentHdrFound && bytes.EqualFold(key, traceParentHdrAsBytes) {
1,546,115✔
563
                                // We will now check if the value has sampling or not.
9✔
564
                                // TODO(ik): Not sure if this header can have multiple values
9✔
565
                                // or not, and if so, what would be the rule to check for
9✔
566
                                // sampling. What is done here is to check them all until we
9✔
567
                                // found one with sampling.
9✔
568
                                tk := bytes.Split(val, dashAsBytes)
9✔
569
                                if len(tk) == 4 && len([]byte(tk[3])) == 2 {
18✔
570
                                        if hexVal, err := strconv.ParseInt(bytesToString(tk[3]), 16, 8); err == nil {
18✔
571
                                                if hexVal&0x1 == 0x1 {
18✔
572
                                                        traceParentHdrFound = true
9✔
573
                                                }
9✔
574
                                        }
575
                                }
576
                        } else if !traceDestHdrFound && bytes.Equal(key, traceDestHdrAsBytes) {
1,546,097✔
577
                                // This is the Nats-Trace-Dest header, check the value to see
×
578
                                // if it indicates that the trace was disabled.
×
579
                                if bytes.Equal(val, traceDestDisabledAsBytes) {
×
580
                                        return nil, false
×
581
                                }
×
582
                                traceDestHdrFound = true
×
583
                        }
584
                        // Add to the keys and preserve the key's case
585
                        keys = append(keys, key)
1,546,106✔
586
                }
587
                i += nl + 2
1,546,113✔
588
        }
589
        if !traceDestHdrFound && !traceParentHdrFound {
2,814,645✔
590
                return nil, false
1,407,318✔
591
        }
1,407,318✔
592
        m = make(map[string][]string, len(keys))
9✔
593
        for i, k := range keys {
30✔
594
                hname := string(k)
21✔
595
                m[hname] = append(m[hname], string(vals[i]))
21✔
596
        }
21✔
597
        return m, !traceDestHdrFound && traceParentHdrFound
9✔
598
}
599

600
// Special case where we create a trace event before parsing the message.
601
// This is for cases where the connection will be closed when detecting
602
// an error during early message processing (for instance max payload).
603
func (c *client) initAndSendIngressErrEvent(hdr []byte, dest string, ingressError error) {
×
604
        if ingressError == nil {
×
605
                return
×
606
        }
×
607
        ct := getAcceptEncoding(hdr)
×
608
        t := &msgTrace{
×
609
                srv:  c.srv,
×
610
                acc:  c.acc,
×
611
                dest: dest,
×
612
                ct:   ct,
×
613
                event: &MsgTraceEvent{
×
614
                        Request: MsgTraceRequest{MsgSize: c.pa.size},
×
615
                        Events: append(MsgTraceEvents(nil), &MsgTraceIngress{
×
616
                                MsgTraceBase: MsgTraceBase{
×
617
                                        Type:      MsgTraceIngressType,
×
618
                                        Timestamp: time.Now(),
×
619
                                },
×
620
                                Kind:  c.kind,
×
621
                                CID:   c.cid,
×
622
                                Name:  getConnName(c),
×
623
                                Error: ingressError.Error(),
×
624
                        }),
×
625
                },
×
626
        }
×
627
        t.sendEvent()
×
628
}
629

630
// Returns `true` if message tracing is enabled and we are tracing only,
631
// that is, we are not going to deliver the inbound message, returns
632
// `false` otherwise (no tracing, or tracing and message delivery).
633
func (t *msgTrace) traceOnly() bool {
3,514,800✔
634
        return t != nil && t.tonly
3,514,800✔
635
}
3,514,800✔
636

637
func (t *msgTrace) setOriginAccountHeaderIfNeeded(c *client, acc *Account, msg []byte) []byte {
×
638
        var oan string
×
639
        // If t.acc is set, only check that, not t.oan.
×
640
        if t.acc != nil {
×
641
                if t.acc != acc {
×
642
                        oan = t.acc.GetName()
×
643
                }
×
644
        } else if t.oan != acc.GetName() {
×
645
                oan = t.oan
×
646
        }
×
647
        if oan != _EMPTY_ {
×
648
                msg = c.setHeader(MsgTraceOriginAccount, oan, msg)
×
649
        }
×
650
        return msg
×
651
}
652

653
func (t *msgTrace) setHopHeader(c *client, msg []byte) []byte {
×
654
        e := t.event
×
655
        e.Hops++
×
656
        if len(t.hop) > 0 {
×
657
                t.nhop = fmt.Sprintf("%s.%d", t.hop, e.Hops)
×
658
        } else {
×
659
                t.nhop = fmt.Sprintf("%d", e.Hops)
×
660
        }
×
661
        return c.setHeader(MsgTraceHop, t.nhop, msg)
×
662
}
663

664
func (t *msgTrace) setIngressError(err string) {
×
665
        if i := t.event.Ingress(); i != nil {
×
666
                i.Error = err
×
667
        }
×
668
}
669

670
func (t *msgTrace) addSubjectMappingEvent(subj []byte) {
12,578✔
671
        if t == nil {
25,156✔
672
                return
12,578✔
673
        }
12,578✔
674
        t.event.Events = append(t.event.Events, &MsgTraceSubjectMapping{
×
675
                MsgTraceBase: MsgTraceBase{
×
676
                        Type:      MsgTraceSubjectMappingType,
×
677
                        Timestamp: time.Now(),
×
678
                },
×
679
                MappedTo: string(subj),
×
680
        })
×
681
}
682

683
func (t *msgTrace) addEgressEvent(dc *client, sub *subscription, err string) {
3,242,426✔
684
        if t == nil {
6,484,852✔
685
                return
3,242,426✔
686
        }
3,242,426✔
687
        e := &MsgTraceEgress{
×
688
                MsgTraceBase: MsgTraceBase{
×
689
                        Type:      MsgTraceEgressType,
×
690
                        Timestamp: time.Now(),
×
691
                },
×
692
                Kind:  dc.kind,
×
693
                CID:   dc.cid,
×
694
                Name:  getConnName(dc),
×
695
                Hop:   t.nhop,
×
696
                Error: err,
×
697
        }
×
698
        t.nhop = _EMPTY_
×
699
        // Specific to CLIENT connections...
×
700
        if dc.kind == CLIENT {
×
701
                // Set the subscription's subject and possibly queue name.
×
702
                e.Subscription = string(sub.subject)
×
703
                if len(sub.queue) > 0 {
×
704
                        e.Queue = string(sub.queue)
×
705
                }
×
706
        }
707
        if dc.kind == CLIENT || dc.kind == LEAF {
×
708
                if i := t.event.Ingress(); i != nil {
×
709
                        // If the Ingress' account is different from the destination's
×
710
                        // account, add the account name into the Egress trace event.
×
711
                        // This would happen with service imports.
×
712
                        if dcAccName := dc.acc.GetName(); dcAccName != i.Account {
×
713
                                e.Account = dcAccName
×
714
                        }
×
715
                }
716
        }
717
        t.event.Events = append(t.event.Events, e)
×
718
}
719

720
func (t *msgTrace) addStreamExportEvent(dc *client, to []byte) {
×
721
        if t == nil {
×
722
                return
×
723
        }
×
724
        dc.mu.Lock()
×
725
        accName := dc.acc.GetName()
×
726
        dc.mu.Unlock()
×
727
        t.event.Events = append(t.event.Events, &MsgTraceStreamExport{
×
728
                MsgTraceBase: MsgTraceBase{
×
729
                        Type:      MsgTraceStreamExportType,
×
730
                        Timestamp: time.Now(),
×
731
                },
×
732
                Account: accName,
×
733
                To:      string(to),
×
734
        })
×
735
}
736

737
func (t *msgTrace) addServiceImportEvent(accName, from, to string) {
×
738
        if t == nil {
×
739
                return
×
740
        }
×
741
        t.event.Events = append(t.event.Events, &MsgTraceServiceImport{
×
742
                MsgTraceBase: MsgTraceBase{
×
743
                        Type:      MsgTraceServiceImportType,
×
744
                        Timestamp: time.Now(),
×
745
                },
×
746
                Account: accName,
×
747
                From:    from,
×
748
                To:      to,
×
749
        })
×
750
}
751

752
func (t *msgTrace) addJetStreamEvent(streamName string) {
×
753
        if t == nil {
×
754
                return
×
755
        }
×
756
        t.js = &MsgTraceJetStream{
×
757
                MsgTraceBase: MsgTraceBase{
×
758
                        Type:      MsgTraceJetStreamType,
×
759
                        Timestamp: time.Now(),
×
760
                },
×
761
                Stream: streamName,
×
762
        }
×
763
        t.event.Events = append(t.event.Events, t.js)
×
764
}
765

766
func (t *msgTrace) updateJetStreamEvent(subject string, noInterest bool) {
2,586,271✔
767
        if t == nil {
5,172,542✔
768
                return
2,586,271✔
769
        }
2,586,271✔
770
        // JetStream event should have been created in addJetStreamEvent
771
        if t.js == nil {
×
772
                return
×
773
        }
×
774
        t.js.Subject = subject
×
775
        t.js.NoInterest = noInterest
×
776
        // Update the timestamp since this is more accurate than when it
×
777
        // was first added in addJetStreamEvent().
×
778
        t.js.Timestamp = time.Now()
×
779
}
780

781
func (t *msgTrace) sendEventFromJetStream(err error) {
×
782
        if t == nil {
×
783
                return
×
784
        }
×
785
        // JetStream event should have been created in addJetStreamEvent
786
        if t.js == nil {
×
787
                return
×
788
        }
×
789
        if err != nil {
×
790
                t.js.Error = err.Error()
×
791
        }
×
792
        t.sendEvent()
×
793
}
794

795
func (t *msgTrace) sendEvent() {
25,567,287✔
796
        if t == nil {
51,134,574✔
797
                return
25,567,287✔
798
        }
25,567,287✔
799
        if t.js != nil {
×
800
                ready := atomic.AddInt32(&t.ready, 1) == 2
×
801
                if !ready {
×
802
                        return
×
803
                }
×
804
        }
805
        t.srv.sendInternalAccountSysMsg(t.acc, t.dest, &t.event.Server, t.event, t.ct)
×
806
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc