• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 20087658921

09 Dec 2025 02:35PM UTC coverage: 84.763% (+0.07%) from 84.691%
20087658921

push

github

web-flow
[FIXED] Filestore compaction issues (#7627)

This PR fixes a couple issues relating to `Compact(seq)`:
1. If an (almost) empty stream would be compacted or purged to `seq=2`
and the server would be hard killed after, it would not have written a
tombstone to recover this sequence from.
2. A particular sequence of `SkipMsg(seq)`, `Compact(seq)`,
`SkipMsg(seq+1)` would preserve the block and tombstones after the
compact, but the subsequent `SkipMsg(seq+1)` on the empty block would
see the server starting to overwrite block data at the head of the file.
Reading that file from disk afterward would appear corrupted and log:
`indexCacheBuf corrupt record state`.
3. Compaction of a block that would allow to reclaim over half of the
storage size taken up by the file would overwrite the file in place.
This could lose messages if the server was hard killed after the file
truncation, but before it fully wrote the compacted bytes. We now write
the file separately and rename it to overwrite the original file
atomically.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>

74120 of 87444 relevant lines covered (84.76%)

359268.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.68
/server/jetstream_api.go
1
// Copyright 2020-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "cmp"
19
        "encoding/json"
20
        "errors"
21
        "fmt"
22
        "io"
23
        "os"
24
        "path/filepath"
25
        "runtime"
26
        "slices"
27
        "strconv"
28
        "strings"
29
        "sync/atomic"
30
        "time"
31
        "unicode"
32

33
        "github.com/nats-io/nuid"
34
)
35

36
// Request API subjects for JetStream.
37
const (
38
        // All API endpoints.
39
        jsAllAPI = "$JS.API.>"
40

41
        // For constructing JetStream domain prefixes.
42
        jsDomainAPI = "$JS.%s.API.>"
43

44
        JSApiPrefix = "$JS.API"
45

46
        // JSApiAccountInfo is for obtaining general information about JetStream for this account.
47
        // Will return JSON response.
48
        JSApiAccountInfo = "$JS.API.INFO"
49

50
        // JSApiStreamCreate is the endpoint to create new streams.
51
        // Will return JSON response.
52
        JSApiStreamCreate  = "$JS.API.STREAM.CREATE.*"
53
        JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"
54

55
        // JSApiStreamUpdate is the endpoint to update existing streams.
56
        // Will return JSON response.
57
        JSApiStreamUpdate  = "$JS.API.STREAM.UPDATE.*"
58
        JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"
59

60
        // JSApiStreams is the endpoint to list all stream names for this account.
61
        // Will return JSON response.
62
        JSApiStreams = "$JS.API.STREAM.NAMES"
63
        // JSApiStreamList is the endpoint that will return all detailed stream information
64
        JSApiStreamList = "$JS.API.STREAM.LIST"
65

66
        // JSApiStreamInfo is for obtaining general information about a named stream.
67
        // Will return JSON response.
68
        JSApiStreamInfo  = "$JS.API.STREAM.INFO.*"
69
        JSApiStreamInfoT = "$JS.API.STREAM.INFO.%s"
70

71
        // JSApiStreamDelete is the endpoint to delete streams.
72
        // Will return JSON response.
73
        JSApiStreamDelete  = "$JS.API.STREAM.DELETE.*"
74
        JSApiStreamDeleteT = "$JS.API.STREAM.DELETE.%s"
75

76
        // JSApiStreamPurge is the endpoint to purge streams.
77
        // Will return JSON response.
78
        JSApiStreamPurge  = "$JS.API.STREAM.PURGE.*"
79
        JSApiStreamPurgeT = "$JS.API.STREAM.PURGE.%s"
80

81
        // JSApiStreamSnapshot is the endpoint to snapshot streams.
82
        // Will return a stream of chunks with a nil chunk as EOF to
83
        // the deliver subject. Caller should respond to each chunk
84
        // with a nil body response for ack flow.
85
        JSApiStreamSnapshot  = "$JS.API.STREAM.SNAPSHOT.*"
86
        JSApiStreamSnapshotT = "$JS.API.STREAM.SNAPSHOT.%s"
87

88
        // JSApiStreamRestore is the endpoint to restore a stream from a snapshot.
89
        // Caller should respond to each chunk with a nil body response.
90
        JSApiStreamRestore  = "$JS.API.STREAM.RESTORE.*"
91
        JSApiStreamRestoreT = "$JS.API.STREAM.RESTORE.%s"
92

93
        // JSApiMsgDelete is the endpoint to delete messages from a stream.
94
        // Will return JSON response.
95
        JSApiMsgDelete  = "$JS.API.STREAM.MSG.DELETE.*"
96
        JSApiMsgDeleteT = "$JS.API.STREAM.MSG.DELETE.%s"
97

98
        // JSApiMsgGet is the template for direct requests for a message by its stream sequence number.
99
        // Will return JSON response.
100
        JSApiMsgGet  = "$JS.API.STREAM.MSG.GET.*"
101
        JSApiMsgGetT = "$JS.API.STREAM.MSG.GET.%s"
102

103
        // JSDirectMsgGet is the template for non-api layer direct requests for a message by its stream sequence number or last by subject.
104
        // Will return the message similar to how a consumer receives the message, no JSON processing.
105
        // If the message can not be found we will use a status header of 404. If the stream does not exist the client will get a no-responders or timeout.
106
        JSDirectMsgGet  = "$JS.API.DIRECT.GET.*"
107
        JSDirectMsgGetT = "$JS.API.DIRECT.GET.%s"
108

109
        // This is a direct version of get last by subject, which will be the dominant pattern for KV access once 2.9 is released.
110
        // The stream and the key will be part of the subject to allow for no-marshal payloads and subject based security permissions.
111
        JSDirectGetLastBySubject  = "$JS.API.DIRECT.GET.*.>"
112
        JSDirectGetLastBySubjectT = "$JS.API.DIRECT.GET.%s.%s"
113

114
        // jsDirectGetPre
115
        jsDirectGetPre = "$JS.API.DIRECT.GET"
116

117
        // JSApiConsumerCreate is the endpoint to create consumers for streams.
118
        // This was also the legacy endpoint for ephemeral consumers.
119
        // It now can take consumer name and optional filter subject, which when part of the subject controls access.
120
        // Will return JSON response.
121
        JSApiConsumerCreate    = "$JS.API.CONSUMER.CREATE.*"
122
        JSApiConsumerCreateT   = "$JS.API.CONSUMER.CREATE.%s"
123
        JSApiConsumerCreateEx  = "$JS.API.CONSUMER.CREATE.*.>"
124
        JSApiConsumerCreateExT = "$JS.API.CONSUMER.CREATE.%s.%s.%s"
125

126
        // JSApiDurableCreate is the endpoint to create durable consumers for streams.
127
        // You need to include the stream and consumer name in the subject.
128
        JSApiDurableCreate  = "$JS.API.CONSUMER.DURABLE.CREATE.*.*"
129
        JSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s"
130

131
        // JSApiConsumers is the endpoint to list all consumer names for the stream.
132
        // Will return JSON response.
133
        JSApiConsumers  = "$JS.API.CONSUMER.NAMES.*"
134
        JSApiConsumersT = "$JS.API.CONSUMER.NAMES.%s"
135

136
        // JSApiConsumerList is the endpoint that will return all detailed consumer information
137
        JSApiConsumerList  = "$JS.API.CONSUMER.LIST.*"
138
        JSApiConsumerListT = "$JS.API.CONSUMER.LIST.%s"
139

140
        // JSApiConsumerInfo is for obtaining general information about a consumer.
141
        // Will return JSON response.
142
        JSApiConsumerInfo  = "$JS.API.CONSUMER.INFO.*.*"
143
        JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s"
144

145
        // JSApiConsumerDelete is the endpoint to delete consumers.
146
        // Will return JSON response.
147
        JSApiConsumerDelete  = "$JS.API.CONSUMER.DELETE.*.*"
148
        JSApiConsumerDeleteT = "$JS.API.CONSUMER.DELETE.%s.%s"
149

150
        // JSApiConsumerPause is the endpoint to pause or unpause consumers.
151
        // Will return JSON response.
152
        JSApiConsumerPause  = "$JS.API.CONSUMER.PAUSE.*.*"
153
        JSApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"
154

155
        // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
156
        JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
157

158
        // JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
159
        JSApiConsumerUnpin  = "$JS.API.CONSUMER.UNPIN.*.*"
160
        JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"
161

162
        // jsRequestNextPre
163
        jsRequestNextPre = "$JS.API.CONSUMER.MSG.NEXT."
164

165
        // For snapshots and restores. The ack will have additional tokens.
166
        jsSnapshotAckT    = "$JS.SNAPSHOT.ACK.%s.%s"
167
        jsRestoreDeliverT = "$JS.SNAPSHOT.RESTORE.%s.%s"
168

169
        // JSApiStreamRemovePeer is the endpoint to remove a peer from a clustered stream and its consumers.
170
        // Will return JSON response.
171
        JSApiStreamRemovePeer  = "$JS.API.STREAM.PEER.REMOVE.*"
172
        JSApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.%s"
173

174
        // JSApiStreamLeaderStepDown is the endpoint to have stream leader stepdown.
175
        // Will return JSON response.
176
        JSApiStreamLeaderStepDown  = "$JS.API.STREAM.LEADER.STEPDOWN.*"
177
        JSApiStreamLeaderStepDownT = "$JS.API.STREAM.LEADER.STEPDOWN.%s"
178

179
        // JSApiConsumerLeaderStepDown is the endpoint to have consumer leader stepdown.
180
        // Will return JSON response.
181
        JSApiConsumerLeaderStepDown  = "$JS.API.CONSUMER.LEADER.STEPDOWN.*.*"
182
        JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
183

184
        // JSApiLeaderStepDown is the endpoint to have our metaleader stepdown.
185
        // Only works from system account.
186
        // Will return JSON response.
187
        JSApiLeaderStepDown = "$JS.API.META.LEADER.STEPDOWN"
188

189
        // JSApiRemoveServer is the endpoint to remove a peer server from the cluster.
190
        // Only works from system account.
191
        // Will return JSON response.
192
        JSApiRemoveServer = "$JS.API.SERVER.REMOVE"
193

194
        // JSApiAccountPurge is the endpoint to purge the js content of an account
195
        // Only works from system account.
196
        // Will return JSON response.
197
        JSApiAccountPurge  = "$JS.API.ACCOUNT.PURGE.*"
198
        JSApiAccountPurgeT = "$JS.API.ACCOUNT.PURGE.%s"
199

200
        // JSApiServerStreamMove is the endpoint to move streams off a server
201
        // Only works from system account.
202
        // Will return JSON response.
203
        JSApiServerStreamMove  = "$JS.API.ACCOUNT.STREAM.MOVE.*.*"
204
        JSApiServerStreamMoveT = "$JS.API.ACCOUNT.STREAM.MOVE.%s.%s"
205

206
        // JSApiServerStreamCancelMove is the endpoint to cancel a stream move
207
        // Only works from system account.
208
        // Will return JSON response.
209
        JSApiServerStreamCancelMove  = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.*.*"
210
        JSApiServerStreamCancelMoveT = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.%s.%s"
211

212
        // The prefix for system level account API.
213
        jsAPIAccountPre = "$JS.API.ACCOUNT."
214

215
        // jsAckT is the template for the ack message stream coming back from a consumer
216
        // when they ACK/NAK, etc a message.
217
        jsAckT      = "$JS.ACK.%s.%s"
218
        jsAckPre    = "$JS.ACK."
219
        jsAckPreLen = len(jsAckPre)
220

221
        // jsFlowControl is for flow control subjects.
222
        jsFlowControlPre = "$JS.FC."
223
        // jsFlowControl is for FC responses.
224
        jsFlowControl = "$JS.FC.%s.%s.*"
225

226
        // JSAdvisoryPrefix is a prefix for all JetStream advisories.
227
        JSAdvisoryPrefix = "$JS.EVENT.ADVISORY"
228

229
        // JSMetricPrefix is a prefix for all JetStream metrics.
230
        JSMetricPrefix = "$JS.EVENT.METRIC"
231

232
        // JSMetricConsumerAckPre is a metric containing ack latency.
233
        JSMetricConsumerAckPre = "$JS.EVENT.METRIC.CONSUMER.ACK"
234

235
        // JSAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold.
236
        JSAdvisoryConsumerMaxDeliveryExceedPre = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES"
237

238
        // JSAdvisoryConsumerMsgNakPre is a notification published when a message has been naked
239
        JSAdvisoryConsumerMsgNakPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED"
240

241
        // JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated.
242
        JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"
243

244
        // JSAdvisoryStreamCreatedPre notification that a stream was created.
245
        JSAdvisoryStreamCreatedPre = "$JS.EVENT.ADVISORY.STREAM.CREATED"
246

247
        // JSAdvisoryStreamDeletedPre notification that a stream was deleted.
248
        JSAdvisoryStreamDeletedPre = "$JS.EVENT.ADVISORY.STREAM.DELETED"
249

250
        // JSAdvisoryStreamUpdatedPre notification that a stream was updated.
251
        JSAdvisoryStreamUpdatedPre = "$JS.EVENT.ADVISORY.STREAM.UPDATED"
252

253
        // JSAdvisoryConsumerCreatedPre notification that a consumer was created.
254
        JSAdvisoryConsumerCreatedPre = "$JS.EVENT.ADVISORY.CONSUMER.CREATED"
255

256
        // JSAdvisoryConsumerDeletedPre notification that a consumer was deleted.
257
        JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"
258

259
        // JSAdvisoryConsumerPausePre notification that a consumer paused/unpaused.
260
        JSAdvisoryConsumerPausePre = "$JS.EVENT.ADVISORY.CONSUMER.PAUSE"
261

262
        // JSAdvisoryConsumerPinnedPre notification that a consumer was pinned.
263
        JSAdvisoryConsumerPinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.PINNED"
264

265
        // JSAdvisoryConsumerUnpinnedPre notification that a consumer was unpinned.
266
        JSAdvisoryConsumerUnpinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.UNPINNED"
267

268
        // JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
269
        JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"
270

271
        // JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed.
272
        JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"
273

274
        // JSAdvisoryStreamRestoreCreatePre notification that a restore was start.
275
        JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"
276

277
        // JSAdvisoryStreamRestoreCompletePre notification that a restore was completed.
278
        JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"
279

280
        // JSAdvisoryDomainLeaderElectedPre notification that a jetstream domain has elected a leader.
281
        JSAdvisoryDomainLeaderElected = "$JS.EVENT.ADVISORY.DOMAIN.LEADER_ELECTED"
282

283
        // JSAdvisoryStreamLeaderElectedPre notification that a replicated stream has elected a leader.
284
        JSAdvisoryStreamLeaderElectedPre = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED"
285

286
        // JSAdvisoryStreamQuorumLostPre notification that a stream and its consumers are stalled.
287
        JSAdvisoryStreamQuorumLostPre = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST"
288

289
        // JSAdvisoryStreamBatchAbandonedPre notification that a stream's batch was abandoned.
290
        JSAdvisoryStreamBatchAbandonedPre = "$JS.EVENT.ADVISORY.STREAM.BATCH_ABANDONED"
291

292
        // JSAdvisoryConsumerLeaderElectedPre notification that a replicated consumer has elected a leader.
293
        JSAdvisoryConsumerLeaderElectedPre = "$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED"
294

295
        // JSAdvisoryConsumerQuorumLostPre notification that a consumer is stalled.
296
        JSAdvisoryConsumerQuorumLostPre = "$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST"
297

298
        // JSAdvisoryServerOutOfStorage notification that a server has no more storage.
299
        JSAdvisoryServerOutOfStorage = "$JS.EVENT.ADVISORY.SERVER.OUT_OF_STORAGE"
300

301
        // JSAdvisoryServerRemoved notification that a server has been removed from the system.
302
        JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED"
303

304
        // JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit.
305
        JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED"
306

307
        // JSAuditAdvisory is a notification about JetStream API access.
308
        // FIXME - Add in details about who..
309
        JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
310
)
311

312
// Headers used in $JS.API.> requests.
313
const (
314
        // JSRequiredApiLevel requires the API level of the responding server to have the specified minimum value.
315
        JSRequiredApiLevel = "Nats-Required-Api-Level"
316
)
317

318
var denyAllClientJs = []string{jsAllAPI, "$KV.>", "$OBJ.>"}
319
var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI, "$KV.>", "$OBJ.>"}
320

321
func generateJSMappingTable(domain string) map[string]string {
790✔
322
        mappings := map[string]string{}
790✔
323
        // This set of mappings is very very very ugly.
790✔
324
        // It is a consequence of what we defined the domain prefix to be "$JS.domain.API" and it's mapping to "$JS.API"
790✔
325
        // For optics $KV and $OBJ where made to be independent subject spaces.
790✔
326
        // As materialized views of JS, they did not simply extend that subject space to say "$JS.API.KV" "$JS.API.OBJ"
790✔
327
        // This is very unfortunate!!!
790✔
328
        // Furthermore, it seemed bad to require different domain prefixes for JS/KV/OBJ.
790✔
329
        // Especially since the actual API for say KV, does use stream create from JS.
790✔
330
        // To avoid overlaps KV and OBJ views append the prefix to their API.
790✔
331
        // (Replacing $KV with the prefix allows users to create collisions with say the bucket name)
790✔
332
        // This mapping therefore needs to have extra token so that the mapping can properly discern between $JS, $KV, $OBJ
790✔
333
        for srcMappingSuffix, to := range map[string]string{
790✔
334
                "INFO":       JSApiAccountInfo,
790✔
335
                "STREAM.>":   "$JS.API.STREAM.>",
790✔
336
                "CONSUMER.>": "$JS.API.CONSUMER.>",
790✔
337
                "DIRECT.>":   "$JS.API.DIRECT.>",
790✔
338
                "META.>":     "$JS.API.META.>",
790✔
339
                "SERVER.>":   "$JS.API.SERVER.>",
790✔
340
                "ACCOUNT.>":  "$JS.API.ACCOUNT.>",
790✔
341
                "$KV.>":      "$KV.>",
790✔
342
                "$OBJ.>":     "$OBJ.>",
790✔
343
        } {
7,900✔
344
                mappings[fmt.Sprintf("$JS.%s.API.%s", domain, srcMappingSuffix)] = to
7,110✔
345
        }
7,110✔
346
        return mappings
790✔
347
}
348

349
// JSMaxDescription is the maximum description length for streams and consumers.
350
const JSMaxDescriptionLen = 4 * 1024
351

352
// JSMaxMetadataLen is the maximum length for streams and consumers metadata map.
353
// It's calculated by summing length of all keys and values.
354
const JSMaxMetadataLen = 128 * 1024
355

356
// JSMaxNameLen is the maximum name lengths for streams, consumers and templates.
357
// Picked 255 as it seems to be a widely used file name limit
358
const JSMaxNameLen = 255
359

360
// JSDefaultRequestQueueLimit is the default number of entries that we will
361
// put on the global request queue before we react.
362
const JSDefaultRequestQueueLimit = 10_000
363

364
// Responses for API calls.
365

366
// ApiResponse is a standard response from the JetStream JSON API
367
type ApiResponse struct {
368
        Type  string    `json:"type"`
369
        Error *ApiError `json:"error,omitempty"`
370
}
371

372
const JSApiSystemResponseType = "io.nats.jetstream.api.v1.system_response"
373

374
// When passing back to the clients generalize store failures.
375
var (
376
        errStreamStoreFailed   = errors.New("error creating store for stream")
377
        errConsumerStoreFailed = errors.New("error creating store for consumer")
378
)
379

380
// ToError checks if the response has a error and if it does converts it to an error avoiding
381
// the pitfalls described by https://yourbasic.org/golang/gotcha-why-nil-error-not-equal-nil/
382
func (r *ApiResponse) ToError() error {
3,396✔
383
        if r.Error == nil {
5,280✔
384
                return nil
1,884✔
385
        }
1,884✔
386

387
        return r.Error
1,512✔
388
}
389

390
const JSApiOverloadedType = "io.nats.jetstream.api.v1.system_overloaded"
391

392
// ApiPaged includes variables used to create paged responses from the JSON API
393
type ApiPaged struct {
394
        Total  int `json:"total"`
395
        Offset int `json:"offset"`
396
        Limit  int `json:"limit"`
397
}
398

399
// ApiPagedRequest includes parameters allowing specific pages to be requests from APIs responding with ApiPaged
400
type ApiPagedRequest struct {
401
        Offset int `json:"offset"`
402
}
403

404
// JSApiAccountInfoResponse reports back information on jetstream for this account.
405
type JSApiAccountInfoResponse struct {
406
        ApiResponse
407
        *JetStreamAccountStats
408
}
409

410
const JSApiAccountInfoResponseType = "io.nats.jetstream.api.v1.account_info_response"
411

412
// JSApiStreamCreateResponse stream creation.
413
type JSApiStreamCreateResponse struct {
414
        ApiResponse
415
        *StreamInfo
416
        DidCreate bool `json:"did_create,omitempty"`
417
}
418

419
const JSApiStreamCreateResponseType = "io.nats.jetstream.api.v1.stream_create_response"
420

421
// JSApiStreamDeleteResponse stream removal.
422
type JSApiStreamDeleteResponse struct {
423
        ApiResponse
424
        Success bool `json:"success,omitempty"`
425
}
426

427
const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"
428

429
// JSMaxSubjectDetails The limit of the number of subject details we will send in a stream info response.
430
const JSMaxSubjectDetails = 100_000
431

432
type JSApiStreamInfoRequest struct {
433
        ApiPagedRequest
434
        DeletedDetails bool   `json:"deleted_details,omitempty"`
435
        SubjectsFilter string `json:"subjects_filter,omitempty"`
436
}
437

438
type JSApiStreamInfoResponse struct {
439
        ApiResponse
440
        ApiPaged
441
        *StreamInfo
442
}
443

444
const JSApiStreamInfoResponseType = "io.nats.jetstream.api.v1.stream_info_response"
445

446
// JSApiNamesLimit is the maximum entries we will return for streams or consumers lists.
447
// TODO(dlc) - with header or request support could request chunked response.
448
const JSApiNamesLimit = 1024
449
const JSApiListLimit = 256
450

451
type JSApiStreamNamesRequest struct {
452
        ApiPagedRequest
453
        // These are filters that can be applied to the list.
454
        Subject string `json:"subject,omitempty"`
455
}
456

457
// JSApiStreamNamesResponse list of streams.
458
// A nil request is valid and means all streams.
459
type JSApiStreamNamesResponse struct {
460
        ApiResponse
461
        ApiPaged
462
        Streams []string `json:"streams"`
463
}
464

465
const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"
466

467
type JSApiStreamListRequest struct {
468
        ApiPagedRequest
469
        // These are filters that can be applied to the list.
470
        Subject string `json:"subject,omitempty"`
471
}
472

473
// JSApiStreamListResponse list of detailed stream information.
474
// A nil request is valid and means all streams.
475
type JSApiStreamListResponse struct {
476
        ApiResponse
477
        ApiPaged
478
        Streams []*StreamInfo     `json:"streams"`
479
        Missing []string          `json:"missing,omitempty"`
480
        Offline map[string]string `json:"offline,omitempty"`
481
}
482

483
const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response"
484

485
// JSApiStreamPurgeRequest is optional request information to the purge API.
486
// Subject will filter the purge request to only messages that match the subject, which can have wildcards.
487
// Sequence will purge up to but not including this sequence and can be combined with subject filtering.
488
// Keep will specify how many messages to keep. This can also be combined with subject filtering.
489
// Note that Sequence and Keep are mutually exclusive, so both can not be set at the same time.
490
type JSApiStreamPurgeRequest struct {
491
        // Purge up to but not including sequence.
492
        Sequence uint64 `json:"seq,omitempty"`
493
        // Subject to match against messages for the purge command.
494
        Subject string `json:"filter,omitempty"`
495
        // Number of messages to keep.
496
        Keep uint64 `json:"keep,omitempty"`
497
}
498

499
type JSApiStreamPurgeResponse struct {
500
        ApiResponse
501
        Success bool   `json:"success,omitempty"`
502
        Purged  uint64 `json:"purged"`
503
}
504

505
const JSApiStreamPurgeResponseType = "io.nats.jetstream.api.v1.stream_purge_response"
506

507
type JSApiConsumerUnpinRequest struct {
508
        Group string `json:"group"`
509
}
510

511
type JSApiConsumerUnpinResponse struct {
512
        ApiResponse
513
}
514

515
const JSApiConsumerUnpinResponseType = "io.nats.jetstream.api.v1.consumer_unpin_response"
516

517
// JSApiStreamUpdateResponse for updating a stream.
518
type JSApiStreamUpdateResponse struct {
519
        ApiResponse
520
        *StreamInfo
521
}
522

523
const JSApiStreamUpdateResponseType = "io.nats.jetstream.api.v1.stream_update_response"
524

525
// JSApiMsgDeleteRequest delete message request.
526
type JSApiMsgDeleteRequest struct {
527
        Seq     uint64 `json:"seq"`
528
        NoErase bool   `json:"no_erase,omitempty"`
529
}
530

531
type JSApiMsgDeleteResponse struct {
532
        ApiResponse
533
        Success bool `json:"success,omitempty"`
534
}
535

536
const JSApiMsgDeleteResponseType = "io.nats.jetstream.api.v1.stream_msg_delete_response"
537

538
type JSApiStreamSnapshotRequest struct {
539
        // Subject to deliver the chunks to for the snapshot.
540
        DeliverSubject string `json:"deliver_subject"`
541
        // Do not include consumers in the snapshot.
542
        NoConsumers bool `json:"no_consumers,omitempty"`
543
        // Optional chunk size preference.
544
        // Best to just let server select.
545
        ChunkSize int `json:"chunk_size,omitempty"`
546
        // Check all message's checksums prior to snapshot.
547
        CheckMsgs bool `json:"jsck,omitempty"`
548
}
549

550
// JSApiStreamSnapshotResponse is the direct response to the snapshot request.
551
type JSApiStreamSnapshotResponse struct {
552
        ApiResponse
553
        // Configuration of the given stream.
554
        Config *StreamConfig `json:"config,omitempty"`
555
        // Current State for the given stream.
556
        State *StreamState `json:"state,omitempty"`
557
}
558

559
const JSApiStreamSnapshotResponseType = "io.nats.jetstream.api.v1.stream_snapshot_response"
560

561
// JSApiStreamRestoreRequest is the required restore request.
562
type JSApiStreamRestoreRequest struct {
563
        // Configuration of the given stream.
564
        Config StreamConfig `json:"config"`
565
        // Current State for the given stream.
566
        State StreamState `json:"state"`
567
}
568

569
// JSApiStreamRestoreResponse is the direct response to the restore request.
570
type JSApiStreamRestoreResponse struct {
571
        ApiResponse
572
        // Subject to deliver the chunks to for the snapshot restore.
573
        DeliverSubject string `json:"deliver_subject"`
574
}
575

576
const JSApiStreamRestoreResponseType = "io.nats.jetstream.api.v1.stream_restore_response"
577

578
// JSApiStreamRemovePeerRequest is the required remove peer request.
579
type JSApiStreamRemovePeerRequest struct {
580
        // Server name of the peer to be removed.
581
        Peer string `json:"peer"`
582
}
583

584
// JSApiStreamRemovePeerResponse is the response to a remove peer request.
585
type JSApiStreamRemovePeerResponse struct {
586
        ApiResponse
587
        Success bool `json:"success,omitempty"`
588
}
589

590
const JSApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response"
591

592
// JSApiStreamLeaderStepDownResponse is the response to a leader stepdown request.
593
type JSApiStreamLeaderStepDownResponse struct {
594
        ApiResponse
595
        Success bool `json:"success,omitempty"`
596
}
597

598
const JSApiStreamLeaderStepDownResponseType = "io.nats.jetstream.api.v1.stream_leader_stepdown_response"
599

600
// JSApiConsumerLeaderStepDownResponse is the response to a consumer leader stepdown request.
601
type JSApiConsumerLeaderStepDownResponse struct {
602
        ApiResponse
603
        Success bool `json:"success,omitempty"`
604
}
605

606
const JSApiConsumerLeaderStepDownResponseType = "io.nats.jetstream.api.v1.consumer_leader_stepdown_response"
607

608
// JSApiLeaderStepdownRequest allows placement control over the meta leader placement.
609
type JSApiLeaderStepdownRequest struct {
610
        Placement *Placement `json:"placement,omitempty"`
611
}
612

613
// JSApiLeaderStepDownResponse is the response to a meta leader stepdown request.
614
type JSApiLeaderStepDownResponse struct {
615
        ApiResponse
616
        Success bool `json:"success,omitempty"`
617
}
618

619
const JSApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_stepdown_response"
620

621
// JSApiMetaServerRemoveRequest will remove a peer from the meta group.
622
type JSApiMetaServerRemoveRequest struct {
623
        // Server name of the peer to be removed.
624
        Server string `json:"peer"`
625
        // Peer ID of the peer to be removed. If specified this is used
626
        // instead of the server name.
627
        Peer string `json:"peer_id,omitempty"`
628
}
629

630
// JSApiMetaServerRemoveResponse is the response to a peer removal request in the meta group.
631
type JSApiMetaServerRemoveResponse struct {
632
        ApiResponse
633
        Success bool `json:"success,omitempty"`
634
}
635

636
const JSApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response"
637

638
// JSApiMetaServerStreamMoveRequest will move a stream on a server to another
639
// response to this will come as JSApiStreamUpdateResponse/JSApiStreamUpdateResponseType
640
type JSApiMetaServerStreamMoveRequest struct {
641
        // Server name of the peer to be evacuated.
642
        Server string `json:"server,omitempty"`
643
        // Cluster the server is in
644
        Cluster string `json:"cluster,omitempty"`
645
        // Domain the sever is in
646
        Domain string `json:"domain,omitempty"`
647
        // Ephemeral placement tags for the move
648
        Tags []string `json:"tags,omitempty"`
649
}
650

651
const JSApiAccountPurgeResponseType = "io.nats.jetstream.api.v1.account_purge_response"
652

653
// JSApiAccountPurgeResponse is the response to a purge request in the meta group.
654
type JSApiAccountPurgeResponse struct {
655
        ApiResponse
656
        Initiated bool `json:"initiated,omitempty"`
657
}
658

659
// JSApiMsgGetRequest get a message request.
660
type JSApiMsgGetRequest struct {
661
        Seq     uint64 `json:"seq,omitempty"`
662
        LastFor string `json:"last_by_subj,omitempty"`
663
        NextFor string `json:"next_by_subj,omitempty"`
664

665
        // Batch support. Used to request more than one msg at a time.
666
        // Can be used with simple starting seq, but also NextFor with wildcards.
667
        Batch int `json:"batch,omitempty"`
668
        // This will make sure we limit how much data we blast out. If not set we will
669
        // inherit the slow consumer default max setting of the server. Default is MAX_PENDING_SIZE.
670
        MaxBytes int `json:"max_bytes,omitempty"`
671
        // Return messages as of this start time.
672
        StartTime *time.Time `json:"start_time,omitempty"`
673

674
        // Multiple response support. Will get the last msgs matching the subjects. These can include wildcards.
675
        MultiLastFor []string `json:"multi_last,omitempty"`
676
        // Only return messages up to this sequence. If not set, will be last sequence for the stream.
677
        UpToSeq uint64 `json:"up_to_seq,omitempty"`
678
        // Only return messages up to this time.
679
        UpToTime *time.Time `json:"up_to_time,omitempty"`
680
        // Only return the message payload, excluding headers if present.
681
        NoHeaders bool `json:"no_hdr,omitempty"`
682
}
683

684
type JSApiMsgGetResponse struct {
685
        ApiResponse
686
        Message *StoredMsg `json:"message,omitempty"`
687
}
688

689
const JSApiMsgGetResponseType = "io.nats.jetstream.api.v1.stream_msg_get_response"
690

691
// JSWaitQueueDefaultMax is the default max number of outstanding requests for pull consumers.
692
const JSWaitQueueDefaultMax = 512
693

694
type JSApiConsumerCreateResponse struct {
695
        ApiResponse
696
        *ConsumerInfo
697
}
698

699
const JSApiConsumerCreateResponseType = "io.nats.jetstream.api.v1.consumer_create_response"
700

701
type JSApiConsumerDeleteResponse struct {
702
        ApiResponse
703
        Success bool `json:"success,omitempty"`
704
}
705

706
const JSApiConsumerDeleteResponseType = "io.nats.jetstream.api.v1.consumer_delete_response"
707

708
type JSApiConsumerPauseRequest struct {
709
        PauseUntil time.Time `json:"pause_until,omitempty"`
710
}
711

712
const JSApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response"
713

714
type JSApiConsumerPauseResponse struct {
715
        ApiResponse
716
        Paused         bool          `json:"paused"`
717
        PauseUntil     time.Time     `json:"pause_until"`
718
        PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
719
}
720

721
type JSApiConsumerInfoResponse struct {
722
        ApiResponse
723
        *ConsumerInfo
724
}
725

726
const JSApiConsumerInfoResponseType = "io.nats.jetstream.api.v1.consumer_info_response"
727

728
type JSApiConsumersRequest struct {
729
        ApiPagedRequest
730
}
731

732
type JSApiConsumerNamesResponse struct {
733
        ApiResponse
734
        ApiPaged
735
        Consumers []string `json:"consumers"`
736
}
737

738
const JSApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_response"
739

740
type JSApiConsumerListResponse struct {
741
        ApiResponse
742
        ApiPaged
743
        Consumers []*ConsumerInfo   `json:"consumers"`
744
        Missing   []string          `json:"missing,omitempty"`
745
        Offline   map[string]string `json:"offline,omitempty"`
746
}
747

748
const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"
749

750
// JSApiConsumerGetNextRequest is for getting next messages for pull based consumers.
751
type JSApiConsumerGetNextRequest struct {
752
        Expires   time.Duration `json:"expires,omitempty"`
753
        Batch     int           `json:"batch,omitempty"`
754
        MaxBytes  int           `json:"max_bytes,omitempty"`
755
        NoWait    bool          `json:"no_wait,omitempty"`
756
        Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
757
        PriorityGroup
758
}
759

760
// Structure that holds state for a JetStream API request that is processed
761
// in a separate long-lived go routine. This is to avoid blocking connections.
762
type jsAPIRoutedReq struct {
763
        jsub    *subscription
764
        sub     *subscription
765
        acc     *Account
766
        subject string
767
        reply   string
768
        msg     []byte
769
        pa      pubArg
770
}
771

772
func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) {
135,802✔
773
        // Ignore system level directives meta stepdown and peer remove requests here.
135,802✔
774
        if subject == JSApiLeaderStepDown ||
135,802✔
775
                subject == JSApiRemoveServer ||
135,802✔
776
                strings.HasPrefix(subject, jsAPIAccountPre) {
136,289✔
777
                return
487✔
778
        }
487✔
779
        // No lock needed, those are immutable.
780
        s, rr := js.srv, js.apiSubs.Match(subject)
135,315✔
781

135,315✔
782
        hdr, msg := c.msgParts(rmsg)
135,315✔
783
        if len(sliceHeader(ClientInfoHdr, hdr)) == 0 {
135,322✔
784
                // Check if this is the system account. We will let these through for the account info only.
7✔
785
                sacc := s.SystemAccount()
7✔
786
                if sacc != acc {
7✔
787
                        return
×
788
                }
×
789
                if subject != JSApiAccountInfo {
11✔
790
                        // Only respond from the initial server entry to the NATS system.
4✔
791
                        if c.kind == CLIENT || c.kind == LEAF {
6✔
792
                                var resp = ApiResponse{
2✔
793
                                        Type:  JSApiSystemResponseType,
2✔
794
                                        Error: NewJSNotEnabledForAccountError(),
2✔
795
                                }
2✔
796
                                s.sendAPIErrResponse(nil, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
797
                        }
2✔
798
                        return
4✔
799
                }
800
        }
801

802
        // Short circuit for no interest.
803
        if len(rr.psubs)+len(rr.qsubs) == 0 {
155,295✔
804
                if (c.kind == CLIENT || c.kind == LEAF) && acc != s.SystemAccount() {
19,984✔
805
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
806
                        var resp = ApiResponse{
×
807
                                Type:  JSApiSystemResponseType,
×
808
                                Error: NewJSBadRequestError(),
×
809
                        }
×
810
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
811
                }
×
812
                return
19,984✔
813
        }
814

815
        // We should only have psubs and only 1 per result.
816
        if len(rr.psubs) != 1 {
115,327✔
817
                s.Warnf("Malformed JetStream API Request: [%s] %q", subject, rmsg)
×
818
                if c.kind == CLIENT || c.kind == LEAF {
×
819
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
820
                        var resp = ApiResponse{
×
821
                                Type:  JSApiSystemResponseType,
×
822
                                Error: NewJSBadRequestError(),
×
823
                        }
×
824
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
825
                }
×
826
                return
×
827
        }
828
        jsub := rr.psubs[0]
115,327✔
829

115,327✔
830
        // We need to make sure not to block. We will send the request to a long-lived
115,327✔
831
        // pool of go routines.
115,327✔
832

115,327✔
833
        // Increment inflight. Do this before queueing.
115,327✔
834
        atomic.AddInt64(&js.apiInflight, 1)
115,327✔
835

115,327✔
836
        // Copy the state. Note the JSAPI only uses the hdr index to piece apart the
115,327✔
837
        // header from the msg body. No other references are needed.
115,327✔
838
        // Check pending and warn if getting backed up.
115,327✔
839
        pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
115,327✔
840
        limit := atomic.LoadInt64(&js.queueLimit)
115,327✔
841
        if pending >= int(limit) {
115,591✔
842
                s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
264✔
843
                drained := int64(s.jsAPIRoutedReqs.drain())
264✔
844
                atomic.AddInt64(&js.apiInflight, -drained)
264✔
845

264✔
846
                s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
264✔
847
                        TypedEvent: TypedEvent{
264✔
848
                                Type: JSAPILimitReachedAdvisoryType,
264✔
849
                                ID:   nuid.Next(),
264✔
850
                                Time: time.Now().UTC(),
264✔
851
                        },
264✔
852
                        Server:  s.Name(),
264✔
853
                        Domain:  js.config.Domain,
264✔
854
                        Dropped: drained,
264✔
855
                })
264✔
856
        }
264✔
857
}
858

859
func (s *Server) processJSAPIRoutedRequests() {
16,860✔
860
        defer s.grWG.Done()
16,860✔
861

16,860✔
862
        s.mu.RLock()
16,860✔
863
        queue := s.jsAPIRoutedReqs
16,860✔
864
        client := &client{srv: s, kind: JETSTREAM}
16,860✔
865
        s.mu.RUnlock()
16,860✔
866

16,860✔
867
        js := s.getJetStream()
16,860✔
868

16,860✔
869
        for {
109,433✔
870
                select {
92,573✔
871
                case <-queue.ch:
75,713✔
872
                        // Only pop one item at a time here, otherwise if the system is recovering
75,713✔
873
                        // from queue buildup, then one worker will pull off all the tasks and the
75,713✔
874
                        // others will be starved of work.
75,713✔
875
                        for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
190,784✔
876
                                client.pa = r.pa
115,071✔
877
                                start := time.Now()
115,071✔
878
                                r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
115,071✔
879
                                if dur := time.Since(start); dur >= readLoopReportThreshold {
115,074✔
880
                                        s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
3✔
881
                                }
3✔
882
                                atomic.AddInt64(&js.apiInflight, -1)
115,071✔
883
                        }
884
                case <-s.quitCh:
16,844✔
885
                        return
16,844✔
886
                }
887
        }
888
}
889

890
func (s *Server) setJetStreamExportSubs() error {
4,215✔
891
        js := s.getJetStream()
4,215✔
892
        if js == nil {
4,215✔
893
                return NewJSNotEnabledError()
×
894
        }
×
895

896
        // Start the go routine that will process API requests received by the
897
        // subscription below when they are coming from routes, etc..
898
        const maxProcs = 16
4,215✔
899
        mp := runtime.GOMAXPROCS(0)
4,215✔
900
        // Cap at 16 max for now on larger core setups.
4,215✔
901
        if mp > maxProcs {
4,215✔
902
                mp = maxProcs
×
903
        }
×
904
        s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "Routed JS API Requests")
4,215✔
905
        for i := 0; i < mp; i++ {
21,075✔
906
                s.startGoRoutine(s.processJSAPIRoutedRequests)
16,860✔
907
        }
16,860✔
908

909
        // This is the catch all now for all JetStream API calls.
910
        if _, err := s.sysSubscribe(jsAllAPI, js.apiDispatch); err != nil {
4,215✔
911
                return err
×
912
        }
×
913

914
        if err := s.SystemAccount().AddServiceExport(jsAllAPI, nil); err != nil {
4,215✔
915
                s.Warnf("Error setting up jetstream service exports: %v", err)
×
916
                return err
×
917
        }
×
918

919
        // API handles themselves.
920
        pairs := []struct {
4,215✔
921
                subject string
4,215✔
922
                handler msgHandler
4,215✔
923
        }{
4,215✔
924
                {JSApiAccountInfo, s.jsAccountInfoRequest},
4,215✔
925
                {JSApiStreamCreate, s.jsStreamCreateRequest},
4,215✔
926
                {JSApiStreamUpdate, s.jsStreamUpdateRequest},
4,215✔
927
                {JSApiStreams, s.jsStreamNamesRequest},
4,215✔
928
                {JSApiStreamList, s.jsStreamListRequest},
4,215✔
929
                {JSApiStreamInfo, s.jsStreamInfoRequest},
4,215✔
930
                {JSApiStreamDelete, s.jsStreamDeleteRequest},
4,215✔
931
                {JSApiStreamPurge, s.jsStreamPurgeRequest},
4,215✔
932
                {JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
4,215✔
933
                {JSApiStreamRestore, s.jsStreamRestoreRequest},
4,215✔
934
                {JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},
4,215✔
935
                {JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},
4,215✔
936
                {JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
4,215✔
937
                {JSApiMsgDelete, s.jsMsgDeleteRequest},
4,215✔
938
                {JSApiMsgGet, s.jsMsgGetRequest},
4,215✔
939
                {JSApiConsumerCreateEx, s.jsConsumerCreateRequest},
4,215✔
940
                {JSApiConsumerCreate, s.jsConsumerCreateRequest},
4,215✔
941
                {JSApiDurableCreate, s.jsConsumerCreateRequest},
4,215✔
942
                {JSApiConsumers, s.jsConsumerNamesRequest},
4,215✔
943
                {JSApiConsumerList, s.jsConsumerListRequest},
4,215✔
944
                {JSApiConsumerInfo, s.jsConsumerInfoRequest},
4,215✔
945
                {JSApiConsumerDelete, s.jsConsumerDeleteRequest},
4,215✔
946
                {JSApiConsumerPause, s.jsConsumerPauseRequest},
4,215✔
947
                {JSApiConsumerUnpin, s.jsConsumerUnpinRequest},
4,215✔
948
        }
4,215✔
949

4,215✔
950
        js.mu.Lock()
4,215✔
951
        defer js.mu.Unlock()
4,215✔
952

4,215✔
953
        for _, p := range pairs {
105,375✔
954
                sub := &subscription{subject: []byte(p.subject), icb: p.handler}
101,160✔
955
                if err := js.apiSubs.Insert(sub); err != nil {
101,160✔
956
                        return err
×
957
                }
×
958
        }
959

960
        return nil
4,215✔
961
}
962

963
func (s *Server) sendAPIResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
38,835✔
964
        acc.trackAPI()
38,835✔
965
        if reply != _EMPTY_ {
68,934✔
966
                s.sendInternalAccountMsg(nil, reply, response)
30,099✔
967
        }
30,099✔
968
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
38,835✔
969
}
970

971
func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
26,592✔
972
        acc.trackAPIErr()
26,592✔
973
        if reply != _EMPTY_ {
41,493✔
974
                s.sendInternalAccountMsg(nil, reply, response)
14,901✔
975
        }
14,901✔
976
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
26,592✔
977
}
978

979
const errRespDelay = 500 * time.Millisecond
980

981
type delayedAPIResponse struct {
982
        ci       *ClientInfo
983
        acc      *Account
984
        subject  string
985
        reply    string
986
        request  string
987
        hdr      []byte
988
        response string
989
        rg       *raftGroup
990
        deadline time.Time
991
        noJs     bool
992
        next     *delayedAPIResponse
993
}
994

995
// Add `r` in the list that is maintained ordered by the `delayedAPIResponse.deadline` time.
996
func addDelayedResponse(head, tail **delayedAPIResponse, r *delayedAPIResponse) {
112✔
997
        // Check if list empty.
112✔
998
        if *head == nil {
210✔
999
                *head, *tail = r, r
98✔
1000
                return
98✔
1001
        }
98✔
1002
        // Check if it should be added at the end, which is if after or equal to the tail.
1003
        if r.deadline.After((*tail).deadline) || r.deadline.Equal((*tail).deadline) {
26✔
1004
                (*tail).next, *tail = r, r
12✔
1005
                return
12✔
1006
        }
12✔
1007
        // Find its spot in the list.
1008
        var prev *delayedAPIResponse
2✔
1009
        for c := *head; c != nil; c = c.next {
6✔
1010
                // We insert only if we are stricly before the current `c`.
4✔
1011
                if r.deadline.Before(c.deadline) {
6✔
1012
                        r.next = c
2✔
1013
                        if prev != nil {
3✔
1014
                                prev.next = r
1✔
1015
                        } else {
2✔
1016
                                *head = r
1✔
1017
                        }
1✔
1018
                        return
2✔
1019
                }
1020
                prev = c
2✔
1021
        }
1022
}
1023

1024
func (s *Server) delayedAPIResponder() {
7,056✔
1025
        defer s.grWG.Done()
7,056✔
1026
        var (
7,056✔
1027
                head, tail *delayedAPIResponse // Linked list.
7,056✔
1028
                r          *delayedAPIResponse // Updated by calling next().
7,056✔
1029
                rqch       <-chan struct{}     // Quit channel of the Raft group (if present).
7,056✔
1030
                tm         = time.NewTimer(time.Hour)
7,056✔
1031
        )
7,056✔
1032
        next := func() {
7,257✔
1033
                r, rqch = nil, nil
201✔
1034
                // Check that JetStream is still on. Do not exit the go routine
201✔
1035
                // since JS can be enabled/disabled. The go routine will exit
201✔
1036
                // only if server is shutdown.
201✔
1037
                js := s.getJetStream()
201✔
1038
                if js == nil {
202✔
1039
                        // Reset head and tail here. Also drain the ipQueue.
1✔
1040
                        head, tail = nil, nil
1✔
1041
                        s.delayedAPIResponses.drain()
1✔
1042
                        // Fall back into next "if" that resets timer.
1✔
1043
                }
1✔
1044
                // If there are no delayed messages then delay the timer for
1045
                // a while.
1046
                if head == nil {
299✔
1047
                        tm.Reset(time.Hour)
98✔
1048
                        return
98✔
1049
                }
98✔
1050
                // Get the first expected message and then reset the timer.
1051
                r = head
103✔
1052
                js.mu.RLock()
103✔
1053
                if r.rg != nil && r.rg.node != nil {
104✔
1054
                        // If there's an attached Raft group to the delayed response
1✔
1055
                        // then pull out the quit channel, so that we don't bother
1✔
1056
                        // sending responses for entities which are now no longer
1✔
1057
                        // running.
1✔
1058
                        rqch = r.rg.node.QuitC()
1✔
1059
                }
1✔
1060
                js.mu.RUnlock()
103✔
1061
                tm.Reset(time.Until(r.deadline))
103✔
1062
        }
1063
        pop := func() {
7,158✔
1064
                if head == nil {
102✔
1065
                        return
×
1066
                }
×
1067
                head = head.next
102✔
1068
                if head == nil {
199✔
1069
                        tail = nil
97✔
1070
                }
97✔
1071
        }
1072
        for {
14,326✔
1073
                select {
7,270✔
1074
                case <-s.delayedAPIResponses.ch:
112✔
1075
                        v, ok := s.delayedAPIResponses.popOne()
112✔
1076
                        if !ok {
112✔
1077
                                continue
×
1078
                        }
1079
                        // Add it to the list, and if ends up being the head, set things up.
1080
                        addDelayedResponse(&head, &tail, v)
112✔
1081
                        if v == head {
211✔
1082
                                next()
99✔
1083
                        }
99✔
1084
                case <-s.quitCh:
7,052✔
1085
                        return
7,052✔
1086
                case <-rqch:
1✔
1087
                        // If we were the head, drop and setup things for next.
1✔
1088
                        if r != nil && r == head {
2✔
1089
                                pop()
1✔
1090
                        }
1✔
1091
                        next()
1✔
1092
                case <-tm.C:
101✔
1093
                        if r != nil {
202✔
1094
                                // If it's not a JS API error, send it as a raw response without additional API/audit tracking.
101✔
1095
                                if r.noJs {
137✔
1096
                                        s.sendInternalAccountMsgWithReply(r.acc, r.subject, _EMPTY_, r.hdr, r.response, false)
36✔
1097
                                } else {
101✔
1098
                                        s.sendAPIErrResponse(r.ci, r.acc, r.subject, r.reply, r.request, r.response)
65✔
1099
                                }
65✔
1100
                                pop()
101✔
1101
                        }
1102
                        next()
101✔
1103
                }
1104
        }
1105
}
1106

1107
func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup, duration time.Duration) {
76✔
1108
        s.delayedAPIResponses.push(&delayedAPIResponse{
76✔
1109
                ci, acc, subject, reply, request, nil, response, rg, time.Now().Add(duration), false, nil,
76✔
1110
        })
76✔
1111
}
76✔
1112

1113
func (s *Server) sendDelayedErrResponse(acc *Account, subject string, hdr []byte, response string, duration time.Duration) {
36✔
1114
        s.delayedAPIResponses.push(&delayedAPIResponse{
36✔
1115
                nil, acc, subject, _EMPTY_, _EMPTY_, hdr, response, nil, time.Now().Add(duration), true, nil,
36✔
1116
        })
36✔
1117
}
36✔
1118

1119
func (s *Server) getRequestInfo(c *client, raw []byte) (pci *ClientInfo, acc *Account, hdr, msg []byte, err error) {
105,880✔
1120
        hdr, msg = c.msgParts(raw)
105,880✔
1121
        var ci ClientInfo
105,880✔
1122

105,880✔
1123
        if len(hdr) > 0 {
211,676✔
1124
                if err := json.Unmarshal(sliceHeader(ClientInfoHdr, hdr), &ci); err != nil {
105,796✔
1125
                        return nil, nil, nil, nil, err
×
1126
                }
×
1127
        }
1128

1129
        if ci.Service != _EMPTY_ {
105,940✔
1130
                acc, _ = s.LookupAccount(ci.Service)
60✔
1131
        } else if ci.Account != _EMPTY_ {
211,616✔
1132
                acc, _ = s.LookupAccount(ci.Account)
105,736✔
1133
        } else {
105,820✔
1134
                // Direct $SYS access.
84✔
1135
                acc = c.acc
84✔
1136
                if acc == nil {
88✔
1137
                        acc = s.SystemAccount()
4✔
1138
                }
4✔
1139
        }
1140
        if acc == nil {
105,890✔
1141
                return nil, nil, nil, nil, ErrMissingAccount
10✔
1142
        }
10✔
1143
        return &ci, acc, hdr, msg, nil
105,870✔
1144
}
1145

1146
func (s *Server) unmarshalRequest(c *client, acc *Account, subject string, msg []byte, v any) error {
18,576✔
1147
        decoder := json.NewDecoder(bytes.NewReader(msg))
18,576✔
1148
        decoder.DisallowUnknownFields()
18,576✔
1149

18,576✔
1150
        for {
55,722✔
1151
                if err := decoder.Decode(v); err != nil {
55,722✔
1152
                        if err == io.EOF {
37,146✔
1153
                                return nil
18,570✔
1154
                        }
18,570✔
1155

1156
                        var syntaxErr *json.SyntaxError
6✔
1157
                        if errors.As(err, &syntaxErr) {
6✔
1158
                                err = fmt.Errorf("%w at offset %d", err, syntaxErr.Offset)
×
1159
                        }
×
1160

1161
                        c.RateLimitWarnf("Invalid JetStream request '%s > %s': %s", acc, subject, err)
6✔
1162

6✔
1163
                        if s.JetStreamConfig().Strict {
12✔
1164
                                return err
6✔
1165
                        }
6✔
1166

1167
                        return json.Unmarshal(msg, v)
×
1168
                }
1169
        }
1170
}
1171

1172
func (a *Account) trackAPI() {
38,835✔
1173
        a.mu.RLock()
38,835✔
1174
        jsa := a.js
38,835✔
1175
        a.mu.RUnlock()
38,835✔
1176
        if jsa != nil {
77,568✔
1177
                jsa.usageMu.Lock()
38,733✔
1178
                jsa.usageApi++
38,733✔
1179
                jsa.apiTotal++
38,733✔
1180
                jsa.sendClusterUsageUpdate()
38,733✔
1181
                atomic.AddInt64(&jsa.js.apiTotal, 1)
38,733✔
1182
                jsa.usageMu.Unlock()
38,733✔
1183
        }
38,733✔
1184
}
1185

1186
func (a *Account) trackAPIErr() {
26,592✔
1187
        a.mu.RLock()
26,592✔
1188
        jsa := a.js
26,592✔
1189
        a.mu.RUnlock()
26,592✔
1190
        if jsa != nil {
52,963✔
1191
                jsa.usageMu.Lock()
26,371✔
1192
                jsa.usageApi++
26,371✔
1193
                jsa.apiTotal++
26,371✔
1194
                jsa.usageErr++
26,371✔
1195
                jsa.apiErrors++
26,371✔
1196
                jsa.sendClusterUsageUpdate()
26,371✔
1197
                atomic.AddInt64(&jsa.js.apiTotal, 1)
26,371✔
1198
                atomic.AddInt64(&jsa.js.apiErrors, 1)
26,371✔
1199
                jsa.usageMu.Unlock()
26,371✔
1200
        }
26,371✔
1201
}
1202

1203
const badAPIRequestT = "Malformed JetStream API Request: %q"
1204

1205
// Helper function to check on JetStream being enabled but also on status of leafnodes
1206
// If the local account is not enabled but does have leafnode connectivity we will not
1207
// want to error immediately and let the other side decide.
1208
func (a *Account) checkJetStream() (enabled, shouldError bool) {
44,070✔
1209
        a.mu.RLock()
44,070✔
1210
        defer a.mu.RUnlock()
44,070✔
1211
        return a.js != nil, a.nleafs+a.nrleafs == 0
44,070✔
1212
}
44,070✔
1213

1214
// Request for current usage and limits for this account.
1215
func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
500✔
1216
        if c == nil || !s.JetStreamEnabled() {
500✔
1217
                return
×
1218
        }
×
1219

1220
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
500✔
1221
        if err != nil {
501✔
1222
                s.Warnf(badAPIRequestT, msg)
1✔
1223
                return
1✔
1224
        }
1✔
1225

1226
        var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}}
499✔
1227
        if errorOnRequiredApiLevel(hdr) {
500✔
1228
                resp.Error = NewJSRequiredApiLevelError()
1✔
1229
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1230
                return
1✔
1231
        }
1✔
1232

1233
        // Determine if we should proceed here when we are in clustered mode.
1234
        if s.JetStreamIsClustered() {
922✔
1235
                js, cc := s.getJetStreamCluster()
424✔
1236
                if js == nil || cc == nil {
424✔
1237
                        return
×
1238
                }
×
1239
                if js.isLeaderless() {
425✔
1240
                        resp.Error = NewJSClusterNotAvailError()
1✔
1241
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1242
                        return
1✔
1243
                }
1✔
1244
                // Make sure we are meta leader.
1245
                if !s.JetStreamIsLeader() {
716✔
1246
                        return
293✔
1247
                }
293✔
1248
        }
1249

1250
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
210✔
1251
                if !doErr {
7✔
1252
                        return
1✔
1253
                }
1✔
1254
                resp.Error = NewJSNotEnabledForAccountError()
5✔
1255
        } else {
198✔
1256
                stats := acc.JetStreamUsage()
198✔
1257
                resp.JetStreamAccountStats = &stats
198✔
1258
        }
198✔
1259
        b, err := json.Marshal(resp)
203✔
1260
        if err != nil {
203✔
1261
                return
×
1262
        }
×
1263

1264
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), string(b))
203✔
1265
}
1266

1267
// Helpers for token extraction.
1268
func streamNameFromSubject(subject string) string {
79,468✔
1269
        return tokenAt(subject, 5)
79,468✔
1270
}
79,468✔
1271

1272
func consumerNameFromSubject(subject string) string {
48,961✔
1273
        return tokenAt(subject, 6)
48,961✔
1274
}
48,961✔
1275

1276
func (s *Server) jsonResponse(v any) string {
66,010✔
1277
        b, err := json.Marshal(v)
66,010✔
1278
        if err != nil {
66,010✔
1279
                s.Warnf("Problem marshaling JSON for JetStream API:", err)
×
1280
                return ""
×
1281
        }
×
1282
        return string(b)
66,010✔
1283
}
1284

1285
// Read lock must be held
1286
func (jsa *jsAccount) tieredReservation(tier string, cfg *StreamConfig) int64 {
3,518✔
1287
        reservation := int64(0)
3,518✔
1288
        if tier == _EMPTY_ {
7,014✔
1289
                for _, sa := range jsa.streams {
11,955✔
1290
                        if sa.cfg.MaxBytes > 0 {
8,486✔
1291
                                if sa.cfg.Storage == cfg.Storage && sa.cfg.Name != cfg.Name {
27✔
1292
                                        reservation += (int64(sa.cfg.Replicas) * sa.cfg.MaxBytes)
×
1293
                                }
×
1294
                        }
1295
                }
1296
        } else {
22✔
1297
                for _, sa := range jsa.streams {
37✔
1298
                        if sa.cfg.Replicas == cfg.Replicas {
29✔
1299
                                if sa.cfg.MaxBytes > 0 {
19✔
1300
                                        if isSameTier(&sa.cfg, cfg) && sa.cfg.Name != cfg.Name {
10✔
1301
                                                reservation += (int64(sa.cfg.Replicas) * sa.cfg.MaxBytes)
5✔
1302
                                        }
5✔
1303
                                }
1304
                        }
1305
                }
1306
        }
1307
        return reservation
3,518✔
1308
}
1309

1310
// Request to create a stream.
1311
func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
11,726✔
1312
        if c == nil || !s.JetStreamEnabled() {
11,997✔
1313
                return
271✔
1314
        }
271✔
1315
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
11,455✔
1316
        if err != nil {
11,458✔
1317
                s.Warnf(badAPIRequestT, msg)
3✔
1318
                return
3✔
1319
        }
3✔
1320

1321
        var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
11,452✔
1322
        if errorOnRequiredApiLevel(hdr) {
11,453✔
1323
                resp.Error = NewJSRequiredApiLevelError()
1✔
1324
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1325
                return
1✔
1326
        }
1✔
1327

1328
        // Determine if we should proceed here when we are in clustered mode.
1329
        if s.JetStreamIsClustered() {
21,575✔
1330
                js, cc := s.getJetStreamCluster()
10,124✔
1331
                if js == nil || cc == nil {
10,124✔
1332
                        return
×
1333
                }
×
1334
                if js.isLeaderless() {
10,125✔
1335
                        resp.Error = NewJSClusterNotAvailError()
1✔
1336
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1337
                        return
1✔
1338
                }
1✔
1339
                // Make sure we are meta leader.
1340
                if !s.JetStreamIsLeader() {
17,058✔
1341
                        return
6,935✔
1342
                }
6,935✔
1343
        }
1344

1345
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
4,523✔
1346
                if doErr {
8✔
1347
                        resp.Error = NewJSNotEnabledForAccountError()
×
1348
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1349
                }
×
1350
                return
8✔
1351
        }
1352

1353
        var cfg StreamConfigRequest
4,507✔
1354
        if err := s.unmarshalRequest(c, acc, subject, msg, &cfg); err != nil {
4,508✔
1355
                resp.Error = NewJSInvalidJSONError(err)
1✔
1356
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1357
                return
1✔
1358
        }
1✔
1359

1360
        // Initialize asset version metadata.
1361
        setStaticStreamMetadata(&cfg.StreamConfig)
4,506✔
1362

4,506✔
1363
        streamName := streamNameFromSubject(subject)
4,506✔
1364
        if streamName != cfg.Name {
4,507✔
1365
                resp.Error = NewJSStreamMismatchError()
1✔
1366
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1367
                return
1✔
1368
        }
1✔
1369

1370
        // Check for path like separators in the name.
1371
        if strings.ContainsAny(streamName, `\/`) {
4,507✔
1372
                resp.Error = NewJSStreamNameContainsPathSeparatorsError()
2✔
1373
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1374
                return
2✔
1375
        }
2✔
1376

1377
        // Can't create a stream with a sealed state.
1378
        if cfg.Sealed {
4,505✔
1379
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed"))
2✔
1380
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1381
                return
2✔
1382
        }
2✔
1383

1384
        // If we are told to do mirror direct but are not mirroring, error.
1385
        if cfg.MirrorDirect && cfg.Mirror == nil {
4,501✔
1386
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream has no mirror but does have mirror direct"))
×
1387
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1388
                return
×
1389
        }
×
1390

1391
        // Hand off to cluster for processing.
1392
        if s.JetStreamIsClustered() {
7,687✔
1393
                s.jsClusteredStreamRequest(ci, acc, subject, reply, rmsg, &cfg)
3,186✔
1394
                return
3,186✔
1395
        }
3,186✔
1396

1397
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg.StreamConfig); err != nil {
1,317✔
1398
                resp.Error = err
2✔
1399
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1400
                return
2✔
1401
        }
2✔
1402

1403
        mset, err := acc.addStreamPedantic(&cfg.StreamConfig, cfg.Pedantic)
1,313✔
1404
        if err != nil {
1,364✔
1405
                if IsNatsErr(err, JSStreamStoreFailedF) {
51✔
1406
                        s.Warnf("Stream create failed for '%s > %s': %v", acc, streamName, err)
×
1407
                        err = errStreamStoreFailed
×
1408
                }
×
1409
                resp.Error = NewJSStreamCreateError(err, Unless(err))
51✔
1410
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
51✔
1411
                return
51✔
1412
        }
1413
        msetCfg := mset.config()
1,262✔
1414
        resp.StreamInfo = &StreamInfo{
1,262✔
1415
                Created:   mset.createdTime(),
1,262✔
1416
                State:     mset.state(),
1,262✔
1417
                Config:    *setDynamicStreamMetadata(&msetCfg),
1,262✔
1418
                TimeStamp: time.Now().UTC(),
1,262✔
1419
                Mirror:    mset.mirrorInfo(),
1,262✔
1420
                Sources:   mset.sourcesInfo(),
1,262✔
1421
        }
1,262✔
1422
        resp.DidCreate = true
1,262✔
1423
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,262✔
1424
}
1425

1426
// Request to update a stream.
1427
func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
827✔
1428
        if c == nil || !s.JetStreamEnabled() {
827✔
1429
                return
×
1430
        }
×
1431

1432
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
827✔
1433
        if err != nil {
827✔
1434
                s.Warnf(badAPIRequestT, msg)
×
1435
                return
×
1436
        }
×
1437

1438
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
827✔
1439
        if errorOnRequiredApiLevel(hdr) {
828✔
1440
                resp.Error = NewJSRequiredApiLevelError()
1✔
1441
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1442
                return
1✔
1443
        }
1✔
1444

1445
        // Determine if we should proceed here when we are in clustered mode.
1446
        if s.JetStreamIsClustered() {
1,560✔
1447
                js, cc := s.getJetStreamCluster()
734✔
1448
                if js == nil || cc == nil {
734✔
1449
                        return
×
1450
                }
×
1451
                if js.isLeaderless() {
735✔
1452
                        resp.Error = NewJSClusterNotAvailError()
1✔
1453
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1454
                        return
1✔
1455
                }
1✔
1456
                // Make sure we are meta leader.
1457
                if !s.JetStreamIsLeader() {
1,292✔
1458
                        return
559✔
1459
                }
559✔
1460
        }
1461

1462
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
266✔
1463
                if doErr {
×
1464
                        resp.Error = NewJSNotEnabledForAccountError()
×
1465
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1466
                }
×
1467
                return
×
1468
        }
1469
        var ncfg StreamConfigRequest
266✔
1470
        if err := s.unmarshalRequest(c, acc, subject, msg, &ncfg); err != nil {
267✔
1471
                resp.Error = NewJSInvalidJSONError(err)
1✔
1472
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1473
                return
1✔
1474
        }
1✔
1475

1476
        cfg, apiErr := s.checkStreamCfg(&ncfg.StreamConfig, acc, ncfg.Pedantic)
265✔
1477
        if apiErr != nil {
284✔
1478
                resp.Error = apiErr
19✔
1479
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
19✔
1480
                return
19✔
1481
        }
19✔
1482

1483
        streamName := streamNameFromSubject(subject)
246✔
1484
        if streamName != cfg.Name {
247✔
1485
                resp.Error = NewJSStreamMismatchError()
1✔
1486
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1487
                return
1✔
1488
        }
1✔
1489

1490
        // Handle clustered version here.
1491
        if s.JetStreamIsClustered() {
414✔
1492
                s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
169✔
1493
                return
169✔
1494
        }
169✔
1495

1496
        mset, err := acc.lookupStream(streamName)
76✔
1497
        if err != nil {
80✔
1498
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
1499
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
1500
                return
4✔
1501
        }
4✔
1502
        if mset.offlineReason != _EMPTY_ {
72✔
1503
                resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason))
×
1504
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
1505
                return
×
1506
        }
×
1507

1508
        // Update asset version metadata.
1509
        setStaticStreamMetadata(&cfg)
72✔
1510

72✔
1511
        if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil {
84✔
1512
                resp.Error = NewJSStreamUpdateError(err, Unless(err))
12✔
1513
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
12✔
1514
                return
12✔
1515
        }
12✔
1516

1517
        msetCfg := mset.config()
60✔
1518
        resp.StreamInfo = &StreamInfo{
60✔
1519
                Created:   mset.createdTime(),
60✔
1520
                State:     mset.state(),
60✔
1521
                Config:    *setDynamicStreamMetadata(&msetCfg),
60✔
1522
                Domain:    s.getOpts().JetStreamDomain,
60✔
1523
                Mirror:    mset.mirrorInfo(),
60✔
1524
                Sources:   mset.sourcesInfo(),
60✔
1525
                TimeStamp: time.Now().UTC(),
60✔
1526
        }
60✔
1527
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
60✔
1528
}
1529

1530
// Request for the list of all stream names.
1531
func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,303✔
1532
        if c == nil || !s.JetStreamEnabled() {
1,303✔
1533
                return
×
1534
        }
×
1535
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
1,303✔
1536
        if err != nil {
1,303✔
1537
                s.Warnf(badAPIRequestT, msg)
×
1538
                return
×
1539
        }
×
1540

1541
        var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}}
1,303✔
1542
        if errorOnRequiredApiLevel(hdr) {
1,304✔
1543
                resp.Error = NewJSRequiredApiLevelError()
1✔
1544
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1545
                return
1✔
1546
        }
1✔
1547

1548
        // Determine if we should proceed here when we are in clustered mode.
1549
        if s.JetStreamIsClustered() {
2,336✔
1550
                js, cc := s.getJetStreamCluster()
1,034✔
1551
                if js == nil || cc == nil {
1,034✔
1552
                        return
×
1553
                }
×
1554
                if js.isLeaderless() {
1,034✔
1555
                        resp.Error = NewJSClusterNotAvailError()
×
1556
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1557
                        return
×
1558
                }
×
1559
                // Make sure we are meta leader.
1560
                if !s.JetStreamIsLeader() {
1,790✔
1561
                        return
756✔
1562
                }
756✔
1563
        }
1564

1565
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
547✔
1566
                if doErr {
1✔
1567
                        resp.Error = NewJSNotEnabledForAccountError()
×
1568
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1569
                }
×
1570
                return
1✔
1571
        }
1572

1573
        var offset int
545✔
1574
        var filter string
545✔
1575

545✔
1576
        if isJSONObjectOrArray(msg) {
915✔
1577
                var req JSApiStreamNamesRequest
370✔
1578
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
370✔
1579
                        resp.Error = NewJSInvalidJSONError(err)
×
1580
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1581
                        return
×
1582
                }
×
1583
                offset = req.Offset
370✔
1584
                if req.Subject != _EMPTY_ {
716✔
1585
                        filter = req.Subject
346✔
1586
                }
346✔
1587
        }
1588

1589
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1590
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1591
        var numStreams int
545✔
1592
        if s.JetStreamIsClustered() {
823✔
1593
                js, cc := s.getJetStreamCluster()
278✔
1594
                if js == nil || cc == nil {
278✔
1595
                        // TODO(dlc) - Debug or Warn?
×
1596
                        return
×
1597
                }
×
1598
                js.mu.RLock()
278✔
1599
                for stream, sa := range cc.streams[acc.Name] {
604✔
1600
                        if IsNatsErr(sa.err, JSClusterNotAssignedErr) {
326✔
1601
                                continue
×
1602
                        }
1603
                        if filter != _EMPTY_ {
609✔
1604
                                // These could not have subjects auto-filled in since they are raw and unprocessed.
283✔
1605
                                if len(sa.Config.Subjects) == 0 {
285✔
1606
                                        if SubjectsCollide(filter, sa.Config.Name) {
2✔
1607
                                                resp.Streams = append(resp.Streams, stream)
×
1608
                                        }
×
1609
                                } else {
281✔
1610
                                        for _, subj := range sa.Config.Subjects {
562✔
1611
                                                if SubjectsCollide(filter, subj) {
507✔
1612
                                                        resp.Streams = append(resp.Streams, stream)
226✔
1613
                                                        break
226✔
1614
                                                }
1615
                                        }
1616
                                }
1617
                        } else {
43✔
1618
                                resp.Streams = append(resp.Streams, stream)
43✔
1619
                        }
43✔
1620
                }
1621
                js.mu.RUnlock()
278✔
1622
                if len(resp.Streams) > 1 {
280✔
1623
                        slices.Sort(resp.Streams)
2✔
1624
                }
2✔
1625
                numStreams = len(resp.Streams)
278✔
1626
                if offset > numStreams {
278✔
1627
                        offset = numStreams
×
1628
                }
×
1629
                if offset > 0 {
278✔
1630
                        resp.Streams = resp.Streams[offset:]
×
1631
                }
×
1632
                if len(resp.Streams) > JSApiNamesLimit {
278✔
1633
                        resp.Streams = resp.Streams[:JSApiNamesLimit]
×
1634
                }
×
1635
        } else {
267✔
1636
                msets := acc.filteredStreams(filter)
267✔
1637
                // Since we page results order matters.
267✔
1638
                if len(msets) > 1 {
273✔
1639
                        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
19✔
1640
                }
1641

1642
                numStreams = len(msets)
267✔
1643
                if offset > numStreams {
267✔
1644
                        offset = numStreams
×
1645
                }
×
1646

1647
                for _, mset := range msets[offset:] {
544✔
1648
                        resp.Streams = append(resp.Streams, mset.cfg.Name)
277✔
1649
                        if len(resp.Streams) >= JSApiNamesLimit {
277✔
1650
                                break
×
1651
                        }
1652
                }
1653
        }
1654
        resp.Total = numStreams
545✔
1655
        resp.Limit = JSApiNamesLimit
545✔
1656
        resp.Offset = offset
545✔
1657

545✔
1658
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
545✔
1659
}
1660

1661
// Request for the list of all detailed stream info.
1662
// TODO(dlc) - combine with above long term
1663
func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
67✔
1664
        if c == nil || !s.JetStreamEnabled() {
67✔
1665
                return
×
1666
        }
×
1667
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
67✔
1668
        if err != nil {
67✔
1669
                s.Warnf(badAPIRequestT, msg)
×
1670
                return
×
1671
        }
×
1672

1673
        var resp = JSApiStreamListResponse{
67✔
1674
                ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
67✔
1675
                Streams:     []*StreamInfo{},
67✔
1676
        }
67✔
1677
        if errorOnRequiredApiLevel(hdr) {
68✔
1678
                resp.Error = NewJSRequiredApiLevelError()
1✔
1679
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1680
                return
1✔
1681
        }
1✔
1682

1683
        // Determine if we should proceed here when we are in clustered mode.
1684
        if s.JetStreamIsClustered() {
127✔
1685
                js, cc := s.getJetStreamCluster()
61✔
1686
                if js == nil || cc == nil {
61✔
1687
                        return
×
1688
                }
×
1689
                if js.isLeaderless() {
62✔
1690
                        resp.Error = NewJSClusterNotAvailError()
1✔
1691
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1692
                        return
1✔
1693
                }
1✔
1694
                // Make sure we are meta leader.
1695
                if !s.JetStreamIsLeader() {
103✔
1696
                        return
43✔
1697
                }
43✔
1698
        }
1699

1700
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
22✔
1701
                if doErr {
×
1702
                        resp.Error = NewJSNotEnabledForAccountError()
×
1703
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1704
                }
×
1705
                return
×
1706
        }
1707

1708
        var offset int
22✔
1709
        var filter string
22✔
1710

22✔
1711
        if isJSONObjectOrArray(msg) {
33✔
1712
                var req JSApiStreamListRequest
11✔
1713
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
1714
                        resp.Error = NewJSInvalidJSONError(err)
1✔
1715
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1716
                        return
1✔
1717
                }
1✔
1718
                offset = req.Offset
10✔
1719
                if req.Subject != _EMPTY_ {
12✔
1720
                        filter = req.Subject
2✔
1721
                }
2✔
1722
        }
1723

1724
        // Clustered mode will invoke a scatter and gather.
1725
        if s.JetStreamIsClustered() {
38✔
1726
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
17✔
1727
                msg = copyBytes(msg)
17✔
1728
                s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) })
34✔
1729
                return
17✔
1730
        }
1731

1732
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1733
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1734
        var msets []*stream
4✔
1735
        if filter == _EMPTY_ {
7✔
1736
                msets = acc.streams()
3✔
1737
        } else {
4✔
1738
                msets = acc.filteredStreams(filter)
1✔
1739
        }
1✔
1740

1741
        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
5✔
1742

1743
        scnt := len(msets)
4✔
1744
        if offset > scnt {
4✔
1745
                offset = scnt
×
1746
        }
×
1747

1748
        var missingNames []string
4✔
1749
        for _, mset := range msets[offset:] {
9✔
1750
                if mset.offlineReason != _EMPTY_ {
6✔
1751
                        if resp.Offline == nil {
2✔
1752
                                resp.Offline = make(map[string]string, 1)
1✔
1753
                        }
1✔
1754
                        resp.Offline[mset.getCfgName()] = mset.offlineReason
1✔
1755
                        missingNames = append(missingNames, mset.getCfgName())
1✔
1756
                        continue
1✔
1757
                }
1758

1759
                config := mset.config()
4✔
1760
                resp.Streams = append(resp.Streams, &StreamInfo{
4✔
1761
                        Created:   mset.createdTime(),
4✔
1762
                        State:     mset.state(),
4✔
1763
                        Config:    config,
4✔
1764
                        Domain:    s.getOpts().JetStreamDomain,
4✔
1765
                        Mirror:    mset.mirrorInfo(),
4✔
1766
                        Sources:   mset.sourcesInfo(),
4✔
1767
                        TimeStamp: time.Now().UTC(),
4✔
1768
                })
4✔
1769
                if len(resp.Streams) >= JSApiListLimit {
4✔
1770
                        break
×
1771
                }
1772
        }
1773
        resp.Total = scnt
4✔
1774
        resp.Limit = JSApiListLimit
4✔
1775
        resp.Offset = offset
4✔
1776
        resp.Missing = missingNames
4✔
1777
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
4✔
1778
}
1779

1780
// Request for information about a stream.
1781
func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
24,168✔
1782
        if c == nil || !s.JetStreamEnabled() {
24,175✔
1783
                return
7✔
1784
        }
7✔
1785
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
24,161✔
1786
        if err != nil {
24,166✔
1787
                s.Warnf(badAPIRequestT, msg)
5✔
1788
                return
5✔
1789
        }
5✔
1790

1791
        streamName := streamNameFromSubject(subject)
24,156✔
1792

24,156✔
1793
        var resp = JSApiStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamInfoResponseType}}
24,156✔
1794

24,156✔
1795
        // If someone creates a duplicate stream that is identical we will get this request forwarded to us.
24,156✔
1796
        // Make sure the response type is for a create call.
24,156✔
1797
        if rt := getHeader(JSResponseType, hdr); len(rt) > 0 && string(rt) == jsCreateResponse {
24,156✔
1798
                resp.ApiResponse.Type = JSApiStreamCreateResponseType
×
1799
        }
×
1800
        if errorOnRequiredApiLevel(hdr) {
24,157✔
1801
                resp.Error = NewJSRequiredApiLevelError()
1✔
1802
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1803
                return
1✔
1804
        }
1✔
1805

1806
        var clusterWideConsCount int
24,155✔
1807

24,155✔
1808
        js, cc := s.getJetStreamCluster()
24,155✔
1809
        if js == nil {
24,155✔
1810
                return
×
1811
        }
×
1812
        // If we are in clustered mode we need to be the stream leader to proceed.
1813
        if cc != nil {
34,161✔
1814
                // Check to make sure the stream is assigned.
10,006✔
1815
                js.mu.RLock()
10,006✔
1816
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName)
10,006✔
1817
                var offline bool
10,006✔
1818
                if sa != nil {
18,802✔
1819
                        clusterWideConsCount = len(sa.consumers)
8,796✔
1820
                        offline = s.allPeersOffline(sa.Group)
8,796✔
1821
                        if sa.unsupported != nil && sa.Group != nil && cc.meta != nil && sa.Group.isMember(cc.meta.ID()) {
8,826✔
1822
                                // If we're a member for this stream, and it's not supported, report it as offline.
30✔
1823
                                resp.Error = NewJSStreamOfflineReasonError(errors.New(sa.unsupported.reason))
30✔
1824
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
30✔
1825
                                js.mu.RUnlock()
30✔
1826
                                return
30✔
1827
                        }
30✔
1828
                }
1829
                js.mu.RUnlock()
9,976✔
1830

9,976✔
1831
                if isLeader && sa == nil {
10,237✔
1832
                        // We can't find the stream, so mimic what would be the errors below.
261✔
1833
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
261✔
1834
                                if doErr {
×
1835
                                        resp.Error = NewJSNotEnabledForAccountError()
×
1836
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1837
                                }
×
1838
                                return
×
1839
                        }
1840
                        // No stream present.
1841
                        resp.Error = NewJSStreamNotFoundError()
261✔
1842
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
261✔
1843
                        return
261✔
1844
                } else if sa == nil {
10,664✔
1845
                        if js.isLeaderless() {
949✔
1846
                                resp.Error = NewJSClusterNotAvailError()
×
1847
                                // Delaying an error response gives the leader a chance to respond before us
×
1848
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
1849
                        }
×
1850
                        return
949✔
1851
                } else if isLeader && offline {
8,768✔
1852
                        resp.Error = NewJSStreamOfflineError()
2✔
1853
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
1854
                        return
2✔
1855
                }
2✔
1856

1857
                // Check to see if we are a member of the group and if the group has no leader.
1858
                isLeaderless := js.isGroupLeaderless(sa.Group)
8,764✔
1859

8,764✔
1860
                // We have the stream assigned and a leader, so only the stream leader should answer.
8,764✔
1861
                if !acc.JetStreamIsStreamLeader(streamName) && !isLeaderless {
15,784✔
1862
                        if js.isLeaderless() {
7,020✔
1863
                                resp.Error = NewJSClusterNotAvailError()
×
1864
                                // Delaying an error response gives the leader a chance to respond before us
×
1865
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), sa.Group, errRespDelay)
×
1866
                                return
×
1867
                        }
×
1868

1869
                        // We may be in process of electing a leader, but if this is a scale up from 1 we will still be the state leader
1870
                        // while the new members work through the election and catchup process.
1871
                        // Double check for that instead of exiting here and being silent. e.g. nats stream update test --replicas=3
1872
                        js.mu.RLock()
7,020✔
1873
                        rg := sa.Group
7,020✔
1874
                        var ourID string
7,020✔
1875
                        if cc.meta != nil {
14,040✔
1876
                                ourID = cc.meta.ID()
7,020✔
1877
                        }
7,020✔
1878
                        // We have seen cases where rg is nil at this point,
1879
                        // so check explicitly and bail if that is the case.
1880
                        bail := rg == nil || !rg.isMember(ourID)
7,020✔
1881
                        if !bail {
9,521✔
1882
                                // We know we are a member here, if this group is new and we are preferred allow us to answer.
2,501✔
1883
                                // Also, we have seen cases where rg.node is nil at this point,
2,501✔
1884
                                // so check explicitly and bail if that is the case.
2,501✔
1885
                                bail = rg.Preferred != ourID || (rg.node != nil && time.Since(rg.node.Created()) > lostQuorumIntervalDefault)
2,501✔
1886
                        }
2,501✔
1887
                        js.mu.RUnlock()
7,020✔
1888
                        if bail {
14,027✔
1889
                                return
7,007✔
1890
                        }
7,007✔
1891
                }
1892
        }
1893

1894
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
15,912✔
1895
                if doErr {
7✔
1896
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
1897
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1898
                }
1✔
1899
                return
6✔
1900
        }
1901

1902
        var details bool
15,900✔
1903
        var subjects string
15,900✔
1904
        var offset int
15,900✔
1905
        if isJSONObjectOrArray(msg) {
15,935✔
1906
                var req JSApiStreamInfoRequest
35✔
1907
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
36✔
1908
                        resp.Error = NewJSInvalidJSONError(err)
1✔
1909
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1910
                        return
1✔
1911
                }
1✔
1912
                details, subjects = req.DeletedDetails, req.SubjectsFilter
34✔
1913
                offset = req.Offset
34✔
1914
        }
1915

1916
        mset, err := acc.lookupStream(streamName)
15,899✔
1917
        // Error is not to be expected at this point, but could happen if same stream trying to be created.
15,899✔
1918
        if err != nil {
16,616✔
1919
                if cc != nil {
717✔
1920
                        // This could be inflight, pause for a short bit and try again.
×
1921
                        // This will not be inline, so ok.
×
1922
                        time.Sleep(10 * time.Millisecond)
×
1923
                        mset, err = acc.lookupStream(streamName)
×
1924
                }
×
1925
                // Check again.
1926
                if err != nil {
1,434✔
1927
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
717✔
1928
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
717✔
1929
                        return
717✔
1930
                }
717✔
1931
        }
1932

1933
        if mset.offlineReason != _EMPTY_ {
15,183✔
1934
                resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason))
1✔
1935
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
1936
                return
1✔
1937
        }
1✔
1938

1939
        config := mset.config()
15,181✔
1940
        resp.StreamInfo = &StreamInfo{
15,181✔
1941
                Created:    mset.createdTime(),
15,181✔
1942
                State:      mset.stateWithDetail(details),
15,181✔
1943
                Config:     *setDynamicStreamMetadata(&config),
15,181✔
1944
                Domain:     s.getOpts().JetStreamDomain,
15,181✔
1945
                Cluster:    js.clusterInfo(mset.raftGroup()),
15,181✔
1946
                Mirror:     mset.mirrorInfo(),
15,181✔
1947
                Sources:    mset.sourcesInfo(),
15,181✔
1948
                Alternates: js.streamAlternates(ci, config.Name),
15,181✔
1949
                TimeStamp:  time.Now().UTC(),
15,181✔
1950
        }
15,181✔
1951
        if clusterWideConsCount > 0 {
15,643✔
1952
                resp.StreamInfo.State.Consumers = clusterWideConsCount
462✔
1953
        }
462✔
1954

1955
        // Check if they have asked for subject details.
1956
        if subjects != _EMPTY_ {
15,213✔
1957
                st := mset.store.SubjectsTotals(subjects)
32✔
1958
                if lst := len(st); lst > 0 {
60✔
1959
                        // Common for both cases.
28✔
1960
                        resp.Offset = offset
28✔
1961
                        resp.Limit = JSMaxSubjectDetails
28✔
1962
                        resp.Total = lst
28✔
1963

28✔
1964
                        if offset == 0 && lst <= JSMaxSubjectDetails {
56✔
1965
                                resp.StreamInfo.State.Subjects = st
28✔
1966
                        } else {
28✔
1967
                                // Here we have to filter list due to offset or maximum constraints.
×
1968
                                subjs := make([]string, 0, len(st))
×
1969
                                for subj := range st {
×
1970
                                        subjs = append(subjs, subj)
×
1971
                                }
×
1972
                                // Sort it
1973
                                slices.Sort(subjs)
×
1974

×
1975
                                if offset > len(subjs) {
×
1976
                                        offset = len(subjs)
×
1977
                                }
×
1978

1979
                                end := offset + JSMaxSubjectDetails
×
1980
                                if end > len(subjs) {
×
1981
                                        end = len(subjs)
×
1982
                                }
×
1983
                                actualSize := end - offset
×
1984
                                var sd map[string]uint64
×
1985

×
1986
                                if actualSize > 0 {
×
1987
                                        sd = make(map[string]uint64, actualSize)
×
1988
                                        for _, ss := range subjs[offset:end] {
×
1989
                                                sd[ss] = st[ss]
×
1990
                                        }
×
1991
                                }
1992
                                resp.StreamInfo.State.Subjects = sd
×
1993
                        }
1994
                }
1995
        }
1996
        // Check for out of band catchups.
1997
        if mset.hasCatchupPeers() {
15,182✔
1998
                mset.checkClusterInfo(resp.StreamInfo.Cluster)
1✔
1999
        }
1✔
2000

2001
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
15,181✔
2002
}
2003

2004
// Request to have a stream leader stepdown.
2005
func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
247✔
2006
        if c == nil || !s.JetStreamEnabled() {
247✔
2007
                return
×
2008
        }
×
2009
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
247✔
2010
        if err != nil {
247✔
2011
                s.Warnf(badAPIRequestT, msg)
×
2012
                return
×
2013
        }
×
2014

2015
        // Have extra token for this one.
2016
        name := tokenAt(subject, 6)
247✔
2017

247✔
2018
        var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}}
247✔
2019
        if errorOnRequiredApiLevel(hdr) {
248✔
2020
                resp.Error = NewJSRequiredApiLevelError()
1✔
2021
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2022
                return
1✔
2023
        }
1✔
2024

2025
        // If we are not in clustered mode this is a failed request.
2026
        if !s.JetStreamIsClustered() {
246✔
2027
                resp.Error = NewJSClusterRequiredError()
×
2028
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2029
                return
×
2030
        }
×
2031

2032
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2033
        js, cc := s.getJetStreamCluster()
246✔
2034
        if js == nil || cc == nil {
246✔
2035
                return
×
2036
        }
×
2037
        if js.isLeaderless() {
246✔
2038
                resp.Error = NewJSClusterNotAvailError()
×
2039
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2040
                return
×
2041
        }
×
2042

2043
        js.mu.RLock()
246✔
2044
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
246✔
2045
        js.mu.RUnlock()
246✔
2046

246✔
2047
        if isLeader && sa == nil {
246✔
2048
                resp.Error = NewJSStreamNotFoundError()
×
2049
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2050
                return
×
2051
        } else if sa == nil {
247✔
2052
                return
1✔
2053
        }
1✔
2054

2055
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
245✔
2056
                if doErr {
×
2057
                        resp.Error = NewJSNotEnabledForAccountError()
×
2058
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2059
                }
×
2060
                return
×
2061
        }
2062

2063
        // Check to see if we are a member of the group and if the group has no leader.
2064
        if js.isGroupLeaderless(sa.Group) {
245✔
2065
                resp.Error = NewJSClusterNotAvailError()
×
2066
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2067
                return
×
2068
        }
×
2069

2070
        // We have the stream assigned and a leader, so only the stream leader should answer.
2071
        if !acc.JetStreamIsStreamLeader(name) {
437✔
2072
                return
192✔
2073
        }
192✔
2074

2075
        mset, err := acc.lookupStream(name)
53✔
2076
        if err != nil {
53✔
2077
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
2078
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2079
                return
×
2080
        }
×
2081

2082
        if mset == nil {
53✔
2083
                resp.Success = true
×
2084
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2085
                return
×
2086
        }
×
2087

2088
        node := mset.raftNode()
53✔
2089
        if node == nil {
53✔
2090
                resp.Success = true
×
2091
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2092
                return
×
2093
        }
×
2094

2095
        var preferredLeader string
53✔
2096
        if isJSONObjectOrArray(msg) {
66✔
2097
                var req JSApiLeaderStepdownRequest
13✔
2098
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2099
                        resp.Error = NewJSInvalidJSONError(err)
×
2100
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2101
                        return
×
2102
                }
×
2103
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(node, req.Placement); resp.Error != nil {
18✔
2104
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2105
                        return
5✔
2106
                }
5✔
2107
        }
2108

2109
        // Call actual stepdown.
2110
        err = node.StepDown(preferredLeader)
48✔
2111
        if err != nil {
48✔
2112
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2113
        } else {
48✔
2114
                resp.Success = true
48✔
2115
        }
48✔
2116
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
48✔
2117
}
2118

2119
// Request to have a consumer leader stepdown.
2120
func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
224✔
2121
        if c == nil || !s.JetStreamEnabled() {
224✔
2122
                return
×
2123
        }
×
2124
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
224✔
2125
        if err != nil {
224✔
2126
                s.Warnf(badAPIRequestT, msg)
×
2127
                return
×
2128
        }
×
2129

2130
        var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}}
224✔
2131
        if errorOnRequiredApiLevel(hdr) {
225✔
2132
                resp.Error = NewJSRequiredApiLevelError()
1✔
2133
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2134
                return
1✔
2135
        }
1✔
2136

2137
        // If we are not in clustered mode this is a failed request.
2138
        if !s.JetStreamIsClustered() {
223✔
2139
                resp.Error = NewJSClusterRequiredError()
×
2140
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2141
                return
×
2142
        }
×
2143

2144
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2145
        js, cc := s.getJetStreamCluster()
223✔
2146
        if js == nil || cc == nil {
223✔
2147
                return
×
2148
        }
×
2149
        if js.isLeaderless() {
223✔
2150
                resp.Error = NewJSClusterNotAvailError()
×
2151
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2152
                return
×
2153
        }
×
2154

2155
        // Have extra token for this one.
2156
        stream := tokenAt(subject, 6)
223✔
2157
        consumer := tokenAt(subject, 7)
223✔
2158

223✔
2159
        js.mu.RLock()
223✔
2160
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
223✔
2161
        js.mu.RUnlock()
223✔
2162

223✔
2163
        if isLeader && sa == nil {
223✔
2164
                resp.Error = NewJSStreamNotFoundError()
×
2165
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2166
                return
×
2167
        } else if sa == nil {
223✔
2168
                return
×
2169
        }
×
2170
        var ca *consumerAssignment
223✔
2171
        if sa.consumers != nil {
446✔
2172
                ca = sa.consumers[consumer]
223✔
2173
        }
223✔
2174
        if ca == nil {
223✔
2175
                resp.Error = NewJSConsumerNotFoundError()
×
2176
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2177
                return
×
2178
        }
×
2179
        // Check to see if we are a member of the group and if the group has no leader.
2180
        if js.isGroupLeaderless(ca.Group) {
223✔
2181
                resp.Error = NewJSClusterNotAvailError()
×
2182
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2183
                return
×
2184
        }
×
2185

2186
        if !acc.JetStreamIsConsumerLeader(stream, consumer) {
384✔
2187
                return
161✔
2188
        }
161✔
2189

2190
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
62✔
2191
                if doErr {
×
2192
                        resp.Error = NewJSNotEnabledForAccountError()
×
2193
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2194
                }
×
2195
                return
×
2196
        }
2197

2198
        mset, err := acc.lookupStream(stream)
62✔
2199
        if err != nil {
62✔
2200
                resp.Error = NewJSStreamNotFoundError()
×
2201
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2202
                return
×
2203
        }
×
2204
        o := mset.lookupConsumer(consumer)
62✔
2205
        if o == nil {
62✔
2206
                resp.Error = NewJSConsumerNotFoundError()
×
2207
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2208
                return
×
2209
        }
×
2210

2211
        n := o.raftNode()
62✔
2212
        if n == nil {
62✔
2213
                resp.Success = true
×
2214
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2215
                return
×
2216
        }
×
2217

2218
        var preferredLeader string
62✔
2219
        if isJSONObjectOrArray(msg) {
74✔
2220
                var req JSApiLeaderStepdownRequest
12✔
2221
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
2222
                        resp.Error = NewJSInvalidJSONError(err)
×
2223
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2224
                        return
×
2225
                }
×
2226
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(n, req.Placement); resp.Error != nil {
17✔
2227
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2228
                        return
5✔
2229
                }
5✔
2230
        }
2231

2232
        // Call actual stepdown.
2233
        err = n.StepDown(preferredLeader)
57✔
2234
        if err != nil {
57✔
2235
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2236
        } else {
57✔
2237
                resp.Success = true
57✔
2238
        }
57✔
2239
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
57✔
2240
}
2241

2242
// Request to remove a peer from a clustered stream.
2243
func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
70✔
2244
        if c == nil || !s.JetStreamEnabled() {
70✔
2245
                return
×
2246
        }
×
2247
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
70✔
2248
        if err != nil {
70✔
2249
                s.Warnf(badAPIRequestT, msg)
×
2250
                return
×
2251
        }
×
2252

2253
        // Have extra token for this one.
2254
        name := tokenAt(subject, 6)
70✔
2255

70✔
2256
        var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}}
70✔
2257
        if errorOnRequiredApiLevel(hdr) {
71✔
2258
                resp.Error = NewJSRequiredApiLevelError()
1✔
2259
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2260
                return
1✔
2261
        }
1✔
2262

2263
        // If we are not in clustered mode this is a failed request.
2264
        if !s.JetStreamIsClustered() {
69✔
2265
                resp.Error = NewJSClusterRequiredError()
×
2266
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2267
                return
×
2268
        }
×
2269

2270
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2271
        js, cc := s.getJetStreamCluster()
69✔
2272
        if js == nil || cc == nil {
69✔
2273
                return
×
2274
        }
×
2275
        if js.isLeaderless() {
69✔
2276
                resp.Error = NewJSClusterNotAvailError()
×
2277
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2278
                return
×
2279
        }
×
2280

2281
        js.mu.RLock()
69✔
2282
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
69✔
2283
        js.mu.RUnlock()
69✔
2284

69✔
2285
        // Make sure we are meta leader.
69✔
2286
        if !isLeader {
127✔
2287
                return
58✔
2288
        }
58✔
2289

2290
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
2291
                if doErr {
×
2292
                        resp.Error = NewJSNotEnabledForAccountError()
×
2293
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2294
                }
×
2295
                return
×
2296
        }
2297
        if isEmptyRequest(msg) {
11✔
2298
                resp.Error = NewJSBadRequestError()
×
2299
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2300
                return
×
2301
        }
×
2302

2303
        var req JSApiStreamRemovePeerRequest
11✔
2304
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
2305
                resp.Error = NewJSInvalidJSONError(err)
×
2306
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2307
                return
×
2308
        }
×
2309
        if req.Peer == _EMPTY_ {
11✔
2310
                resp.Error = NewJSBadRequestError()
×
2311
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2312
                return
×
2313
        }
×
2314

2315
        if sa == nil {
11✔
2316
                // No stream present.
×
2317
                resp.Error = NewJSStreamNotFoundError()
×
2318
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2319
                return
×
2320
        }
×
2321

2322
        // Check to see if we are a member of the group and if the group has no leader.
2323
        // Peers here is a server name, convert to node name.
2324
        nodeName := getHash(req.Peer)
11✔
2325

11✔
2326
        js.mu.RLock()
11✔
2327
        rg := sa.Group
11✔
2328
        isMember := rg.isMember(nodeName)
11✔
2329
        js.mu.RUnlock()
11✔
2330

11✔
2331
        // Make sure we are a member.
11✔
2332
        if !isMember {
12✔
2333
                resp.Error = NewJSClusterPeerNotMemberError()
1✔
2334
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2335
                return
1✔
2336
        }
1✔
2337

2338
        // If we are here we have a valid peer member set for removal.
2339
        if !js.removePeerFromStream(sa, nodeName) {
12✔
2340
                resp.Error = NewJSPeerRemapError()
2✔
2341
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2342
                return
2✔
2343
        }
2✔
2344

2345
        resp.Success = true
8✔
2346
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
8✔
2347
}
2348

2349
// Request to have the metaleader remove a peer from the system.
2350
func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
13✔
2351
        if c == nil || !s.JetStreamEnabled() {
13✔
2352
                return
×
2353
        }
×
2354

2355
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
13✔
2356
        if err != nil {
13✔
2357
                s.Warnf(badAPIRequestT, msg)
×
2358
                return
×
2359
        }
×
2360
        if acc != s.SystemAccount() {
13✔
2361
                return
×
2362
        }
×
2363

2364
        js, cc := s.getJetStreamCluster()
13✔
2365
        if js == nil || cc == nil {
13✔
2366
                return
×
2367
        }
×
2368

2369
        js.mu.RLock()
13✔
2370
        isLeader := cc.isLeader()
13✔
2371
        meta := cc.meta
13✔
2372
        js.mu.RUnlock()
13✔
2373

13✔
2374
        // Extra checks here but only leader is listening.
13✔
2375
        if !isLeader {
13✔
2376
                return
×
2377
        }
×
2378

2379
        var resp = JSApiMetaServerRemoveResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerRemoveResponseType}}
13✔
2380
        if errorOnRequiredApiLevel(hdr) {
13✔
2381
                resp.Error = NewJSRequiredApiLevelError()
×
2382
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2383
                return
×
2384
        }
×
2385

2386
        if isEmptyRequest(msg) {
13✔
2387
                resp.Error = NewJSBadRequestError()
×
2388
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2389
                return
×
2390
        }
×
2391

2392
        var req JSApiMetaServerRemoveRequest
13✔
2393
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2394
                resp.Error = NewJSInvalidJSONError(err)
×
2395
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2396
                return
×
2397
        }
×
2398

2399
        js.mu.Lock()
13✔
2400
        defer js.mu.Unlock()
13✔
2401

13✔
2402
        // Another peer-remove is already in progress, don't allow multiple concurrent changes.
13✔
2403
        if cc.peerRemoveReply != nil {
15✔
2404
                resp.Error = NewJSClusterServerMemberChangeInflightError()
2✔
2405
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2406
                return
2✔
2407
        }
2✔
2408

2409
        var found string
11✔
2410
        for _, p := range meta.Peers() {
49✔
2411
                // If Peer is specified, it takes precedence
38✔
2412
                if req.Peer != _EMPTY_ {
43✔
2413
                        if p.ID == req.Peer {
6✔
2414
                                found = req.Peer
1✔
2415
                                break
1✔
2416
                        }
2417
                        continue
4✔
2418
                }
2419
                si, ok := s.nodeToInfo.Load(p.ID)
33✔
2420
                if ok && si.(nodeInfo).name == req.Server {
39✔
2421
                        found = p.ID
6✔
2422
                        break
6✔
2423
                }
2424
        }
2425

2426
        if found == _EMPTY_ {
15✔
2427
                resp.Error = NewJSClusterServerNotMemberError()
4✔
2428
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
2429
                return
4✔
2430
        }
4✔
2431

2432
        if err := meta.ProposeRemovePeer(found); err != nil {
7✔
2433
                if err == errMembershipChange {
×
2434
                        resp.Error = NewJSClusterServerMemberChangeInflightError()
×
2435
                } else {
×
2436
                        resp.Error = NewJSRaftGeneralError(err)
×
2437
                }
×
2438
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2439
                return
×
2440
        }
2441

2442
        if cc.peerRemoveReply == nil {
14✔
2443
                cc.peerRemoveReply = make(map[string]peerRemoveInfo, 1)
7✔
2444
        }
7✔
2445
        // Only copy the request, the subject and reply are already copied.
2446
        cc.peerRemoveReply[found] = peerRemoveInfo{ci: ci, subject: subject, reply: reply, request: string(msg)}
7✔
2447
}
2448

2449
func (s *Server) peerSetToNames(ps []string) []string {
175✔
2450
        names := make([]string, len(ps))
175✔
2451
        for i := 0; i < len(ps); i++ {
646✔
2452
                if si, ok := s.nodeToInfo.Load(ps[i]); !ok {
471✔
2453
                        names[i] = ps[i]
×
2454
                } else {
471✔
2455
                        names[i] = si.(nodeInfo).name
471✔
2456
                }
471✔
2457
        }
2458
        return names
175✔
2459
}
2460

2461
// looks up the peer id for a given server name. Cluster and domain name are optional filter criteria
2462
func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string {
29✔
2463
        js.mu.RLock()
29✔
2464
        defer js.mu.RUnlock()
29✔
2465
        if cc := js.cluster; cc != nil {
58✔
2466
                for _, p := range cc.meta.Peers() {
186✔
2467
                        si, ok := s.nodeToInfo.Load(p.ID)
157✔
2468
                        if ok && si.(nodeInfo).name == serverName {
186✔
2469
                                if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
58✔
2470
                                        if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
58✔
2471
                                                return p.ID
29✔
2472
                                        }
29✔
2473
                                }
2474
                        }
2475
                }
2476
        }
2477
        return _EMPTY_
×
2478
}
2479

2480
// Request to have the metaleader move a stream on a peer to another
2481
func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
33✔
2482
        if c == nil || !s.JetStreamEnabled() {
33✔
2483
                return
×
2484
        }
×
2485

2486
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
33✔
2487
        if err != nil {
33✔
2488
                s.Warnf(badAPIRequestT, msg)
×
2489
                return
×
2490
        }
×
2491

2492
        js, cc := s.getJetStreamCluster()
33✔
2493
        if js == nil || cc == nil {
33✔
2494
                return
×
2495
        }
×
2496

2497
        // Extra checks here but only leader is listening.
2498
        js.mu.RLock()
33✔
2499
        isLeader := cc.isLeader()
33✔
2500
        js.mu.RUnlock()
33✔
2501

33✔
2502
        if !isLeader {
33✔
2503
                return
×
2504
        }
×
2505

2506
        accName := tokenAt(subject, 6)
33✔
2507
        streamName := tokenAt(subject, 7)
33✔
2508

33✔
2509
        if acc.GetName() != accName && acc != s.SystemAccount() {
33✔
2510
                return
×
2511
        }
×
2512

2513
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
33✔
2514
        if errorOnRequiredApiLevel(hdr) {
33✔
2515
                resp.Error = NewJSRequiredApiLevelError()
×
2516
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2517
                return
×
2518
        }
×
2519

2520
        var req JSApiMetaServerStreamMoveRequest
33✔
2521
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
33✔
2522
                resp.Error = NewJSInvalidJSONError(err)
×
2523
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2524
                return
×
2525
        }
×
2526

2527
        srcPeer := _EMPTY_
33✔
2528
        if req.Server != _EMPTY_ {
62✔
2529
                srcPeer = s.nameToPeer(js, req.Server, req.Cluster, req.Domain)
29✔
2530
        }
29✔
2531

2532
        targetAcc, ok := s.accounts.Load(accName)
33✔
2533
        if !ok {
33✔
2534
                resp.Error = NewJSNoAccountError()
×
2535
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2536
                return
×
2537
        }
×
2538

2539
        var streamFound bool
33✔
2540
        cfg := StreamConfig{}
33✔
2541
        currPeers := []string{}
33✔
2542
        currCluster := _EMPTY_
33✔
2543
        js.mu.Lock()
33✔
2544
        streams, ok := cc.streams[accName]
33✔
2545
        if ok {
66✔
2546
                sa, ok := streams[streamName]
33✔
2547
                if ok {
66✔
2548
                        cfg = *sa.Config.clone()
33✔
2549
                        streamFound = true
33✔
2550
                        currPeers = sa.Group.Peers
33✔
2551
                        currCluster = sa.Group.Cluster
33✔
2552
                }
33✔
2553
        }
2554
        js.mu.Unlock()
33✔
2555

33✔
2556
        if !streamFound {
33✔
2557
                resp.Error = NewJSStreamNotFoundError()
×
2558
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2559
                return
×
2560
        }
×
2561

2562
        // if server was picked, make sure src peer exists and move it to first position.
2563
        // removal will drop peers from the left
2564
        if req.Server != _EMPTY_ {
62✔
2565
                if srcPeer == _EMPTY_ {
29✔
2566
                        resp.Error = NewJSClusterServerNotMemberError()
×
2567
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2568
                        return
×
2569
                }
×
2570
                var peerFound bool
29✔
2571
                for i := 0; i < len(currPeers); i++ {
83✔
2572
                        if currPeers[i] == srcPeer {
83✔
2573
                                copy(currPeers[1:], currPeers[:i])
29✔
2574
                                currPeers[0] = srcPeer
29✔
2575
                                peerFound = true
29✔
2576
                                break
29✔
2577
                        }
2578
                }
2579
                if !peerFound {
29✔
2580
                        resp.Error = NewJSClusterPeerNotMemberError()
×
2581
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2582
                        return
×
2583
                }
×
2584
        }
2585

2586
        // make sure client is scoped to requested account
2587
        ciNew := *(ci)
33✔
2588
        ciNew.Account = accName
33✔
2589

33✔
2590
        // backup placement such that peers can be looked up with modified tag list
33✔
2591
        var origPlacement *Placement
33✔
2592
        if cfg.Placement != nil {
33✔
2593
                tmp := *cfg.Placement
×
2594
                origPlacement = &tmp
×
2595
        }
×
2596

2597
        if len(req.Tags) > 0 {
60✔
2598
                if cfg.Placement == nil {
54✔
2599
                        cfg.Placement = &Placement{}
27✔
2600
                }
27✔
2601
                cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...)
27✔
2602
        }
2603

2604
        peers, e := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1, nil)
33✔
2605
        if len(peers) <= cfg.Replicas {
35✔
2606
                // since expanding in the same cluster did not yield a result, try in different cluster
2✔
2607
                peers = nil
2✔
2608

2✔
2609
                clusters := map[string]struct{}{}
2✔
2610
                s.nodeToInfo.Range(func(_, ni any) bool {
18✔
2611
                        if currCluster != ni.(nodeInfo).cluster {
24✔
2612
                                clusters[ni.(nodeInfo).cluster] = struct{}{}
8✔
2613
                        }
8✔
2614
                        return true
16✔
2615
                })
2616
                errs := &selectPeerError{}
2✔
2617
                errs.accumulate(e)
2✔
2618
                for cluster := range clusters {
4✔
2619
                        newPeers, e := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0, nil)
2✔
2620
                        if len(newPeers) >= cfg.Replicas {
4✔
2621
                                peers = append([]string{}, currPeers...)
2✔
2622
                                peers = append(peers, newPeers[:cfg.Replicas]...)
2✔
2623
                                break
2✔
2624
                        }
2625
                        errs.accumulate(e)
×
2626
                }
2627
                if peers == nil {
2✔
2628
                        resp.Error = NewJSClusterNoPeersError(errs)
×
2629
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2630
                        return
×
2631
                }
×
2632
        }
2633

2634
        cfg.Placement = origPlacement
33✔
2635

33✔
2636
        s.Noticef("Requested move for stream '%s > %s' R=%d from %+v to %+v",
33✔
2637
                accName, streamName, cfg.Replicas, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
33✔
2638

33✔
2639
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
33✔
2640
        // We should be fine ignoring pedantic mode here. as we do not touch configuration.
33✔
2641
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
33✔
2642
}
2643

2644
// Request to have the metaleader move a stream on a peer to another
2645
func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
5✔
2646
        if c == nil || !s.JetStreamEnabled() {
5✔
2647
                return
×
2648
        }
×
2649

2650
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
5✔
2651
        if err != nil {
5✔
2652
                s.Warnf(badAPIRequestT, msg)
×
2653
                return
×
2654
        }
×
2655

2656
        js, cc := s.getJetStreamCluster()
5✔
2657
        if js == nil || cc == nil {
5✔
2658
                return
×
2659
        }
×
2660

2661
        // Extra checks here but only leader is listening.
2662
        js.mu.RLock()
5✔
2663
        isLeader := cc.isLeader()
5✔
2664
        js.mu.RUnlock()
5✔
2665

5✔
2666
        if !isLeader {
5✔
2667
                return
×
2668
        }
×
2669

2670
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
5✔
2671
        if errorOnRequiredApiLevel(hdr) {
5✔
2672
                resp.Error = NewJSRequiredApiLevelError()
×
2673
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2674
                return
×
2675
        }
×
2676

2677
        accName := tokenAt(subject, 6)
5✔
2678
        streamName := tokenAt(subject, 7)
5✔
2679

5✔
2680
        if acc.GetName() != accName && acc != s.SystemAccount() {
5✔
2681
                return
×
2682
        }
×
2683

2684
        targetAcc, ok := s.accounts.Load(accName)
5✔
2685
        if !ok {
5✔
2686
                resp.Error = NewJSNoAccountError()
×
2687
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2688
                return
×
2689
        }
×
2690

2691
        streamFound := false
5✔
2692
        cfg := StreamConfig{}
5✔
2693
        currPeers := []string{}
5✔
2694
        js.mu.Lock()
5✔
2695
        streams, ok := cc.streams[accName]
5✔
2696
        if ok {
10✔
2697
                sa, ok := streams[streamName]
5✔
2698
                if ok {
10✔
2699
                        cfg = *sa.Config.clone()
5✔
2700
                        streamFound = true
5✔
2701
                        currPeers = sa.Group.Peers
5✔
2702
                }
5✔
2703
        }
2704
        js.mu.Unlock()
5✔
2705

5✔
2706
        if !streamFound {
5✔
2707
                resp.Error = NewJSStreamNotFoundError()
×
2708
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2709
                return
×
2710
        }
×
2711

2712
        if len(currPeers) <= cfg.Replicas {
6✔
2713
                resp.Error = NewJSStreamMoveNotInProgressError()
1✔
2714
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2715
                return
1✔
2716
        }
1✔
2717

2718
        // make sure client is scoped to requested account
2719
        ciNew := *(ci)
4✔
2720
        ciNew.Account = accName
4✔
2721

4✔
2722
        peers := currPeers[:cfg.Replicas]
4✔
2723

4✔
2724
        // Remove placement in case tags don't match
4✔
2725
        // This can happen if the move was initiated by modifying the tags.
4✔
2726
        // This is an account operation.
4✔
2727
        // This can NOT happen when the move was initiated by the system account.
4✔
2728
        // There move honors the original tag list.
4✔
2729
        if cfg.Placement != nil && len(cfg.Placement.Tags) != 0 {
5✔
2730
        FOR_TAGCHECK:
1✔
2731
                for _, peer := range peers {
2✔
2732
                        si, ok := s.nodeToInfo.Load(peer)
1✔
2733
                        if !ok {
1✔
2734
                                // can't verify tags, do the safe thing and error
×
2735
                                resp.Error = NewJSStreamGeneralError(
×
2736
                                        fmt.Errorf("peer %s not present for tag validation", peer))
×
2737
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2738
                                return
×
2739
                        }
×
2740
                        nodeTags := si.(nodeInfo).tags
1✔
2741
                        for _, tag := range cfg.Placement.Tags {
2✔
2742
                                if !nodeTags.Contains(tag) {
2✔
2743
                                        // clear placement as tags don't match
1✔
2744
                                        cfg.Placement = nil
1✔
2745
                                        break FOR_TAGCHECK
1✔
2746
                                }
2747
                        }
2748

2749
                }
2750
        }
2751

2752
        s.Noticef("Requested cancel of move: R=%d '%s > %s' to peer set %+v and restore previous peer set %+v",
4✔
2753
                cfg.Replicas, accName, streamName, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
4✔
2754

4✔
2755
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
4✔
2756
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
4✔
2757
}
2758

2759
// Request to have an account purged
2760
func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
6✔
2761
        if c == nil || !s.JetStreamEnabled() {
6✔
2762
                return
×
2763
        }
×
2764

2765
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
6✔
2766
        if err != nil {
6✔
2767
                s.Warnf(badAPIRequestT, msg)
×
2768
                return
×
2769
        }
×
2770
        if acc != s.SystemAccount() {
6✔
2771
                return
×
2772
        }
×
2773

2774
        js := s.getJetStream()
6✔
2775
        if js == nil {
6✔
2776
                return
×
2777
        }
×
2778

2779
        accName := tokenAt(subject, 5)
6✔
2780

6✔
2781
        var resp = JSApiAccountPurgeResponse{ApiResponse: ApiResponse{Type: JSApiAccountPurgeResponseType}}
6✔
2782
        if errorOnRequiredApiLevel(hdr) {
6✔
2783
                resp.Error = NewJSRequiredApiLevelError()
×
2784
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2785
                return
×
2786
        }
×
2787

2788
        if !s.JetStreamIsClustered() {
8✔
2789
                var streams []*stream
2✔
2790
                var ac *Account
2✔
2791
                if ac, err = s.lookupAccount(accName); err == nil && ac != nil {
3✔
2792
                        streams = ac.streams()
1✔
2793
                }
1✔
2794

2795
                s.Noticef("Purge request for account %s (streams: %d, hasAccount: %t)",
2✔
2796
                        accName, len(streams), ac != nil)
2✔
2797

2✔
2798
                for _, mset := range streams {
3✔
2799
                        err := mset.delete()
1✔
2800
                        if err != nil {
1✔
2801
                                resp.Error = NewJSStreamDeleteError(err)
×
2802
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2803
                                return
×
2804
                        }
×
2805
                }
2806
                if err := os.RemoveAll(filepath.Join(js.config.StoreDir, accName)); err != nil {
2✔
2807
                        resp.Error = NewJSStreamGeneralError(err)
×
2808
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2809
                        return
×
2810
                }
×
2811
                resp.Initiated = true
2✔
2812
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2813
                return
2✔
2814
        }
2815

2816
        _, cc := s.getJetStreamCluster()
4✔
2817

4✔
2818
        js.mu.RLock()
4✔
2819
        isLeader := cc.isLeader()
4✔
2820
        meta := cc.meta
4✔
2821
        js.mu.RUnlock()
4✔
2822

4✔
2823
        if !isLeader {
4✔
2824
                return
×
2825
        }
×
2826

2827
        if js.isMetaRecovering() {
4✔
2828
                // While in recovery mode, the data structures are not fully initialized
×
2829
                resp.Error = NewJSClusterNotAvailError()
×
2830
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2831
                return
×
2832
        }
×
2833

2834
        js.mu.RLock()
4✔
2835
        ns, nc := 0, 0
4✔
2836
        streams, hasAccount := cc.streams[accName]
4✔
2837
        for _, osa := range streams {
12✔
2838
                for _, oca := range osa.consumers {
20✔
2839
                        oca.deleted = true
12✔
2840
                        ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client, Created: oca.Created}
12✔
2841
                        meta.Propose(encodeDeleteConsumerAssignment(ca))
12✔
2842
                        nc++
12✔
2843
                }
12✔
2844
                sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client, Created: osa.Created}
8✔
2845
                meta.Propose(encodeDeleteStreamAssignment(sa))
8✔
2846
                ns++
8✔
2847
        }
2848
        js.mu.RUnlock()
4✔
2849

4✔
2850
        s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)", accName, ns, nc, hasAccount)
4✔
2851

4✔
2852
        resp.Initiated = true
4✔
2853
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
2854
}
2855

2856
// Request to have the meta leader stepdown.
2857
// These will only be received by the meta leader, so less checking needed.
2858
func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
25✔
2859
        if c == nil || !s.JetStreamEnabled() {
25✔
2860
                return
×
2861
        }
×
2862

2863
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
25✔
2864
        if err != nil {
25✔
2865
                s.Warnf(badAPIRequestT, msg)
×
2866
                return
×
2867
        }
×
2868

2869
        // This should only be coming from the System Account.
2870
        if acc != s.SystemAccount() {
26✔
2871
                s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User)
1✔
2872
                return
1✔
2873
        }
1✔
2874

2875
        js, cc := s.getJetStreamCluster()
24✔
2876
        if js == nil || cc == nil {
24✔
2877
                return
×
2878
        }
×
2879

2880
        // Extra checks here but only leader is listening.
2881
        js.mu.RLock()
24✔
2882
        isLeader := cc.isLeader()
24✔
2883
        meta := cc.meta
24✔
2884
        js.mu.RUnlock()
24✔
2885

24✔
2886
        if !isLeader {
24✔
2887
                return
×
2888
        }
×
2889

2890
        var preferredLeader string
24✔
2891
        var resp = JSApiLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiLeaderStepDownResponseType}}
24✔
2892
        if errorOnRequiredApiLevel(hdr) {
24✔
2893
                resp.Error = NewJSRequiredApiLevelError()
×
2894
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2895
                return
×
2896
        }
×
2897

2898
        if isJSONObjectOrArray(msg) {
39✔
2899
                var req JSApiLeaderStepdownRequest
15✔
2900
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
15✔
2901
                        resp.Error = NewJSInvalidJSONError(err)
×
2902
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2903
                        return
×
2904
                }
×
2905
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(meta, req.Placement); resp.Error != nil {
21✔
2906
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
6✔
2907
                        return
6✔
2908
                }
6✔
2909
        }
2910

2911
        // Call actual stepdown.
2912
        err = meta.StepDown(preferredLeader)
18✔
2913
        if err != nil {
18✔
2914
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2915
        } else {
18✔
2916
                resp.Success = true
18✔
2917
        }
18✔
2918
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
18✔
2919
}
2920

2921
// Check if given []bytes is a JSON Object or Array.
2922
// Technically, valid JSON can also be a plain string or number, but for our use case,
2923
// we care only for JSON objects or arrays which starts with `[` or `{`.
2924
// This function does not have to ensure valid JSON in its entirety. It is used merely
2925
// to hint the codepath if it should attempt to parse the request as JSON or not.
2926
func isJSONObjectOrArray(req []byte) bool {
16,790✔
2927
        // Skip leading JSON whitespace (space, tab, newline, carriage return)
16,790✔
2928
        i := 0
16,790✔
2929
        for i < len(req) && (req[i] == ' ' || req[i] == '\t' || req[i] == '\n' || req[i] == '\r') {
16,802✔
2930
                i++
12✔
2931
        }
12✔
2932
        // Check for empty input after trimming
2933
        if i >= len(req) {
33,003✔
2934
                return false
16,213✔
2935
        }
16,213✔
2936
        // Check if the first non-whitespace character is '{' or '['
2937
        return req[i] == '{' || req[i] == '['
577✔
2938
}
2939

2940
func isEmptyRequest(req []byte) bool {
48,218✔
2941
        if len(req) == 0 {
94,157✔
2942
                return true
45,939✔
2943
        }
45,939✔
2944
        if bytes.Equal(req, []byte("{}")) {
2,280✔
2945
                return true
1✔
2946
        }
1✔
2947
        // If we are here we didn't get our simple match, but still could be valid.
2948
        var v any
2,278✔
2949
        if err := json.Unmarshal(req, &v); err != nil {
2,278✔
2950
                return false
×
2951
        }
×
2952
        vm, ok := v.(map[string]any)
2,278✔
2953
        if !ok {
2,278✔
2954
                return false
×
2955
        }
×
2956
        return len(vm) == 0
2,278✔
2957
}
2958

2959
// getStepDownPreferredPlacement attempts to work out what the best placement is
2960
// for a stepdown request. The preferred server name always takes precedence, but
2961
// if not specified, the placement will be used to filter by cluster. The caller
2962
// should check for return API errors and return those to the requestor if needed.
2963
func (s *Server) getStepDownPreferredPlacement(group RaftNode, placement *Placement) (string, *ApiError) {
40✔
2964
        if placement == nil {
42✔
2965
                return _EMPTY_, nil
2✔
2966
        }
2✔
2967
        var preferredLeader string
38✔
2968
        if placement.Preferred != _EMPTY_ {
56✔
2969
                for _, p := range group.Peers() {
71✔
2970
                        si, ok := s.nodeToInfo.Load(p.ID)
53✔
2971
                        if !ok || si == nil {
53✔
2972
                                continue
×
2973
                        }
2974
                        if si.(nodeInfo).name == placement.Preferred {
67✔
2975
                                preferredLeader = p.ID
14✔
2976
                                break
14✔
2977
                        }
2978
                }
2979
                if preferredLeader == group.ID() {
22✔
2980
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q is already leader", placement.Preferred))
4✔
2981
                }
4✔
2982
                if preferredLeader == _EMPTY_ {
18✔
2983
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q not known", placement.Preferred))
4✔
2984
                }
4✔
2985
        } else {
20✔
2986
                possiblePeers := make(map[*Peer]nodeInfo, len(group.Peers()))
20✔
2987
                ourID := group.ID()
20✔
2988
                for _, p := range group.Peers() {
116✔
2989
                        if p == nil {
96✔
2990
                                continue // ... shouldn't happen.
×
2991
                        }
2992
                        si, ok := s.nodeToInfo.Load(p.ID)
96✔
2993
                        if !ok || si == nil {
96✔
2994
                                continue
×
2995
                        }
2996
                        ni := si.(nodeInfo)
96✔
2997
                        if ni.offline || p.ID == ourID {
116✔
2998
                                continue
20✔
2999
                        }
3000
                        possiblePeers[p] = ni
76✔
3001
                }
3002
                // If cluster is specified, filter out anything not matching the cluster name.
3003
                if placement.Cluster != _EMPTY_ {
31✔
3004
                        for p, si := range possiblePeers {
51✔
3005
                                if si.cluster != placement.Cluster {
66✔
3006
                                        delete(possiblePeers, p)
26✔
3007
                                }
26✔
3008
                        }
3009
                }
3010
                // If tags are specified, filter out anything not matching all supplied tags.
3011
                if len(placement.Tags) > 0 {
32✔
3012
                        for p, si := range possiblePeers {
55✔
3013
                                matchesAll := true
43✔
3014
                                for _, tag := range placement.Tags {
93✔
3015
                                        if matchesAll = matchesAll && si.tags.Contains(tag); !matchesAll {
82✔
3016
                                                break
32✔
3017
                                        }
3018
                                }
3019
                                if !matchesAll {
75✔
3020
                                        delete(possiblePeers, p)
32✔
3021
                                }
32✔
3022
                        }
3023
                }
3024
                // If there are no possible peers, return an error.
3025
                if len(possiblePeers) == 0 {
28✔
3026
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
8✔
3027
                }
8✔
3028
                // Take advantage of random map iteration order to select the preferred.
3029
                for p := range possiblePeers {
24✔
3030
                        preferredLeader = p.ID
12✔
3031
                        break
12✔
3032
                }
3033
        }
3034
        return preferredLeader, nil
22✔
3035
}
3036

3037
// Request to delete a stream.
3038
func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
552✔
3039
        if c == nil || !s.JetStreamEnabled() {
552✔
3040
                return
×
3041
        }
×
3042
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
552✔
3043
        if err != nil {
552✔
3044
                s.Warnf(badAPIRequestT, msg)
×
3045
                return
×
3046
        }
×
3047

3048
        var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
552✔
3049
        if errorOnRequiredApiLevel(hdr) {
553✔
3050
                resp.Error = NewJSRequiredApiLevelError()
1✔
3051
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3052
                return
1✔
3053
        }
1✔
3054

3055
        // Determine if we should proceed here when we are in clustered mode.
3056
        if s.JetStreamIsClustered() {
1,058✔
3057
                js, cc := s.getJetStreamCluster()
507✔
3058
                if js == nil || cc == nil {
507✔
3059
                        return
×
3060
                }
×
3061
                if js.isLeaderless() {
508✔
3062
                        resp.Error = NewJSClusterNotAvailError()
1✔
3063
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3064
                        return
1✔
3065
                }
1✔
3066
                // Make sure we are meta leader.
3067
                if !s.JetStreamIsLeader() {
900✔
3068
                        return
394✔
3069
                }
394✔
3070
        }
3071

3072
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
156✔
3073
                if doErr {
×
3074
                        resp.Error = NewJSNotEnabledForAccountError()
×
3075
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3076
                }
×
3077
                return
×
3078
        }
3079

3080
        if !isEmptyRequest(msg) {
157✔
3081
                resp.Error = NewJSNotEmptyRequestError()
1✔
3082
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3083
                return
1✔
3084
        }
1✔
3085
        stream := streamNameFromSubject(subject)
155✔
3086

155✔
3087
        // Clustered.
155✔
3088
        if s.JetStreamIsClustered() {
267✔
3089
                s.jsClusteredStreamDeleteRequest(ci, acc, stream, subject, reply, msg)
112✔
3090
                return
112✔
3091
        }
112✔
3092

3093
        mset, err := acc.lookupStream(stream)
43✔
3094
        if err != nil {
48✔
3095
                resp.Error = NewJSStreamNotFoundError(Unless(err))
5✔
3096
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
3097
                return
5✔
3098
        }
5✔
3099

3100
        if err := mset.delete(); err != nil {
38✔
3101
                resp.Error = NewJSStreamDeleteError(err, Unless(err))
×
3102
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3103
                return
×
3104
        }
×
3105
        resp.Success = true
38✔
3106
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
38✔
3107
}
3108

3109
// Request to delete a message.
3110
// This expects a stream sequence number as the msg body.
3111
func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,111✔
3112
        if c == nil || !s.JetStreamEnabled() {
1,116✔
3113
                return
5✔
3114
        }
5✔
3115
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
1,106✔
3116
        if err != nil {
1,106✔
3117
                s.Warnf(badAPIRequestT, msg)
×
3118
                return
×
3119
        }
×
3120

3121
        stream := tokenAt(subject, 6)
1,106✔
3122

1,106✔
3123
        var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
1,106✔
3124
        if errorOnRequiredApiLevel(hdr) {
1,107✔
3125
                resp.Error = NewJSRequiredApiLevelError()
1✔
3126
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3127
                return
1✔
3128
        }
1✔
3129

3130
        // If we are in clustered mode we need to be the stream leader to proceed.
3131
        if s.JetStreamIsClustered() {
1,495✔
3132
                // Check to make sure the stream is assigned.
390✔
3133
                js, cc := s.getJetStreamCluster()
390✔
3134
                if js == nil || cc == nil {
396✔
3135
                        return
6✔
3136
                }
6✔
3137
                if js.isLeaderless() {
385✔
3138
                        resp.Error = NewJSClusterNotAvailError()
1✔
3139
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3140
                        return
1✔
3141
                }
1✔
3142

3143
                js.mu.RLock()
383✔
3144
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
383✔
3145
                js.mu.RUnlock()
383✔
3146

383✔
3147
                if isLeader && sa == nil {
383✔
3148
                        // We can't find the stream, so mimic what would be the errors below.
×
3149
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3150
                                if doErr {
×
3151
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3152
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3153
                                }
×
3154
                                return
×
3155
                        }
3156
                        // No stream present.
3157
                        resp.Error = NewJSStreamNotFoundError()
×
3158
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3159
                        return
×
3160
                } else if sa == nil {
383✔
3161
                        return
×
3162
                }
×
3163

3164
                // Check to see if we are a member of the group and if the group has no leader.
3165
                if js.isGroupLeaderless(sa.Group) {
383✔
3166
                        resp.Error = NewJSClusterNotAvailError()
×
3167
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3168
                        return
×
3169
                }
×
3170

3171
                // We have the stream assigned and a leader, so only the stream leader should answer.
3172
                if !acc.JetStreamIsStreamLeader(stream) {
634✔
3173
                        return
251✔
3174
                }
251✔
3175
        }
3176

3177
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
985✔
3178
                if doErr {
274✔
3179
                        resp.Error = NewJSNotEnabledForAccountError()
136✔
3180
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
136✔
3181
                }
136✔
3182
                return
138✔
3183
        }
3184
        if isEmptyRequest(msg) {
709✔
3185
                resp.Error = NewJSBadRequestError()
×
3186
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3187
                return
×
3188
        }
×
3189
        var req JSApiMsgDeleteRequest
709✔
3190
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
709✔
3191
                resp.Error = NewJSInvalidJSONError(err)
×
3192
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3193
                return
×
3194
        }
×
3195

3196
        mset, err := acc.lookupStream(stream)
709✔
3197
        if err != nil {
713✔
3198
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
3199
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3200
                return
4✔
3201
        }
4✔
3202
        if mset.cfg.Sealed {
707✔
3203
                resp.Error = NewJSStreamSealedError()
2✔
3204
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3205
                return
2✔
3206
        }
2✔
3207
        if mset.cfg.DenyDelete {
704✔
3208
                resp.Error = NewJSStreamMsgDeleteFailedError(errors.New("message delete not permitted"))
1✔
3209
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3210
                return
1✔
3211
        }
1✔
3212

3213
        if s.JetStreamIsClustered() {
832✔
3214
                s.jsClusteredMsgDeleteRequest(ci, acc, mset, stream, subject, reply, &req, rmsg)
130✔
3215
                return
130✔
3216
        }
130✔
3217

3218
        var removed bool
572✔
3219
        if req.NoErase {
1,142✔
3220
                removed, err = mset.removeMsg(req.Seq)
570✔
3221
        } else {
572✔
3222
                removed, err = mset.eraseMsg(req.Seq)
2✔
3223
        }
2✔
3224
        if err != nil {
572✔
3225
                resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err))
×
3226
        } else if !removed {
572✔
3227
                resp.Error = NewJSSequenceNotFoundError(req.Seq)
×
3228
        } else {
572✔
3229
                resp.Success = true
572✔
3230
        }
572✔
3231
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
572✔
3232
}
3233

3234
// Request to get a raw stream message.
3235
func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
2,455✔
3236
        if c == nil || !s.JetStreamEnabled() {
2,455✔
3237
                return
×
3238
        }
×
3239
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
2,455✔
3240
        if err != nil {
2,455✔
3241
                s.Warnf(badAPIRequestT, msg)
×
3242
                return
×
3243
        }
×
3244

3245
        stream := tokenAt(subject, 6)
2,455✔
3246

2,455✔
3247
        var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
2,455✔
3248
        if errorOnRequiredApiLevel(hdr) {
2,456✔
3249
                resp.Error = NewJSRequiredApiLevelError()
1✔
3250
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3251
                return
1✔
3252
        }
1✔
3253

3254
        // If we are in clustered mode we need to be the stream leader to proceed.
3255
        if s.JetStreamIsClustered() {
3,863✔
3256
                // Check to make sure the stream is assigned.
1,409✔
3257
                js, cc := s.getJetStreamCluster()
1,409✔
3258
                if js == nil || cc == nil {
1,409✔
3259
                        return
×
3260
                }
×
3261
                if js.isLeaderless() {
1,409✔
3262
                        resp.Error = NewJSClusterNotAvailError()
×
3263
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3264
                        return
×
3265
                }
×
3266

3267
                js.mu.RLock()
1,409✔
3268
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
1,409✔
3269
                js.mu.RUnlock()
1,409✔
3270

1,409✔
3271
                if isLeader && sa == nil {
1,409✔
3272
                        // We can't find the stream, so mimic what would be the errors below.
×
3273
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3274
                                if doErr {
×
3275
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3276
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3277
                                }
×
3278
                                return
×
3279
                        }
3280
                        // No stream present.
3281
                        resp.Error = NewJSStreamNotFoundError()
×
3282
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3283
                        return
×
3284
                } else if sa == nil {
1,409✔
3285
                        return
×
3286
                }
×
3287

3288
                // Check to see if we are a member of the group and if the group has no leader.
3289
                if js.isGroupLeaderless(sa.Group) {
1,409✔
3290
                        resp.Error = NewJSClusterNotAvailError()
×
3291
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3292
                        return
×
3293
                }
×
3294

3295
                // We have the stream assigned and a leader, so only the stream leader should answer.
3296
                if !acc.JetStreamIsStreamLeader(stream) {
2,356✔
3297
                        return
947✔
3298
                }
947✔
3299
        }
3300

3301
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
1,510✔
3302
                if doErr {
3✔
3303
                        resp.Error = NewJSNotEnabledForAccountError()
×
3304
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3305
                }
×
3306
                return
3✔
3307
        }
3308
        if isEmptyRequest(msg) {
1,504✔
3309
                resp.Error = NewJSBadRequestError()
×
3310
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3311
                return
×
3312
        }
×
3313
        var req JSApiMsgGetRequest
1,504✔
3314
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
1,504✔
3315
                resp.Error = NewJSInvalidJSONError(err)
×
3316
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3317
                return
×
3318
        }
×
3319

3320
        // This version does not support batch.
3321
        if req.Batch > 0 || req.MaxBytes > 0 {
1,505✔
3322
                resp.Error = NewJSBadRequestError()
1✔
3323
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3324
                return
1✔
3325
        }
1✔
3326

3327
        // Validate non-conflicting options. Seq, LastFor, and AsOfTime are mutually exclusive.
3328
        // NextFor can be paired with Seq or AsOfTime indicating a filter subject.
3329
        if (req.Seq > 0 && req.LastFor != _EMPTY_) ||
1,503✔
3330
                (req.Seq == 0 && req.LastFor == _EMPTY_ && req.NextFor == _EMPTY_ && req.StartTime == nil) ||
1,503✔
3331
                (req.Seq > 0 && req.StartTime != nil) ||
1,503✔
3332
                (req.StartTime != nil && req.LastFor != _EMPTY_) ||
1,503✔
3333
                (req.LastFor != _EMPTY_ && req.NextFor != _EMPTY_) {
1,507✔
3334
                resp.Error = NewJSBadRequestError()
4✔
3335
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3336
                return
4✔
3337
        }
4✔
3338

3339
        mset, err := acc.lookupStream(stream)
1,499✔
3340
        if err != nil {
1,499✔
3341
                resp.Error = NewJSStreamNotFoundError()
×
3342
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3343
                return
×
3344
        }
×
3345
        if mset.offlineReason != _EMPTY_ {
1,499✔
3346
                // Just let the request time out.
×
3347
                return
×
3348
        }
×
3349

3350
        var svp StoreMsg
1,499✔
3351
        var sm *StoreMsg
1,499✔
3352

1,499✔
3353
        // Ensure this read request is isolated and doesn't interleave with writes.
1,499✔
3354
        mset.mu.RLock()
1,499✔
3355
        defer mset.mu.RUnlock()
1,499✔
3356

1,499✔
3357
        // If AsOfTime is set, perform this first to get the sequence.
1,499✔
3358
        var seq uint64
1,499✔
3359
        if req.StartTime != nil {
1,505✔
3360
                seq = mset.store.GetSeqFromTime(*req.StartTime)
6✔
3361
        } else {
1,499✔
3362
                seq = req.Seq
1,493✔
3363
        }
1,493✔
3364

3365
        if seq > 0 && req.NextFor == _EMPTY_ {
1,944✔
3366
                sm, err = mset.store.LoadMsg(seq, &svp)
445✔
3367
        } else if req.NextFor != _EMPTY_ {
1,601✔
3368
                sm, _, err = mset.store.LoadNextMsg(req.NextFor, subjectHasWildcard(req.NextFor), seq, &svp)
102✔
3369
        } else {
1,054✔
3370
                sm, err = mset.store.LoadLastMsg(req.LastFor, &svp)
952✔
3371
        }
952✔
3372
        if err != nil {
2,279✔
3373
                resp.Error = NewJSNoMessageFoundError()
780✔
3374
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
780✔
3375
                return
780✔
3376
        }
780✔
3377
        resp.Message = &StoredMsg{
719✔
3378
                Subject:  sm.subj,
719✔
3379
                Sequence: sm.seq,
719✔
3380
                Data:     sm.msg,
719✔
3381
                Time:     time.Unix(0, sm.ts).UTC(),
719✔
3382
        }
719✔
3383
        if !req.NoHeaders {
1,437✔
3384
                resp.Message.Header = sm.hdr
718✔
3385
        }
718✔
3386

3387
        // Don't send response through API layer for this call.
3388
        s.sendInternalAccountMsg(nil, reply, s.jsonResponse(resp))
719✔
3389
}
3390

3391
func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
28✔
3392
        if c == nil || !s.JetStreamEnabled() {
28✔
3393
                return
×
3394
        }
×
3395

3396
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
28✔
3397
        if err != nil {
28✔
3398
                s.Warnf(badAPIRequestT, msg)
×
3399
                return
×
3400
        }
×
3401

3402
        stream := streamNameFromSubject(subject)
28✔
3403
        consumer := consumerNameFromSubject(subject)
28✔
3404

28✔
3405
        var req JSApiConsumerUnpinRequest
28✔
3406
        var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}}
28✔
3407
        if errorOnRequiredApiLevel(hdr) {
29✔
3408
                resp.Error = NewJSRequiredApiLevelError()
1✔
3409
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3410
                return
1✔
3411
        }
1✔
3412

3413
        if err := json.Unmarshal(msg, &req); err != nil {
27✔
3414
                resp.Error = NewJSInvalidJSONError(err)
×
3415
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3416
                return
×
3417
        }
×
3418

3419
        if req.Group == _EMPTY_ {
31✔
3420
                resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
4✔
3421
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3422
                return
4✔
3423
        }
4✔
3424

3425
        if !validGroupName.MatchString(req.Group) {
27✔
3426
                resp.Error = NewJSConsumerInvalidGroupNameError()
4✔
3427
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3428
                return
4✔
3429
        }
4✔
3430
        if s.JetStreamIsClustered() {
31✔
3431
                // Check to make sure the stream is assigned.
12✔
3432
                js, cc := s.getJetStreamCluster()
12✔
3433
                if js == nil || cc == nil {
12✔
3434
                        return
×
3435
                }
×
3436

3437
                // First check if the stream and consumer is there.
3438
                js.mu.RLock()
12✔
3439
                sa := js.streamAssignment(acc.Name, stream)
12✔
3440
                if sa == nil {
15✔
3441
                        js.mu.RUnlock()
3✔
3442
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
3✔
3443
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3444
                        return
3✔
3445
                }
3✔
3446
                if sa.unsupported != nil {
9✔
3447
                        js.mu.RUnlock()
×
3448
                        // Just let the request time out.
×
3449
                        return
×
3450
                }
×
3451

3452
                ca, ok := sa.consumers[consumer]
9✔
3453
                if !ok || ca == nil {
12✔
3454
                        js.mu.RUnlock()
3✔
3455
                        resp.Error = NewJSConsumerNotFoundError()
3✔
3456
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3457
                        return
3✔
3458
                }
3✔
3459
                if ca.unsupported != nil {
6✔
3460
                        js.mu.RUnlock()
×
3461
                        // Just let the request time out.
×
3462
                        return
×
3463
                }
×
3464
                js.mu.RUnlock()
6✔
3465

6✔
3466
                // Then check if we are the leader.
6✔
3467
                mset, err := acc.lookupStream(stream)
6✔
3468
                if err != nil {
6✔
3469
                        return
×
3470
                }
×
3471

3472
                o := mset.lookupConsumer(consumer)
6✔
3473
                if o == nil {
6✔
3474
                        return
×
3475
                }
×
3476
                if !o.isLeader() {
10✔
3477
                        return
4✔
3478
                }
4✔
3479
        }
3480

3481
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
9✔
3482
                if doErr {
×
3483
                        resp.Error = NewJSNotEnabledForAccountError()
×
3484
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3485
                }
×
3486
                return
×
3487
        }
3488

3489
        mset, err := acc.lookupStream(stream)
9✔
3490
        if err != nil {
10✔
3491
                resp.Error = NewJSStreamNotFoundError()
1✔
3492
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3493
                return
1✔
3494
        }
1✔
3495
        if mset.offlineReason != _EMPTY_ {
8✔
3496
                // Just let the request time out.
×
3497
                return
×
3498
        }
×
3499
        o := mset.lookupConsumer(consumer)
8✔
3500
        if o == nil {
9✔
3501
                resp.Error = NewJSConsumerNotFoundError()
1✔
3502
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3503
                return
1✔
3504
        }
1✔
3505
        if o.offlineReason != _EMPTY_ {
7✔
3506
                // Just let the request time out.
×
3507
                return
×
3508
        }
×
3509

3510
        var foundPriority bool
7✔
3511
        for _, group := range o.config().PriorityGroups {
14✔
3512
                if group == req.Group {
12✔
3513
                        foundPriority = true
5✔
3514
                        break
5✔
3515
                }
3516
        }
3517
        if !foundPriority {
9✔
3518
                resp.Error = NewJSConsumerInvalidPriorityGroupError()
2✔
3519
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3520
                return
2✔
3521
        }
2✔
3522

3523
        o.mu.Lock()
5✔
3524
        o.currentPinId = _EMPTY_
5✔
3525
        o.sendUnpinnedAdvisoryLocked(req.Group, "admin")
5✔
3526
        o.mu.Unlock()
5✔
3527
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
5✔
3528
}
3529

3530
// Request to purge a stream.
3531
func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
136✔
3532
        if c == nil || !s.JetStreamEnabled() {
136✔
3533
                return
×
3534
        }
×
3535
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
136✔
3536
        if err != nil {
136✔
3537
                s.Warnf(badAPIRequestT, msg)
×
3538
                return
×
3539
        }
×
3540

3541
        stream := streamNameFromSubject(subject)
136✔
3542

136✔
3543
        var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
136✔
3544
        if errorOnRequiredApiLevel(hdr) {
137✔
3545
                resp.Error = NewJSRequiredApiLevelError()
1✔
3546
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3547
                return
1✔
3548
        }
1✔
3549

3550
        // If we are in clustered mode we need to be the stream leader to proceed.
3551
        if s.JetStreamIsClustered() {
236✔
3552
                // Check to make sure the stream is assigned.
101✔
3553
                js, cc := s.getJetStreamCluster()
101✔
3554
                if js == nil || cc == nil {
101✔
3555
                        return
×
3556
                }
×
3557

3558
                js.mu.RLock()
101✔
3559
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
101✔
3560
                js.mu.RUnlock()
101✔
3561

101✔
3562
                if isLeader && sa == nil {
101✔
3563
                        // We can't find the stream, so mimic what would be the errors below.
×
3564
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3565
                                if doErr {
×
3566
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3567
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3568
                                }
×
3569
                                return
×
3570
                        }
3571
                        // No stream present.
3572
                        resp.Error = NewJSStreamNotFoundError()
×
3573
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3574
                        return
×
3575
                } else if sa == nil {
102✔
3576
                        if js.isLeaderless() {
1✔
3577
                                resp.Error = NewJSClusterNotAvailError()
×
3578
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3579
                        }
×
3580
                        return
1✔
3581
                }
3582

3583
                // Check to see if we are a member of the group and if the group has no leader.
3584
                if js.isGroupLeaderless(sa.Group) {
101✔
3585
                        resp.Error = NewJSClusterNotAvailError()
1✔
3586
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3587
                        return
1✔
3588
                }
1✔
3589

3590
                // We have the stream assigned and a leader, so only the stream leader should answer.
3591
                if !acc.JetStreamIsStreamLeader(stream) {
166✔
3592
                        if js.isLeaderless() {
67✔
3593
                                resp.Error = NewJSClusterNotAvailError()
×
3594
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3595
                        }
×
3596
                        return
67✔
3597
                }
3598
        }
3599

3600
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
66✔
3601
                if doErr {
×
3602
                        resp.Error = NewJSNotEnabledForAccountError()
×
3603
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3604
                }
×
3605
                return
×
3606
        }
3607

3608
        var purgeRequest *JSApiStreamPurgeRequest
66✔
3609
        if isJSONObjectOrArray(msg) {
100✔
3610
                var req JSApiStreamPurgeRequest
34✔
3611
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
34✔
3612
                        resp.Error = NewJSInvalidJSONError(err)
×
3613
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3614
                        return
×
3615
                }
×
3616
                if req.Sequence > 0 && req.Keep > 0 {
34✔
3617
                        resp.Error = NewJSBadRequestError()
×
3618
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3619
                        return
×
3620
                }
×
3621
                purgeRequest = &req
34✔
3622
        }
3623

3624
        mset, err := acc.lookupStream(stream)
66✔
3625
        if err != nil {
66✔
3626
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
3627
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3628
                return
×
3629
        }
×
3630
        if mset.cfg.Sealed {
68✔
3631
                resp.Error = NewJSStreamSealedError()
2✔
3632
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3633
                return
2✔
3634
        }
2✔
3635
        if mset.cfg.DenyPurge {
65✔
3636
                resp.Error = NewJSStreamPurgeFailedError(errors.New("stream purge not permitted"))
1✔
3637
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3638
                return
1✔
3639
        }
1✔
3640

3641
        if s.JetStreamIsClustered() {
93✔
3642
                s.jsClusteredStreamPurgeRequest(ci, acc, mset, stream, subject, reply, rmsg, purgeRequest)
30✔
3643
                return
30✔
3644
        }
30✔
3645

3646
        purged, err := mset.purge(purgeRequest)
33✔
3647
        if err != nil {
33✔
3648
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
3649
        } else {
33✔
3650
                resp.Purged = purged
33✔
3651
                resp.Success = true
33✔
3652
        }
33✔
3653
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
33✔
3654
}
3655

3656
func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError {
1,326✔
3657
        var replicas int
1,326✔
3658
        if cfg != nil {
2,652✔
3659
                replicas = cfg.Replicas
1,326✔
3660
        }
1,326✔
3661
        selectedLimits, tier, jsa, apiErr := acc.selectLimits(replicas)
1,326✔
3662
        if apiErr != nil {
1,326✔
3663
                return apiErr
×
3664
        }
×
3665
        jsa.js.mu.RLock()
1,326✔
3666
        defer jsa.js.mu.RUnlock()
1,326✔
3667
        jsa.mu.RLock()
1,326✔
3668
        defer jsa.mu.RUnlock()
1,326✔
3669
        if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, cfg) >= selectedLimits.MaxStreams {
1,329✔
3670
                return NewJSMaximumStreamsLimitError()
3✔
3671
        }
3✔
3672
        reserved := jsa.tieredReservation(tier, cfg)
1,323✔
3673
        if err := jsa.js.checkAllLimits(selectedLimits, cfg, reserved, 0); err != nil {
1,324✔
3674
                return NewJSStreamLimitsError(err, Unless(err))
1✔
3675
        }
1✔
3676
        return nil
1,322✔
3677
}
3678

3679
// Request to restore a stream.
3680
func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
50✔
3681
        if c == nil || !s.JetStreamIsLeader() {
74✔
3682
                return
24✔
3683
        }
24✔
3684
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
26✔
3685
        if err != nil {
26✔
3686
                s.Warnf(badAPIRequestT, msg)
×
3687
                return
×
3688
        }
×
3689

3690
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
26✔
3691
        if errorOnRequiredApiLevel(hdr) {
27✔
3692
                resp.Error = NewJSRequiredApiLevelError()
1✔
3693
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3694
                return
1✔
3695
        }
1✔
3696
        if !acc.JetStreamEnabled() {
25✔
3697
                resp.Error = NewJSNotEnabledForAccountError()
×
3698
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3699
                return
×
3700
        }
×
3701
        if isEmptyRequest(msg) {
26✔
3702
                resp.Error = NewJSBadRequestError()
1✔
3703
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3704
                return
1✔
3705
        }
1✔
3706

3707
        var req JSApiStreamRestoreRequest
24✔
3708
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
24✔
3709
                resp.Error = NewJSInvalidJSONError(err)
×
3710
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3711
                return
×
3712
        }
×
3713

3714
        stream := streamNameFromSubject(subject)
24✔
3715

24✔
3716
        if stream != req.Config.Name && req.Config.Name == _EMPTY_ {
24✔
3717
                req.Config.Name = stream
×
3718
        }
×
3719

3720
        // check stream config at the start of the restore process, not at the end
3721
        cfg, apiErr := s.checkStreamCfg(&req.Config, acc, false)
24✔
3722
        if apiErr != nil {
26✔
3723
                resp.Error = apiErr
2✔
3724
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3725
                return
2✔
3726
        }
2✔
3727

3728
        if s.JetStreamIsClustered() {
33✔
3729
                s.jsClusteredStreamRestoreRequest(ci, acc, &req, subject, reply, rmsg)
11✔
3730
                return
11✔
3731
        }
11✔
3732

3733
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil {
13✔
3734
                resp.Error = err
2✔
3735
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3736
                return
2✔
3737
        }
2✔
3738

3739
        if _, err := acc.lookupStream(stream); err == nil {
10✔
3740
                resp.Error = NewJSStreamNameExistRestoreFailedError()
1✔
3741
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3742
                return
1✔
3743
        }
1✔
3744

3745
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
8✔
3746
                if doErr {
×
3747
                        resp.Error = NewJSNotEnabledForAccountError()
×
3748
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3749
                }
×
3750
                return
×
3751
        }
3752

3753
        s.processStreamRestore(ci, acc, &req.Config, subject, reply, string(msg))
8✔
3754
}
3755

3756
func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamConfig, subject, reply, msg string) <-chan error {
16✔
3757
        js := s.getJetStream()
16✔
3758

16✔
3759
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
16✔
3760

16✔
3761
        snapDir := filepath.Join(js.config.StoreDir, snapStagingDir)
16✔
3762
        if _, err := os.Stat(snapDir); os.IsNotExist(err) {
29✔
3763
                if err := os.MkdirAll(snapDir, defaultDirPerms); err != nil {
13✔
3764
                        resp.Error = &ApiError{Code: 503, Description: "JetStream unable to create temp storage for restore"}
×
3765
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3766
                        return nil
×
3767
                }
×
3768
        }
3769

3770
        tfile, err := os.CreateTemp(snapDir, "js-restore-")
16✔
3771
        if err != nil {
16✔
3772
                resp.Error = NewJSTempStorageFailedError()
×
3773
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
3774
                return nil
×
3775
        }
×
3776

3777
        streamName := cfg.Name
16✔
3778
        s.Noticef("Starting restore for stream '%s > %s'", acc.Name, streamName)
16✔
3779

16✔
3780
        start := time.Now().UTC()
16✔
3781
        domain := s.getOpts().JetStreamDomain
16✔
3782
        s.publishAdvisory(acc, JSAdvisoryStreamRestoreCreatePre+"."+streamName, &JSRestoreCreateAdvisory{
16✔
3783
                TypedEvent: TypedEvent{
16✔
3784
                        Type: JSRestoreCreateAdvisoryType,
16✔
3785
                        ID:   nuid.Next(),
16✔
3786
                        Time: start,
16✔
3787
                },
16✔
3788
                Stream: streamName,
16✔
3789
                Client: ci.forAdvisory(),
16✔
3790
                Domain: domain,
16✔
3791
        })
16✔
3792

16✔
3793
        // Create our internal subscription to accept the snapshot.
16✔
3794
        restoreSubj := fmt.Sprintf(jsRestoreDeliverT, streamName, nuid.Next())
16✔
3795

16✔
3796
        type result struct {
16✔
3797
                err   error
16✔
3798
                reply string
16✔
3799
        }
16✔
3800

16✔
3801
        // For signaling to upper layers.
16✔
3802
        resultCh := make(chan result, 1)
16✔
3803
        activeQ := newIPQueue[int](s, fmt.Sprintf("[ACC:%s] stream '%s' restore", acc.Name, streamName)) // of int
16✔
3804

16✔
3805
        var total int
16✔
3806

16✔
3807
        // FIXME(dlc) - Probably take out of network path eventually due to disk I/O?
16✔
3808
        processChunk := func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
112✔
3809
                // We require reply subjects to communicate back failures, flow etc. If they do not have one log and cancel.
96✔
3810
                if reply == _EMPTY_ {
97✔
3811
                        sub.client.processUnsub(sub.sid)
1✔
3812
                        resultCh <- result{
1✔
3813
                                fmt.Errorf("restore for stream '%s > %s' requires reply subject for each chunk", acc.Name, streamName),
1✔
3814
                                reply,
1✔
3815
                        }
1✔
3816
                        return
1✔
3817
                }
1✔
3818
                // Account client messages have \r\n on end. This is an error.
3819
                if len(msg) < LEN_CR_LF {
95✔
3820
                        sub.client.processUnsub(sub.sid)
×
3821
                        resultCh <- result{
×
3822
                                fmt.Errorf("restore for stream '%s > %s' received short chunk", acc.Name, streamName),
×
3823
                                reply,
×
3824
                        }
×
3825
                        return
×
3826
                }
×
3827
                // Adjust.
3828
                msg = msg[:len(msg)-LEN_CR_LF]
95✔
3829

95✔
3830
                // This means we are complete with our transfer from the client.
95✔
3831
                if len(msg) == 0 {
110✔
3832
                        s.Debugf("Finished staging restore for stream '%s > %s'", acc.Name, streamName)
15✔
3833
                        resultCh <- result{err, reply}
15✔
3834
                        return
15✔
3835
                }
15✔
3836

3837
                // We track total and check on server limits.
3838
                // TODO(dlc) - We could check apriori and cancel initial request if we know it won't fit.
3839
                total += len(msg)
80✔
3840
                if js.wouldExceedLimits(FileStorage, total) {
80✔
3841
                        s.resourcesExceededError(FileStorage)
×
3842
                        resultCh <- result{NewJSInsufficientResourcesError(), reply}
×
3843
                        return
×
3844
                }
×
3845

3846
                // Append chunk to temp file. Mark as issue if we encounter an error.
3847
                if n, err := tfile.Write(msg); n != len(msg) || err != nil {
80✔
3848
                        resultCh <- result{err, reply}
×
3849
                        if reply != _EMPTY_ {
×
3850
                                s.sendInternalAccountMsg(acc, reply, "-ERR 'storage failure during restore'")
×
3851
                        }
×
3852
                        return
×
3853
                }
3854

3855
                activeQ.push(len(msg))
80✔
3856

80✔
3857
                s.sendInternalAccountMsg(acc, reply, nil)
80✔
3858
        }
3859

3860
        sub, err := acc.subscribeInternal(restoreSubj, processChunk)
16✔
3861
        if err != nil {
16✔
3862
                tfile.Close()
×
3863
                os.Remove(tfile.Name())
×
3864
                resp.Error = NewJSRestoreSubscribeFailedError(err, restoreSubj)
×
3865
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
3866
                return nil
×
3867
        }
×
3868

3869
        // Mark the subject so the end user knows where to send the snapshot chunks.
3870
        resp.DeliverSubject = restoreSubj
16✔
3871
        s.sendAPIResponse(ci, acc, subject, reply, msg, s.jsonResponse(resp))
16✔
3872

16✔
3873
        doneCh := make(chan error, 1)
16✔
3874

16✔
3875
        // Monitor the progress from another Go routine.
16✔
3876
        s.startGoRoutine(func() {
32✔
3877
                defer s.grWG.Done()
16✔
3878
                defer func() {
32✔
3879
                        tfile.Close()
16✔
3880
                        os.Remove(tfile.Name())
16✔
3881
                        sub.client.processUnsub(sub.sid)
16✔
3882
                        activeQ.unregister()
16✔
3883
                }()
16✔
3884

3885
                const activityInterval = 5 * time.Second
16✔
3886
                notActive := time.NewTimer(activityInterval)
16✔
3887
                defer notActive.Stop()
16✔
3888

16✔
3889
                total := 0
16✔
3890
                for {
112✔
3891
                        select {
96✔
3892
                        case result := <-resultCh:
16✔
3893
                                err := result.err
16✔
3894
                                var mset *stream
16✔
3895

16✔
3896
                                // If we staged properly go ahead and do restore now.
16✔
3897
                                if err == nil {
31✔
3898
                                        s.Debugf("Finalizing restore for stream '%s > %s'", acc.Name, streamName)
15✔
3899
                                        tfile.Seek(0, 0)
15✔
3900
                                        mset, err = acc.RestoreStream(cfg, tfile)
15✔
3901
                                } else {
16✔
3902
                                        errStr := err.Error()
1✔
3903
                                        tmp := []rune(errStr)
1✔
3904
                                        tmp[0] = unicode.ToUpper(tmp[0])
1✔
3905
                                        s.Warnf(errStr)
1✔
3906
                                }
1✔
3907

3908
                                end := time.Now().UTC()
16✔
3909

16✔
3910
                                // TODO(rip) - Should this have the error code in it??
16✔
3911
                                s.publishAdvisory(acc, JSAdvisoryStreamRestoreCompletePre+"."+streamName, &JSRestoreCompleteAdvisory{
16✔
3912
                                        TypedEvent: TypedEvent{
16✔
3913
                                                Type: JSRestoreCompleteAdvisoryType,
16✔
3914
                                                ID:   nuid.Next(),
16✔
3915
                                                Time: end,
16✔
3916
                                        },
16✔
3917
                                        Stream: streamName,
16✔
3918
                                        Start:  start,
16✔
3919
                                        End:    end,
16✔
3920
                                        Bytes:  int64(total),
16✔
3921
                                        Client: ci.forAdvisory(),
16✔
3922
                                        Domain: domain,
16✔
3923
                                })
16✔
3924

16✔
3925
                                var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
16✔
3926

16✔
3927
                                if err != nil {
20✔
3928
                                        resp.Error = NewJSStreamRestoreError(err, Unless(err))
4✔
3929
                                        s.Warnf("Restore failed for %s for stream '%s > %s' in %v",
4✔
3930
                                                friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start))
4✔
3931
                                } else {
16✔
3932
                                        msetCfg := mset.config()
12✔
3933
                                        resp.StreamInfo = &StreamInfo{
12✔
3934
                                                Created:   mset.createdTime(),
12✔
3935
                                                State:     mset.state(),
12✔
3936
                                                Config:    *setDynamicStreamMetadata(&msetCfg),
12✔
3937
                                                TimeStamp: time.Now().UTC(),
12✔
3938
                                        }
12✔
3939
                                        s.Noticef("Completed restore of %s for stream '%s > %s' in %v",
12✔
3940
                                                friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start).Round(time.Millisecond))
12✔
3941
                                }
12✔
3942

3943
                                // On the last EOF, send back the stream info or error status.
3944
                                s.sendInternalAccountMsg(acc, result.reply, s.jsonResponse(&resp))
16✔
3945
                                // Signal to the upper layers.
16✔
3946
                                doneCh <- err
16✔
3947
                                return
16✔
3948
                        case <-activeQ.ch:
80✔
3949
                                if n, ok := activeQ.popOne(); ok {
160✔
3950
                                        total += n
80✔
3951
                                        notActive.Reset(activityInterval)
80✔
3952
                                }
80✔
3953
                        case <-notActive.C:
×
3954
                                err := fmt.Errorf("restore for stream '%s > %s' is stalled", acc, streamName)
×
3955
                                doneCh <- err
×
3956
                                return
×
3957
                        }
3958
                }
3959
        })
3960

3961
        return doneCh
16✔
3962
}
3963

3964
// Process a snapshot request.
3965
func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
29✔
3966
        if c == nil || !s.JetStreamEnabled() {
29✔
3967
                return
×
3968
        }
×
3969
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
29✔
3970
        if err != nil {
29✔
3971
                s.Warnf(badAPIRequestT, msg)
×
3972
                return
×
3973
        }
×
3974

3975
        smsg := string(msg)
29✔
3976
        stream := streamNameFromSubject(subject)
29✔
3977

29✔
3978
        // If we are in clustered mode we need to be the stream leader to proceed.
29✔
3979
        if s.JetStreamIsClustered() && !acc.JetStreamIsStreamLeader(stream) {
44✔
3980
                return
15✔
3981
        }
15✔
3982

3983
        var resp = JSApiStreamSnapshotResponse{ApiResponse: ApiResponse{Type: JSApiStreamSnapshotResponseType}}
14✔
3984
        if errorOnRequiredApiLevel(hdr) {
15✔
3985
                resp.Error = NewJSRequiredApiLevelError()
1✔
3986
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3987
                return
1✔
3988
        }
1✔
3989
        if !acc.JetStreamEnabled() {
13✔
3990
                resp.Error = NewJSNotEnabledForAccountError()
×
3991
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
3992
                return
×
3993
        }
×
3994
        if isEmptyRequest(msg) {
14✔
3995
                resp.Error = NewJSBadRequestError()
1✔
3996
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
3997
                return
1✔
3998
        }
1✔
3999

4000
        mset, err := acc.lookupStream(stream)
12✔
4001
        if err != nil {
13✔
4002
                resp.Error = NewJSStreamNotFoundError(Unless(err))
1✔
4003
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4004
                return
1✔
4005
        }
1✔
4006

4007
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
4008
                if doErr {
×
4009
                        resp.Error = NewJSNotEnabledForAccountError()
×
4010
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4011
                }
×
4012
                return
×
4013
        }
4014

4015
        var req JSApiStreamSnapshotRequest
11✔
4016
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
4017
                resp.Error = NewJSInvalidJSONError(err)
×
4018
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4019
                return
×
4020
        }
×
4021
        if !IsValidSubject(req.DeliverSubject) {
12✔
4022
                resp.Error = NewJSSnapshotDeliverSubjectInvalidError()
1✔
4023
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4024
                return
1✔
4025
        }
1✔
4026

4027
        // We will do the snapshot in a go routine as well since check msgs may
4028
        // stall this go routine.
4029
        go func() {
20✔
4030
                if req.CheckMsgs {
12✔
4031
                        s.Noticef("Starting health check and snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
2✔
4032
                } else {
10✔
4033
                        s.Noticef("Starting snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
8✔
4034
                }
8✔
4035

4036
                start := time.Now().UTC()
10✔
4037

10✔
4038
                sr, err := mset.snapshot(0, req.CheckMsgs, !req.NoConsumers)
10✔
4039
                if err != nil {
10✔
4040
                        s.Warnf("Snapshot of stream '%s > %s' failed: %v", mset.jsa.account.Name, mset.name(), err)
×
4041
                        resp.Error = NewJSStreamSnapshotError(err, Unless(err))
×
4042
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4043
                        return
×
4044
                }
×
4045

4046
                config := mset.config()
10✔
4047
                resp.State = &sr.State
10✔
4048
                resp.Config = &config
10✔
4049

10✔
4050
                s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(resp))
10✔
4051

10✔
4052
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCreatePre+"."+mset.name(), &JSSnapshotCreateAdvisory{
10✔
4053
                        TypedEvent: TypedEvent{
10✔
4054
                                Type: JSSnapshotCreatedAdvisoryType,
10✔
4055
                                ID:   nuid.Next(),
10✔
4056
                                Time: time.Now().UTC(),
10✔
4057
                        },
10✔
4058
                        Stream: mset.name(),
10✔
4059
                        State:  sr.State,
10✔
4060
                        Client: ci.forAdvisory(),
10✔
4061
                        Domain: s.getOpts().JetStreamDomain,
10✔
4062
                })
10✔
4063

10✔
4064
                // Now do the real streaming.
10✔
4065
                s.streamSnapshot(acc, mset, sr, &req)
10✔
4066

10✔
4067
                end := time.Now().UTC()
10✔
4068

10✔
4069
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCompletePre+"."+mset.name(), &JSSnapshotCompleteAdvisory{
10✔
4070
                        TypedEvent: TypedEvent{
10✔
4071
                                Type: JSSnapshotCompleteAdvisoryType,
10✔
4072
                                ID:   nuid.Next(),
10✔
4073
                                Time: end,
10✔
4074
                        },
10✔
4075
                        Stream: mset.name(),
10✔
4076
                        Start:  start,
10✔
4077
                        End:    end,
10✔
4078
                        Client: ci.forAdvisory(),
10✔
4079
                        Domain: s.getOpts().JetStreamDomain,
10✔
4080
                })
10✔
4081

10✔
4082
                s.Noticef("Completed snapshot of %s for stream '%s > %s' in %v",
10✔
4083
                        friendlyBytes(int64(sr.State.Bytes)),
10✔
4084
                        mset.jsa.account.Name,
10✔
4085
                        mset.name(),
10✔
4086
                        end.Sub(start))
10✔
4087
        }()
4088
}
4089

4090
// Default chunk size for now.
4091
const defaultSnapshotChunkSize = 128 * 1024
4092
const defaultSnapshotWindowSize = 8 * 1024 * 1024 // 8MB
4093

4094
// streamSnapshot will stream out our snapshot to the reply subject.
4095
func (s *Server) streamSnapshot(acc *Account, mset *stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) {
10✔
4096
        chunkSize := req.ChunkSize
10✔
4097
        if chunkSize == 0 {
12✔
4098
                chunkSize = defaultSnapshotChunkSize
2✔
4099
        }
2✔
4100
        // Setup for the chunk stream.
4101
        reply := req.DeliverSubject
10✔
4102
        r := sr.Reader
10✔
4103
        defer r.Close()
10✔
4104

10✔
4105
        // Check interest for the snapshot deliver subject.
10✔
4106
        inch := make(chan bool, 1)
10✔
4107
        acc.sl.RegisterNotification(req.DeliverSubject, inch)
10✔
4108
        defer acc.sl.ClearNotification(req.DeliverSubject, inch)
10✔
4109
        hasInterest := <-inch
10✔
4110
        if !hasInterest {
15✔
4111
                // Allow 2 seconds or so for interest to show up.
5✔
4112
                select {
5✔
4113
                case <-inch:
4✔
4114
                case <-time.After(2 * time.Second):
1✔
4115
                }
4116
        }
4117

4118
        // Create our ack flow handler.
4119
        // This is very simple for now.
4120
        ackSize := defaultSnapshotWindowSize / chunkSize
10✔
4121
        if ackSize < 8 {
10✔
4122
                ackSize = 8
×
4123
        } else if ackSize > 8*1024 {
16✔
4124
                ackSize = 8 * 1024
6✔
4125
        }
6✔
4126
        acks := make(chan struct{}, ackSize)
10✔
4127
        acks <- struct{}{}
10✔
4128

10✔
4129
        // Track bytes outstanding.
10✔
4130
        var out int32
10✔
4131

10✔
4132
        // We will place sequence number and size of chunk sent in the reply.
10✔
4133
        ackSubj := fmt.Sprintf(jsSnapshotAckT, mset.name(), nuid.Next())
10✔
4134
        ackSub, _ := mset.subscribeInternal(ackSubj+".>", func(_ *subscription, _ *client, _ *Account, subject, _ string, _ []byte) {
30✔
4135
                cs, _ := strconv.Atoi(tokenAt(subject, 6))
20✔
4136
                // This is very crude and simple, but ok for now.
20✔
4137
                // This only matters when sending multiple chunks.
20✔
4138
                if atomic.AddInt32(&out, int32(-cs)) < defaultSnapshotWindowSize {
40✔
4139
                        select {
20✔
4140
                        case acks <- struct{}{}:
20✔
4141
                        default:
×
4142
                        }
4143
                }
4144
        })
4145
        defer mset.unsubscribe(ackSub)
10✔
4146

10✔
4147
        // TODO(dlc) - Add in NATS-Chunked-Sequence header
10✔
4148
        var hdr []byte
10✔
4149
        for index := 1; ; index++ {
109✔
4150
                chunk := make([]byte, chunkSize)
99✔
4151
                n, err := r.Read(chunk)
99✔
4152
                chunk = chunk[:n]
99✔
4153
                if err != nil {
109✔
4154
                        if n > 0 {
10✔
4155
                                mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0))
×
4156
                        }
×
4157
                        break
10✔
4158
                }
4159

4160
                // Wait on acks for flow control if past our window size.
4161
                // Wait up to 10ms for now if no acks received.
4162
                if atomic.LoadInt32(&out) > defaultSnapshotWindowSize {
89✔
4163
                        select {
×
4164
                        case <-acks:
×
4165
                                // ok to proceed.
4166
                        case <-inch:
×
4167
                                // Lost interest
×
4168
                                hdr = []byte("NATS/1.0 408 No Interest\r\n\r\n")
×
4169
                                goto done
×
4170
                        case <-time.After(2 * time.Second):
×
4171
                                hdr = []byte("NATS/1.0 408 No Flow Response\r\n\r\n")
×
4172
                                goto done
×
4173
                        }
4174
                }
4175
                ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index)
89✔
4176
                if hdr == nil {
99✔
4177
                        hdr = []byte("NATS/1.0 204\r\n\r\n")
10✔
4178
                }
10✔
4179
                mset.outq.send(newJSPubMsg(reply, _EMPTY_, ackReply, nil, chunk, nil, 0))
89✔
4180
                atomic.AddInt32(&out, int32(len(chunk)))
89✔
4181
        }
4182

4183
        if err := <-sr.errCh; err != _EMPTY_ {
10✔
4184
                hdr = []byte(fmt.Sprintf("NATS/1.0 500 %s\r\n\r\n", err))
×
4185
        }
×
4186

4187
done:
4188
        // Send last EOF
4189
        // TODO(dlc) - place hash in header
4190
        mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
10✔
4191
}
4192

4193
// For determining consumer request type.
4194
type ccReqType uint8
4195

4196
const (
4197
        ccNew = iota
4198
        ccLegacyEphemeral
4199
        ccLegacyDurable
4200
)
4201

4202
// Request to create a consumer where stream and optional consumer name are part of the subject, and optional
4203
// filtered subjects can be at the tail end.
4204
// Assumes stream and consumer names are single tokens.
4205
func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
10,938✔
4206
        if c == nil || !s.JetStreamEnabled() {
10,938✔
4207
                return
×
4208
        }
×
4209

4210
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
10,938✔
4211
        if err != nil {
10,939✔
4212
                s.Warnf(badAPIRequestT, msg)
1✔
4213
                return
1✔
4214
        }
1✔
4215

4216
        var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
10,937✔
4217
        if errorOnRequiredApiLevel(hdr) {
10,940✔
4218
                resp.Error = NewJSRequiredApiLevelError()
3✔
4219
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4220
                return
3✔
4221
        }
3✔
4222

4223
        var req CreateConsumerRequest
10,934✔
4224
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
10,935✔
4225
                resp.Error = NewJSInvalidJSONError(err)
1✔
4226
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4227
                return
1✔
4228
        }
1✔
4229

4230
        var js *jetStream
10,933✔
4231
        isClustered := s.JetStreamIsClustered()
10,933✔
4232

10,933✔
4233
        // Determine if we should proceed here when we are in clustered mode.
10,933✔
4234
        if isClustered {
20,871✔
4235
                if req.Config.Direct {
10,357✔
4236
                        // Check to see if we have this stream and are the stream leader.
419✔
4237
                        if !acc.JetStreamIsStreamLeader(streamNameFromSubject(subject)) {
740✔
4238
                                return
321✔
4239
                        }
321✔
4240
                } else {
9,519✔
4241
                        var cc *jetStreamCluster
9,519✔
4242
                        js, cc = s.getJetStreamCluster()
9,519✔
4243
                        if js == nil || cc == nil {
9,519✔
4244
                                return
×
4245
                        }
×
4246
                        if js.isLeaderless() {
9,519✔
4247
                                resp.Error = NewJSClusterNotAvailError()
×
4248
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4249
                                return
×
4250
                        }
×
4251
                        // Make sure we are meta leader.
4252
                        if !s.JetStreamIsLeader() {
15,952✔
4253
                                return
6,433✔
4254
                        }
6,433✔
4255
                }
4256
        }
4257

4258
        var streamName, consumerName, filteredSubject string
4,179✔
4259
        var rt ccReqType
4,179✔
4260

4,179✔
4261
        if n := numTokens(subject); n < 5 {
4,179✔
4262
                s.Warnf(badAPIRequestT, msg)
×
4263
                return
×
4264
        } else if n == 5 {
4,976✔
4265
                // Legacy ephemeral.
797✔
4266
                rt = ccLegacyEphemeral
797✔
4267
                streamName = streamNameFromSubject(subject)
797✔
4268
        } else {
4,179✔
4269
                // New style and durable legacy.
3,382✔
4270
                if tokenAt(subject, 4) == "DURABLE" {
3,641✔
4271
                        rt = ccLegacyDurable
259✔
4272
                        if n != 7 {
259✔
4273
                                resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4274
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4275
                                return
×
4276
                        }
×
4277
                        streamName = tokenAt(subject, 6)
259✔
4278
                        consumerName = tokenAt(subject, 7)
259✔
4279
                } else {
3,123✔
4280
                        streamName = streamNameFromSubject(subject)
3,123✔
4281
                        consumerName = consumerNameFromSubject(subject)
3,123✔
4282
                        // New has optional filtered subject as part of main subject..
3,123✔
4283
                        if n > 6 {
5,774✔
4284
                                tokens := strings.Split(subject, tsep)
2,651✔
4285
                                filteredSubject = strings.Join(tokens[6:], tsep)
2,651✔
4286
                        }
2,651✔
4287
                }
4288
        }
4289

4290
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
4,182✔
4291
                if doErr {
4✔
4292
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
4293
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4294
                }
1✔
4295
                return
3✔
4296
        }
4297

4298
        if streamName != req.Stream {
4,177✔
4299
                resp.Error = NewJSStreamMismatchError()
1✔
4300
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4301
                return
1✔
4302
        }
1✔
4303

4304
        if consumerName != _EMPTY_ {
7,555✔
4305
                // Check for path like separators in the name.
3,380✔
4306
                if strings.ContainsAny(consumerName, `\/`) {
3,384✔
4307
                        resp.Error = NewJSConsumerNameContainsPathSeparatorsError()
4✔
4308
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4309
                        return
4✔
4310
                }
4✔
4311
        }
4312

4313
        // Should we expect a durable name
4314
        if rt == ccLegacyDurable {
4,429✔
4315
                if numTokens(subject) < 7 {
258✔
4316
                        resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4317
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4318
                        return
×
4319
                }
×
4320
                // Now check on requirements for durable request.
4321
                if req.Config.Durable == _EMPTY_ {
259✔
4322
                        resp.Error = NewJSConsumerDurableNameNotSetError()
1✔
4323
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4324
                        return
1✔
4325
                }
1✔
4326
                if consumerName != req.Config.Durable {
257✔
4327
                        resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4328
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4329
                        return
×
4330
                }
×
4331
        }
4332
        // If new style and durable set make sure they match.
4333
        if rt == ccNew {
7,288✔
4334
                if req.Config.Durable != _EMPTY_ {
5,730✔
4335
                        if consumerName != req.Config.Durable {
2,612✔
4336
                                resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4337
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4338
                                return
×
4339
                        }
×
4340
                }
4341
                // New style ephemeral so we need to honor the name.
4342
                req.Config.Name = consumerName
3,118✔
4343
        }
4344
        // Check for legacy ephemeral mis-configuration.
4345
        if rt == ccLegacyEphemeral && req.Config.Durable != _EMPTY_ {
4,173✔
4346
                resp.Error = NewJSConsumerEphemeralWithDurableNameError()
3✔
4347
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4348
                return
3✔
4349
        }
3✔
4350

4351
        // in case of multiple filters provided, error if new API is used.
4352
        if filteredSubject != _EMPTY_ && len(req.Config.FilterSubjects) != 0 {
4,168✔
4353
                resp.Error = NewJSConsumerMultipleFiltersNotAllowedError()
1✔
4354
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4355
                return
1✔
4356
        }
1✔
4357

4358
        // Check for a filter subject.
4359
        if filteredSubject != _EMPTY_ && req.Config.FilterSubject != filteredSubject {
4,168✔
4360
                resp.Error = NewJSConsumerCreateFilterSubjectMismatchError()
2✔
4361
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
4362
                return
2✔
4363
        }
2✔
4364

4365
        if isClustered && !req.Config.Direct {
7,248✔
4366
                s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
3,084✔
4367
                return
3,084✔
4368
        }
3,084✔
4369

4370
        // If we are here we are single server mode.
4371
        if req.Config.Replicas > 1 {
1,080✔
4372
                resp.Error = NewJSStreamReplicasNotSupportedError()
×
4373
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4374
                return
×
4375
        }
×
4376

4377
        stream, err := acc.lookupStream(req.Stream)
1,080✔
4378
        if err != nil {
1,084✔
4379
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
4380
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4381
                return
4✔
4382
        }
4✔
4383
        if stream.offlineReason != _EMPTY_ {
1,076✔
4384
                resp.Error = NewJSStreamOfflineReasonError(errors.New(stream.offlineReason))
×
4385
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4386
                return
×
4387
        }
×
4388

4389
        if o := stream.lookupConsumer(consumerName); o != nil {
1,124✔
4390
                if o.offlineReason != _EMPTY_ {
48✔
4391
                        resp.Error = NewJSConsumerOfflineReasonError(errors.New(o.offlineReason))
×
4392
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4393
                        return
×
4394
                }
×
4395
                // If the consumer already exists then don't allow updating the PauseUntil, just set
4396
                // it back to whatever the current configured value is.
4397
                o.mu.RLock()
48✔
4398
                req.Config.PauseUntil = o.cfg.PauseUntil
48✔
4399
                o.mu.RUnlock()
48✔
4400
        }
4401

4402
        // Initialize/update asset version metadata.
4403
        setStaticConsumerMetadata(&req.Config)
1,076✔
4404

1,076✔
4405
        o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic)
1,076✔
4406

1,076✔
4407
        if err != nil {
1,129✔
4408
                if IsNatsErr(err, JSConsumerStoreFailedErrF) {
53✔
4409
                        cname := req.Config.Durable // Will be empty if ephemeral.
×
4410
                        s.Warnf("Consumer create failed for '%s > %s > %s': %v", acc, req.Stream, cname, err)
×
4411
                        err = errConsumerStoreFailed
×
4412
                }
×
4413
                resp.Error = NewJSConsumerCreateError(err, Unless(err))
53✔
4414
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
53✔
4415
                return
53✔
4416
        }
4417
        resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.initialInfo())
1,023✔
4418
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,023✔
4419

1,023✔
4420
        o.mu.RLock()
1,023✔
4421
        if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
1,027✔
4422
                o.sendPauseAdvisoryLocked(&o.cfg)
4✔
4423
        }
4✔
4424
        o.mu.RUnlock()
1,023✔
4425
}
4426

4427
// Request for the list of all consumer names.
4428
func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
35✔
4429
        if c == nil || !s.JetStreamEnabled() {
35✔
4430
                return
×
4431
        }
×
4432
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
35✔
4433
        if err != nil {
35✔
4434
                s.Warnf(badAPIRequestT, msg)
×
4435
                return
×
4436
        }
×
4437

4438
        var resp = JSApiConsumerNamesResponse{
35✔
4439
                ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType},
35✔
4440
                Consumers:   []string{},
35✔
4441
        }
35✔
4442
        if errorOnRequiredApiLevel(hdr) {
36✔
4443
                resp.Error = NewJSRequiredApiLevelError()
1✔
4444
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4445
                return
1✔
4446
        }
1✔
4447

4448
        // Determine if we should proceed here when we are in clustered mode.
4449
        if s.JetStreamIsClustered() {
64✔
4450
                js, cc := s.getJetStreamCluster()
30✔
4451
                if js == nil || cc == nil {
30✔
4452
                        return
×
4453
                }
×
4454
                if js.isLeaderless() {
30✔
4455
                        resp.Error = NewJSClusterNotAvailError()
×
4456
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4457
                        return
×
4458
                }
×
4459
                // Make sure we are meta leader.
4460
                if !s.JetStreamIsLeader() {
50✔
4461
                        return
20✔
4462
                }
20✔
4463
        }
4464

4465
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
14✔
4466
                if doErr {
×
4467
                        resp.Error = NewJSNotEnabledForAccountError()
×
4468
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4469
                }
×
4470
                return
×
4471
        }
4472

4473
        var offset int
14✔
4474
        if isJSONObjectOrArray(msg) {
22✔
4475
                var req JSApiConsumersRequest
8✔
4476
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
8✔
4477
                        resp.Error = NewJSInvalidJSONError(err)
×
4478
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4479
                        return
×
4480
                }
×
4481
                offset = req.Offset
8✔
4482
        }
4483

4484
        streamName := streamNameFromSubject(subject)
14✔
4485
        var numConsumers int
14✔
4486

14✔
4487
        if s.JetStreamIsClustered() {
24✔
4488
                js, cc := s.getJetStreamCluster()
10✔
4489
                if js == nil || cc == nil {
10✔
4490
                        // TODO(dlc) - Debug or Warn?
×
4491
                        return
×
4492
                }
×
4493
                js.mu.RLock()
10✔
4494
                sas := cc.streams[acc.Name]
10✔
4495
                if sas == nil {
10✔
4496
                        js.mu.RUnlock()
×
4497
                        resp.Error = NewJSStreamNotFoundError()
×
4498
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4499
                        return
×
4500
                }
×
4501
                sa := sas[streamName]
10✔
4502
                if sa == nil || sa.err != nil {
10✔
4503
                        js.mu.RUnlock()
×
4504
                        resp.Error = NewJSStreamNotFoundError()
×
4505
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4506
                        return
×
4507
                }
×
4508
                for consumer := range sa.consumers {
23✔
4509
                        resp.Consumers = append(resp.Consumers, consumer)
13✔
4510
                }
13✔
4511
                if len(resp.Consumers) > 1 {
14✔
4512
                        slices.Sort(resp.Consumers)
4✔
4513
                }
4✔
4514
                numConsumers = len(resp.Consumers)
10✔
4515
                if offset > numConsumers {
10✔
4516
                        offset = numConsumers
×
4517
                }
×
4518
                resp.Consumers = resp.Consumers[offset:]
10✔
4519
                if len(resp.Consumers) > JSApiNamesLimit {
10✔
4520
                        resp.Consumers = resp.Consumers[:JSApiNamesLimit]
×
4521
                }
×
4522
                js.mu.RUnlock()
10✔
4523

4524
        } else {
4✔
4525
                mset, err := acc.lookupStream(streamName)
4✔
4526
                if err != nil {
4✔
4527
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4528
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4529
                        return
×
4530
                }
×
4531

4532
                obs := mset.getPublicConsumers()
4✔
4533
                slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
4✔
4534

4535
                numConsumers = len(obs)
4✔
4536
                if offset > numConsumers {
4✔
4537
                        offset = numConsumers
×
4538
                }
×
4539

4540
                for _, o := range obs[offset:] {
7✔
4541
                        resp.Consumers = append(resp.Consumers, o.String())
3✔
4542
                        if len(resp.Consumers) >= JSApiNamesLimit {
3✔
4543
                                break
×
4544
                        }
4545
                }
4546
        }
4547
        resp.Total = numConsumers
14✔
4548
        resp.Limit = JSApiNamesLimit
14✔
4549
        resp.Offset = offset
14✔
4550
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
14✔
4551
}
4552

4553
// Request for the list of all detailed consumer information.
4554
func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
90✔
4555
        if c == nil || !s.JetStreamEnabled() {
90✔
4556
                return
×
4557
        }
×
4558

4559
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
90✔
4560
        if err != nil {
90✔
4561
                s.Warnf(badAPIRequestT, msg)
×
4562
                return
×
4563
        }
×
4564

4565
        var resp = JSApiConsumerListResponse{
90✔
4566
                ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
90✔
4567
                Consumers:   []*ConsumerInfo{},
90✔
4568
        }
90✔
4569
        if errorOnRequiredApiLevel(hdr) {
91✔
4570
                resp.Error = NewJSRequiredApiLevelError()
1✔
4571
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4572
                return
1✔
4573
        }
1✔
4574

4575
        // Determine if we should proceed here when we are in clustered mode.
4576
        if s.JetStreamIsClustered() {
173✔
4577
                js, cc := s.getJetStreamCluster()
84✔
4578
                if js == nil || cc == nil {
84✔
4579
                        return
×
4580
                }
×
4581
                if js.isLeaderless() {
85✔
4582
                        resp.Error = NewJSClusterNotAvailError()
1✔
4583
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4584
                        return
1✔
4585
                }
1✔
4586
                // Make sure we are meta leader.
4587
                if !s.JetStreamIsLeader() {
145✔
4588
                        return
62✔
4589
                }
62✔
4590
        }
4591

4592
        if errorOnRequiredApiLevel(hdr) {
26✔
4593
                resp.Error = NewJSClusterNotAvailError()
×
4594
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4595
                return
×
4596
        }
×
4597

4598
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
26✔
4599
                if doErr {
×
4600
                        resp.Error = NewJSNotEnabledForAccountError()
×
4601
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4602
                }
×
4603
                return
×
4604
        }
4605

4606
        var offset int
26✔
4607
        if isJSONObjectOrArray(msg) {
37✔
4608
                var req JSApiConsumersRequest
11✔
4609
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
4610
                        resp.Error = NewJSInvalidJSONError(err)
1✔
4611
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4612
                        return
1✔
4613
                }
1✔
4614
                offset = req.Offset
10✔
4615
        }
4616

4617
        streamName := streamNameFromSubject(subject)
25✔
4618

25✔
4619
        // Clustered mode will invoke a scatter and gather.
25✔
4620
        if s.JetStreamIsClustered() {
46✔
4621
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
21✔
4622
                msg = copyBytes(msg)
21✔
4623
                s.startGoRoutine(func() {
42✔
4624
                        s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg)
21✔
4625
                })
21✔
4626
                return
21✔
4627
        }
4628

4629
        mset, err := acc.lookupStream(streamName)
4✔
4630
        if err != nil {
4✔
4631
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4632
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4633
                return
×
4634
        }
×
4635

4636
        obs := mset.getPublicConsumers()
4✔
4637
        slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
4✔
4638

4639
        ocnt := len(obs)
4✔
4640
        if offset > ocnt {
4✔
4641
                offset = ocnt
×
4642
        }
×
4643

4644
        var missingNames []string
4✔
4645
        for _, o := range obs[offset:] {
7✔
4646
                if o.offlineReason != _EMPTY_ {
4✔
4647
                        if resp.Offline == nil {
2✔
4648
                                resp.Offline = make(map[string]string, 1)
1✔
4649
                        }
1✔
4650
                        resp.Offline[o.name] = o.offlineReason
1✔
4651
                        missingNames = append(missingNames, o.name)
1✔
4652
                        continue
1✔
4653
                }
4654
                if cinfo := o.info(); cinfo != nil {
4✔
4655
                        resp.Consumers = append(resp.Consumers, cinfo)
2✔
4656
                }
2✔
4657
                if len(resp.Consumers) >= JSApiListLimit {
2✔
4658
                        break
×
4659
                }
4660
        }
4661
        resp.Total = ocnt
4✔
4662
        resp.Limit = JSApiListLimit
4✔
4663
        resp.Offset = offset
4✔
4664
        resp.Missing = missingNames
4✔
4665
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
4✔
4666
}
4667

4668
// Request for information about an consumer.
4669
func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
43,081✔
4670
        if c == nil || !s.JetStreamEnabled() {
43,081✔
4671
                return
×
4672
        }
×
4673
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
43,081✔
4674
        if err != nil {
43,081✔
4675
                s.Warnf(badAPIRequestT, msg)
×
4676
                return
×
4677
        }
×
4678

4679
        streamName := streamNameFromSubject(subject)
43,081✔
4680
        consumerName := consumerNameFromSubject(subject)
43,081✔
4681

43,081✔
4682
        var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}
43,081✔
4683
        if errorOnRequiredApiLevel(hdr) {
43,082✔
4684
                resp.Error = NewJSRequiredApiLevelError()
1✔
4685
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4686
                return
1✔
4687
        }
1✔
4688

4689
        if !isEmptyRequest(msg) {
43,081✔
4690
                resp.Error = NewJSNotEmptyRequestError()
1✔
4691
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4692
                return
1✔
4693
        }
1✔
4694

4695
        // If we are in clustered mode we need to be the consumer leader to proceed.
4696
        if s.JetStreamIsClustered() {
85,664✔
4697
                // Check to make sure the consumer is assigned.
42,585✔
4698
                js, cc := s.getJetStreamCluster()
42,585✔
4699
                if js == nil || cc == nil {
42,585✔
4700
                        return
×
4701
                }
×
4702

4703
                js.mu.RLock()
42,585✔
4704
                meta := cc.meta
42,585✔
4705
                js.mu.RUnlock()
42,585✔
4706

42,585✔
4707
                if meta == nil {
42,585✔
4708
                        return
×
4709
                }
×
4710

4711
                // Since these could wait on the Raft group lock, don't do so under the JS lock.
4712
                ourID := meta.ID()
42,585✔
4713
                groupLeaderless := meta.Leaderless()
42,585✔
4714
                groupCreated := meta.Created()
42,585✔
4715

42,585✔
4716
                js.mu.RLock()
42,585✔
4717
                isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName)
42,585✔
4718
                var rg *raftGroup
42,585✔
4719
                var offline, isMember bool
42,585✔
4720
                if ca != nil {
47,662✔
4721
                        if rg = ca.Group; rg != nil {
10,154✔
4722
                                offline = s.allPeersOffline(rg)
5,077✔
4723
                                isMember = rg.isMember(ourID)
5,077✔
4724
                        }
5,077✔
4725
                        if ca.unsupported != nil && isMember {
5,095✔
4726
                                // If we're a member for this consumer, and it's not supported, report it as offline.
18✔
4727
                                resp.Error = NewJSConsumerOfflineReasonError(errors.New(ca.unsupported.reason))
18✔
4728
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
18✔
4729
                                js.mu.RUnlock()
18✔
4730
                                return
18✔
4731
                        }
18✔
4732
                }
4733
                // Capture consumer leader here.
4734
                isConsumerLeader := cc.isConsumerLeader(acc.Name, streamName, consumerName)
42,567✔
4735
                // Also capture if we think there is no meta leader.
42,567✔
4736
                var isLeaderLess bool
42,567✔
4737
                if !isLeader {
71,150✔
4738
                        isLeaderLess = groupLeaderless && time.Since(groupCreated) > lostQuorumIntervalDefault
28,583✔
4739
                }
28,583✔
4740
                js.mu.RUnlock()
42,567✔
4741

42,567✔
4742
                if isLeader && ca == nil {
54,981✔
4743
                        // We can't find the consumer, so mimic what would be the errors below.
12,414✔
4744
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
12,414✔
4745
                                if doErr {
×
4746
                                        resp.Error = NewJSNotEnabledForAccountError()
×
4747
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4748
                                }
×
4749
                                return
×
4750
                        }
4751
                        if sa == nil {
22,417✔
4752
                                resp.Error = NewJSStreamNotFoundError()
10,003✔
4753
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
10,003✔
4754
                                return
10,003✔
4755
                        }
10,003✔
4756
                        // If we are here the consumer is not present.
4757
                        resp.Error = NewJSConsumerNotFoundError()
2,411✔
4758
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2,411✔
4759
                        return
2,411✔
4760
                } else if ca == nil {
55,247✔
4761
                        if isLeaderLess {
25,096✔
4762
                                resp.Error = NewJSClusterNotAvailError()
2✔
4763
                                // Delaying an error response gives the leader a chance to respond before us
2✔
4764
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
4765
                        }
2✔
4766
                        return
25,094✔
4767
                } else if isLeader && offline {
5,061✔
4768
                        resp.Error = NewJSConsumerOfflineError()
2✔
4769
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
4770
                        return
2✔
4771
                }
2✔
4772

4773
                // Check to see if we are a member of the group and if the group has no leader.
4774
                if isMember && js.isGroupLeaderless(ca.Group) {
5,058✔
4775
                        resp.Error = NewJSClusterNotAvailError()
1✔
4776
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4777
                        return
1✔
4778
                }
1✔
4779

4780
                // We have the consumer assigned and a leader, so only the consumer leader should answer.
4781
                if !isConsumerLeader {
8,554✔
4782
                        if isLeaderLess {
3,498✔
4783
                                resp.Error = NewJSClusterNotAvailError()
×
4784
                                // Delaying an error response gives the leader a chance to respond before us
×
4785
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group, errRespDelay)
×
4786
                                return
×
4787
                        }
×
4788

4789
                        var node RaftNode
3,498✔
4790
                        var leaderNotPartOfGroup bool
3,498✔
4791

3,498✔
4792
                        // We have a consumer assignment.
3,498✔
4793
                        if isMember {
6,245✔
4794
                                js.mu.RLock()
2,747✔
4795
                                if rg != nil && rg.node != nil {
5,494✔
4796
                                        node = rg.node
2,747✔
4797
                                        if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
2,747✔
4798
                                                leaderNotPartOfGroup = true
×
4799
                                        }
×
4800
                                }
4801
                                js.mu.RUnlock()
2,747✔
4802
                        }
4803

4804
                        // Check if we should ignore all together.
4805
                        if node == nil {
4,249✔
4806
                                // We have been assigned but have not created a node yet. If we are a member return
751✔
4807
                                // our config and defaults for state and no cluster info.
751✔
4808
                                if isMember {
751✔
4809
                                        // Since we access consumerAssignment, need js lock.
×
4810
                                        js.mu.RLock()
×
4811
                                        resp.ConsumerInfo = &ConsumerInfo{
×
4812
                                                Stream:    ca.Stream,
×
4813
                                                Name:      ca.Name,
×
4814
                                                Created:   ca.Created,
×
4815
                                                Config:    setDynamicConsumerMetadata(ca.Config),
×
4816
                                                TimeStamp: time.Now().UTC(),
×
4817
                                        }
×
4818
                                        b := s.jsonResponse(resp)
×
4819
                                        js.mu.RUnlock()
×
4820
                                        s.sendAPIResponse(ci, acc, subject, reply, string(msg), b)
×
4821
                                }
×
4822
                                return
751✔
4823
                        }
4824
                        // If we are a member and we have a group leader or we had a previous leader consider bailing out.
4825
                        if !node.Leaderless() || node.HadPreviousLeader() || (rg != nil && rg.Preferred != _EMPTY_ && rg.Preferred != ourID) {
5,485✔
4826
                                if leaderNotPartOfGroup {
2,738✔
4827
                                        resp.Error = NewJSConsumerOfflineError()
×
4828
                                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4829
                                }
×
4830
                                return
2,738✔
4831
                        }
4832
                        // If we are here we are a member and this is just a new consumer that does not have a (preferred) leader yet.
4833
                        // Will fall through and return what we have. All consumers can respond but this should be very rare
4834
                        // but makes more sense to clients when they try to create, get a consumer exists, and then do consumer info.
4835
                }
4836
        }
4837

4838
        if !acc.JetStreamEnabled() {
2,061✔
4839
                resp.Error = NewJSNotEnabledForAccountError()
×
4840
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4841
                return
×
4842
        }
×
4843

4844
        mset, err := acc.lookupStream(streamName)
2,061✔
4845
        if err != nil {
2,061✔
4846
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4847
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4848
                return
×
4849
        }
×
4850

4851
        obs := mset.lookupConsumer(consumerName)
2,061✔
4852
        if obs == nil {
2,240✔
4853
                resp.Error = NewJSConsumerNotFoundError()
179✔
4854
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
179✔
4855
                return
179✔
4856
        }
179✔
4857

4858
        if obs.offlineReason != _EMPTY_ {
1,883✔
4859
                resp.Error = NewJSConsumerOfflineReasonError(errors.New(obs.offlineReason))
1✔
4860
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
4861
                return
1✔
4862
        }
1✔
4863

4864
        if resp.ConsumerInfo = setDynamicConsumerInfoMetadata(obs.info()); resp.ConsumerInfo == nil {
1,882✔
4865
                // This consumer returned nil which means it's closed. Respond with not found.
1✔
4866
                resp.Error = NewJSConsumerNotFoundError()
1✔
4867
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4868
                return
1✔
4869
        }
1✔
4870
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,880✔
4871
}
4872

4873
// Request to delete an Consumer.
4874
func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
7,380✔
4875
        if c == nil || !s.JetStreamEnabled() {
7,382✔
4876
                return
2✔
4877
        }
2✔
4878
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
7,378✔
4879
        if err != nil {
7,378✔
4880
                s.Warnf(badAPIRequestT, msg)
×
4881
                return
×
4882
        }
×
4883

4884
        var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
7,378✔
4885
        if errorOnRequiredApiLevel(hdr) {
7,379✔
4886
                resp.Error = NewJSRequiredApiLevelError()
1✔
4887
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4888
                return
1✔
4889
        }
1✔
4890

4891
        // Determine if we should proceed here when we are in clustered mode.
4892
        if s.JetStreamIsClustered() {
14,314✔
4893
                js, cc := s.getJetStreamCluster()
6,937✔
4894
                if js == nil || cc == nil {
6,938✔
4895
                        return
1✔
4896
                }
1✔
4897
                if js.isLeaderless() {
6,937✔
4898
                        resp.Error = NewJSClusterNotAvailError()
1✔
4899
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4900
                        return
1✔
4901
                }
1✔
4902
                // Make sure we are meta leader.
4903
                if !s.JetStreamIsLeader() {
11,538✔
4904
                        return
4,603✔
4905
                }
4,603✔
4906
        }
4907

4908
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
2,837✔
4909
                if doErr {
128✔
4910
                        resp.Error = NewJSNotEnabledForAccountError()
63✔
4911
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
63✔
4912
                }
63✔
4913
                return
65✔
4914
        }
4915
        if !isEmptyRequest(msg) {
2,708✔
4916
                resp.Error = NewJSNotEmptyRequestError()
1✔
4917
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4918
                return
1✔
4919
        }
1✔
4920
        stream := streamNameFromSubject(subject)
2,706✔
4921
        consumer := consumerNameFromSubject(subject)
2,706✔
4922

2,706✔
4923
        if s.JetStreamIsClustered() {
5,038✔
4924
                s.jsClusteredConsumerDeleteRequest(ci, acc, stream, consumer, subject, reply, rmsg)
2,332✔
4925
                return
2,332✔
4926
        }
2,332✔
4927

4928
        mset, err := acc.lookupStream(stream)
374✔
4929
        if err != nil {
374✔
4930
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4931
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4932
                return
×
4933
        }
×
4934

4935
        obs := mset.lookupConsumer(consumer)
374✔
4936
        if obs == nil {
530✔
4937
                resp.Error = NewJSConsumerNotFoundError()
156✔
4938
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
156✔
4939
                return
156✔
4940
        }
156✔
4941
        if err := obs.delete(); err != nil {
218✔
4942
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
4943
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4944
                return
×
4945
        }
×
4946
        resp.Success = true
218✔
4947
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
218✔
4948
}
4949

4950
// Request to pause or unpause a Consumer.
4951
func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
64✔
4952
        if c == nil || !s.JetStreamEnabled() {
64✔
4953
                return
×
4954
        }
×
4955
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
64✔
4956
        if err != nil {
64✔
4957
                s.Warnf(badAPIRequestT, msg)
×
4958
                return
×
4959
        }
×
4960

4961
        var req JSApiConsumerPauseRequest
64✔
4962
        var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}}
64✔
4963
        if errorOnRequiredApiLevel(hdr) {
65✔
4964
                resp.Error = NewJSRequiredApiLevelError()
1✔
4965
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4966
                return
1✔
4967
        }
1✔
4968

4969
        if isJSONObjectOrArray(msg) {
118✔
4970
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
55✔
4971
                        resp.Error = NewJSInvalidJSONError(err)
×
4972
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4973
                        return
×
4974
                }
×
4975
        }
4976

4977
        // Determine if we should proceed here when we are in clustered mode.
4978
        isClustered := s.JetStreamIsClustered()
63✔
4979
        js, cc := s.getJetStreamCluster()
63✔
4980
        if isClustered {
117✔
4981
                if js == nil || cc == nil {
54✔
4982
                        return
×
4983
                }
×
4984
                if js.isLeaderless() {
54✔
4985
                        resp.Error = NewJSClusterNotAvailError()
×
4986
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4987
                        return
×
4988
                }
×
4989
                // Make sure we are meta leader.
4990
                if !s.JetStreamIsLeader() {
94✔
4991
                        return
40✔
4992
                }
40✔
4993
        }
4994

4995
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
23✔
4996
                if doErr {
×
4997
                        resp.Error = NewJSNotEnabledForAccountError()
×
4998
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4999
                }
×
5000
                return
×
5001
        }
5002

5003
        stream := streamNameFromSubject(subject)
23✔
5004
        consumer := consumerNameFromSubject(subject)
23✔
5005

23✔
5006
        if isClustered {
37✔
5007
                js.mu.RLock()
14✔
5008
                sa := js.streamAssignment(acc.Name, stream)
14✔
5009
                if sa == nil {
14✔
5010
                        js.mu.RUnlock()
×
5011
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5012
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5013
                        return
×
5014
                }
×
5015
                if sa.unsupported != nil {
14✔
5016
                        js.mu.RUnlock()
×
5017
                        // Just let the request time out.
×
5018
                        return
×
5019
                }
×
5020

5021
                ca, ok := sa.consumers[consumer]
14✔
5022
                if !ok || ca == nil {
14✔
5023
                        js.mu.RUnlock()
×
5024
                        resp.Error = NewJSConsumerNotFoundError()
×
5025
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5026
                        return
×
5027
                }
×
5028
                if ca.unsupported != nil {
14✔
5029
                        js.mu.RUnlock()
×
5030
                        // Just let the request time out.
×
5031
                        return
×
5032
                }
×
5033

5034
                nca := *ca
14✔
5035
                ncfg := *ca.Config
14✔
5036
                nca.Config = &ncfg
14✔
5037
                meta := cc.meta
14✔
5038
                js.mu.RUnlock()
14✔
5039
                pauseUTC := req.PauseUntil.UTC()
14✔
5040
                if !pauseUTC.IsZero() {
24✔
5041
                        nca.Config.PauseUntil = &pauseUTC
10✔
5042
                } else {
14✔
5043
                        nca.Config.PauseUntil = nil
4✔
5044
                }
4✔
5045

5046
                // Update asset version metadata due to updating pause/resume.
5047
                // Only PauseUntil is updated above, so reuse config for both.
5048
                setStaticConsumerMetadata(nca.Config)
14✔
5049

14✔
5050
                eca := encodeAddConsumerAssignment(&nca)
14✔
5051
                meta.Propose(eca)
14✔
5052

14✔
5053
                resp.PauseUntil = pauseUTC
14✔
5054
                if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
24✔
5055
                        resp.PauseRemaining = time.Until(pauseUTC)
10✔
5056
                }
10✔
5057
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
14✔
5058
                return
14✔
5059
        }
5060

5061
        mset, err := acc.lookupStream(stream)
9✔
5062
        if err != nil {
9✔
5063
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5064
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5065
                return
×
5066
        }
×
5067
        if mset.offlineReason != _EMPTY_ {
9✔
5068
                // Just let the request time out.
×
5069
                return
×
5070
        }
×
5071

5072
        obs := mset.lookupConsumer(consumer)
9✔
5073
        if obs == nil {
9✔
5074
                resp.Error = NewJSConsumerNotFoundError()
×
5075
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5076
                return
×
5077
        }
×
5078
        if obs.offlineReason != _EMPTY_ {
9✔
5079
                // Just let the request time out.
×
5080
                return
×
5081
        }
×
5082

5083
        ncfg := obs.cfg
9✔
5084
        pauseUTC := req.PauseUntil.UTC()
9✔
5085
        if !pauseUTC.IsZero() {
14✔
5086
                ncfg.PauseUntil = &pauseUTC
5✔
5087
        } else {
9✔
5088
                ncfg.PauseUntil = nil
4✔
5089
        }
4✔
5090

5091
        // Update asset version metadata due to updating pause/resume.
5092
        setStaticConsumerMetadata(&ncfg)
9✔
5093

9✔
5094
        if err := obs.updateConfig(&ncfg); err != nil {
9✔
5095
                // The only type of error that should be returned here is from o.store,
×
5096
                // so use a store failed error type.
×
5097
                resp.Error = NewJSConsumerStoreFailedError(err)
×
5098
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5099
                return
×
5100
        }
×
5101

5102
        resp.PauseUntil = pauseUTC
9✔
5103
        if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
14✔
5104
                resp.PauseRemaining = time.Until(pauseUTC)
5✔
5105
        }
5✔
5106
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
9✔
5107
}
5108

5109
// sendJetStreamAPIAuditAdvisory will send the audit event for a given event.
5110
func (s *Server) sendJetStreamAPIAuditAdvisory(ci *ClientInfo, acc *Account, subject, request, response string) {
65,427✔
5111
        s.publishAdvisory(acc, JSAuditAdvisory, JSAPIAudit{
65,427✔
5112
                TypedEvent: TypedEvent{
65,427✔
5113
                        Type: JSAPIAuditType,
65,427✔
5114
                        ID:   nuid.Next(),
65,427✔
5115
                        Time: time.Now().UTC(),
65,427✔
5116
                },
65,427✔
5117
                Server:   s.Name(),
65,427✔
5118
                Client:   ci.forAdvisory(),
65,427✔
5119
                Subject:  subject,
65,427✔
5120
                Request:  request,
65,427✔
5121
                Response: response,
65,427✔
5122
                Domain:   s.getOpts().JetStreamDomain,
65,427✔
5123
        })
65,427✔
5124
}
65,427✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc