• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 19658562439

24 Nov 2025 12:55PM UTC coverage: 69.106% (-17.0%) from 86.129%
19658562439

push

github

web-flow
NRG: Don't reset WAL when failing to load last snapshot (#7580)

In most cases we can either install a new snapshot before shutting down,
or if not, we can better detect the situation on the next startup.

ref: #7556

Signed-off-by: Neil Twigg <neil@nats.io>

60221 of 87143 relevant lines covered (69.11%)

271294.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.87
/server/msgtrace.go
1
// Copyright 2024-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "encoding/json"
19
        "fmt"
20
        "math/rand"
21
        "strconv"
22
        "strings"
23
        "sync/atomic"
24
        "time"
25
)
26

27
const (
28
        MsgTraceDest          = "Nats-Trace-Dest"
29
        MsgTraceHop           = "Nats-Trace-Hop"
30
        MsgTraceOriginAccount = "Nats-Trace-Origin-Account"
31
        MsgTraceOnly          = "Nats-Trace-Only"
32

33
        // External trace header. Note that this header is normally in lower
34
        // case (https://www.w3.org/TR/trace-context/#header-name). Vendors
35
        // MUST expect the header in any case (upper, lower, mixed), and
36
        // SHOULD send the header name in lowercase.
37
        traceParentHdr = "traceparent"
38
)
39

40
type MsgTraceType string
41

42
// Type of message trace events in the MsgTraceEvents list.
43
// This is needed to unmarshal the list.
44
const (
45
        MsgTraceIngressType        = "in"
46
        MsgTraceSubjectMappingType = "sm"
47
        MsgTraceStreamExportType   = "se"
48
        MsgTraceServiceImportType  = "si"
49
        MsgTraceJetStreamType      = "js"
50
        MsgTraceEgressType         = "eg"
51
)
52

53
type MsgTraceEvent struct {
54
        Server  ServerInfo      `json:"server"`
55
        Request MsgTraceRequest `json:"request"`
56
        Hops    int             `json:"hops,omitempty"`
57
        Events  MsgTraceEvents  `json:"events"`
58
}
59

60
type MsgTraceRequest struct {
61
        // We are not making this an http.Header so that header name case is preserved.
62
        Header  map[string][]string `json:"header,omitempty"`
63
        MsgSize int                 `json:"msgsize,omitempty"`
64
}
65

66
type MsgTraceEvents []MsgTrace
67

68
type MsgTrace interface {
69
        new() MsgTrace
70
        typ() MsgTraceType
71
}
72

73
type MsgTraceBase struct {
74
        Type      MsgTraceType `json:"type"`
75
        Timestamp time.Time    `json:"ts"`
76
}
77

78
type MsgTraceIngress struct {
79
        MsgTraceBase
80
        Kind    int    `json:"kind"`
81
        CID     uint64 `json:"cid"`
82
        Name    string `json:"name,omitempty"`
83
        Account string `json:"acc"`
84
        Subject string `json:"subj"`
85
        Error   string `json:"error,omitempty"`
86
}
87

88
type MsgTraceSubjectMapping struct {
89
        MsgTraceBase
90
        MappedTo string `json:"to"`
91
}
92

93
type MsgTraceStreamExport struct {
94
        MsgTraceBase
95
        Account string `json:"acc"`
96
        To      string `json:"to"`
97
}
98

99
type MsgTraceServiceImport struct {
100
        MsgTraceBase
101
        Account string `json:"acc"`
102
        From    string `json:"from"`
103
        To      string `json:"to"`
104
}
105

106
type MsgTraceJetStream struct {
107
        MsgTraceBase
108
        Stream     string `json:"stream"`
109
        Subject    string `json:"subject,omitempty"`
110
        NoInterest bool   `json:"nointerest,omitempty"`
111
        Error      string `json:"error,omitempty"`
112
}
113

114
type MsgTraceEgress struct {
115
        MsgTraceBase
116
        Kind         int    `json:"kind"`
117
        CID          uint64 `json:"cid"`
118
        Name         string `json:"name,omitempty"`
119
        Hop          string `json:"hop,omitempty"`
120
        Account      string `json:"acc,omitempty"`
121
        Subscription string `json:"sub,omitempty"`
122
        Queue        string `json:"queue,omitempty"`
123
        Error        string `json:"error,omitempty"`
124

125
        // This is for applications that unmarshal the trace events
126
        // and want to link an egress to route/leaf/gateway with
127
        // the MsgTraceEvent from that server.
128
        Link *MsgTraceEvent `json:"-"`
129
}
130

131
// -------------------------------------------------------------
132

133
func (t MsgTraceBase) typ() MsgTraceType     { return t.Type }
×
134
func (MsgTraceIngress) new() MsgTrace        { return &MsgTraceIngress{} }
×
135
func (MsgTraceSubjectMapping) new() MsgTrace { return &MsgTraceSubjectMapping{} }
×
136
func (MsgTraceStreamExport) new() MsgTrace   { return &MsgTraceStreamExport{} }
×
137
func (MsgTraceServiceImport) new() MsgTrace  { return &MsgTraceServiceImport{} }
×
138
func (MsgTraceJetStream) new() MsgTrace      { return &MsgTraceJetStream{} }
×
139
func (MsgTraceEgress) new() MsgTrace         { return &MsgTraceEgress{} }
×
140

141
var msgTraceInterfaces = map[MsgTraceType]MsgTrace{
142
        MsgTraceIngressType:        MsgTraceIngress{},
143
        MsgTraceSubjectMappingType: MsgTraceSubjectMapping{},
144
        MsgTraceStreamExportType:   MsgTraceStreamExport{},
145
        MsgTraceServiceImportType:  MsgTraceServiceImport{},
146
        MsgTraceJetStreamType:      MsgTraceJetStream{},
147
        MsgTraceEgressType:         MsgTraceEgress{},
148
}
149

150
func (t *MsgTraceEvents) UnmarshalJSON(data []byte) error {
×
151
        var raw []json.RawMessage
×
152
        err := json.Unmarshal(data, &raw)
×
153
        if err != nil {
×
154
                return err
×
155
        }
×
156
        *t = make(MsgTraceEvents, len(raw))
×
157
        var tt MsgTraceBase
×
158
        for i, r := range raw {
×
159
                if err = json.Unmarshal(r, &tt); err != nil {
×
160
                        return err
×
161
                }
×
162
                tr, ok := msgTraceInterfaces[tt.Type]
×
163
                if !ok {
×
164
                        return fmt.Errorf("unknown trace type %v", tt.Type)
×
165
                }
×
166
                te := tr.new()
×
167
                if err := json.Unmarshal(r, te); err != nil {
×
168
                        return err
×
169
                }
×
170
                (*t)[i] = te
×
171
        }
172
        return nil
×
173
}
174

175
func getTraceAs[T MsgTrace](e any) *T {
×
176
        v, ok := e.(*T)
×
177
        if ok {
×
178
                return v
×
179
        }
×
180
        return nil
×
181
}
182

183
func (t *MsgTraceEvent) Ingress() *MsgTraceIngress {
×
184
        if len(t.Events) < 1 {
×
185
                return nil
×
186
        }
×
187
        return getTraceAs[MsgTraceIngress](t.Events[0])
×
188
}
189

190
func (t *MsgTraceEvent) SubjectMapping() *MsgTraceSubjectMapping {
×
191
        for _, e := range t.Events {
×
192
                if e.typ() == MsgTraceSubjectMappingType {
×
193
                        return getTraceAs[MsgTraceSubjectMapping](e)
×
194
                }
×
195
        }
196
        return nil
×
197
}
198

199
func (t *MsgTraceEvent) StreamExports() []*MsgTraceStreamExport {
×
200
        var se []*MsgTraceStreamExport
×
201
        for _, e := range t.Events {
×
202
                if e.typ() == MsgTraceStreamExportType {
×
203
                        se = append(se, getTraceAs[MsgTraceStreamExport](e))
×
204
                }
×
205
        }
206
        return se
×
207
}
208

209
func (t *MsgTraceEvent) ServiceImports() []*MsgTraceServiceImport {
×
210
        var si []*MsgTraceServiceImport
×
211
        for _, e := range t.Events {
×
212
                if e.typ() == MsgTraceServiceImportType {
×
213
                        si = append(si, getTraceAs[MsgTraceServiceImport](e))
×
214
                }
×
215
        }
216
        return si
×
217
}
218

219
func (t *MsgTraceEvent) JetStream() *MsgTraceJetStream {
×
220
        for _, e := range t.Events {
×
221
                if e.typ() == MsgTraceJetStreamType {
×
222
                        return getTraceAs[MsgTraceJetStream](e)
×
223
                }
×
224
        }
225
        return nil
×
226
}
227

228
func (t *MsgTraceEvent) Egresses() []*MsgTraceEgress {
×
229
        var eg []*MsgTraceEgress
×
230
        for _, e := range t.Events {
×
231
                if e.typ() == MsgTraceEgressType {
×
232
                        eg = append(eg, getTraceAs[MsgTraceEgress](e))
×
233
                }
×
234
        }
235
        return eg
×
236
}
237

238
const (
239
        errMsgTraceOnlyNoSupport   = "Not delivered because remote does not support message tracing"
240
        errMsgTraceNoSupport       = "Message delivered but remote does not support message tracing so no trace event generated from there"
241
        errMsgTraceNoEcho          = "Not delivered because of no echo"
242
        errMsgTracePubViolation    = "Not delivered because publish denied for this subject"
243
        errMsgTraceSubDeny         = "Not delivered because subscription denies this subject"
244
        errMsgTraceSubClosed       = "Not delivered because subscription is closed"
245
        errMsgTraceClientClosed    = "Not delivered because client is closed"
246
        errMsgTraceAutoSubExceeded = "Not delivered because auto-unsubscribe exceeded"
247
        errMsgTraceFastProdNoStall = "Not delivered because fast producer not stalled and consumer is slow"
248
)
249

250
type msgTrace struct {
251
        ready int32
252
        srv   *Server
253
        acc   *Account
254
        // Origin account name, set only if acc is nil when acc lookup failed.
255
        oan   string
256
        dest  string
257
        event *MsgTraceEvent
258
        js    *MsgTraceJetStream
259
        hop   string
260
        nhop  string
261
        tonly bool // Will only trace the message, not do delivery.
262
        ct    compressionType
263
}
264

265
// This will be false outside of the tests, so when building the server binary,
266
// any code where you see `if msgTraceRunInTests` statement will be compiled
267
// out, so this will have no performance penalty.
268
var (
269
        msgTraceRunInTests   bool
270
        msgTraceCheckSupport bool
271
)
272

273
// Returns the message trace object, if message is being traced,
274
// and `true` if we want to only trace, not actually deliver the message.
275
func (c *client) isMsgTraceEnabled() (*msgTrace, bool) {
32,788,325✔
276
        t := c.pa.trace
32,788,325✔
277
        if t == nil {
65,576,650✔
278
                return nil, false
32,788,325✔
279
        }
32,788,325✔
280
        return t, t.tonly
×
281
}
282

283
// For LEAF/ROUTER/GATEWAY, return false if the remote does not support
284
// message tracing (important if the tracing requests trace-only).
285
func (c *client) msgTraceSupport() bool {
×
286
        // Exclude client connection from the protocol check.
×
287
        return c.kind == CLIENT || c.opts.Protocol >= MsgTraceProto
×
288
}
×
289

290
func getConnName(c *client) string {
×
291
        switch c.kind {
×
292
        case ROUTER:
×
293
                if n := c.route.remoteName; n != _EMPTY_ {
×
294
                        return n
×
295
                }
×
296
        case GATEWAY:
×
297
                if n := c.gw.remoteName; n != _EMPTY_ {
×
298
                        return n
×
299
                }
×
300
        case LEAF:
×
301
                if n := c.leaf.remoteServer; n != _EMPTY_ {
×
302
                        return n
×
303
                }
×
304
        }
305
        return c.opts.Name
×
306
}
307

308
func getCompressionType(cts string) compressionType {
9✔
309
        if cts == _EMPTY_ {
18✔
310
                return noCompression
9✔
311
        }
9✔
312
        cts = strings.ToLower(cts)
×
313
        if strings.Contains(cts, "snappy") || strings.Contains(cts, "s2") {
×
314
                return snappyCompression
×
315
        }
×
316
        if strings.Contains(cts, "gzip") {
×
317
                return gzipCompression
×
318
        }
×
319
        return unsupportedCompression
×
320
}
321

322
func (c *client) initMsgTrace() *msgTrace {
660,926✔
323
        // The code in the "if" statement is only running in test mode.
660,926✔
324
        if msgTraceRunInTests {
1,320,266✔
325
                // Check the type of client that tries to initialize a trace struct.
659,340✔
326
                if !(c.kind == CLIENT || c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF) {
659,340✔
327
                        panic(fmt.Sprintf("Unexpected client type %q trying to initialize msgTrace", c.kindString()))
×
328
                }
329
                // In some tests, we want to make a server behave like an old server
330
                // and so even if a trace header is received, we want the server to
331
                // simply ignore it.
332
                if msgTraceCheckSupport {
659,340✔
333
                        if c.srv == nil || c.srv.getServerProto() < MsgTraceProto {
×
334
                                return nil
×
335
                        }
×
336
                }
337
        }
338
        if c.pa.hdr <= 0 {
660,926✔
339
                return nil
×
340
        }
×
341
        hdr := c.msgBuf[:c.pa.hdr]
660,926✔
342
        headers, external := genHeaderMapIfTraceHeadersPresent(hdr)
660,926✔
343
        if len(headers) == 0 {
1,321,843✔
344
                return nil
660,917✔
345
        }
660,917✔
346
        // Little helper to give us the first value of a given header, or _EMPTY_
347
        // if key is not present.
348
        getHdrVal := func(key string) string {
18✔
349
                vv, ok := headers[key]
9✔
350
                if !ok {
18✔
351
                        return _EMPTY_
9✔
352
                }
9✔
353
                return vv[0]
×
354
        }
355
        ct := getCompressionType(getHdrVal(acceptEncodingHeader))
9✔
356
        var (
9✔
357
                dest      string
9✔
358
                traceOnly bool
9✔
359
        )
9✔
360
        // Check for traceOnly only if not external.
9✔
361
        if !external {
9✔
362
                if to := getHdrVal(MsgTraceOnly); to != _EMPTY_ {
×
363
                        tos := strings.ToLower(to)
×
364
                        switch tos {
×
365
                        case "1", "true", "on":
×
366
                                traceOnly = true
×
367
                        }
368
                }
369
                dest = getHdrVal(MsgTraceDest)
×
370
                // Check the destination to see if this is a valid public subject.
×
371
                if !IsValidPublishSubject(dest) {
×
372
                        // We still have to return a msgTrace object (if traceOnly is set)
×
373
                        // because if we don't, the message will end-up being delivered to
×
374
                        // applications, which may break them. We report the error in any case.
×
375
                        c.Errorf("Destination %q is not valid, won't be able to trace events", dest)
×
376
                        if !traceOnly {
×
377
                                // We can bail, tracing will be disabled for this message.
×
378
                                return nil
×
379
                        }
×
380
                }
381
        }
382
        var (
9✔
383
                // Account to use when sending the trace event
9✔
384
                acc *Account
9✔
385
                // Ingress' account name
9✔
386
                ian string
9✔
387
                // Origin account name
9✔
388
                oan string
9✔
389
                // The hop "id", taken from headers only when not from CLIENT
9✔
390
                hop string
9✔
391
        )
9✔
392
        if c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF {
9✔
393
                // The ingress account name will always be c.pa.account, but `acc` may
×
394
                // be different if we have an origin account header.
×
395
                if c.kind == LEAF {
×
396
                        ian = c.acc.GetName()
×
397
                } else {
×
398
                        ian = string(c.pa.account)
×
399
                }
×
400
                // The remote will have set the origin account header only if the
401
                // message changed account (think of service imports).
402
                oan = getHdrVal(MsgTraceOriginAccount)
×
403
                if oan == _EMPTY_ {
×
404
                        // For LEAF or ROUTER with pinned-account, we can use the c.acc.
×
405
                        if c.kind == LEAF || (c.kind == ROUTER && len(c.route.accName) > 0) {
×
406
                                acc = c.acc
×
407
                        } else {
×
408
                                // We will lookup account with c.pa.account (or ian).
×
409
                                oan = ian
×
410
                        }
×
411
                }
412
                // Unless we already got the account, we need to look it up.
413
                if acc == nil {
×
414
                        // We don't want to do account resolving here.
×
415
                        if acci, ok := c.srv.accounts.Load(oan); ok {
×
416
                                acc = acci.(*Account)
×
417
                                // Since we have looked-up the account, we don't need oan, so
×
418
                                // clear it in case it was set.
×
419
                                oan = _EMPTY_
×
420
                        } else {
×
421
                                // We still have to return a msgTrace object (if traceOnly is set)
×
422
                                // because if we don't, the message will end-up being delivered to
×
423
                                // applications, which may break them. We report the error in any case.
×
424
                                c.Errorf("Account %q was not found, won't be able to trace events", oan)
×
425
                                if !traceOnly {
×
426
                                        // We can bail, tracing will be disabled for this message.
×
427
                                        return nil
×
428
                                }
×
429
                        }
430
                }
431
                // Check the hop header
432
                hop = getHdrVal(MsgTraceHop)
×
433
        } else {
9✔
434
                acc = c.acc
9✔
435
                ian = acc.GetName()
9✔
436
        }
9✔
437
        // If external, we need to have the account's trace destination set,
438
        // otherwise, we are not enabling tracing.
439
        if external {
18✔
440
                var sampling int
9✔
441
                if acc != nil {
18✔
442
                        dest, sampling = acc.getTraceDestAndSampling()
9✔
443
                }
9✔
444
                if dest == _EMPTY_ {
18✔
445
                        // No account destination, no tracing for external trace headers.
9✔
446
                        return nil
9✔
447
                }
9✔
448
                // Check sampling, but only from origin server.
449
                if c.kind == CLIENT && !sample(sampling) {
×
450
                        // Need to desactivate the traceParentHdr so that if the message
×
451
                        // is routed, it does possibly trigger a trace there.
×
452
                        disableTraceHeaders(c, hdr)
×
453
                        return nil
×
454
                }
×
455
        }
456
        c.pa.trace = &msgTrace{
×
457
                srv:  c.srv,
×
458
                acc:  acc,
×
459
                oan:  oan,
×
460
                dest: dest,
×
461
                ct:   ct,
×
462
                hop:  hop,
×
463
                event: &MsgTraceEvent{
×
464
                        Request: MsgTraceRequest{
×
465
                                Header:  headers,
×
466
                                MsgSize: c.pa.size,
×
467
                        },
×
468
                        Events: append(MsgTraceEvents(nil), &MsgTraceIngress{
×
469
                                MsgTraceBase: MsgTraceBase{
×
470
                                        Type:      MsgTraceIngressType,
×
471
                                        Timestamp: time.Now(),
×
472
                                },
×
473
                                Kind:    c.kind,
×
474
                                CID:     c.cid,
×
475
                                Name:    getConnName(c),
×
476
                                Account: ian,
×
477
                                Subject: string(c.pa.subject),
×
478
                        }),
×
479
                },
×
480
                tonly: traceOnly,
×
481
        }
×
482
        return c.pa.trace
×
483
}
484

485
func sample(sampling int) bool {
×
486
        // Option parsing should ensure that sampling is [1..100], but consider
×
487
        // any value outside of this range to be 100%.
×
488
        if sampling <= 0 || sampling >= 100 {
×
489
                return true
×
490
        }
×
491
        return rand.Int31n(100) <= int32(sampling)
×
492
}
493

494
// This function will return the header as a map (instead of http.Header because
495
// we want to preserve the header names' case) and a boolean that indicates if
496
// the headers have been lifted due to the presence of the external trace header
497
// only.
498
// Note that because of the traceParentHdr, the search is done in a case
499
// insensitive way, but if the header is found, it is rewritten in lower case
500
// as suggested by the spec, but also to make it easier to disable the header
501
// when needed.
502
func genHeaderMapIfTraceHeadersPresent(hdr []byte) (map[string][]string, bool) {
660,926✔
503

660,926✔
504
        var (
660,926✔
505
                _keys               = [64][]byte{}
660,926✔
506
                _vals               = [64][]byte{}
660,926✔
507
                m                   map[string][]string
660,926✔
508
                traceDestHdrFound   bool
660,926✔
509
                traceParentHdrFound bool
660,926✔
510
        )
660,926✔
511
        // Skip the hdrLine
660,926✔
512
        if !bytes.HasPrefix(hdr, stringToBytes(hdrLine)) {
672,105✔
513
                return nil, false
11,179✔
514
        }
11,179✔
515

516
        traceDestHdrAsBytes := stringToBytes(MsgTraceDest)
649,747✔
517
        traceParentHdrAsBytes := stringToBytes(traceParentHdr)
649,747✔
518
        crLFAsBytes := stringToBytes(CR_LF)
649,747✔
519
        dashAsBytes := stringToBytes("-")
649,747✔
520

649,747✔
521
        keys := _keys[:0]
649,747✔
522
        vals := _vals[:0]
649,747✔
523

649,747✔
524
        for i := len(hdrLine); i < len(hdr); {
1,984,489✔
525
                // Search for key/val delimiter
1,334,742✔
526
                del := bytes.IndexByte(hdr[i:], ':')
1,334,742✔
527
                if del < 0 {
1,984,489✔
528
                        break
649,747✔
529
                }
530
                keyStart := i
684,995✔
531
                key := hdr[keyStart : keyStart+del]
684,995✔
532
                i += del + 1
684,995✔
533
                valStart := i
684,995✔
534
                nl := bytes.Index(hdr[valStart:], crLFAsBytes)
684,995✔
535
                if nl < 0 {
684,995✔
536
                        break
×
537
                }
538
                if len(key) > 0 {
1,369,990✔
539
                        val := bytes.Trim(hdr[valStart:valStart+nl], " \t")
684,995✔
540
                        vals = append(vals, val)
684,995✔
541

684,995✔
542
                        // Check for the external trace header.
684,995✔
543
                        if bytes.EqualFold(key, traceParentHdrAsBytes) {
685,004✔
544
                                // Rewrite the header using lower case if needed.
9✔
545
                                if !bytes.Equal(key, traceParentHdrAsBytes) {
18✔
546
                                        copy(hdr[keyStart:], traceParentHdrAsBytes)
9✔
547
                                }
9✔
548
                                // We will now check if the value has sampling or not.
549
                                // TODO(ik): Not sure if this header can have multiple values
550
                                // or not, and if so, what would be the rule to check for
551
                                // sampling. What is done here is to check them all until we
552
                                // found one with sampling.
553
                                if !traceParentHdrFound {
18✔
554
                                        tk := bytes.Split(val, dashAsBytes)
9✔
555
                                        if len(tk) == 4 && len([]byte(tk[3])) == 2 {
18✔
556
                                                if hexVal, err := strconv.ParseInt(bytesToString(tk[3]), 16, 8); err == nil {
18✔
557
                                                        if hexVal&0x1 == 0x1 {
18✔
558
                                                                traceParentHdrFound = true
9✔
559
                                                        }
9✔
560
                                                }
561
                                        }
562
                                }
563
                                // Add to the keys with the external trace header in lower case.
564
                                keys = append(keys, traceParentHdrAsBytes)
9✔
565
                        } else {
684,986✔
566
                                // Is the key the Nats-Trace-Dest header?
684,986✔
567
                                if bytes.EqualFold(key, traceDestHdrAsBytes) {
684,986✔
568
                                        traceDestHdrFound = true
×
569
                                }
×
570
                                // Add to the keys and preserve the key's case
571
                                keys = append(keys, key)
684,986✔
572
                        }
573
                }
574
                i += nl + 2
684,995✔
575
        }
576
        if !traceDestHdrFound && !traceParentHdrFound {
1,299,485✔
577
                return nil, false
649,738✔
578
        }
649,738✔
579
        m = make(map[string][]string, len(keys))
9✔
580
        for i, k := range keys {
30✔
581
                hname := string(k)
21✔
582
                m[hname] = append(m[hname], string(vals[i]))
21✔
583
        }
21✔
584
        return m, !traceDestHdrFound && traceParentHdrFound
9✔
585
}
586

587
// Special case where we create a trace event before parsing the message.
588
// This is for cases where the connection will be closed when detecting
589
// an error during early message processing (for instance max payload).
590
func (c *client) initAndSendIngressErrEvent(hdr []byte, dest string, ingressError error) {
×
591
        if ingressError == nil {
×
592
                return
×
593
        }
×
594
        ct := getAcceptEncoding(hdr)
×
595
        t := &msgTrace{
×
596
                srv:  c.srv,
×
597
                acc:  c.acc,
×
598
                dest: dest,
×
599
                ct:   ct,
×
600
                event: &MsgTraceEvent{
×
601
                        Request: MsgTraceRequest{MsgSize: c.pa.size},
×
602
                        Events: append(MsgTraceEvents(nil), &MsgTraceIngress{
×
603
                                MsgTraceBase: MsgTraceBase{
×
604
                                        Type:      MsgTraceIngressType,
×
605
                                        Timestamp: time.Now(),
×
606
                                },
×
607
                                Kind:  c.kind,
×
608
                                CID:   c.cid,
×
609
                                Name:  getConnName(c),
×
610
                                Error: ingressError.Error(),
×
611
                        }),
×
612
                },
×
613
        }
×
614
        t.sendEvent()
×
615
}
616

617
// Returns `true` if message tracing is enabled and we are tracing only,
618
// that is, we are not going to deliver the inbound message, returns
619
// `false` otherwise (no tracing, or tracing and message delivery).
620
func (t *msgTrace) traceOnly() bool {
1,555,389✔
621
        return t != nil && t.tonly
1,555,389✔
622
}
1,555,389✔
623

624
func (t *msgTrace) setOriginAccountHeaderIfNeeded(c *client, acc *Account, msg []byte) []byte {
×
625
        var oan string
×
626
        // If t.acc is set, only check that, not t.oan.
×
627
        if t.acc != nil {
×
628
                if t.acc != acc {
×
629
                        oan = t.acc.GetName()
×
630
                }
×
631
        } else if t.oan != acc.GetName() {
×
632
                oan = t.oan
×
633
        }
×
634
        if oan != _EMPTY_ {
×
635
                msg = c.setHeader(MsgTraceOriginAccount, oan, msg)
×
636
        }
×
637
        return msg
×
638
}
639

640
func (t *msgTrace) setHopHeader(c *client, msg []byte) []byte {
×
641
        e := t.event
×
642
        e.Hops++
×
643
        if len(t.hop) > 0 {
×
644
                t.nhop = fmt.Sprintf("%s.%d", t.hop, e.Hops)
×
645
        } else {
×
646
                t.nhop = fmt.Sprintf("%d", e.Hops)
×
647
        }
×
648
        return c.setHeader(MsgTraceHop, t.nhop, msg)
×
649
}
650

651
// Will look for the MsgTraceSendTo and traceParentHdr headers and change the first
652
// character to an 'X' so that if this message is sent to a remote, the remote
653
// will not initialize tracing since it won't find the actual trace headers.
654
// The function returns the position of the headers so it can efficiently be
655
// re-enabled by calling enableTraceHeaders.
656
// Note that if `msg` can be either the header alone or the full message
657
// (header and payload). This function will use c.pa.hdr to limit the
658
// search to the header section alone.
659
func disableTraceHeaders(c *client, msg []byte) []int {
×
660
        // Code largely copied from getHeader(), except that we don't need the value
×
661
        if c.pa.hdr <= 0 {
×
662
                return []int{-1, -1}
×
663
        }
×
664
        hdr := msg[:c.pa.hdr]
×
665
        headers := [2]string{MsgTraceDest, traceParentHdr}
×
666
        positions := [2]int{-1, -1}
×
667
        for i := 0; i < 2; i++ {
×
668
                key := stringToBytes(headers[i])
×
669
                pos := bytes.Index(hdr, key)
×
670
                if pos < 0 {
×
671
                        continue
×
672
                }
673
                // Make sure this key does not have additional prefix.
674
                if pos < 2 || hdr[pos-1] != '\n' || hdr[pos-2] != '\r' {
×
675
                        continue
×
676
                }
677
                index := pos + len(key)
×
678
                if index >= len(hdr) {
×
679
                        continue
×
680
                }
681
                if hdr[index] != ':' {
×
682
                        continue
×
683
                }
684
                // Disable the trace by altering the first character of the header
685
                hdr[pos] = 'X'
×
686
                positions[i] = pos
×
687
        }
688
        // Return the positions of those characters so we can re-enable the headers.
689
        return positions[:2]
×
690
}
691

692
// Changes back the character at the given position `pos` in the `msg`
693
// byte slice to the first character of the MsgTraceSendTo header.
694
func enableTraceHeaders(msg []byte, positions []int) {
×
695
        firstChar := [2]byte{MsgTraceDest[0], traceParentHdr[0]}
×
696
        for i, pos := range positions {
×
697
                if pos == -1 {
×
698
                        continue
×
699
                }
700
                msg[pos] = firstChar[i]
×
701
        }
702
}
703

704
func (t *msgTrace) setIngressError(err string) {
×
705
        if i := t.event.Ingress(); i != nil {
×
706
                i.Error = err
×
707
        }
×
708
}
709

710
func (t *msgTrace) addSubjectMappingEvent(subj []byte) {
10,292✔
711
        if t == nil {
20,584✔
712
                return
10,292✔
713
        }
10,292✔
714
        t.event.Events = append(t.event.Events, &MsgTraceSubjectMapping{
×
715
                MsgTraceBase: MsgTraceBase{
×
716
                        Type:      MsgTraceSubjectMappingType,
×
717
                        Timestamp: time.Now(),
×
718
                },
×
719
                MappedTo: string(subj),
×
720
        })
×
721
}
722

723
func (t *msgTrace) addEgressEvent(dc *client, sub *subscription, err string) {
1,354,511✔
724
        if t == nil {
2,709,022✔
725
                return
1,354,511✔
726
        }
1,354,511✔
727
        e := &MsgTraceEgress{
×
728
                MsgTraceBase: MsgTraceBase{
×
729
                        Type:      MsgTraceEgressType,
×
730
                        Timestamp: time.Now(),
×
731
                },
×
732
                Kind:  dc.kind,
×
733
                CID:   dc.cid,
×
734
                Name:  getConnName(dc),
×
735
                Hop:   t.nhop,
×
736
                Error: err,
×
737
        }
×
738
        t.nhop = _EMPTY_
×
739
        // Specific to CLIENT connections...
×
740
        if dc.kind == CLIENT {
×
741
                // Set the subscription's subject and possibly queue name.
×
742
                e.Subscription = string(sub.subject)
×
743
                if len(sub.queue) > 0 {
×
744
                        e.Queue = string(sub.queue)
×
745
                }
×
746
        }
747
        if dc.kind == CLIENT || dc.kind == LEAF {
×
748
                if i := t.event.Ingress(); i != nil {
×
749
                        // If the Ingress' account is different from the destination's
×
750
                        // account, add the account name into the Egress trace event.
×
751
                        // This would happen with service imports.
×
752
                        if dcAccName := dc.acc.GetName(); dcAccName != i.Account {
×
753
                                e.Account = dcAccName
×
754
                        }
×
755
                }
756
        }
757
        t.event.Events = append(t.event.Events, e)
×
758
}
759

760
func (t *msgTrace) addStreamExportEvent(dc *client, to []byte) {
×
761
        if t == nil {
×
762
                return
×
763
        }
×
764
        dc.mu.Lock()
×
765
        accName := dc.acc.GetName()
×
766
        dc.mu.Unlock()
×
767
        t.event.Events = append(t.event.Events, &MsgTraceStreamExport{
×
768
                MsgTraceBase: MsgTraceBase{
×
769
                        Type:      MsgTraceStreamExportType,
×
770
                        Timestamp: time.Now(),
×
771
                },
×
772
                Account: accName,
×
773
                To:      string(to),
×
774
        })
×
775
}
776

777
func (t *msgTrace) addServiceImportEvent(accName, from, to string) {
×
778
        if t == nil {
×
779
                return
×
780
        }
×
781
        t.event.Events = append(t.event.Events, &MsgTraceServiceImport{
×
782
                MsgTraceBase: MsgTraceBase{
×
783
                        Type:      MsgTraceServiceImportType,
×
784
                        Timestamp: time.Now(),
×
785
                },
×
786
                Account: accName,
×
787
                From:    from,
×
788
                To:      to,
×
789
        })
×
790
}
791

792
func (t *msgTrace) addJetStreamEvent(streamName string) {
×
793
        if t == nil {
×
794
                return
×
795
        }
×
796
        t.js = &MsgTraceJetStream{
×
797
                MsgTraceBase: MsgTraceBase{
×
798
                        Type:      MsgTraceJetStreamType,
×
799
                        Timestamp: time.Now(),
×
800
                },
×
801
                Stream: streamName,
×
802
        }
×
803
        t.event.Events = append(t.event.Events, t.js)
×
804
}
805

806
func (t *msgTrace) updateJetStreamEvent(subject string, noInterest bool) {
1,107,415✔
807
        if t == nil {
2,214,830✔
808
                return
1,107,415✔
809
        }
1,107,415✔
810
        // JetStream event should have been created in addJetStreamEvent
811
        if t.js == nil {
×
812
                return
×
813
        }
×
814
        t.js.Subject = subject
×
815
        t.js.NoInterest = noInterest
×
816
        // Update the timestamp since this is more accurate than when it
×
817
        // was first added in addJetStreamEvent().
×
818
        t.js.Timestamp = time.Now()
×
819
}
820

821
func (t *msgTrace) sendEventFromJetStream(err error) {
×
822
        if t == nil {
×
823
                return
×
824
        }
×
825
        // JetStream event should have been created in addJetStreamEvent
826
        if t.js == nil {
×
827
                return
×
828
        }
×
829
        if err != nil {
×
830
                t.js.Error = err.Error()
×
831
        }
×
832
        t.sendEvent()
×
833
}
834

835
func (t *msgTrace) sendEvent() {
13,537,876✔
836
        if t == nil {
27,075,752✔
837
                return
13,537,876✔
838
        }
13,537,876✔
839
        if t.js != nil {
×
840
                ready := atomic.AddInt32(&t.ready, 1) == 2
×
841
                if !ready {
×
842
                        return
×
843
                }
×
844
        }
845
        t.srv.sendInternalAccountSysMsg(t.acc, t.dest, &t.event.Server, t.event, t.ct)
×
846
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc