• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 24949216239

24 Apr 2026 08:34AM UTC coverage: 80.645% (-2.4%) from 83.05%
24949216239

push

github

web-flow
(2.14) [ADDED] `RemoteLeafOpts.IgnoreDiscoveredServers` option (#8067)

For a given leafnode remote, if this is set to true, this remote will
ignore any server leafnode URLs returned by the hub, allowing the user
to fully manage the servers this remote can connect to.

Resolves #8002

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>

74685 of 92610 relevant lines covered (80.64%)

632737.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.53
/server/jetstream_batching.go
1
// Copyright 2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "encoding/json"
18
        "errors"
19
        "fmt"
20
        "math"
21
        "math/big"
22
        "path/filepath"
23
        "slices"
24
        "strconv"
25
        "strings"
26
        "sync"
27
        "sync/atomic"
28
        "time"
29
)
30

31
var (
32
        // Tracks the total inflight batches, across all streams and accounts that enable batching.
33
        globalInflightAtomicBatches atomic.Int64
34
        globalInflightFastBatches   atomic.Int64
35
)
36

37
type batching struct {
38
        mu     sync.Mutex
39
        atomic map[string]*atomicBatch
40
        fast   map[string]*fastBatch
41
}
42

43
type atomicBatch struct {
44
        timer *time.Timer // Inactivity timer for the batch.
45
        lseq  uint64      // The highest sequence for this batch.
46
        store StreamStore // Where the batch is staged before committing.
47
}
48

49
type fastBatch struct {
50
        timer          *time.Timer // Inactivity timer for the batch.
51
        lseq           uint64      // The highest sequence for this batch.
52
        sseq           uint64      // Last persisted stream sequence.
53
        pseq           uint64      // Last persisted batch sequence (is always lower or equal to lseq).
54
        fseq           uint64      // Sequence of when we last sent a flow message (is always lower or equal to pseq).
55
        pending        uint32      // Number of pending messages in the batch waiting to be persisted.
56
        ackMessages    uint16      // Ack will be sent every N messages.
57
        maxAckMessages uint16      // Maximum ackMessages value the client allows.
58
        reply          string      // The last reply subject seen when persisting a message.
59
        gapOk          bool        // Whether a gap is okay, if not, the batch would be rejected.
60
        commit         bool        // If the batch is committed.
61
}
62

63
// newAtomicBatch creates an atomic batch publish object.
64
// Lock should be held.
65
func (batches *batching) newAtomicBatch(mset *stream, batchId string, replicas int, storage StorageType, storeDir, streamName string) (*atomicBatch, error) {
129✔
66
        store, err := newBatchStore(mset, batchId, replicas, storage, storeDir, streamName)
129✔
67
        if err != nil {
129✔
68
                return nil, err
×
69
        }
×
70
        b := &atomicBatch{store: store}
129✔
71
        b.setupCleanupTimer(mset, batchId, batches)
129✔
72
        return b, nil
129✔
73
}
74

75
// setupCleanupTimer sets up a timer to clean up the batch after a timeout.
76
func (b *atomicBatch) setupCleanupTimer(mset *stream, batchId string, batches *batching) {
129✔
77
        // Create a timer to clean up after timeout.
129✔
78
        timeout := getCleanupTimeout(mset)
129✔
79
        b.timer = time.AfterFunc(timeout, func() {
133✔
80
                b.cleanup(batchId, batches)
4✔
81
                mset.sendStreamBatchAbandonedAdvisory(batchId, BatchTimeout)
4✔
82
        })
4✔
83
}
84

85
// resetCleanupTimer resets the cleanup timer, allowing to extend the lifetime of the batch.
86
// Returns whether the timer was reset without it having expired before.
87
func (b *atomicBatch) resetCleanupTimer(mset *stream) bool {
2,289✔
88
        timeout := getCleanupTimeout(mset)
2,289✔
89
        return b.timer.Reset(timeout)
2,289✔
90
}
2,289✔
91

92
// cleanup deletes underlying resources associated with the batch and unregisters it from the stream's batches.
93
func (b *atomicBatch) cleanup(batchId string, batches *batching) {
4✔
94
        batches.mu.Lock()
4✔
95
        defer batches.mu.Unlock()
4✔
96
        b.cleanupLocked(batchId, batches)
4✔
97
}
4✔
98

99
// Lock should be held.
100
func (b *atomicBatch) cleanupLocked(batchId string, batches *batching) {
127✔
101
        if b.timer == nil {
127✔
102
                return
×
103
        }
×
104
        globalInflightAtomicBatches.Add(-1)
127✔
105
        b.timer.Stop()
127✔
106
        b.store.Delete(true)
127✔
107
        delete(batches.atomic, batchId)
127✔
108
        // Reset so that another invocation doesn't double-account.
127✔
109
        b.timer = nil
127✔
110
}
111

112
// Lock should be held.
113
func (b *atomicBatch) stopLocked() {
×
114
        if b.timer == nil {
×
115
                return
×
116
        }
×
117
        globalInflightAtomicBatches.Add(-1)
×
118
        b.timer.Stop()
×
119
        b.store.Stop()
×
120
        // Reset so that another invocation doesn't double-account.
×
121
        b.timer = nil
×
122
}
123

124
func getBatchStoreDir(storeDir, streamName, batchId string) (string, string) {
41✔
125
        bname := getHash(batchId)
41✔
126
        return bname, filepath.Join(storeDir, streamsDir, streamName, batchesDir, bname)
41✔
127
}
41✔
128

129
func newBatchStore(mset *stream, batchId string, replicas int, storage StorageType, storeDir, streamName string) (StreamStore, error) {
131✔
130
        if replicas == 1 && storage == FileStorage {
170✔
131
                bname, storeDir := getBatchStoreDir(storeDir, streamName, batchId)
39✔
132
                fcfg := FileStoreConfig{AsyncFlush: true, BlockSize: defaultLargeBlockSize, StoreDir: storeDir}
39✔
133
                s := mset.srv
39✔
134
                prf := s.jsKeyGen(s.getOpts().JetStreamKey, mset.acc.Name)
39✔
135
                if prf != nil {
39✔
136
                        // We are encrypted here, fill in correct cipher selection.
×
137
                        fcfg.Cipher = s.getOpts().JetStreamCipher
×
138
                }
×
139
                oldprf := s.jsKeyGen(s.getOpts().JetStreamOldKey, mset.acc.Name)
39✔
140
                cfg := StreamConfig{Name: bname, Storage: FileStorage}
39✔
141
                return newFileStoreWithCreated(fcfg, cfg, time.Time{}, prf, oldprf)
39✔
142
        }
143
        return newMemStore(&StreamConfig{Name: _EMPTY_, Storage: MemoryStorage})
92✔
144
}
145

146
// readyForCommit indicates the batch is ready to be committed.
147
// If the timer has already cleaned up the batch, we can't commit.
148
// Otherwise, we ensure the timer does not clean up the batch in the meantime.
149
// Lock should be held.
150
func (b *atomicBatch) readyForCommit() *BatchAbandonReason {
100✔
151
        if !b.timer.Stop() {
100✔
152
                return &BatchTimeout
×
153
        }
×
154
        if b.store.FlushAllPending() != nil {
100✔
155
                return &BatchIncomplete
×
156
        }
×
157
        return nil
100✔
158
}
159

160
// newFastBatch creates a fast batch publish object and registers it in batches.fast.
161
// Lock should be held.
162
func (batches *batching) newFastBatch(mset *stream, batchId string, gapOk bool, maxAckMessages uint16) *fastBatch {
100✔
163
        b := &fastBatch{gapOk: gapOk, maxAckMessages: maxAckMessages}
100✔
164
        if batches.fast == nil {
174✔
165
                batches.fast = make(map[string]*fastBatch, 1)
74✔
166
        }
74✔
167
        batches.fast[batchId] = b
100✔
168
        batches.fastBatchInit(b)
100✔
169
        b.setupCleanupTimer(mset, batchId, batches)
100✔
170
        return b
100✔
171
}
172

173
// fastBatchInit (re)initializes the ackMessages field for a fast batch.
174
// The batch must already be registered in batches.fast.
175
// Lock should be held.
176
func (batches *batching) fastBatchInit(b *fastBatch) {
105✔
177
        // If it's the only batch, just allow what the client wants, otherwise we'll
105✔
178
        // need to coordinate and slowly ramp up this publisher.
105✔
179
        // TODO(mvv): fast ingest's initial flow value improvements?
105✔
180
        ackMessages := min(500, b.maxAckMessages)
105✔
181
        if len(batches.fast) > 1 {
108✔
182
                ackMessages = 1
3✔
183
        }
3✔
184
        b.ackMessages = ackMessages
105✔
185
}
186

187
// fastBatchReset resets the fast batch to an empty state and sends a flow control message.
188
// Lock should be held.
189
func (batches *batching) fastBatchReset(mset *stream, batchId string, b *fastBatch) {
4✔
190
        // If the timer already stopped before we could commit, we clean it up.
4✔
191
        if b.timer == nil || (!b.commit && !b.timer.Stop()) {
4✔
192
                b.cleanupLocked(batchId, batches)
×
193
                return
×
194
        }
×
195
        // Otherwise, reset the state.
196
        batches.fastBatchInit(b)
4✔
197
        b.timer.Reset(getCleanupTimeout(mset))
4✔
198
        b.commit = false
4✔
199
        b.pending = 0
4✔
200
        b.fseq, b.lseq = b.pseq, b.pseq
4✔
201
        b.sendFlowControl(b.fseq, mset, b.reply)
4✔
202
}
203

204
// fastBatchRegisterSequences registers the highest stored batch and stream sequence and returns
205
// whether a PubAck should be sent if the batch has been committed.
206
// If this is called on a follower, it only registers the highest stream and persisted batch sequences.
207
// Lock should be held.
208
func (batches *batching) fastBatchRegisterSequences(mset *stream, reply string, streamSeq uint64, isLeader bool, batch *FastBatch) bool {
256✔
209
        b, ok := batches.fast[batch.id]
256✔
210
        if !ok || !isLeader {
387✔
211
                // If this batch has committed, we can clean it up.
131✔
212
                if batch.commit {
169✔
213
                        if b != nil {
56✔
214
                                b.cleanupLocked(batch.id, batches)
18✔
215
                        }
18✔
216
                        return false
38✔
217
                }
218
                // Otherwise, even as a follower, we record the latest state of this batch.
219
                if b == nil || !b.resetCleanupTimer(mset) {
125✔
220
                        if b != nil {
32✔
221
                                // The timer couldn't be reset, this means the timer already runs and is likely
×
222
                                // waiting to acquire the lock. We reset the timer here so it doesn't clean up
×
223
                                // this batch that we're about to overwrite.
×
224
                                b.timer = nil
×
225
                        } else {
32✔
226
                                // If this is a new batch for us, even though we're a follower, we still need
32✔
227
                                // to account toward the global inflight limit.
32✔
228
                                globalInflightFastBatches.Add(1)
32✔
229
                        }
32✔
230
                        // We'll need a copy as we'll use it as a key and later for cleanup.
231
                        batchId := copyString(batch.id)
32✔
232
                        b = batches.newFastBatch(mset, batchId, batch.gapOk, batch.flow)
32✔
233
                }
234
                b.sseq = streamSeq
93✔
235
                b.pseq, b.lseq = batch.seq, batch.seq
93✔
236
                b.reply = reply
93✔
237
                return false
93✔
238
        }
239
        b.reply = reply
125✔
240
        if b.pending > 0 {
250✔
241
                b.pending--
125✔
242
        }
125✔
243
        b.sseq = streamSeq
125✔
244
        // Store last persisted batch sequence.
125✔
245
        // If we have no remaining pending writes, we might have had duplicate messages
125✔
246
        // and need to send additional flow control messages.
125✔
247
        var skipped bool
125✔
248
        if b.pending == 0 {
225✔
249
                skipped = true
100✔
250
                b.pseq = b.lseq
100✔
251
        } else {
125✔
252
                b.pseq = batch.seq
25✔
253
        }
25✔
254
        // If the PubAck needs to be sent now as a result of a commit.
255
        if b.lseq == b.pseq && b.commit {
168✔
256
                b.cleanupLocked(batch.id, batches)
43✔
257
                // If we skipped ahead due to duplicate messages, send the PubAck with the highest sequence.
43✔
258
                if skipped {
86✔
259
                        var buf [256]byte
43✔
260
                        pubAck := append(buf[:0], mset.pubAck...)
43✔
261
                        response := append(pubAck, strconv.FormatUint(b.sseq, 10)...)
43✔
262
                        response = append(response, fmt.Sprintf(",\"batch\":%q,\"count\":%d}", batch.id, b.lseq)...)
43✔
263
                        if len(reply) > 0 {
86✔
264
                                mset.outq.sendMsg(reply, response)
43✔
265
                        }
43✔
266
                        return false
43✔
267
                }
268
                return true
×
269
        }
270
        b.checkFlowControl(mset, reply, batches)
82✔
271
        return false
82✔
272
}
273

274
// checkFlowControl checks whether a flow control message should be sent.
275
// If so, it updates the flow values to speed up or slow down the publisher if needed.
276
// Returns whether a flow control message was sent.
277
// Lock should be held.
278
func (b *fastBatch) checkFlowControl(mset *stream, reply string, batches *batching) bool {
99✔
279
        am := uint64(b.ackMessages)
99✔
280
        if b.pseq < b.fseq+am {
186✔
281
                return false
87✔
282
        }
87✔
283
        // Instead of sending multiple flow control messages, skip ahead to only send the last.
284
        steps := (b.pseq - b.fseq) / am
12✔
285
        b.fseq += steps * am
12✔
286

12✔
287
        // TODO(mvv): fast ingest's dynamic flow value improvements?
12✔
288
        //  This is currently just a simple value to have a working version. Should take average
12✔
289
        //  message sizes into account and compare how much this client is contributing to the
12✔
290
        //  ingest IPQ total size and messages and have publishers share based on that.
12✔
291
        maxAckMessages := uint16(500 / len(batches.fast))
12✔
292
        if maxAckMessages < 1 {
12✔
293
                maxAckMessages = 1
×
294
        }
×
295
        // Limit to the client's allowed maximum.
296
        if maxAckMessages > b.maxAckMessages {
24✔
297
                maxAckMessages = b.maxAckMessages
12✔
298
        }
12✔
299

300
        if b.ackMessages < maxAckMessages {
12✔
301
                // Ramp up.
×
302
                b.ackMessages *= 2
×
303
                if b.ackMessages > maxAckMessages {
×
304
                        b.ackMessages = maxAckMessages
×
305
                }
×
306
        } else if b.ackMessages > maxAckMessages {
12✔
307
                // Slow down.
×
308
                b.ackMessages /= 2
×
309
                if b.ackMessages <= maxAckMessages {
×
310
                        b.ackMessages = maxAckMessages
×
311
                }
×
312
        }
313

314
        // Finally, send the flow control message.
315
        b.sendFlowControl(b.fseq, mset, reply)
12✔
316
        return true
12✔
317
}
318

319
// sendFlowControl sends a fast batch flow control message for the current highest sequence.
320
// Lock should be held.
321
func (b *fastBatch) sendFlowControl(batchSeq uint64, mset *stream, reply string) {
18✔
322
        if len(reply) == 0 {
19✔
323
                return
1✔
324
        }
1✔
325
        response, _ := BatchFlowAck{Sequence: batchSeq, Messages: b.ackMessages}.MarshalJSON()
17✔
326
        mset.outq.sendMsg(reply, response)
17✔
327
}
328

329
// fastBatchCommit ends the batch and commits the data up to that point. If all messages
330
// have already been persisted, a PubAck is sent immediately. Otherwise, it will be sent
331
// after the last message has been persisted.
332
// Lock should be held.
333
func (batches *batching) fastBatchCommit(b *fastBatch, batchId string, mset *stream, reply string) bool {
64✔
334
        // Either we commit now, or we clean up later, so stop the timer.
64✔
335
        if b.timer == nil || (!b.commit && !b.timer.Stop()) {
64✔
336
                // Shouldn't be possible for the timer to already be stopped if we haven't committed yet,
×
337
                // since we pre-check being able to reset the timer. But guard against it anyhow.
×
338
                return true
×
339
        }
×
340
        // Mark that this batch commits.
341
        b.commit = true
64✔
342
        // If the whole batch has been persisted, we can respond with the PubAck now.
64✔
343
        if b.lseq == b.pseq {
76✔
344
                b.cleanupLocked(batchId, batches)
12✔
345
                var buf [256]byte
12✔
346
                pubAck := append(buf[:0], mset.pubAck...)
12✔
347
                response := append(pubAck, strconv.FormatUint(b.sseq, 10)...)
12✔
348
                response = append(response, fmt.Sprintf(",\"batch\":%q,\"count\":%d}", batchId, b.lseq)...)
12✔
349
                if len(reply) > 0 {
24✔
350
                        mset.outq.sendMsg(reply, response)
12✔
351
                }
12✔
352
                return true
12✔
353
        }
354
        // Otherwise, we need to wait and the PubAck will be sent when the last message is persisted.
355
        return false
52✔
356
}
357

358
// setupCleanupTimer sets up a timer to clean up the batch after a timeout.
359
func (b *fastBatch) setupCleanupTimer(mset *stream, batchId string, batches *batching) {
100✔
360
        // Create a timer to clean up after timeout.
100✔
361
        timeout := getCleanupTimeout(mset)
100✔
362
        b.timer = time.AfterFunc(timeout, func() {
104✔
363
                b.cleanup(batchId, batches)
4✔
364
                // Only send the advisory if we're the leader. (Since we do the tracking on followers too)
4✔
365
                if mset.IsLeader() {
6✔
366
                        mset.sendStreamBatchAbandonedAdvisory(batchId, BatchTimeout)
2✔
367
                }
2✔
368
        })
369
}
370

371
// resetCleanupTimer resets the cleanup timer, allowing to extend the lifetime of the batch.
372
// Returns whether the timer was reset without it having expired before.
373
func (b *fastBatch) resetCleanupTimer(mset *stream) bool {
226✔
374
        if b.commit {
229✔
375
                return true
3✔
376
        }
3✔
377
        if b.timer == nil {
223✔
378
                return false
×
379
        }
×
380
        timeout := getCleanupTimeout(mset)
223✔
381
        return b.timer.Reset(timeout)
223✔
382
}
383

384
// cleanup deletes underlying resources associated with the batch and unregisters it from the stream's batches.
385
func (b *fastBatch) cleanup(batchId string, batches *batching) {
4✔
386
        batches.mu.Lock()
4✔
387
        defer batches.mu.Unlock()
4✔
388
        b.cleanupLocked(batchId, batches)
4✔
389
}
4✔
390

391
// Lock should be held.
392
func (b *fastBatch) cleanupLocked(batchId string, batches *batching) {
108✔
393
        // If the timer is nil, it means this batch has been replaced with a new one.
108✔
394
        // This can happen on a follower depending on timing.
108✔
395
        if b.timer == nil {
120✔
396
                return
12✔
397
        }
12✔
398
        globalInflightFastBatches.Add(-1)
96✔
399
        b.timer.Stop()
96✔
400
        delete(batches.fast, batchId)
96✔
401
        // Reset so that another invocation doesn't double-account.
96✔
402
        b.timer = nil
96✔
403
}
404

405
// getCleanupTimeout returns the timeout for the batch, taking into account the server's limits.
406
func getCleanupTimeout(mset *stream) time.Duration {
2,745✔
407
        timeout := streamMaxBatchTimeout
2,745✔
408
        if maxBatchTimeout := mset.srv.getOpts().JetStreamLimits.MaxBatchTimeout; maxBatchTimeout > 0 {
4,779✔
409
                timeout = maxBatchTimeout
2,034✔
410
        }
2,034✔
411
        return timeout
2,745✔
412
}
413

414
// batchStagedDiff stages all changes for consistency checks until commit.
415
type batchStagedDiff struct {
416
        msgIds             map[string]struct{}
417
        counter            map[string]*msgCounterRunningTotal
418
        inflight           map[string]*inflightSubjectRunningTotal
419
        inflightTransform  map[uint64]string
420
        expectedPerSubject map[string]*batchExpectedPerSubject
421
}
422

423
type batchExpectedPerSubject struct {
424
        sseq  uint64 // Stream sequence.
425
        clseq uint64 // Clustered proposal sequence.
426
}
427

428
func (diff *batchStagedDiff) commit(mset *stream) {
930,216✔
429
        if len(diff.msgIds) > 0 {
1,669,414✔
430
                ts := time.Now().UnixNano()
739,198✔
431
                mset.ddMu.Lock()
739,198✔
432
                for msgId := range diff.msgIds {
1,478,403✔
433
                        // We stage with zero, and will update in processJetStreamMsg once we know the sequence.
739,205✔
434
                        mset.storeMsgIdLocked(&ddentry{msgId, 0, ts})
739,205✔
435
                }
739,205✔
436
                mset.ddMu.Unlock()
739,198✔
437
        }
438

439
        // Store running totals for counters, we could have multiple counter increments proposed, but not applied yet.
440
        if len(diff.counter) > 0 {
930,252✔
441
                if mset.clusteredCounterTotal == nil {
50✔
442
                        mset.clusteredCounterTotal = make(map[string]*msgCounterRunningTotal, len(diff.counter))
14✔
443
                }
14✔
444
                for k, c := range diff.counter {
72✔
445
                        mset.clusteredCounterTotal[k] = c
36✔
446
                }
36✔
447
        }
448

449
        // Track inflight.
450
        if len(diff.inflight) > 0 {
1,860,432✔
451
                if mset.inflight == nil {
930,873✔
452
                        mset.inflight = make(map[string]*inflightSubjectRunningTotal, len(diff.inflight))
657✔
453
                }
657✔
454
                for subj, i := range diff.inflight {
1,860,484✔
455
                        if c, ok := mset.inflight[subj]; ok {
1,456,807✔
456
                                c.bytes += i.bytes
526,539✔
457
                                c.ops += i.ops
526,539✔
458
                        } else {
930,268✔
459
                                mset.inflight[subj] = i
403,729✔
460
                        }
403,729✔
461
                }
462
        }
463

464
        // Track inflight subject transforms.
465
        if len(diff.inflightTransform) > 0 {
930,223✔
466
                if mset.inflightTransform == nil {
10✔
467
                        mset.inflightTransform = make(map[uint64]string, len(diff.inflightTransform))
3✔
468
                }
3✔
469
                for clseq, subj := range diff.inflightTransform {
14✔
470
                        mset.inflightTransform[clseq] = subj
7✔
471
                }
7✔
472
        }
473

474
        // Track sequence and subject.
475
        if len(diff.expectedPerSubject) > 0 {
930,279✔
476
                if mset.expectedPerSubjectSequence == nil {
87✔
477
                        mset.expectedPerSubjectSequence = make(map[uint64]string, len(diff.expectedPerSubject))
24✔
478
                }
24✔
479
                if mset.expectedPerSubjectInProcess == nil {
87✔
480
                        mset.expectedPerSubjectInProcess = make(map[string]struct{}, len(diff.expectedPerSubject))
24✔
481
                }
24✔
482
                for subj, e := range diff.expectedPerSubject {
127✔
483
                        mset.expectedPerSubjectSequence[e.clseq] = subj
64✔
484
                        mset.expectedPerSubjectInProcess[subj] = struct{}{}
64✔
485
                }
64✔
486
        }
487
}
488

489
type batchApply struct {
490
        mu         sync.Mutex
491
        id         string            // ID of the current batch.
492
        count      uint64            // Number of entries in the batch, for consistency checks.
493
        entries    []*CommittedEntry // Previous entries that are part of this batch.
494
        entryStart int               // The index into an entry indicating the first message of the batch.
495
        maxApplied uint64            // Applied value before the entry containing the first message of the batch.
496
}
497

498
// clearBatchStateLocked clears in-memory apply-batch-related state.
499
// batch.mu lock should be held.
500
func (batch *batchApply) clearBatchStateLocked() {
408✔
501
        batch.id = _EMPTY_
408✔
502
        batch.count = 0
408✔
503
        batch.entries = nil
408✔
504
        batch.entryStart = 0
408✔
505
        batch.maxApplied = 0
408✔
506
}
408✔
507

508
// rejectBatchStateLocked rejects the batch and clears in-memory apply-batch-related state.
509
// Corrects mset.clfs to take the failed batch into account.
510
// batch.mu lock should be held.
511
func (batch *batchApply) rejectBatchStateLocked(mset *stream) {
244✔
512
        mset.clMu.Lock()
244✔
513
        mset.clfs += batch.count
244✔
514
        mset.clMu.Unlock()
244✔
515
        // We're rejecting the batch, so all entries need to be returned to the pool.
244✔
516
        for _, bce := range batch.entries {
266✔
517
                bce.ReturnToPool()
22✔
518
        }
22✔
519
        batch.clearBatchStateLocked()
244✔
520
}
521

522
func (batch *batchApply) rejectBatchState(mset *stream) {
30✔
523
        batch.mu.Lock()
30✔
524
        defer batch.mu.Unlock()
30✔
525
        batch.rejectBatchStateLocked(mset)
30✔
526
}
30✔
527

528
// checkMsgHeadersPreClusteredProposal checks the message for expected/consistency headers.
529
// mset.mu lock must NOT be held or used.
530
// mset.clMu lock must be held.
531
func checkMsgHeadersPreClusteredProposal(
532
        diff *batchStagedDiff, mset *stream, subject, rsubject string, hdr []byte, msg []byte, sourced bool, name string,
533
        jsa *jsAccount, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules bool,
534
        discard DiscardPolicy, discardNewPer bool, maxMsgSize int, maxMsgs int64, maxMsgsPer int64, maxBytes int64,
535
) ([]byte, []byte, uint64, *ApiError, error) {
955,679✔
536
        var incr *big.Int
955,679✔
537

955,679✔
538
        // Some header checks must be checked pre proposal.
955,679✔
539
        if len(hdr) > 0 {
1,719,926✔
540
                // Since we encode header len as u16 make sure we do not exceed.
764,247✔
541
                // Again this works if it goes through but better to be pre-emptive.
764,247✔
542
                if len(hdr) > math.MaxUint16 {
764,248✔
543
                        err := fmt.Errorf("JetStream header size exceeds limits for '%s > %s'", jsa.acc().Name, mset.cfg.Name)
1✔
544
                        return hdr, msg, 0, NewJSStreamHeaderExceedsMaximumError(), err
1✔
545
                }
1✔
546
                // Counter increments.
547
                // Only supported on counter streams, and payload must be empty (if not coming from a source).
548
                var ok bool
764,246✔
549
                if incr, ok = getMessageIncr(hdr); !ok {
764,247✔
550
                        apiErr := NewJSMessageIncrInvalidError()
1✔
551
                        return hdr, msg, 0, apiErr, apiErr
1✔
552
                } else if incr != nil && !sourced {
764,291✔
553
                        // Only do checks if the message isn't sourced. Otherwise, we need to store verbatim.
45✔
554
                        if !allowMsgCounter {
46✔
555
                                apiErr := NewJSMessageIncrDisabledError()
1✔
556
                                return hdr, msg, 0, apiErr, apiErr
1✔
557
                        } else if len(msg) > 0 {
46✔
558
                                apiErr := NewJSMessageIncrPayloadError()
1✔
559
                                return hdr, msg, 0, apiErr, apiErr
1✔
560
                        } else {
44✔
561
                                // Check for incompatible headers.
43✔
562
                                var doErr bool
43✔
563
                                if getRollup(hdr) != _EMPTY_ ||
43✔
564
                                        getExpectedStream(hdr) != _EMPTY_ ||
43✔
565
                                        getExpectedLastMsgId(hdr) != _EMPTY_ ||
43✔
566
                                        getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ {
46✔
567
                                        doErr = true
3✔
568
                                } else if _, ok = getExpectedLastSeq(hdr); ok {
44✔
569
                                        doErr = true
1✔
570
                                } else if _, ok = getExpectedLastSeqPerSubject(hdr); ok {
41✔
571
                                        doErr = true
1✔
572
                                }
1✔
573

574
                                if doErr {
48✔
575
                                        apiErr := NewJSMessageIncrInvalidError()
5✔
576
                                        return hdr, msg, 0, apiErr, apiErr
5✔
577
                                }
5✔
578
                        }
579
                }
580
                // Expected stream name can also be pre-checked.
581
                if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
764,238✔
582
                        return hdr, msg, 0, NewJSStreamNotMatchError(), errStreamMismatch
×
583
                }
×
584
                // TTL'd messages are rejected entirely if TTLs are not enabled on the stream, or if the TTL is invalid.
585
                if ttl, err := getMessageTTL(hdr); !sourced && (ttl != 0 || err != nil) {
764,310✔
586
                        if !allowTTL {
75✔
587
                                return hdr, msg, 0, NewJSMessageTTLDisabledError(), errMsgTTLDisabled
3✔
588
                        } else if err != nil {
74✔
589
                                return hdr, msg, 0, NewJSMessageTTLInvalidError(), err
2✔
590
                        }
2✔
591
                }
592
                // Check for MsgIds here at the cluster level to avoid excessive CLFS accounting.
593
                // Will help during restarts.
594
                if msgId := getMsgId(hdr); msgId != _EMPTY_ {
1,525,540✔
595
                        // Dedupe if staged.
761,307✔
596
                        if _, ok = diff.msgIds[msgId]; ok {
761,308✔
597
                                return hdr, msg, 0, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate
1✔
598
                        }
1✔
599
                        mset.ddMu.Lock()
761,306✔
600
                        if dde := mset.checkMsgId(msgId); dde != nil {
783,391✔
601
                                seq := dde.seq
22,085✔
602
                                mset.ddMu.Unlock()
22,085✔
603
                                // Should not return an invalid sequence, in that case error.
22,085✔
604
                                if seq > 0 {
44,041✔
605
                                        return hdr, msg, seq, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate
21,956✔
606
                                } else {
22,085✔
607
                                        return hdr, msg, 0, NewJSStreamDuplicateMessageConflictError(), errMsgIdDuplicate
129✔
608
                                }
129✔
609
                        }
610
                        if diff.msgIds == nil {
1,478,429✔
611
                                diff.msgIds = map[string]struct{}{msgId: {}}
739,208✔
612
                        } else {
739,221✔
613
                                diff.msgIds[msgId] = struct{}{}
13✔
614
                        }
13✔
615
                        mset.ddMu.Unlock()
739,221✔
616
                }
617
        }
618

619
        // Apply increment for counter.
620
        // But only if it's allowed for this stream. This can happen when we store verbatim for a sourced stream.
621
        if incr == nil && allowMsgCounter {
933,581✔
622
                apiErr := NewJSMessageIncrMissingError()
2✔
623
                return hdr, msg, 0, apiErr, apiErr
2✔
624
        }
2✔
625
        if incr != nil && allowMsgCounter {
933,618✔
626
                var initial big.Int
41✔
627
                var sources CounterSources
41✔
628

41✔
629
                // If we've got a running total, update that, since we have inflight proposals updating the same counter.
41✔
630
                var ok bool
41✔
631
                var counter *msgCounterRunningTotal
41✔
632
                if counter, ok = diff.counter[subject]; ok {
42✔
633
                        initial = *counter.total
1✔
634
                        sources = counter.sources
1✔
635
                } else if counter, ok = mset.clusteredCounterTotal[subject]; ok {
43✔
636
                        initial = *counter.total
2✔
637
                        sources = counter.sources
2✔
638
                        // Make an explicit copy to separate the staged data from what's committed.
2✔
639
                        // Don't need to initialize all values, they'll be overwritten later.
2✔
640
                        counter = &msgCounterRunningTotal{ops: counter.ops}
2✔
641
                } else {
40✔
642
                        // Load last message, and store as inflight running total.
38✔
643
                        var smv StoreMsg
38✔
644
                        sm, err := mset.store.LoadLastMsg(subject, &smv)
38✔
645
                        if err == nil && sm != nil {
63✔
646
                                var val CounterValue
25✔
647
                                // Return an error if the counter is broken somehow.
25✔
648
                                if json.Unmarshal(sm.msg, &val) != nil {
26✔
649
                                        apiErr := NewJSMessageCounterBrokenError()
1✔
650
                                        return hdr, msg, 0, apiErr, apiErr
1✔
651
                                }
1✔
652
                                if ncs := sliceHeader(JSMessageCounterSources, sm.hdr); len(ncs) > 0 {
27✔
653
                                        if err := json.Unmarshal(ncs, &sources); err != nil {
3✔
654
                                                apiErr := NewJSMessageCounterBrokenError()
×
655
                                                return hdr, msg, 0, apiErr, apiErr
×
656
                                        }
×
657
                                }
658
                                initial.SetString(val.Value, 10)
24✔
659
                        }
660
                }
661
                srchdr := sliceHeader(JSStreamSource, hdr)
40✔
662
                if len(srchdr) > 0 {
44✔
663
                        // This is a sourced message, so we can't apply Nats-Incr but
4✔
664
                        // instead should just update the source count header.
4✔
665
                        fields := strings.Split(string(srchdr), " ")
4✔
666
                        origStream := fields[0]
4✔
667
                        origSubj := subject
4✔
668
                        if len(fields) >= 5 {
8✔
669
                                origSubj = fields[4]
4✔
670
                        }
4✔
671
                        var val CounterValue
4✔
672
                        if json.Unmarshal(msg, &val) != nil {
4✔
673
                                apiErr := NewJSMessageCounterBrokenError()
×
674
                                return hdr, msg, 0, apiErr, apiErr
×
675
                        }
×
676
                        var sourced big.Int
4✔
677
                        sourced.SetString(val.Value, 10)
4✔
678
                        if sources == nil {
6✔
679
                                sources = map[string]map[string]string{}
2✔
680
                        }
2✔
681
                        if _, ok = sources[origStream]; !ok {
8✔
682
                                sources[origStream] = map[string]string{}
4✔
683
                        }
4✔
684
                        prevVal := sources[origStream][origSubj]
4✔
685
                        sources[origStream][origSubj] = sourced.String()
4✔
686
                        // We will also replace the Nats-Incr header with the diff
4✔
687
                        // between our last value from this source and this one, so
4✔
688
                        // that the arithmetic is always correct.
4✔
689
                        var previous big.Int
4✔
690
                        previous.SetString(prevVal, 10)
4✔
691
                        incr.Sub(&sourced, &previous)
4✔
692
                        hdr = setHeader(JSMessageIncr, incr.String(), hdr)
4✔
693
                }
694
                // Now make the change.
695
                initial.Add(&initial, incr)
40✔
696
                // Generate the new payload.
40✔
697
                var _msg [128]byte
40✔
698
                msg = fmt.Appendf(_msg[:0], "{%q:%q}", "val", initial.String())
40✔
699
                // Write the updated source count headers.
40✔
700
                if len(sources) > 0 {
45✔
701
                        nhdr, err := json.Marshal(sources)
5✔
702
                        if err != nil {
5✔
703
                                return hdr, msg, 0, NewJSMessageCounterBrokenError(), err
×
704
                        }
×
705
                        hdr = setHeader(JSMessageCounterSources, string(nhdr), hdr)
5✔
706
                }
707

708
                // Check to see if we are over the max msg size.
709
                maxSize := int64(mset.srv.getOpts().MaxPayload)
40✔
710
                if maxMsgSize >= 0 && int64(maxMsgSize) < maxSize {
42✔
711
                        maxSize = int64(maxMsgSize)
2✔
712
                }
2✔
713
                hdrLen, msgLen := int64(len(hdr)), int64(len(msg))
40✔
714
                // Subtract to prevent against overflows.
40✔
715
                if hdrLen > maxSize || msgLen > maxSize-hdrLen {
43✔
716
                        return hdr, msg, 0, NewJSStreamMessageExceedsMaximumError(), ErrMaxPayload
3✔
717
                }
3✔
718

719
                // Keep the in-memory counters up-to-date.
720
                if counter == nil {
71✔
721
                        counter = &msgCounterRunningTotal{}
34✔
722
                }
34✔
723
                counter.total = &initial
37✔
724
                counter.sources = sources
37✔
725
                counter.ops++
37✔
726
                if diff.counter == nil {
73✔
727
                        diff.counter = map[string]*msgCounterRunningTotal{subject: counter}
36✔
728
                } else {
37✔
729
                        diff.counter[subject] = counter
1✔
730
                }
1✔
731
        }
732

733
        if len(hdr) > 0 {
1,675,715✔
734
                // Expected last sequence.
742,142✔
735
                if seq, exists := getExpectedLastSeq(hdr); exists && seq != mset.clseq-mset.clfs {
742,150✔
736
                        mlseq := mset.clseq - mset.clfs
8✔
737
                        err := fmt.Errorf("last sequence mismatch: %d vs %d", seq, mlseq)
8✔
738
                        return hdr, msg, 0, NewJSStreamWrongLastSequenceError(mlseq), err
8✔
739
                } else if exists && len(diff.inflight) > 0 {
742,142✔
740
                        // Only the first message in a batch can contain an expected last sequence.
×
741
                        err := fmt.Errorf("last sequence mismatch")
×
742
                        return hdr, msg, 0, NewJSStreamWrongLastSequenceConstantError(), err
×
743
                }
×
744

745
                // Expected last sequence per subject.
746
                if seq, exists := getExpectedLastSeqPerSubject(hdr); exists {
742,274✔
747
                        // Allow override of the subject used for the check.
140✔
748
                        seqSubj := subject
140✔
749
                        if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ {
192✔
750
                                seqSubj = optSubj
52✔
751
                        }
52✔
752

753
                        // The subject is already written to in this batch, we can't allow
754
                        // expected checks since they would be incorrect.
755
                        if _, ok := diff.inflight[seqSubj]; ok {
146✔
756
                                err := errors.New("last sequence by subject mismatch")
6✔
757
                                return hdr, msg, 0, NewJSStreamWrongLastSequenceConstantError(), err
6✔
758
                        }
6✔
759

760
                        // If the subject is already in process, block as otherwise we could have
761
                        // multiple messages inflight with the same subject.
762
                        if _, found := mset.expectedPerSubjectInProcess[seqSubj]; found {
144✔
763
                                err := errors.New("last sequence by subject mismatch")
10✔
764
                                return hdr, msg, 0, NewJSStreamWrongLastSequenceConstantError(), err
10✔
765
                        }
10✔
766

767
                        // If the subject is already in process but without expected headers, block as we would have
768
                        // multiple messages inflight with the same subject.
769
                        if _, ok := mset.inflight[seqSubj]; ok {
126✔
770
                                err := errors.New("last sequence by subject mismatch")
2✔
771
                                return hdr, msg, 0, NewJSStreamWrongLastSequenceConstantError(), err
2✔
772
                        }
2✔
773

774
                        // If we've already done an expected-check on this subject, use the cached result.
775
                        if e, ok := diff.expectedPerSubject[seqSubj]; ok {
123✔
776
                                if e.sseq != seq {
1✔
777
                                        err := fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, e.sseq)
×
778
                                        return hdr, msg, 0, NewJSStreamWrongLastSequenceError(e.sseq), err
×
779
                                }
×
780
                                e.clseq = mset.clseq
1✔
781
                        } else {
121✔
782
                                var smv StoreMsg
121✔
783
                                var fseq uint64
121✔
784
                                sm, err := mset.store.LoadLastMsg(seqSubj, &smv)
121✔
785
                                if sm != nil {
225✔
786
                                        fseq = sm.seq
104✔
787
                                }
104✔
788
                                if err == ErrStoreMsgNotFound && seq == 0 {
134✔
789
                                        fseq, err = 0, nil
13✔
790
                                }
13✔
791
                                if err != nil || fseq != seq {
174✔
792
                                        err = fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, fseq)
53✔
793
                                        return hdr, msg, 0, NewJSStreamWrongLastSequenceError(fseq), err
53✔
794
                                }
53✔
795

796
                                e = &batchExpectedPerSubject{sseq: fseq, clseq: mset.clseq}
68✔
797
                                if diff.expectedPerSubject == nil {
134✔
798
                                        diff.expectedPerSubject = map[string]*batchExpectedPerSubject{seqSubj: e}
66✔
799
                                } else {
68✔
800
                                        diff.expectedPerSubject[seqSubj] = e
2✔
801
                                }
2✔
802
                        }
803
                } else if getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ {
741,996✔
804
                        apiErr := NewJSStreamExpectedLastSeqPerSubjectInvalidError()
2✔
805
                        return hdr, msg, 0, apiErr, apiErr
2✔
806
                }
2✔
807

808
                // Message scheduling.
809
                if sourced {
744,389✔
810
                        // noop, sourced messages were already validated by the origin stream.
2,328✔
811
                } else if schedule, ok := getMessageSchedule(hdr); !ok {
742,067✔
812
                        apiErr := NewJSMessageSchedulesPatternInvalidError()
6✔
813
                        if !allowMsgSchedules {
9✔
814
                                apiErr = NewJSMessageSchedulesDisabledError()
3✔
815
                        }
3✔
816
                        return hdr, msg, 0, apiErr, apiErr
6✔
817
                } else if !schedule.IsZero() {
739,780✔
818
                        if !allowMsgSchedules {
56✔
819
                                apiErr := NewJSMessageSchedulesDisabledError()
3✔
820
                                return hdr, msg, 0, apiErr, apiErr
3✔
821
                        } else if scheduleTtl, ok := getMessageScheduleTTL(hdr); !ok {
56✔
822
                                apiErr := NewJSMessageSchedulesTTLInvalidError()
3✔
823
                                return hdr, msg, 0, apiErr, apiErr
3✔
824
                        } else if scheduleRollup := getMessageScheduleRollup(hdr); scheduleRollup != _EMPTY_ && scheduleRollup != JSMsgRollupSubject {
53✔
825
                                apiErr := NewJSMessageSchedulesRollupInvalidError()
3✔
826
                                return hdr, msg, 0, apiErr, apiErr
3✔
827
                        } else if scheduleTtl != _EMPTY_ && !allowTTL {
50✔
828
                                return hdr, msg, 0, NewJSMessageTTLDisabledError(), errMsgTTLDisabled
3✔
829
                        } else if scheduleTarget := getMessageScheduleTarget(hdr); scheduleTarget == _EMPTY_ ||
44✔
830
                                !IsValidPublishSubject(scheduleTarget) || SubjectsCollide(scheduleTarget, subject) {
47✔
831
                                apiErr := NewJSMessageSchedulesTargetInvalidError()
6✔
832
                                return hdr, msg, 0, apiErr, apiErr
6✔
833
                        } else if scheduleSource := getMessageScheduleSource(hdr); scheduleSource != _EMPTY_ &&
41✔
834
                                (scheduleSource == scheduleTarget || scheduleSource == subject || !IsValidPublishSubject(scheduleSource)) {
43✔
835
                                apiErr := NewJSMessageSchedulesSourceInvalidError()
8✔
836
                                return hdr, msg, 0, apiErr, apiErr
8✔
837
                        } else {
35✔
838
                                mset.cfgMu.RLock()
27✔
839
                                match := slices.ContainsFunc(mset.cfg.Subjects, func(subj string) bool {
61✔
840
                                        return SubjectsCollide(subj, scheduleTarget)
34✔
841
                                })
34✔
842
                                mset.cfgMu.RUnlock()
27✔
843
                                if !match {
30✔
844
                                        apiErr := NewJSMessageSchedulesTargetInvalidError()
3✔
845
                                        return hdr, msg, 0, apiErr, apiErr
3✔
846
                                }
3✔
847

848
                                // Add a rollup sub header if it doesn't already exist.
849
                                // Otherwise, it must exist already as a rollup on the subject.
850
                                if rollup := getRollup(hdr); rollup == _EMPTY_ {
42✔
851
                                        hdr = genHeader(hdr, JSMsgRollup, JSMsgRollupSubject)
18✔
852
                                } else if rollup != JSMsgRollupSubject {
26✔
853
                                        apiErr := NewJSMessageSchedulesRollupInvalidError()
2✔
854
                                        return hdr, msg, 0, apiErr, apiErr
2✔
855
                                }
2✔
856
                        }
857
                }
858
                if scheduleNext := sliceHeader(JSScheduleNext, hdr); len(scheduleNext) > 0 && !sourced {
742,033✔
859
                        // Clients may only use Nats-Schedule-Next to purge a schedule.
9✔
860
                        if bytesToString(scheduleNext) != JSScheduleNextPurge {
11✔
861
                                apiErr := NewJSMessageSchedulesSchedulerInvalidError()
2✔
862
                                return hdr, msg, 0, apiErr, apiErr
2✔
863
                        }
2✔
864
                        // Nats-Scheduler must accompany the purge and:
865
                        // - it must NOT be empty.
866
                        // - it must NOT match the publish subject.
867
                        if scheduler := sliceHeader(JSScheduler, hdr); len(scheduler) == 0 ||
7✔
868
                                bytesToString(scheduler) == subject || !IsValidPublishSubject(bytesToString(scheduler)) {
10✔
869
                                apiErr := NewJSMessageSchedulesSchedulerInvalidError()
3✔
870
                                return hdr, msg, 0, apiErr, apiErr
3✔
871
                        } else if !allowMsgSchedules {
8✔
872
                                apiErr := NewJSMessageSchedulesDisabledError()
1✔
873
                                return hdr, msg, 0, apiErr, apiErr
1✔
874
                        }
1✔
875
                } else if !sourced && len(sliceHeader(JSScheduler, hdr)) > 0 {
742,017✔
876
                        // Clients may only use Nats-Scheduler alongside Nats-Schedule-Next.
2✔
877
                        apiErr := NewJSMessageSchedulesSchedulerInvalidError()
2✔
878
                        return hdr, msg, 0, apiErr, apiErr
2✔
879
                }
2✔
880

881
                // Check for any rollups.
882
                if rollup := getRollup(hdr); rollup != _EMPTY_ {
742,105✔
883
                        if (!allowRollup || denyPurge) && !sourced {
91✔
884
                                err := errors.New("rollup not permitted")
2✔
885
                                return hdr, msg, 0, NewJSStreamRollupFailedError(err), err
2✔
886
                        }
2✔
887
                        switch rollup {
87✔
888
                        case JSMsgRollupSubject:
76✔
889
                                // Rolling up the subject is only allowed if the first occurrence of this subject in the batch.
76✔
890
                                if _, ok := diff.inflight[subject]; ok {
77✔
891
                                        err := errors.New("batch rollup sub invalid")
1✔
892
                                        return hdr, msg, 0, NewJSStreamRollupFailedError(err), err
1✔
893
                                }
1✔
894
                        case JSMsgRollupAll:
10✔
895
                                // Rolling up the whole stream is only allowed if this is the first message of the batch.
10✔
896
                                if len(diff.inflight) > 0 {
11✔
897
                                        err := errors.New("batch rollup all invalid")
1✔
898
                                        return hdr, msg, 0, NewJSStreamRollupFailedError(err), err
1✔
899
                                }
1✔
900
                        default:
1✔
901
                                err := fmt.Errorf("rollup value invalid: %q", rollup)
1✔
902
                                return hdr, msg, 0, NewJSStreamRollupFailedError(err), err
1✔
903
                        }
904
                }
905
        }
906

907
        // Track inflight.
908
        // Store the subject to ensure other messages in this batch using
909
        // an expected check or rollup on the same subject fail.
910
        if diff.inflight == nil {
1,866,742✔
911
                diff.inflight = make(map[string]*inflightSubjectRunningTotal, 1)
933,300✔
912
        }
933,300✔
913
        var sz uint64
933,442✔
914
        if mset.store.Type() == FileStorage {
1,846,747✔
915
                sz = fileStoreMsgSizeRaw(len(subject), len(hdr), len(msg))
913,305✔
916
        } else {
933,442✔
917
                sz = memStoreMsgSizeRaw(len(subject), len(hdr), len(msg))
20,137✔
918
        }
20,137✔
919
        var (
933,442✔
920
                i   *inflightSubjectRunningTotal
933,442✔
921
                ok  bool
933,442✔
922
                err error
933,442✔
923
        )
933,442✔
924
        if i, ok = diff.inflight[subject]; ok {
933,479✔
925
                i.bytes += sz
37✔
926
                i.ops++
37✔
927
        } else {
933,442✔
928
                i = &inflightSubjectRunningTotal{bytes: sz, ops: 1}
933,405✔
929
                diff.inflight[subject] = i
933,405✔
930
        }
933,405✔
931

932
        // Subject transform.
933
        if subject != rsubject {
933,453✔
934
                // The 'subject' is a transformed subject used for consistency checks.
11✔
935
                // But since we propose the original (raw) subject to our peers, we need
11✔
936
                // to store the transformed subject separately for when we apply.
11✔
937
                // TODO(mvv): since subject transforms are handled by each replica individually, this has a
11✔
938
                //  potential for desync given out-of-order stream subject transform updates.
11✔
939
                if diff.inflightTransform == nil {
22✔
940
                        diff.inflightTransform = make(map[uint64]string, 1)
11✔
941
                }
11✔
942
                diff.inflightTransform[mset.clseq] = subject
11✔
943
        }
944

945
        // Check if we have discard new with max msgs or bytes.
946
        // We need to deny here otherwise we'd need to bump CLFS, and it could succeed on some
947
        // peers and not others depending on consumer ack state (if interest policy).
948
        // So we deny here, if we allow that means we know it would succeed on every peer.
949
        if discard == DiscardNew {
945,115✔
950
                if maxMsgs > 0 || maxBytes > 0 {
22,912✔
951
                        // Track usual max msgs/bytes thresholds for DiscardNew.
11,239✔
952
                        var state StreamState
11,239✔
953
                        mset.store.FastState(&state)
11,239✔
954

11,239✔
955
                        totalMsgs := state.Msgs
11,239✔
956
                        totalBytes := state.Bytes
11,239✔
957
                        for _, i = range mset.inflight {
11,837✔
958
                                totalMsgs += i.ops
598✔
959
                                totalBytes += i.bytes
598✔
960
                        }
598✔
961
                        for _, i = range diff.inflight {
22,480✔
962
                                totalMsgs += i.ops
11,241✔
963
                                totalBytes += i.bytes
11,241✔
964
                        }
11,241✔
965

966
                        if maxMsgs > 0 && totalMsgs > uint64(maxMsgs) {
13,204✔
967
                                err = ErrMaxMsgs
1,965✔
968
                        } else if maxBytes > 0 && totalBytes > uint64(maxBytes) {
12,234✔
969
                                err = ErrMaxBytes
995✔
970
                        }
995✔
971
                        if err != nil {
14,199✔
972
                                return hdr, msg, 0, NewJSStreamStoreFailedError(err, Unless(err)), err
2,960✔
973
                        }
2,960✔
974
                }
975

976
                // Similarly, check DiscardNew per-subject threshold to not need to bump CLFS.
977
                // Allow rollup messages through since they will purge after storing.
978
                if discardNewPer && maxMsgsPer > 0 && len(sliceHeader(JSMsgRollup, hdr)) == 0 {
8,734✔
979
                        // Get the current total for this subject.
21✔
980
                        totalMsgsForSubject := mset.store.SubjectsTotals(subject)[subject]
21✔
981
                        // Add inflight count in this batch and for this stream.
21✔
982
                        totalMsgsForSubject += i.ops
21✔
983
                        if i, ok = mset.inflight[subject]; ok {
23✔
984
                                totalMsgsForSubject += i.ops
2✔
985
                        }
2✔
986
                        if totalMsgsForSubject > uint64(maxMsgsPer) {
28✔
987
                                err = ErrMaxMsgsPerSubject
7✔
988
                                return hdr, msg, 0, NewJSStreamStoreFailedError(err, Unless(err)), err
7✔
989
                        }
7✔
990
                }
991
        }
992

993
        return hdr, msg, 0, nil, nil
930,475✔
994
}
995

996
// recalculateClusteredSeq initializes or updates mset.clseq, for example after a leader change.
997
// This is reused for normal clustered publishing into a stream, and for atomic and fast batch publishing.
998
// mset.clMu lock must be held.
999
func recalculateClusteredSeq(mset *stream, needStreamLock bool) (lseq uint64) {
762✔
1000
        // Need to unlock and re-acquire the locks in the proper order.
762✔
1001
        mset.clMu.Unlock()
762✔
1002
        // Locking order is stream -> batchMu -> clMu
762✔
1003
        if needStreamLock {
1,402✔
1004
                mset.mu.RLock()
640✔
1005
        }
640✔
1006
        batch := mset.batchApply
762✔
1007
        var batchCount uint64
762✔
1008
        if batch != nil {
764✔
1009
                batch.mu.Lock()
2✔
1010
                batchCount = batch.count
2✔
1011
        }
2✔
1012
        mset.clMu.Lock()
762✔
1013
        // Re-capture
762✔
1014
        lseq = mset.lseq
762✔
1015
        mset.clseq = lseq + mset.clfs + batchCount
762✔
1016
        // Keep hold of the mset.clMu, but unlock the others.
762✔
1017
        if batch != nil {
764✔
1018
                batch.mu.Unlock()
2✔
1019
        }
2✔
1020
        if needStreamLock {
1,402✔
1021
                mset.mu.RUnlock()
640✔
1022
        }
640✔
1023
        return lseq
762✔
1024
}
1025

1026
// commitSingleMsg commits and proposes a single message to the node.
1027
// This is reused both for normal publishing into a stream, and for fast batch publishing.
1028
// mset.clMu lock must be held.
1029
func commitSingleMsg(
1030
        diff *batchStagedDiff, mset *stream, subject string, reply string, hdr []byte, msg []byte, name string,
1031
        jsa *jsAccount, mt *msgTrace, node RaftNode, replicas int, lseq uint64,
1032
) error {
930,170✔
1033
        // Do proposal.
930,170✔
1034
        esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), false)
930,170✔
1035
        if err := node.Propose(esm); err != nil {
930,173✔
1036
                return err
3✔
1037
        }
3✔
1038

1039
        var mtKey uint64
930,167✔
1040
        if mt != nil {
930,167✔
1041
                mtKey = mset.clseq
×
1042
                if mset.mt == nil {
×
1043
                        mset.mt = make(map[uint64]*msgTrace)
×
1044
                }
×
1045
                mset.mt[mtKey] = mt
×
1046
        }
1047

1048
        diff.commit(mset)
930,167✔
1049
        mset.clseq++
930,167✔
1050
        mset.trackReplicationTraffic(node, len(esm), replicas)
930,167✔
1051

930,167✔
1052
        // Check to see if we are being overrun.
930,167✔
1053
        // TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured.
930,167✔
1054
        if mset.clseq-(lseq+mset.clfs) > streamLagWarnThreshold {
930,167✔
1055
                lerr := fmt.Errorf("JetStream stream '%s > %s' has high message lag", jsa.acc().Name, name)
×
1056
                mset.srv.RateLimitWarnf("%s", lerr.Error())
×
1057
        }
×
1058
        return nil
930,167✔
1059
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc