• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 20326380621

17 Dec 2025 03:32PM UTC coverage: 84.522% (-0.05%) from 84.574%
20326380621

push

github

web-flow
NRG: Fix single node election (#7642)

This commit fixes single node election: previously, a single node would
simply store its vote, and never check if it already reached a majority.
So it would never transition to the leader state.

Signed-off-by: Daniele Sciascia <daniele@nats.io>

73985 of 87533 relevant lines covered (84.52%)

339454.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.4
/server/jetstream_api.go
1
// Copyright 2020-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "cmp"
19
        "encoding/json"
20
        "errors"
21
        "fmt"
22
        "io"
23
        "os"
24
        "path/filepath"
25
        "runtime"
26
        "slices"
27
        "strconv"
28
        "strings"
29
        "sync/atomic"
30
        "time"
31
        "unicode"
32

33
        "github.com/nats-io/nuid"
34
)
35

36
// Request API subjects for JetStream.
37
const (
38
        // All API endpoints.
39
        jsAllAPI = "$JS.API.>"
40

41
        // For constructing JetStream domain prefixes.
42
        jsDomainAPI = "$JS.%s.API.>"
43

44
        JSApiPrefix = "$JS.API"
45

46
        // JSApiAccountInfo is for obtaining general information about JetStream for this account.
47
        // Will return JSON response.
48
        JSApiAccountInfo = "$JS.API.INFO"
49

50
        // JSApiStreamCreate is the endpoint to create new streams.
51
        // Will return JSON response.
52
        JSApiStreamCreate  = "$JS.API.STREAM.CREATE.*"
53
        JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"
54

55
        // JSApiStreamUpdate is the endpoint to update existing streams.
56
        // Will return JSON response.
57
        JSApiStreamUpdate  = "$JS.API.STREAM.UPDATE.*"
58
        JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"
59

60
        // JSApiStreams is the endpoint to list all stream names for this account.
61
        // Will return JSON response.
62
        JSApiStreams = "$JS.API.STREAM.NAMES"
63
        // JSApiStreamList is the endpoint that will return all detailed stream information
64
        JSApiStreamList = "$JS.API.STREAM.LIST"
65

66
        // JSApiStreamInfo is for obtaining general information about a named stream.
67
        // Will return JSON response.
68
        JSApiStreamInfo  = "$JS.API.STREAM.INFO.*"
69
        JSApiStreamInfoT = "$JS.API.STREAM.INFO.%s"
70

71
        // JSApiStreamDelete is the endpoint to delete streams.
72
        // Will return JSON response.
73
        JSApiStreamDelete  = "$JS.API.STREAM.DELETE.*"
74
        JSApiStreamDeleteT = "$JS.API.STREAM.DELETE.%s"
75

76
        // JSApiStreamPurge is the endpoint to purge streams.
77
        // Will return JSON response.
78
        JSApiStreamPurge  = "$JS.API.STREAM.PURGE.*"
79
        JSApiStreamPurgeT = "$JS.API.STREAM.PURGE.%s"
80

81
        // JSApiStreamSnapshot is the endpoint to snapshot streams.
82
        // Will return a stream of chunks with a nil chunk as EOF to
83
        // the deliver subject. Caller should respond to each chunk
84
        // with a nil body response for ack flow.
85
        JSApiStreamSnapshot  = "$JS.API.STREAM.SNAPSHOT.*"
86
        JSApiStreamSnapshotT = "$JS.API.STREAM.SNAPSHOT.%s"
87

88
        // JSApiStreamRestore is the endpoint to restore a stream from a snapshot.
89
        // Caller should respond to each chunk with a nil body response.
90
        JSApiStreamRestore  = "$JS.API.STREAM.RESTORE.*"
91
        JSApiStreamRestoreT = "$JS.API.STREAM.RESTORE.%s"
92

93
        // JSApiMsgDelete is the endpoint to delete messages from a stream.
94
        // Will return JSON response.
95
        JSApiMsgDelete  = "$JS.API.STREAM.MSG.DELETE.*"
96
        JSApiMsgDeleteT = "$JS.API.STREAM.MSG.DELETE.%s"
97

98
        // JSApiMsgGet is the template for direct requests for a message by its stream sequence number.
99
        // Will return JSON response.
100
        JSApiMsgGet  = "$JS.API.STREAM.MSG.GET.*"
101
        JSApiMsgGetT = "$JS.API.STREAM.MSG.GET.%s"
102

103
        // JSDirectMsgGet is the template for non-api layer direct requests for a message by its stream sequence number or last by subject.
104
        // Will return the message similar to how a consumer receives the message, no JSON processing.
105
        // If the message can not be found we will use a status header of 404. If the stream does not exist the client will get a no-responders or timeout.
106
        JSDirectMsgGet  = "$JS.API.DIRECT.GET.*"
107
        JSDirectMsgGetT = "$JS.API.DIRECT.GET.%s"
108

109
        // This is a direct version of get last by subject, which will be the dominant pattern for KV access once 2.9 is released.
110
        // The stream and the key will be part of the subject to allow for no-marshal payloads and subject based security permissions.
111
        JSDirectGetLastBySubject  = "$JS.API.DIRECT.GET.*.>"
112
        JSDirectGetLastBySubjectT = "$JS.API.DIRECT.GET.%s.%s"
113

114
        // jsDirectGetPre
115
        jsDirectGetPre = "$JS.API.DIRECT.GET"
116

117
        // JSApiConsumerCreate is the endpoint to create consumers for streams.
118
        // This was also the legacy endpoint for ephemeral consumers.
119
        // It now can take consumer name and optional filter subject, which when part of the subject controls access.
120
        // Will return JSON response.
121
        JSApiConsumerCreate    = "$JS.API.CONSUMER.CREATE.*"
122
        JSApiConsumerCreateT   = "$JS.API.CONSUMER.CREATE.%s"
123
        JSApiConsumerCreateEx  = "$JS.API.CONSUMER.CREATE.*.>"
124
        JSApiConsumerCreateExT = "$JS.API.CONSUMER.CREATE.%s.%s.%s"
125

126
        // JSApiDurableCreate is the endpoint to create durable consumers for streams.
127
        // You need to include the stream and consumer name in the subject.
128
        JSApiDurableCreate  = "$JS.API.CONSUMER.DURABLE.CREATE.*.*"
129
        JSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s"
130

131
        // JSApiConsumers is the endpoint to list all consumer names for the stream.
132
        // Will return JSON response.
133
        JSApiConsumers  = "$JS.API.CONSUMER.NAMES.*"
134
        JSApiConsumersT = "$JS.API.CONSUMER.NAMES.%s"
135

136
        // JSApiConsumerList is the endpoint that will return all detailed consumer information
137
        JSApiConsumerList  = "$JS.API.CONSUMER.LIST.*"
138
        JSApiConsumerListT = "$JS.API.CONSUMER.LIST.%s"
139

140
        // JSApiConsumerInfo is for obtaining general information about a consumer.
141
        // Will return JSON response.
142
        JSApiConsumerInfo  = "$JS.API.CONSUMER.INFO.*.*"
143
        JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s"
144

145
        // JSApiConsumerDelete is the endpoint to delete consumers.
146
        // Will return JSON response.
147
        JSApiConsumerDelete  = "$JS.API.CONSUMER.DELETE.*.*"
148
        JSApiConsumerDeleteT = "$JS.API.CONSUMER.DELETE.%s.%s"
149

150
        // JSApiConsumerPause is the endpoint to pause or unpause consumers.
151
        // Will return JSON response.
152
        JSApiConsumerPause  = "$JS.API.CONSUMER.PAUSE.*.*"
153
        JSApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"
154

155
        // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
156
        JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
157

158
        // JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
159
        JSApiConsumerUnpin  = "$JS.API.CONSUMER.UNPIN.*.*"
160
        JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"
161

162
        // jsRequestNextPre
163
        jsRequestNextPre = "$JS.API.CONSUMER.MSG.NEXT."
164

165
        // For snapshots and restores. The ack will have additional tokens.
166
        jsSnapshotAckT    = "$JS.SNAPSHOT.ACK.%s.%s"
167
        jsRestoreDeliverT = "$JS.SNAPSHOT.RESTORE.%s.%s"
168

169
        // JSApiStreamRemovePeer is the endpoint to remove a peer from a clustered stream and its consumers.
170
        // Will return JSON response.
171
        JSApiStreamRemovePeer  = "$JS.API.STREAM.PEER.REMOVE.*"
172
        JSApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.%s"
173

174
        // JSApiStreamLeaderStepDown is the endpoint to have stream leader stepdown.
175
        // Will return JSON response.
176
        JSApiStreamLeaderStepDown  = "$JS.API.STREAM.LEADER.STEPDOWN.*"
177
        JSApiStreamLeaderStepDownT = "$JS.API.STREAM.LEADER.STEPDOWN.%s"
178

179
        // JSApiConsumerLeaderStepDown is the endpoint to have consumer leader stepdown.
180
        // Will return JSON response.
181
        JSApiConsumerLeaderStepDown  = "$JS.API.CONSUMER.LEADER.STEPDOWN.*.*"
182
        JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
183

184
        // JSApiLeaderStepDown is the endpoint to have our metaleader stepdown.
185
        // Only works from system account.
186
        // Will return JSON response.
187
        JSApiLeaderStepDown = "$JS.API.META.LEADER.STEPDOWN"
188

189
        // JSApiRemoveServer is the endpoint to remove a peer server from the cluster.
190
        // Only works from system account.
191
        // Will return JSON response.
192
        JSApiRemoveServer = "$JS.API.SERVER.REMOVE"
193

194
        // JSApiAccountPurge is the endpoint to purge the js content of an account
195
        // Only works from system account.
196
        // Will return JSON response.
197
        JSApiAccountPurge  = "$JS.API.ACCOUNT.PURGE.*"
198
        JSApiAccountPurgeT = "$JS.API.ACCOUNT.PURGE.%s"
199

200
        // JSApiServerStreamMove is the endpoint to move streams off a server
201
        // Only works from system account.
202
        // Will return JSON response.
203
        JSApiServerStreamMove  = "$JS.API.ACCOUNT.STREAM.MOVE.*.*"
204
        JSApiServerStreamMoveT = "$JS.API.ACCOUNT.STREAM.MOVE.%s.%s"
205

206
        // JSApiServerStreamCancelMove is the endpoint to cancel a stream move
207
        // Only works from system account.
208
        // Will return JSON response.
209
        JSApiServerStreamCancelMove  = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.*.*"
210
        JSApiServerStreamCancelMoveT = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.%s.%s"
211

212
        // The prefix for system level account API.
213
        jsAPIAccountPre = "$JS.API.ACCOUNT."
214

215
        // jsAckT is the template for the ack message stream coming back from a consumer
216
        // when they ACK/NAK, etc a message.
217
        jsAckT      = "$JS.ACK.%s.%s"
218
        jsAckPre    = "$JS.ACK."
219
        jsAckPreLen = len(jsAckPre)
220

221
        // jsFlowControl is for flow control subjects.
222
        jsFlowControlPre = "$JS.FC."
223
        // jsFlowControl is for FC responses.
224
        jsFlowControl = "$JS.FC.%s.%s.*"
225

226
        // JSAdvisoryPrefix is a prefix for all JetStream advisories.
227
        JSAdvisoryPrefix = "$JS.EVENT.ADVISORY"
228

229
        // JSMetricPrefix is a prefix for all JetStream metrics.
230
        JSMetricPrefix = "$JS.EVENT.METRIC"
231

232
        // JSMetricConsumerAckPre is a metric containing ack latency.
233
        JSMetricConsumerAckPre = "$JS.EVENT.METRIC.CONSUMER.ACK"
234

235
        // JSAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold.
236
        JSAdvisoryConsumerMaxDeliveryExceedPre = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES"
237

238
        // JSAdvisoryConsumerMsgNakPre is a notification published when a message has been naked
239
        JSAdvisoryConsumerMsgNakPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED"
240

241
        // JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated.
242
        JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"
243

244
        // JSAdvisoryStreamCreatedPre notification that a stream was created.
245
        JSAdvisoryStreamCreatedPre = "$JS.EVENT.ADVISORY.STREAM.CREATED"
246

247
        // JSAdvisoryStreamDeletedPre notification that a stream was deleted.
248
        JSAdvisoryStreamDeletedPre = "$JS.EVENT.ADVISORY.STREAM.DELETED"
249

250
        // JSAdvisoryStreamUpdatedPre notification that a stream was updated.
251
        JSAdvisoryStreamUpdatedPre = "$JS.EVENT.ADVISORY.STREAM.UPDATED"
252

253
        // JSAdvisoryConsumerCreatedPre notification that a consumer was created.
254
        JSAdvisoryConsumerCreatedPre = "$JS.EVENT.ADVISORY.CONSUMER.CREATED"
255

256
        // JSAdvisoryConsumerDeletedPre notification that a consumer was deleted.
257
        JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"
258

259
        // JSAdvisoryConsumerPausePre notification that a consumer paused/unpaused.
260
        JSAdvisoryConsumerPausePre = "$JS.EVENT.ADVISORY.CONSUMER.PAUSE"
261

262
        // JSAdvisoryConsumerPinnedPre notification that a consumer was pinned.
263
        JSAdvisoryConsumerPinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.PINNED"
264

265
        // JSAdvisoryConsumerUnpinnedPre notification that a consumer was unpinned.
266
        JSAdvisoryConsumerUnpinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.UNPINNED"
267

268
        // JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
269
        JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"
270

271
        // JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed.
272
        JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"
273

274
        // JSAdvisoryStreamRestoreCreatePre notification that a restore was start.
275
        JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"
276

277
        // JSAdvisoryStreamRestoreCompletePre notification that a restore was completed.
278
        JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"
279

280
        // JSAdvisoryDomainLeaderElectedPre notification that a jetstream domain has elected a leader.
281
        JSAdvisoryDomainLeaderElected = "$JS.EVENT.ADVISORY.DOMAIN.LEADER_ELECTED"
282

283
        // JSAdvisoryStreamLeaderElectedPre notification that a replicated stream has elected a leader.
284
        JSAdvisoryStreamLeaderElectedPre = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED"
285

286
        // JSAdvisoryStreamQuorumLostPre notification that a stream and its consumers are stalled.
287
        JSAdvisoryStreamQuorumLostPre = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST"
288

289
        // JSAdvisoryStreamBatchAbandonedPre notification that a stream's batch was abandoned.
290
        JSAdvisoryStreamBatchAbandonedPre = "$JS.EVENT.ADVISORY.STREAM.BATCH_ABANDONED"
291

292
        // JSAdvisoryConsumerLeaderElectedPre notification that a replicated consumer has elected a leader.
293
        JSAdvisoryConsumerLeaderElectedPre = "$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED"
294

295
        // JSAdvisoryConsumerQuorumLostPre notification that a consumer is stalled.
296
        JSAdvisoryConsumerQuorumLostPre = "$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST"
297

298
        // JSAdvisoryServerOutOfStorage notification that a server has no more storage.
299
        JSAdvisoryServerOutOfStorage = "$JS.EVENT.ADVISORY.SERVER.OUT_OF_STORAGE"
300

301
        // JSAdvisoryServerRemoved notification that a server has been removed from the system.
302
        JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED"
303

304
        // JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit.
305
        JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED"
306

307
        // JSAuditAdvisory is a notification about JetStream API access.
308
        // FIXME - Add in details about who..
309
        JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
310
)
311

312
// Headers used in $JS.API.> requests.
313
const (
314
        // JSRequiredApiLevel requires the API level of the responding server to have the specified minimum value.
315
        JSRequiredApiLevel = "Nats-Required-Api-Level"
316
)
317

318
var denyAllClientJs = []string{jsAllAPI, "$KV.>", "$OBJ.>"}
319
var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI, "$KV.>", "$OBJ.>"}
320

321
func generateJSMappingTable(domain string) map[string]string {
790✔
322
        mappings := map[string]string{}
790✔
323
        // This set of mappings is very very very ugly.
790✔
324
        // It is a consequence of what we defined the domain prefix to be "$JS.domain.API" and it's mapping to "$JS.API"
790✔
325
        // For optics $KV and $OBJ where made to be independent subject spaces.
790✔
326
        // As materialized views of JS, they did not simply extend that subject space to say "$JS.API.KV" "$JS.API.OBJ"
790✔
327
        // This is very unfortunate!!!
790✔
328
        // Furthermore, it seemed bad to require different domain prefixes for JS/KV/OBJ.
790✔
329
        // Especially since the actual API for say KV, does use stream create from JS.
790✔
330
        // To avoid overlaps KV and OBJ views append the prefix to their API.
790✔
331
        // (Replacing $KV with the prefix allows users to create collisions with say the bucket name)
790✔
332
        // This mapping therefore needs to have extra token so that the mapping can properly discern between $JS, $KV, $OBJ
790✔
333
        for srcMappingSuffix, to := range map[string]string{
790✔
334
                "INFO":       JSApiAccountInfo,
790✔
335
                "STREAM.>":   "$JS.API.STREAM.>",
790✔
336
                "CONSUMER.>": "$JS.API.CONSUMER.>",
790✔
337
                "DIRECT.>":   "$JS.API.DIRECT.>",
790✔
338
                "META.>":     "$JS.API.META.>",
790✔
339
                "SERVER.>":   "$JS.API.SERVER.>",
790✔
340
                "ACCOUNT.>":  "$JS.API.ACCOUNT.>",
790✔
341
                "$KV.>":      "$KV.>",
790✔
342
                "$OBJ.>":     "$OBJ.>",
790✔
343
        } {
7,900✔
344
                mappings[fmt.Sprintf("$JS.%s.API.%s", domain, srcMappingSuffix)] = to
7,110✔
345
        }
7,110✔
346
        return mappings
790✔
347
}
348

349
// JSMaxDescription is the maximum description length for streams and consumers.
350
const JSMaxDescriptionLen = 4 * 1024
351

352
// JSMaxMetadataLen is the maximum length for streams and consumers metadata map.
353
// It's calculated by summing length of all keys and values.
354
const JSMaxMetadataLen = 128 * 1024
355

356
// JSMaxNameLen is the maximum name lengths for streams, consumers and templates.
357
// Picked 255 as it seems to be a widely used file name limit
358
const JSMaxNameLen = 255
359

360
// JSDefaultRequestQueueLimit is the default number of entries that we will
361
// put on the global request queue before we react.
362
const JSDefaultRequestQueueLimit = 10_000
363

364
// Responses for API calls.
365

366
// ApiResponse is a standard response from the JetStream JSON API
367
type ApiResponse struct {
368
        Type  string    `json:"type"`
369
        Error *ApiError `json:"error,omitempty"`
370
}
371

372
const JSApiSystemResponseType = "io.nats.jetstream.api.v1.system_response"
373

374
// When passing back to the clients generalize store failures.
375
var (
376
        errStreamStoreFailed   = errors.New("error creating store for stream")
377
        errConsumerStoreFailed = errors.New("error creating store for consumer")
378
)
379

380
// ToError checks if the response has a error and if it does converts it to an error avoiding
381
// the pitfalls described by https://yourbasic.org/golang/gotcha-why-nil-error-not-equal-nil/
382
func (r *ApiResponse) ToError() error {
3,492✔
383
        if r.Error == nil {
5,415✔
384
                return nil
1,923✔
385
        }
1,923✔
386

387
        return r.Error
1,569✔
388
}
389

390
const JSApiOverloadedType = "io.nats.jetstream.api.v1.system_overloaded"
391

392
// ApiPaged includes variables used to create paged responses from the JSON API
393
type ApiPaged struct {
394
        Total  int `json:"total"`
395
        Offset int `json:"offset"`
396
        Limit  int `json:"limit"`
397
}
398

399
// ApiPagedRequest includes parameters allowing specific pages to be requests from APIs responding with ApiPaged
400
type ApiPagedRequest struct {
401
        Offset int `json:"offset"`
402
}
403

404
// JSApiAccountInfoResponse reports back information on jetstream for this account.
405
type JSApiAccountInfoResponse struct {
406
        ApiResponse
407
        *JetStreamAccountStats
408
}
409

410
const JSApiAccountInfoResponseType = "io.nats.jetstream.api.v1.account_info_response"
411

412
// JSApiStreamCreateResponse stream creation.
413
type JSApiStreamCreateResponse struct {
414
        ApiResponse
415
        *StreamInfo
416
        DidCreate bool `json:"did_create,omitempty"`
417
}
418

419
const JSApiStreamCreateResponseType = "io.nats.jetstream.api.v1.stream_create_response"
420

421
// JSApiStreamDeleteResponse stream removal.
422
type JSApiStreamDeleteResponse struct {
423
        ApiResponse
424
        Success bool `json:"success,omitempty"`
425
}
426

427
const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"
428

429
// JSMaxSubjectDetails The limit of the number of subject details we will send in a stream info response.
430
const JSMaxSubjectDetails = 100_000
431

432
type JSApiStreamInfoRequest struct {
433
        ApiPagedRequest
434
        DeletedDetails bool   `json:"deleted_details,omitempty"`
435
        SubjectsFilter string `json:"subjects_filter,omitempty"`
436
}
437

438
type JSApiStreamInfoResponse struct {
439
        ApiResponse
440
        ApiPaged
441
        *StreamInfo
442
}
443

444
const JSApiStreamInfoResponseType = "io.nats.jetstream.api.v1.stream_info_response"
445

446
// JSApiNamesLimit is the maximum entries we will return for streams or consumers lists.
447
// TODO(dlc) - with header or request support could request chunked response.
448
const JSApiNamesLimit = 1024
449
const JSApiListLimit = 256
450

451
type JSApiStreamNamesRequest struct {
452
        ApiPagedRequest
453
        // These are filters that can be applied to the list.
454
        Subject string `json:"subject,omitempty"`
455
}
456

457
// JSApiStreamNamesResponse list of streams.
458
// A nil request is valid and means all streams.
459
type JSApiStreamNamesResponse struct {
460
        ApiResponse
461
        ApiPaged
462
        Streams []string `json:"streams"`
463
}
464

465
const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"
466

467
type JSApiStreamListRequest struct {
468
        ApiPagedRequest
469
        // These are filters that can be applied to the list.
470
        Subject string `json:"subject,omitempty"`
471
}
472

473
// JSApiStreamListResponse list of detailed stream information.
474
// A nil request is valid and means all streams.
475
type JSApiStreamListResponse struct {
476
        ApiResponse
477
        ApiPaged
478
        Streams []*StreamInfo     `json:"streams"`
479
        Missing []string          `json:"missing,omitempty"`
480
        Offline map[string]string `json:"offline,omitempty"`
481
}
482

483
const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response"
484

485
// JSApiStreamPurgeRequest is optional request information to the purge API.
486
// Subject will filter the purge request to only messages that match the subject, which can have wildcards.
487
// Sequence will purge up to but not including this sequence and can be combined with subject filtering.
488
// Keep will specify how many messages to keep. This can also be combined with subject filtering.
489
// Note that Sequence and Keep are mutually exclusive, so both can not be set at the same time.
490
type JSApiStreamPurgeRequest struct {
491
        // Purge up to but not including sequence.
492
        Sequence uint64 `json:"seq,omitempty"`
493
        // Subject to match against messages for the purge command.
494
        Subject string `json:"filter,omitempty"`
495
        // Number of messages to keep.
496
        Keep uint64 `json:"keep,omitempty"`
497
}
498

499
type JSApiStreamPurgeResponse struct {
500
        ApiResponse
501
        Success bool   `json:"success,omitempty"`
502
        Purged  uint64 `json:"purged"`
503
}
504

505
const JSApiStreamPurgeResponseType = "io.nats.jetstream.api.v1.stream_purge_response"
506

507
type JSApiConsumerUnpinRequest struct {
508
        Group string `json:"group"`
509
}
510

511
type JSApiConsumerUnpinResponse struct {
512
        ApiResponse
513
}
514

515
const JSApiConsumerUnpinResponseType = "io.nats.jetstream.api.v1.consumer_unpin_response"
516

517
// JSApiStreamUpdateResponse for updating a stream.
518
type JSApiStreamUpdateResponse struct {
519
        ApiResponse
520
        *StreamInfo
521
}
522

523
const JSApiStreamUpdateResponseType = "io.nats.jetstream.api.v1.stream_update_response"
524

525
// JSApiMsgDeleteRequest delete message request.
526
type JSApiMsgDeleteRequest struct {
527
        Seq     uint64 `json:"seq"`
528
        NoErase bool   `json:"no_erase,omitempty"`
529
}
530

531
type JSApiMsgDeleteResponse struct {
532
        ApiResponse
533
        Success bool `json:"success,omitempty"`
534
}
535

536
const JSApiMsgDeleteResponseType = "io.nats.jetstream.api.v1.stream_msg_delete_response"
537

538
type JSApiStreamSnapshotRequest struct {
539
        // Subject to deliver the chunks to for the snapshot.
540
        DeliverSubject string `json:"deliver_subject"`
541
        // Do not include consumers in the snapshot.
542
        NoConsumers bool `json:"no_consumers,omitempty"`
543
        // Optional chunk size preference.
544
        // Best to just let server select.
545
        ChunkSize int `json:"chunk_size,omitempty"`
546
        // Check all message's checksums prior to snapshot.
547
        CheckMsgs bool `json:"jsck,omitempty"`
548
}
549

550
// JSApiStreamSnapshotResponse is the direct response to the snapshot request.
551
type JSApiStreamSnapshotResponse struct {
552
        ApiResponse
553
        // Configuration of the given stream.
554
        Config *StreamConfig `json:"config,omitempty"`
555
        // Current State for the given stream.
556
        State *StreamState `json:"state,omitempty"`
557
}
558

559
const JSApiStreamSnapshotResponseType = "io.nats.jetstream.api.v1.stream_snapshot_response"
560

561
// JSApiStreamRestoreRequest is the required restore request.
562
type JSApiStreamRestoreRequest struct {
563
        // Configuration of the given stream.
564
        Config StreamConfig `json:"config"`
565
        // Current State for the given stream.
566
        State StreamState `json:"state"`
567
}
568

569
// JSApiStreamRestoreResponse is the direct response to the restore request.
570
type JSApiStreamRestoreResponse struct {
571
        ApiResponse
572
        // Subject to deliver the chunks to for the snapshot restore.
573
        DeliverSubject string `json:"deliver_subject"`
574
}
575

576
const JSApiStreamRestoreResponseType = "io.nats.jetstream.api.v1.stream_restore_response"
577

578
// JSApiStreamRemovePeerRequest is the required remove peer request.
579
type JSApiStreamRemovePeerRequest struct {
580
        // Server name of the peer to be removed.
581
        Peer string `json:"peer"`
582
}
583

584
// JSApiStreamRemovePeerResponse is the response to a remove peer request.
585
type JSApiStreamRemovePeerResponse struct {
586
        ApiResponse
587
        Success bool `json:"success,omitempty"`
588
}
589

590
const JSApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response"
591

592
// JSApiStreamLeaderStepDownResponse is the response to a leader stepdown request.
593
type JSApiStreamLeaderStepDownResponse struct {
594
        ApiResponse
595
        Success bool `json:"success,omitempty"`
596
}
597

598
const JSApiStreamLeaderStepDownResponseType = "io.nats.jetstream.api.v1.stream_leader_stepdown_response"
599

600
// JSApiConsumerLeaderStepDownResponse is the response to a consumer leader stepdown request.
601
type JSApiConsumerLeaderStepDownResponse struct {
602
        ApiResponse
603
        Success bool `json:"success,omitempty"`
604
}
605

606
const JSApiConsumerLeaderStepDownResponseType = "io.nats.jetstream.api.v1.consumer_leader_stepdown_response"
607

608
// JSApiLeaderStepdownRequest allows placement control over the meta leader placement.
609
type JSApiLeaderStepdownRequest struct {
610
        Placement *Placement `json:"placement,omitempty"`
611
}
612

613
// JSApiLeaderStepDownResponse is the response to a meta leader stepdown request.
614
type JSApiLeaderStepDownResponse struct {
615
        ApiResponse
616
        Success bool `json:"success,omitempty"`
617
}
618

619
const JSApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_stepdown_response"
620

621
// JSApiMetaServerRemoveRequest will remove a peer from the meta group.
622
type JSApiMetaServerRemoveRequest struct {
623
        // Server name of the peer to be removed.
624
        Server string `json:"peer"`
625
        // Peer ID of the peer to be removed. If specified this is used
626
        // instead of the server name.
627
        Peer string `json:"peer_id,omitempty"`
628
}
629

630
// JSApiMetaServerRemoveResponse is the response to a peer removal request in the meta group.
631
type JSApiMetaServerRemoveResponse struct {
632
        ApiResponse
633
        Success bool `json:"success,omitempty"`
634
}
635

636
const JSApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response"
637

638
// JSApiMetaServerStreamMoveRequest will move a stream on a server to another
639
// response to this will come as JSApiStreamUpdateResponse/JSApiStreamUpdateResponseType
640
type JSApiMetaServerStreamMoveRequest struct {
641
        // Server name of the peer to be evacuated.
642
        Server string `json:"server,omitempty"`
643
        // Cluster the server is in
644
        Cluster string `json:"cluster,omitempty"`
645
        // Domain the sever is in
646
        Domain string `json:"domain,omitempty"`
647
        // Ephemeral placement tags for the move
648
        Tags []string `json:"tags,omitempty"`
649
}
650

651
const JSApiAccountPurgeResponseType = "io.nats.jetstream.api.v1.account_purge_response"
652

653
// JSApiAccountPurgeResponse is the response to a purge request in the meta group.
654
type JSApiAccountPurgeResponse struct {
655
        ApiResponse
656
        Initiated bool `json:"initiated,omitempty"`
657
}
658

659
// JSApiMsgGetRequest get a message request.
660
type JSApiMsgGetRequest struct {
661
        Seq     uint64 `json:"seq,omitempty"`
662
        LastFor string `json:"last_by_subj,omitempty"`
663
        NextFor string `json:"next_by_subj,omitempty"`
664

665
        // Batch support. Used to request more than one msg at a time.
666
        // Can be used with simple starting seq, but also NextFor with wildcards.
667
        Batch int `json:"batch,omitempty"`
668
        // This will make sure we limit how much data we blast out. If not set we will
669
        // inherit the slow consumer default max setting of the server. Default is MAX_PENDING_SIZE.
670
        MaxBytes int `json:"max_bytes,omitempty"`
671
        // Return messages as of this start time.
672
        StartTime *time.Time `json:"start_time,omitempty"`
673

674
        // Multiple response support. Will get the last msgs matching the subjects. These can include wildcards.
675
        MultiLastFor []string `json:"multi_last,omitempty"`
676
        // Only return messages up to this sequence. If not set, will be last sequence for the stream.
677
        UpToSeq uint64 `json:"up_to_seq,omitempty"`
678
        // Only return messages up to this time.
679
        UpToTime *time.Time `json:"up_to_time,omitempty"`
680
        // Only return the message payload, excluding headers if present.
681
        NoHeaders bool `json:"no_hdr,omitempty"`
682
}
683

684
type JSApiMsgGetResponse struct {
685
        ApiResponse
686
        Message *StoredMsg `json:"message,omitempty"`
687
}
688

689
const JSApiMsgGetResponseType = "io.nats.jetstream.api.v1.stream_msg_get_response"
690

691
// JSWaitQueueDefaultMax is the default max number of outstanding requests for pull consumers.
692
const JSWaitQueueDefaultMax = 512
693

694
type JSApiConsumerCreateResponse struct {
695
        ApiResponse
696
        *ConsumerInfo
697
}
698

699
const JSApiConsumerCreateResponseType = "io.nats.jetstream.api.v1.consumer_create_response"
700

701
type JSApiConsumerDeleteResponse struct {
702
        ApiResponse
703
        Success bool `json:"success,omitempty"`
704
}
705

706
const JSApiConsumerDeleteResponseType = "io.nats.jetstream.api.v1.consumer_delete_response"
707

708
type JSApiConsumerPauseRequest struct {
709
        PauseUntil time.Time `json:"pause_until,omitempty"`
710
}
711

712
const JSApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response"
713

714
type JSApiConsumerPauseResponse struct {
715
        ApiResponse
716
        Paused         bool          `json:"paused"`
717
        PauseUntil     time.Time     `json:"pause_until"`
718
        PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
719
}
720

721
type JSApiConsumerInfoResponse struct {
722
        ApiResponse
723
        *ConsumerInfo
724
}
725

726
const JSApiConsumerInfoResponseType = "io.nats.jetstream.api.v1.consumer_info_response"
727

728
type JSApiConsumersRequest struct {
729
        ApiPagedRequest
730
}
731

732
type JSApiConsumerNamesResponse struct {
733
        ApiResponse
734
        ApiPaged
735
        Consumers []string `json:"consumers"`
736
}
737

738
const JSApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_response"
739

740
type JSApiConsumerListResponse struct {
741
        ApiResponse
742
        ApiPaged
743
        Consumers []*ConsumerInfo   `json:"consumers"`
744
        Missing   []string          `json:"missing,omitempty"`
745
        Offline   map[string]string `json:"offline,omitempty"`
746
}
747

748
const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"
749

750
// JSApiConsumerGetNextRequest is for getting next messages for pull based consumers.
751
type JSApiConsumerGetNextRequest struct {
752
        Expires   time.Duration `json:"expires,omitempty"`
753
        Batch     int           `json:"batch,omitempty"`
754
        MaxBytes  int           `json:"max_bytes,omitempty"`
755
        NoWait    bool          `json:"no_wait,omitempty"`
756
        Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
757
        PriorityGroup
758
}
759

760
// Structure that holds state for a JetStream API request that is processed
761
// in a separate long-lived go routine. This is to avoid blocking connections.
762
type jsAPIRoutedReq struct {
763
        jsub    *subscription
764
        sub     *subscription
765
        acc     *Account
766
        subject string
767
        reply   string
768
        msg     []byte
769
        pa      pubArg
770
}
771

772
func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) {
138,307✔
773
        // Ignore system level directives meta stepdown and peer remove requests here.
138,307✔
774
        if subject == JSApiLeaderStepDown ||
138,307✔
775
                subject == JSApiRemoveServer ||
138,307✔
776
                strings.HasPrefix(subject, jsAPIAccountPre) {
138,786✔
777
                return
479✔
778
        }
479✔
779
        // No lock needed, those are immutable.
780
        s, rr := js.srv, js.apiSubs.Match(subject)
137,828✔
781

137,828✔
782
        hdr, msg := c.msgParts(rmsg)
137,828✔
783
        if len(sliceHeader(ClientInfoHdr, hdr)) == 0 {
137,835✔
784
                // Check if this is the system account. We will let these through for the account info only.
7✔
785
                sacc := s.SystemAccount()
7✔
786
                if sacc != acc {
7✔
787
                        return
×
788
                }
×
789
                if subject != JSApiAccountInfo {
11✔
790
                        // Only respond from the initial server entry to the NATS system.
4✔
791
                        if c.kind == CLIENT || c.kind == LEAF {
6✔
792
                                var resp = ApiResponse{
2✔
793
                                        Type:  JSApiSystemResponseType,
2✔
794
                                        Error: NewJSNotEnabledForAccountError(),
2✔
795
                                }
2✔
796
                                s.sendAPIErrResponse(nil, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
797
                        }
2✔
798
                        return
4✔
799
                }
800
        }
801

802
        // Short circuit for no interest.
803
        if len(rr.psubs)+len(rr.qsubs) == 0 {
157,808✔
804
                if (c.kind == CLIENT || c.kind == LEAF) && acc != s.SystemAccount() {
19,984✔
805
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
806
                        var resp = ApiResponse{
×
807
                                Type:  JSApiSystemResponseType,
×
808
                                Error: NewJSBadRequestError(),
×
809
                        }
×
810
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
811
                }
×
812
                return
19,984✔
813
        }
814

815
        // We should only have psubs and only 1 per result.
816
        if len(rr.psubs) != 1 {
117,840✔
817
                s.Warnf("Malformed JetStream API Request: [%s] %q", subject, rmsg)
×
818
                if c.kind == CLIENT || c.kind == LEAF {
×
819
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
820
                        var resp = ApiResponse{
×
821
                                Type:  JSApiSystemResponseType,
×
822
                                Error: NewJSBadRequestError(),
×
823
                        }
×
824
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
825
                }
×
826
                return
×
827
        }
828
        jsub := rr.psubs[0]
117,840✔
829

117,840✔
830
        // We need to make sure not to block. We will send the request to a long-lived
117,840✔
831
        // pool of go routines.
117,840✔
832

117,840✔
833
        // Increment inflight. Do this before queueing.
117,840✔
834
        atomic.AddInt64(&js.apiInflight, 1)
117,840✔
835

117,840✔
836
        // Copy the state. Note the JSAPI only uses the hdr index to piece apart the
117,840✔
837
        // header from the msg body. No other references are needed.
117,840✔
838
        // Check pending and warn if getting backed up.
117,840✔
839
        pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
117,840✔
840
        limit := atomic.LoadInt64(&js.queueLimit)
117,840✔
841
        if pending >= int(limit) {
118,130✔
842
                s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
290✔
843
                drained := int64(s.jsAPIRoutedReqs.drain())
290✔
844
                atomic.AddInt64(&js.apiInflight, -drained)
290✔
845

290✔
846
                s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
290✔
847
                        TypedEvent: TypedEvent{
290✔
848
                                Type: JSAPILimitReachedAdvisoryType,
290✔
849
                                ID:   nuid.Next(),
290✔
850
                                Time: time.Now().UTC(),
290✔
851
                        },
290✔
852
                        Server:  s.Name(),
290✔
853
                        Domain:  js.config.Domain,
290✔
854
                        Dropped: drained,
290✔
855
                })
290✔
856
        }
290✔
857
}
858

859
func (s *Server) processJSAPIRoutedRequests() {
16,792✔
860
        defer s.grWG.Done()
16,792✔
861

16,792✔
862
        s.mu.RLock()
16,792✔
863
        queue := s.jsAPIRoutedReqs
16,792✔
864
        client := &client{srv: s, kind: JETSTREAM}
16,792✔
865
        s.mu.RUnlock()
16,792✔
866

16,792✔
867
        js := s.getJetStream()
16,792✔
868

16,792✔
869
        for {
112,352✔
870
                select {
95,560✔
871
                case <-queue.ch:
78,768✔
872
                        // Only pop one item at a time here, otherwise if the system is recovering
78,768✔
873
                        // from queue buildup, then one worker will pull off all the tasks and the
78,768✔
874
                        // others will be starved of work.
78,768✔
875
                        for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
196,321✔
876
                                client.pa = r.pa
117,553✔
877
                                start := time.Now()
117,553✔
878
                                r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
117,553✔
879
                                if dur := time.Since(start); dur >= readLoopReportThreshold {
117,558✔
880
                                        s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
5✔
881
                                }
5✔
882
                                atomic.AddInt64(&js.apiInflight, -1)
117,553✔
883
                        }
884
                case <-s.quitCh:
16,776✔
885
                        return
16,776✔
886
                }
887
        }
888
}
889

890
func (s *Server) setJetStreamExportSubs() error {
4,198✔
891
        js := s.getJetStream()
4,198✔
892
        if js == nil {
4,198✔
893
                return NewJSNotEnabledError()
×
894
        }
×
895

896
        // Start the go routine that will process API requests received by the
897
        // subscription below when they are coming from routes, etc..
898
        const maxProcs = 16
4,198✔
899
        mp := runtime.GOMAXPROCS(0)
4,198✔
900
        // Cap at 16 max for now on larger core setups.
4,198✔
901
        if mp > maxProcs {
4,198✔
902
                mp = maxProcs
×
903
        }
×
904
        s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "Routed JS API Requests")
4,198✔
905
        for i := 0; i < mp; i++ {
20,990✔
906
                s.startGoRoutine(s.processJSAPIRoutedRequests)
16,792✔
907
        }
16,792✔
908

909
        // This is the catch all now for all JetStream API calls.
910
        if _, err := s.sysSubscribe(jsAllAPI, js.apiDispatch); err != nil {
4,198✔
911
                return err
×
912
        }
×
913

914
        if err := s.SystemAccount().AddServiceExport(jsAllAPI, nil); err != nil {
4,198✔
915
                s.Warnf("Error setting up jetstream service exports: %v", err)
×
916
                return err
×
917
        }
×
918

919
        // API handles themselves.
920
        pairs := []struct {
4,198✔
921
                subject string
4,198✔
922
                handler msgHandler
4,198✔
923
        }{
4,198✔
924
                {JSApiAccountInfo, s.jsAccountInfoRequest},
4,198✔
925
                {JSApiStreamCreate, s.jsStreamCreateRequest},
4,198✔
926
                {JSApiStreamUpdate, s.jsStreamUpdateRequest},
4,198✔
927
                {JSApiStreams, s.jsStreamNamesRequest},
4,198✔
928
                {JSApiStreamList, s.jsStreamListRequest},
4,198✔
929
                {JSApiStreamInfo, s.jsStreamInfoRequest},
4,198✔
930
                {JSApiStreamDelete, s.jsStreamDeleteRequest},
4,198✔
931
                {JSApiStreamPurge, s.jsStreamPurgeRequest},
4,198✔
932
                {JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
4,198✔
933
                {JSApiStreamRestore, s.jsStreamRestoreRequest},
4,198✔
934
                {JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},
4,198✔
935
                {JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},
4,198✔
936
                {JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
4,198✔
937
                {JSApiMsgDelete, s.jsMsgDeleteRequest},
4,198✔
938
                {JSApiMsgGet, s.jsMsgGetRequest},
4,198✔
939
                {JSApiConsumerCreateEx, s.jsConsumerCreateRequest},
4,198✔
940
                {JSApiConsumerCreate, s.jsConsumerCreateRequest},
4,198✔
941
                {JSApiDurableCreate, s.jsConsumerCreateRequest},
4,198✔
942
                {JSApiConsumers, s.jsConsumerNamesRequest},
4,198✔
943
                {JSApiConsumerList, s.jsConsumerListRequest},
4,198✔
944
                {JSApiConsumerInfo, s.jsConsumerInfoRequest},
4,198✔
945
                {JSApiConsumerDelete, s.jsConsumerDeleteRequest},
4,198✔
946
                {JSApiConsumerPause, s.jsConsumerPauseRequest},
4,198✔
947
                {JSApiConsumerUnpin, s.jsConsumerUnpinRequest},
4,198✔
948
        }
4,198✔
949

4,198✔
950
        js.mu.Lock()
4,198✔
951
        defer js.mu.Unlock()
4,198✔
952

4,198✔
953
        for _, p := range pairs {
104,950✔
954
                sub := &subscription{subject: []byte(p.subject), icb: p.handler}
100,752✔
955
                if err := js.apiSubs.Insert(sub); err != nil {
100,752✔
956
                        return err
×
957
                }
×
958
        }
959

960
        return nil
4,198✔
961
}
962

963
func (s *Server) sendAPIResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
38,907✔
964
        acc.trackAPI()
38,907✔
965
        if reply != _EMPTY_ {
70,061✔
966
                s.sendInternalAccountMsg(nil, reply, response)
31,154✔
967
        }
31,154✔
968
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
38,907✔
969
}
970

971
func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
28,611✔
972
        acc.trackAPIErr()
28,611✔
973
        if reply != _EMPTY_ {
43,594✔
974
                s.sendInternalAccountMsg(nil, reply, response)
14,983✔
975
        }
14,983✔
976
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
28,611✔
977
}
978

979
const errRespDelay = 500 * time.Millisecond
980

981
type delayedAPIResponse struct {
982
        ci       *ClientInfo
983
        acc      *Account
984
        subject  string
985
        reply    string
986
        request  string
987
        hdr      []byte
988
        response string
989
        rg       *raftGroup
990
        deadline time.Time
991
        noJs     bool
992
        next     *delayedAPIResponse
993
}
994

995
// Add `r` in the list that is maintained ordered by the `delayedAPIResponse.deadline` time.
996
func addDelayedResponse(head, tail **delayedAPIResponse, r *delayedAPIResponse) {
112✔
997
        // Check if list empty.
112✔
998
        if *head == nil {
210✔
999
                *head, *tail = r, r
98✔
1000
                return
98✔
1001
        }
98✔
1002
        // Check if it should be added at the end, which is if after or equal to the tail.
1003
        if r.deadline.After((*tail).deadline) || r.deadline.Equal((*tail).deadline) {
26✔
1004
                (*tail).next, *tail = r, r
12✔
1005
                return
12✔
1006
        }
12✔
1007
        // Find its spot in the list.
1008
        var prev *delayedAPIResponse
2✔
1009
        for c := *head; c != nil; c = c.next {
6✔
1010
                // We insert only if we are stricly before the current `c`.
4✔
1011
                if r.deadline.Before(c.deadline) {
6✔
1012
                        r.next = c
2✔
1013
                        if prev != nil {
3✔
1014
                                prev.next = r
1✔
1015
                        } else {
2✔
1016
                                *head = r
1✔
1017
                        }
1✔
1018
                        return
2✔
1019
                }
1020
                prev = c
2✔
1021
        }
1022
}
1023

1024
func (s *Server) delayedAPIResponder() {
6,703✔
1025
        defer s.grWG.Done()
6,703✔
1026
        var (
6,703✔
1027
                head, tail *delayedAPIResponse // Linked list.
6,703✔
1028
                r          *delayedAPIResponse // Updated by calling next().
6,703✔
1029
                rqch       <-chan struct{}     // Quit channel of the Raft group (if present).
6,703✔
1030
                tm         = time.NewTimer(time.Hour)
6,703✔
1031
        )
6,703✔
1032
        next := func() {
6,904✔
1033
                r, rqch = nil, nil
201✔
1034
                // Check that JetStream is still on. Do not exit the go routine
201✔
1035
                // since JS can be enabled/disabled. The go routine will exit
201✔
1036
                // only if server is shutdown.
201✔
1037
                js := s.getJetStream()
201✔
1038
                if js == nil {
202✔
1039
                        // Reset head and tail here. Also drain the ipQueue.
1✔
1040
                        head, tail = nil, nil
1✔
1041
                        s.delayedAPIResponses.drain()
1✔
1042
                        // Fall back into next "if" that resets timer.
1✔
1043
                }
1✔
1044
                // If there are no delayed messages then delay the timer for
1045
                // a while.
1046
                if head == nil {
299✔
1047
                        tm.Reset(time.Hour)
98✔
1048
                        return
98✔
1049
                }
98✔
1050
                // Get the first expected message and then reset the timer.
1051
                r = head
103✔
1052
                js.mu.RLock()
103✔
1053
                if r.rg != nil && r.rg.node != nil {
104✔
1054
                        // If there's an attached Raft group to the delayed response
1✔
1055
                        // then pull out the quit channel, so that we don't bother
1✔
1056
                        // sending responses for entities which are now no longer
1✔
1057
                        // running.
1✔
1058
                        rqch = r.rg.node.QuitC()
1✔
1059
                }
1✔
1060
                js.mu.RUnlock()
103✔
1061
                tm.Reset(time.Until(r.deadline))
103✔
1062
        }
1063
        pop := func() {
6,805✔
1064
                if head == nil {
102✔
1065
                        return
×
1066
                }
×
1067
                head = head.next
102✔
1068
                if head == nil {
199✔
1069
                        tail = nil
97✔
1070
                }
97✔
1071
        }
1072
        for {
13,620✔
1073
                select {
6,917✔
1074
                case <-s.delayedAPIResponses.ch:
112✔
1075
                        v, ok := s.delayedAPIResponses.popOne()
112✔
1076
                        if !ok {
112✔
1077
                                continue
×
1078
                        }
1079
                        // Add it to the list, and if ends up being the head, set things up.
1080
                        addDelayedResponse(&head, &tail, v)
112✔
1081
                        if v == head {
211✔
1082
                                next()
99✔
1083
                        }
99✔
1084
                case <-s.quitCh:
6,698✔
1085
                        return
6,698✔
1086
                case <-rqch:
1✔
1087
                        // If we were the head, drop and setup things for next.
1✔
1088
                        if r != nil && r == head {
2✔
1089
                                pop()
1✔
1090
                        }
1✔
1091
                        next()
1✔
1092
                case <-tm.C:
101✔
1093
                        if r != nil {
202✔
1094
                                // If it's not a JS API error, send it as a raw response without additional API/audit tracking.
101✔
1095
                                if r.noJs {
137✔
1096
                                        s.sendInternalAccountMsgWithReply(r.acc, r.subject, _EMPTY_, r.hdr, r.response, false)
36✔
1097
                                } else {
101✔
1098
                                        s.sendAPIErrResponse(r.ci, r.acc, r.subject, r.reply, r.request, r.response)
65✔
1099
                                }
65✔
1100
                                pop()
101✔
1101
                        }
1102
                        next()
101✔
1103
                }
1104
        }
1105
}
1106

1107
func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup, duration time.Duration) {
76✔
1108
        s.delayedAPIResponses.push(&delayedAPIResponse{
76✔
1109
                ci, acc, subject, reply, request, nil, response, rg, time.Now().Add(duration), false, nil,
76✔
1110
        })
76✔
1111
}
76✔
1112

1113
func (s *Server) sendDelayedErrResponse(acc *Account, subject string, hdr []byte, response string, duration time.Duration) {
36✔
1114
        s.delayedAPIResponses.push(&delayedAPIResponse{
36✔
1115
                nil, acc, subject, _EMPTY_, _EMPTY_, hdr, response, nil, time.Now().Add(duration), true, nil,
36✔
1116
        })
36✔
1117
}
36✔
1118

1119
func (s *Server) getRequestInfo(c *client, raw []byte) (pci *ClientInfo, acc *Account, hdr, msg []byte, err error) {
108,364✔
1120
        hdr, msg = c.msgParts(raw)
108,364✔
1121
        var ci ClientInfo
108,364✔
1122

108,364✔
1123
        if len(hdr) > 0 {
216,646✔
1124
                if err := json.Unmarshal(sliceHeader(ClientInfoHdr, hdr), &ci); err != nil {
108,282✔
1125
                        return nil, nil, nil, nil, err
×
1126
                }
×
1127
        }
1128

1129
        if ci.Service != _EMPTY_ {
108,426✔
1130
                acc, _ = s.LookupAccount(ci.Service)
62✔
1131
        } else if ci.Account != _EMPTY_ {
216,584✔
1132
                acc, _ = s.LookupAccount(ci.Account)
108,220✔
1133
        } else {
108,302✔
1134
                // Direct $SYS access.
82✔
1135
                acc = c.acc
82✔
1136
                if acc == nil {
86✔
1137
                        acc = s.SystemAccount()
4✔
1138
                }
4✔
1139
        }
1140
        if acc == nil {
108,374✔
1141
                return nil, nil, nil, nil, ErrMissingAccount
10✔
1142
        }
10✔
1143
        return &ci, acc, hdr, msg, nil
108,354✔
1144
}
1145

1146
func (s *Server) unmarshalRequest(c *client, acc *Account, subject string, msg []byte, v any) error {
18,542✔
1147
        decoder := json.NewDecoder(bytes.NewReader(msg))
18,542✔
1148
        decoder.DisallowUnknownFields()
18,542✔
1149

18,542✔
1150
        for {
55,620✔
1151
                if err := decoder.Decode(v); err != nil {
55,620✔
1152
                        if err == io.EOF {
37,078✔
1153
                                return nil
18,536✔
1154
                        }
18,536✔
1155

1156
                        var syntaxErr *json.SyntaxError
6✔
1157
                        if errors.As(err, &syntaxErr) {
6✔
1158
                                err = fmt.Errorf("%w at offset %d", err, syntaxErr.Offset)
×
1159
                        }
×
1160

1161
                        c.RateLimitWarnf("Invalid JetStream request '%s > %s': %s", acc, subject, err)
6✔
1162

6✔
1163
                        if s.JetStreamConfig().Strict {
12✔
1164
                                return err
6✔
1165
                        }
6✔
1166

1167
                        return json.Unmarshal(msg, v)
×
1168
                }
1169
        }
1170
}
1171

1172
func (a *Account) trackAPI() {
38,907✔
1173
        a.mu.RLock()
38,907✔
1174
        jsa := a.js
38,907✔
1175
        a.mu.RUnlock()
38,907✔
1176
        if jsa != nil {
77,724✔
1177
                jsa.usageMu.Lock()
38,817✔
1178
                jsa.usageApi++
38,817✔
1179
                jsa.apiTotal++
38,817✔
1180
                jsa.sendClusterUsageUpdate()
38,817✔
1181
                atomic.AddInt64(&jsa.js.apiTotal, 1)
38,817✔
1182
                jsa.usageMu.Unlock()
38,817✔
1183
        }
38,817✔
1184
}
1185

1186
func (a *Account) trackAPIErr() {
28,611✔
1187
        a.mu.RLock()
28,611✔
1188
        jsa := a.js
28,611✔
1189
        a.mu.RUnlock()
28,611✔
1190
        if jsa != nil {
57,000✔
1191
                jsa.usageMu.Lock()
28,389✔
1192
                jsa.usageApi++
28,389✔
1193
                jsa.apiTotal++
28,389✔
1194
                jsa.usageErr++
28,389✔
1195
                jsa.apiErrors++
28,389✔
1196
                jsa.sendClusterUsageUpdate()
28,389✔
1197
                atomic.AddInt64(&jsa.js.apiTotal, 1)
28,389✔
1198
                atomic.AddInt64(&jsa.js.apiErrors, 1)
28,389✔
1199
                jsa.usageMu.Unlock()
28,389✔
1200
        }
28,389✔
1201
}
1202

1203
const badAPIRequestT = "Malformed JetStream API Request: %q"
1204

1205
// Helper function to check on JetStream being enabled but also on status of leafnodes
1206
// If the local account is not enabled but does have leafnode connectivity we will not
1207
// want to error immediately and let the other side decide.
1208
func (a *Account) checkJetStream() (enabled, shouldError bool) {
45,559✔
1209
        a.mu.RLock()
45,559✔
1210
        defer a.mu.RUnlock()
45,559✔
1211
        return a.js != nil, a.nleafs+a.nrleafs == 0
45,559✔
1212
}
45,559✔
1213

1214
// Request for current usage and limits for this account.
1215
func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
457✔
1216
        if c == nil || !s.JetStreamEnabled() {
457✔
1217
                return
×
1218
        }
×
1219

1220
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
457✔
1221
        if err != nil {
458✔
1222
                s.Warnf(badAPIRequestT, msg)
1✔
1223
                return
1✔
1224
        }
1✔
1225

1226
        var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}}
456✔
1227
        if errorOnRequiredApiLevel(hdr) {
457✔
1228
                resp.Error = NewJSRequiredApiLevelError()
1✔
1229
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1230
                return
1✔
1231
        }
1✔
1232

1233
        // Determine if we should proceed here when we are in clustered mode.
1234
        if s.JetStreamIsClustered() {
836✔
1235
                js, cc := s.getJetStreamCluster()
381✔
1236
                if js == nil || cc == nil {
381✔
1237
                        return
×
1238
                }
×
1239
                if js.isLeaderless() {
382✔
1240
                        resp.Error = NewJSClusterNotAvailError()
1✔
1241
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1242
                        return
1✔
1243
                }
1✔
1244
                // Make sure we are meta leader.
1245
                if !s.JetStreamIsLeader() {
643✔
1246
                        return
263✔
1247
                }
263✔
1248
        }
1249

1250
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
197✔
1251
                if !doErr {
7✔
1252
                        return
1✔
1253
                }
1✔
1254
                resp.Error = NewJSNotEnabledForAccountError()
5✔
1255
        } else {
185✔
1256
                stats := acc.JetStreamUsage()
185✔
1257
                resp.JetStreamAccountStats = &stats
185✔
1258
        }
185✔
1259
        b, err := json.Marshal(resp)
190✔
1260
        if err != nil {
190✔
1261
                return
×
1262
        }
×
1263

1264
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), string(b))
190✔
1265
}
1266

1267
// Helpers for token extraction.
1268
func streamNameFromSubject(subject string) string {
82,260✔
1269
        return tokenAt(subject, 5)
82,260✔
1270
}
82,260✔
1271

1272
func consumerNameFromSubject(subject string) string {
48,025✔
1273
        return tokenAt(subject, 6)
48,025✔
1274
}
48,025✔
1275

1276
func (s *Server) jsonResponse(v any) string {
68,121✔
1277
        b, err := json.Marshal(v)
68,121✔
1278
        if err != nil {
68,121✔
1279
                s.Warnf("Problem marshaling JSON for JetStream API:", err)
×
1280
                return ""
×
1281
        }
×
1282
        return string(b)
68,121✔
1283
}
1284

1285
// Read lock must be held
1286
func (jsa *jsAccount) tieredReservation(tier string, cfg *StreamConfig) int64 {
3,569✔
1287
        reservation := int64(0)
3,569✔
1288
        if tier == _EMPTY_ {
7,116✔
1289
                for _, sa := range jsa.streams {
12,108✔
1290
                        if sa.cfg.MaxBytes > 0 {
8,588✔
1291
                                if sa.cfg.Storage == cfg.Storage && sa.cfg.Name != cfg.Name {
27✔
1292
                                        reservation += (int64(sa.cfg.Replicas) * sa.cfg.MaxBytes)
×
1293
                                }
×
1294
                        }
1295
                }
1296
        } else {
22✔
1297
                for _, sa := range jsa.streams {
37✔
1298
                        if sa.cfg.Replicas == cfg.Replicas {
29✔
1299
                                if sa.cfg.MaxBytes > 0 {
19✔
1300
                                        if isSameTier(&sa.cfg, cfg) && sa.cfg.Name != cfg.Name {
10✔
1301
                                                reservation += (int64(sa.cfg.Replicas) * sa.cfg.MaxBytes)
5✔
1302
                                        }
5✔
1303
                                }
1304
                        }
1305
                }
1306
        }
1307
        return reservation
3,569✔
1308
}
1309

1310
// Request to create a stream.
1311
func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
11,210✔
1312
        if c == nil || !s.JetStreamEnabled() {
11,478✔
1313
                return
268✔
1314
        }
268✔
1315
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
10,942✔
1316
        if err != nil {
10,945✔
1317
                s.Warnf(badAPIRequestT, msg)
3✔
1318
                return
3✔
1319
        }
3✔
1320

1321
        var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
10,939✔
1322
        if errorOnRequiredApiLevel(hdr) {
10,940✔
1323
                resp.Error = NewJSRequiredApiLevelError()
1✔
1324
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1325
                return
1✔
1326
        }
1✔
1327

1328
        // Determine if we should proceed here when we are in clustered mode.
1329
        if s.JetStreamIsClustered() {
20,525✔
1330
                js, cc := s.getJetStreamCluster()
9,587✔
1331
                if js == nil || cc == nil {
9,587✔
1332
                        return
×
1333
                }
×
1334
                if js.isLeaderless() {
9,588✔
1335
                        resp.Error = NewJSClusterNotAvailError()
1✔
1336
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1337
                        return
1✔
1338
                }
1✔
1339
                // Make sure we are meta leader.
1340
                if !s.JetStreamIsLeader() {
16,164✔
1341
                        return
6,578✔
1342
                }
6,578✔
1343
        }
1344

1345
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
4,367✔
1346
                if doErr {
8✔
1347
                        resp.Error = NewJSNotEnabledForAccountError()
×
1348
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1349
                }
×
1350
                return
8✔
1351
        }
1352

1353
        var cfg StreamConfigRequest
4,351✔
1354
        if err := s.unmarshalRequest(c, acc, subject, msg, &cfg); err != nil {
4,352✔
1355
                resp.Error = NewJSInvalidJSONError(err)
1✔
1356
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1357
                return
1✔
1358
        }
1✔
1359

1360
        // Initialize asset version metadata.
1361
        setStaticStreamMetadata(&cfg.StreamConfig)
4,350✔
1362

4,350✔
1363
        streamName := streamNameFromSubject(subject)
4,350✔
1364
        if streamName != cfg.Name {
4,351✔
1365
                resp.Error = NewJSStreamMismatchError()
1✔
1366
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1367
                return
1✔
1368
        }
1✔
1369

1370
        // Check for path like separators in the name.
1371
        if strings.ContainsAny(streamName, `\/`) {
4,351✔
1372
                resp.Error = NewJSStreamNameContainsPathSeparatorsError()
2✔
1373
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1374
                return
2✔
1375
        }
2✔
1376

1377
        // Can't create a stream with a sealed state.
1378
        if cfg.Sealed {
4,349✔
1379
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed"))
2✔
1380
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1381
                return
2✔
1382
        }
2✔
1383

1384
        // If we are told to do mirror direct but are not mirroring, error.
1385
        if cfg.MirrorDirect && cfg.Mirror == nil {
4,345✔
1386
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream has no mirror but does have mirror direct"))
×
1387
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1388
                return
×
1389
        }
×
1390

1391
        // Hand off to cluster for processing.
1392
        if s.JetStreamIsClustered() {
7,351✔
1393
                s.jsClusteredStreamRequest(ci, acc, subject, reply, rmsg, &cfg)
3,006✔
1394
                return
3,006✔
1395
        }
3,006✔
1396

1397
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg.StreamConfig); err != nil {
1,341✔
1398
                resp.Error = err
2✔
1399
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1400
                return
2✔
1401
        }
2✔
1402

1403
        mset, err := acc.addStreamPedantic(&cfg.StreamConfig, cfg.Pedantic)
1,337✔
1404
        if err != nil {
1,388✔
1405
                if IsNatsErr(err, JSStreamStoreFailedF) {
51✔
1406
                        s.Warnf("Stream create failed for '%s > %s': %v", acc, streamName, err)
×
1407
                        err = errStreamStoreFailed
×
1408
                }
×
1409
                resp.Error = NewJSStreamCreateError(err, Unless(err))
51✔
1410
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
51✔
1411
                return
51✔
1412
        }
1413
        msetCfg := mset.config()
1,286✔
1414
        resp.StreamInfo = &StreamInfo{
1,286✔
1415
                Created:   mset.createdTime(),
1,286✔
1416
                State:     mset.state(),
1,286✔
1417
                Config:    *setDynamicStreamMetadata(&msetCfg),
1,286✔
1418
                TimeStamp: time.Now().UTC(),
1,286✔
1419
                Mirror:    mset.mirrorInfo(),
1,286✔
1420
                Sources:   mset.sourcesInfo(),
1,286✔
1421
        }
1,286✔
1422
        resp.DidCreate = true
1,286✔
1423
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,286✔
1424
}
1425

1426
// Request to update a stream.
1427
func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
829✔
1428
        if c == nil || !s.JetStreamEnabled() {
829✔
1429
                return
×
1430
        }
×
1431

1432
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
829✔
1433
        if err != nil {
829✔
1434
                s.Warnf(badAPIRequestT, msg)
×
1435
                return
×
1436
        }
×
1437

1438
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
829✔
1439
        if errorOnRequiredApiLevel(hdr) {
830✔
1440
                resp.Error = NewJSRequiredApiLevelError()
1✔
1441
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1442
                return
1✔
1443
        }
1✔
1444

1445
        // Determine if we should proceed here when we are in clustered mode.
1446
        if s.JetStreamIsClustered() {
1,562✔
1447
                js, cc := s.getJetStreamCluster()
734✔
1448
                if js == nil || cc == nil {
734✔
1449
                        return
×
1450
                }
×
1451
                if js.isLeaderless() {
736✔
1452
                        resp.Error = NewJSClusterNotAvailError()
2✔
1453
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1454
                        return
2✔
1455
                }
2✔
1456
                // Make sure we are meta leader.
1457
                if !s.JetStreamIsLeader() {
1,290✔
1458
                        return
558✔
1459
                }
558✔
1460
        }
1461

1462
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
268✔
1463
                if doErr {
×
1464
                        resp.Error = NewJSNotEnabledForAccountError()
×
1465
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1466
                }
×
1467
                return
×
1468
        }
1469
        var ncfg StreamConfigRequest
268✔
1470
        if err := s.unmarshalRequest(c, acc, subject, msg, &ncfg); err != nil {
269✔
1471
                resp.Error = NewJSInvalidJSONError(err)
1✔
1472
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1473
                return
1✔
1474
        }
1✔
1475

1476
        cfg, apiErr := s.checkStreamCfg(&ncfg.StreamConfig, acc, ncfg.Pedantic)
267✔
1477
        if apiErr != nil {
286✔
1478
                resp.Error = apiErr
19✔
1479
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
19✔
1480
                return
19✔
1481
        }
19✔
1482

1483
        streamName := streamNameFromSubject(subject)
248✔
1484
        if streamName != cfg.Name {
249✔
1485
                resp.Error = NewJSStreamMismatchError()
1✔
1486
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1487
                return
1✔
1488
        }
1✔
1489

1490
        // Handle clustered version here.
1491
        if s.JetStreamIsClustered() {
416✔
1492
                s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
169✔
1493
                return
169✔
1494
        }
169✔
1495

1496
        mset, err := acc.lookupStream(streamName)
78✔
1497
        if err != nil {
82✔
1498
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
1499
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
1500
                return
4✔
1501
        }
4✔
1502
        if mset.offlineReason != _EMPTY_ {
74✔
1503
                resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason))
×
1504
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
1505
                return
×
1506
        }
×
1507

1508
        // Update asset version metadata.
1509
        setStaticStreamMetadata(&cfg)
74✔
1510

74✔
1511
        if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil {
86✔
1512
                resp.Error = NewJSStreamUpdateError(err, Unless(err))
12✔
1513
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
12✔
1514
                return
12✔
1515
        }
12✔
1516

1517
        msetCfg := mset.config()
62✔
1518
        resp.StreamInfo = &StreamInfo{
62✔
1519
                Created:   mset.createdTime(),
62✔
1520
                State:     mset.state(),
62✔
1521
                Config:    *setDynamicStreamMetadata(&msetCfg),
62✔
1522
                Domain:    s.getOpts().JetStreamDomain,
62✔
1523
                Mirror:    mset.mirrorInfo(),
62✔
1524
                Sources:   mset.sourcesInfo(),
62✔
1525
                TimeStamp: time.Now().UTC(),
62✔
1526
        }
62✔
1527
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
62✔
1528
}
1529

1530
// Request for the list of all stream names.
1531
func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,307✔
1532
        if c == nil || !s.JetStreamEnabled() {
1,307✔
1533
                return
×
1534
        }
×
1535
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
1,307✔
1536
        if err != nil {
1,307✔
1537
                s.Warnf(badAPIRequestT, msg)
×
1538
                return
×
1539
        }
×
1540

1541
        var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}}
1,307✔
1542
        if errorOnRequiredApiLevel(hdr) {
1,308✔
1543
                resp.Error = NewJSRequiredApiLevelError()
1✔
1544
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1545
                return
1✔
1546
        }
1✔
1547

1548
        // Determine if we should proceed here when we are in clustered mode.
1549
        if s.JetStreamIsClustered() {
2,340✔
1550
                js, cc := s.getJetStreamCluster()
1,034✔
1551
                if js == nil || cc == nil {
1,034✔
1552
                        return
×
1553
                }
×
1554
                if js.isLeaderless() {
1,034✔
1555
                        resp.Error = NewJSClusterNotAvailError()
×
1556
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1557
                        return
×
1558
                }
×
1559
                // Make sure we are meta leader.
1560
                if !s.JetStreamIsLeader() {
1,790✔
1561
                        return
756✔
1562
                }
756✔
1563
        }
1564

1565
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
551✔
1566
                if doErr {
1✔
1567
                        resp.Error = NewJSNotEnabledForAccountError()
×
1568
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1569
                }
×
1570
                return
1✔
1571
        }
1572

1573
        var offset int
549✔
1574
        var filter string
549✔
1575

549✔
1576
        if isJSONObjectOrArray(msg) {
919✔
1577
                var req JSApiStreamNamesRequest
370✔
1578
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
370✔
1579
                        resp.Error = NewJSInvalidJSONError(err)
×
1580
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1581
                        return
×
1582
                }
×
1583
                offset = req.Offset
370✔
1584
                if req.Subject != _EMPTY_ {
716✔
1585
                        filter = req.Subject
346✔
1586
                }
346✔
1587
        }
1588

1589
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1590
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1591
        var numStreams int
549✔
1592
        if s.JetStreamIsClustered() {
827✔
1593
                js, cc := s.getJetStreamCluster()
278✔
1594
                if js == nil || cc == nil {
278✔
1595
                        // TODO(dlc) - Debug or Warn?
×
1596
                        return
×
1597
                }
×
1598
                js.mu.RLock()
278✔
1599
                for stream, sa := range cc.streams[acc.Name] {
604✔
1600
                        if IsNatsErr(sa.err, JSClusterNotAssignedErr) {
326✔
1601
                                continue
×
1602
                        }
1603
                        if filter != _EMPTY_ {
609✔
1604
                                // These could not have subjects auto-filled in since they are raw and unprocessed.
283✔
1605
                                if len(sa.Config.Subjects) == 0 {
285✔
1606
                                        if SubjectsCollide(filter, sa.Config.Name) {
2✔
1607
                                                resp.Streams = append(resp.Streams, stream)
×
1608
                                        }
×
1609
                                } else {
281✔
1610
                                        for _, subj := range sa.Config.Subjects {
562✔
1611
                                                if SubjectsCollide(filter, subj) {
507✔
1612
                                                        resp.Streams = append(resp.Streams, stream)
226✔
1613
                                                        break
226✔
1614
                                                }
1615
                                        }
1616
                                }
1617
                        } else {
43✔
1618
                                resp.Streams = append(resp.Streams, stream)
43✔
1619
                        }
43✔
1620
                }
1621
                js.mu.RUnlock()
278✔
1622
                if len(resp.Streams) > 1 {
280✔
1623
                        slices.Sort(resp.Streams)
2✔
1624
                }
2✔
1625
                numStreams = len(resp.Streams)
278✔
1626
                if offset > numStreams {
278✔
1627
                        offset = numStreams
×
1628
                }
×
1629
                if offset > 0 {
278✔
1630
                        resp.Streams = resp.Streams[offset:]
×
1631
                }
×
1632
                if len(resp.Streams) > JSApiNamesLimit {
278✔
1633
                        resp.Streams = resp.Streams[:JSApiNamesLimit]
×
1634
                }
×
1635
        } else {
271✔
1636
                msets := acc.filteredStreams(filter)
271✔
1637
                // Since we page results order matters.
271✔
1638
                if len(msets) > 1 {
277✔
1639
                        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
21✔
1640
                }
1641

1642
                numStreams = len(msets)
271✔
1643
                if offset > numStreams {
271✔
1644
                        offset = numStreams
×
1645
                }
×
1646

1647
                for _, mset := range msets[offset:] {
552✔
1648
                        resp.Streams = append(resp.Streams, mset.cfg.Name)
281✔
1649
                        if len(resp.Streams) >= JSApiNamesLimit {
281✔
1650
                                break
×
1651
                        }
1652
                }
1653
        }
1654
        resp.Total = numStreams
549✔
1655
        resp.Limit = JSApiNamesLimit
549✔
1656
        resp.Offset = offset
549✔
1657

549✔
1658
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
549✔
1659
}
1660

1661
// Request for the list of all detailed stream info.
1662
// TODO(dlc) - combine with above long term
1663
func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
67✔
1664
        if c == nil || !s.JetStreamEnabled() {
67✔
1665
                return
×
1666
        }
×
1667
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
67✔
1668
        if err != nil {
67✔
1669
                s.Warnf(badAPIRequestT, msg)
×
1670
                return
×
1671
        }
×
1672

1673
        var resp = JSApiStreamListResponse{
67✔
1674
                ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
67✔
1675
                Streams:     []*StreamInfo{},
67✔
1676
        }
67✔
1677
        if errorOnRequiredApiLevel(hdr) {
68✔
1678
                resp.Error = NewJSRequiredApiLevelError()
1✔
1679
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1680
                return
1✔
1681
        }
1✔
1682

1683
        // Determine if we should proceed here when we are in clustered mode.
1684
        if s.JetStreamIsClustered() {
127✔
1685
                js, cc := s.getJetStreamCluster()
61✔
1686
                if js == nil || cc == nil {
61✔
1687
                        return
×
1688
                }
×
1689
                if js.isLeaderless() {
62✔
1690
                        resp.Error = NewJSClusterNotAvailError()
1✔
1691
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1692
                        return
1✔
1693
                }
1✔
1694
                // Make sure we are meta leader.
1695
                if !s.JetStreamIsLeader() {
103✔
1696
                        return
43✔
1697
                }
43✔
1698
        }
1699

1700
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
22✔
1701
                if doErr {
×
1702
                        resp.Error = NewJSNotEnabledForAccountError()
×
1703
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1704
                }
×
1705
                return
×
1706
        }
1707

1708
        var offset int
22✔
1709
        var filter string
22✔
1710

22✔
1711
        if isJSONObjectOrArray(msg) {
33✔
1712
                var req JSApiStreamListRequest
11✔
1713
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
1714
                        resp.Error = NewJSInvalidJSONError(err)
1✔
1715
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1716
                        return
1✔
1717
                }
1✔
1718
                offset = req.Offset
10✔
1719
                if req.Subject != _EMPTY_ {
12✔
1720
                        filter = req.Subject
2✔
1721
                }
2✔
1722
        }
1723

1724
        // Clustered mode will invoke a scatter and gather.
1725
        if s.JetStreamIsClustered() {
38✔
1726
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
17✔
1727
                msg = copyBytes(msg)
17✔
1728
                s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) })
34✔
1729
                return
17✔
1730
        }
1731

1732
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1733
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1734
        var msets []*stream
4✔
1735
        if filter == _EMPTY_ {
7✔
1736
                msets = acc.streams()
3✔
1737
        } else {
4✔
1738
                msets = acc.filteredStreams(filter)
1✔
1739
        }
1✔
1740

1741
        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
5✔
1742

1743
        scnt := len(msets)
4✔
1744
        if offset > scnt {
4✔
1745
                offset = scnt
×
1746
        }
×
1747

1748
        var missingNames []string
4✔
1749
        for _, mset := range msets[offset:] {
9✔
1750
                if mset.offlineReason != _EMPTY_ {
6✔
1751
                        if resp.Offline == nil {
2✔
1752
                                resp.Offline = make(map[string]string, 1)
1✔
1753
                        }
1✔
1754
                        resp.Offline[mset.getCfgName()] = mset.offlineReason
1✔
1755
                        missingNames = append(missingNames, mset.getCfgName())
1✔
1756
                        continue
1✔
1757
                }
1758

1759
                config := mset.config()
4✔
1760
                resp.Streams = append(resp.Streams, &StreamInfo{
4✔
1761
                        Created:   mset.createdTime(),
4✔
1762
                        State:     mset.state(),
4✔
1763
                        Config:    config,
4✔
1764
                        Domain:    s.getOpts().JetStreamDomain,
4✔
1765
                        Mirror:    mset.mirrorInfo(),
4✔
1766
                        Sources:   mset.sourcesInfo(),
4✔
1767
                        TimeStamp: time.Now().UTC(),
4✔
1768
                })
4✔
1769
                if len(resp.Streams) >= JSApiListLimit {
4✔
1770
                        break
×
1771
                }
1772
        }
1773
        resp.Total = scnt
4✔
1774
        resp.Limit = JSApiListLimit
4✔
1775
        resp.Offset = offset
4✔
1776
        resp.Missing = missingNames
4✔
1777
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
4✔
1778
}
1779

1780
// Request for information about a stream.
1781
func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
28,043✔
1782
        if c == nil || !s.JetStreamEnabled() {
28,050✔
1783
                return
7✔
1784
        }
7✔
1785
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
28,036✔
1786
        if err != nil {
28,041✔
1787
                s.Warnf(badAPIRequestT, msg)
5✔
1788
                return
5✔
1789
        }
5✔
1790

1791
        streamName := streamNameFromSubject(subject)
28,031✔
1792

28,031✔
1793
        var resp = JSApiStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamInfoResponseType}}
28,031✔
1794

28,031✔
1795
        // If someone creates a duplicate stream that is identical we will get this request forwarded to us.
28,031✔
1796
        // Make sure the response type is for a create call.
28,031✔
1797
        if rt := getHeader(JSResponseType, hdr); len(rt) > 0 && string(rt) == jsCreateResponse {
28,031✔
1798
                resp.ApiResponse.Type = JSApiStreamCreateResponseType
×
1799
        }
×
1800
        if errorOnRequiredApiLevel(hdr) {
28,032✔
1801
                resp.Error = NewJSRequiredApiLevelError()
1✔
1802
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1803
                return
1✔
1804
        }
1✔
1805

1806
        var clusterWideConsCount int
28,030✔
1807

28,030✔
1808
        js, cc := s.getJetStreamCluster()
28,030✔
1809
        if js == nil {
28,030✔
1810
                return
×
1811
        }
×
1812
        // If we are in clustered mode we need to be the stream leader to proceed.
1813
        if cc != nil {
40,818✔
1814
                // Check to make sure the stream is assigned.
12,788✔
1815
                js.mu.RLock()
12,788✔
1816
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName)
12,788✔
1817
                var offline bool
12,788✔
1818
                if sa != nil {
24,368✔
1819
                        clusterWideConsCount = len(sa.consumers)
11,580✔
1820
                        offline = s.allPeersOffline(sa.Group)
11,580✔
1821
                        if sa.unsupported != nil && sa.Group != nil && cc.meta != nil && sa.Group.isMember(cc.meta.ID()) {
11,610✔
1822
                                // If we're a member for this stream, and it's not supported, report it as offline.
30✔
1823
                                resp.Error = NewJSStreamOfflineReasonError(errors.New(sa.unsupported.reason))
30✔
1824
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
30✔
1825
                                js.mu.RUnlock()
30✔
1826
                                return
30✔
1827
                        }
30✔
1828
                }
1829
                js.mu.RUnlock()
12,758✔
1830

12,758✔
1831
                if isLeader && sa == nil {
13,018✔
1832
                        // We can't find the stream, so mimic what would be the errors below.
260✔
1833
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
260✔
1834
                                if doErr {
×
1835
                                        resp.Error = NewJSNotEnabledForAccountError()
×
1836
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1837
                                }
×
1838
                                return
×
1839
                        }
1840
                        // No stream present.
1841
                        resp.Error = NewJSStreamNotFoundError()
260✔
1842
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
260✔
1843
                        return
260✔
1844
                } else if sa == nil {
13,446✔
1845
                        if js.isLeaderless() {
948✔
1846
                                resp.Error = NewJSClusterNotAvailError()
×
1847
                                // Delaying an error response gives the leader a chance to respond before us
×
1848
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
1849
                        }
×
1850
                        return
948✔
1851
                } else if isLeader && offline {
11,552✔
1852
                        resp.Error = NewJSStreamOfflineError()
2✔
1853
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
1854
                        return
2✔
1855
                }
2✔
1856

1857
                // Check to see if we are a member of the group and if the group has no leader.
1858
                isLeaderless := js.isGroupLeaderless(sa.Group)
11,548✔
1859

11,548✔
1860
                // We have the stream assigned and a leader, so only the stream leader should answer.
11,548✔
1861
                if !acc.JetStreamIsStreamLeader(streamName) && !isLeaderless {
20,897✔
1862
                        if js.isLeaderless() {
9,349✔
1863
                                resp.Error = NewJSClusterNotAvailError()
×
1864
                                // Delaying an error response gives the leader a chance to respond before us
×
1865
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), sa.Group, errRespDelay)
×
1866
                                return
×
1867
                        }
×
1868

1869
                        // We may be in process of electing a leader, but if this is a scale up from 1 we will still be the state leader
1870
                        // while the new members work through the election and catchup process.
1871
                        // Double check for that instead of exiting here and being silent. e.g. nats stream update test --replicas=3
1872
                        js.mu.RLock()
9,349✔
1873
                        rg := sa.Group
9,349✔
1874
                        var ourID string
9,349✔
1875
                        if cc.meta != nil {
18,698✔
1876
                                ourID = cc.meta.ID()
9,349✔
1877
                        }
9,349✔
1878
                        // We have seen cases where rg is nil at this point,
1879
                        // so check explicitly and bail if that is the case.
1880
                        bail := rg == nil || !rg.isMember(ourID)
9,349✔
1881
                        if !bail {
12,652✔
1882
                                // We know we are a member here, if this group is new and we are preferred allow us to answer.
3,303✔
1883
                                // Also, we have seen cases where rg.node is nil at this point,
3,303✔
1884
                                // so check explicitly and bail if that is the case.
3,303✔
1885
                                bail = rg.Preferred != ourID || (rg.node != nil && time.Since(rg.node.Created()) > lostQuorumIntervalDefault)
3,303✔
1886
                        }
3,303✔
1887
                        js.mu.RUnlock()
9,349✔
1888
                        if bail {
18,684✔
1889
                                return
9,335✔
1890
                        }
9,335✔
1891
                }
1892
        }
1893

1894
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
17,461✔
1895
                if doErr {
7✔
1896
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
1897
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1898
                }
1✔
1899
                return
6✔
1900
        }
1901

1902
        var details bool
17,449✔
1903
        var subjects string
17,449✔
1904
        var offset int
17,449✔
1905
        if isJSONObjectOrArray(msg) {
17,484✔
1906
                var req JSApiStreamInfoRequest
35✔
1907
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
36✔
1908
                        resp.Error = NewJSInvalidJSONError(err)
1✔
1909
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1910
                        return
1✔
1911
                }
1✔
1912
                details, subjects = req.DeletedDetails, req.SubjectsFilter
34✔
1913
                offset = req.Offset
34✔
1914
        }
1915

1916
        mset, err := acc.lookupStream(streamName)
17,448✔
1917
        // Error is not to be expected at this point, but could happen if same stream trying to be created.
17,448✔
1918
        if err != nil {
18,186✔
1919
                if cc != nil {
739✔
1920
                        // This could be inflight, pause for a short bit and try again.
1✔
1921
                        // This will not be inline, so ok.
1✔
1922
                        time.Sleep(10 * time.Millisecond)
1✔
1923
                        mset, err = acc.lookupStream(streamName)
1✔
1924
                }
1✔
1925
                // Check again.
1926
                if err != nil {
1,475✔
1927
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
737✔
1928
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
737✔
1929
                        return
737✔
1930
                }
737✔
1931
        }
1932

1933
        if mset.offlineReason != _EMPTY_ {
16,712✔
1934
                resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason))
1✔
1935
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
1936
                return
1✔
1937
        }
1✔
1938

1939
        config := mset.config()
16,710✔
1940
        resp.StreamInfo = &StreamInfo{
16,710✔
1941
                Created:    mset.createdTime(),
16,710✔
1942
                State:      mset.stateWithDetail(details),
16,710✔
1943
                Config:     *setDynamicStreamMetadata(&config),
16,710✔
1944
                Domain:     s.getOpts().JetStreamDomain,
16,710✔
1945
                Cluster:    js.clusterInfo(mset.raftGroup()),
16,710✔
1946
                Mirror:     mset.mirrorInfo(),
16,710✔
1947
                Sources:    mset.sourcesInfo(),
16,710✔
1948
                Alternates: js.streamAlternates(ci, config.Name),
16,710✔
1949
                TimeStamp:  time.Now().UTC(),
16,710✔
1950
        }
16,710✔
1951
        if clusterWideConsCount > 0 {
17,203✔
1952
                resp.StreamInfo.State.Consumers = clusterWideConsCount
493✔
1953
        }
493✔
1954

1955
        // Check if they have asked for subject details.
1956
        if subjects != _EMPTY_ {
16,742✔
1957
                st := mset.store.SubjectsTotals(subjects)
32✔
1958
                if lst := len(st); lst > 0 {
60✔
1959
                        // Common for both cases.
28✔
1960
                        resp.Offset = offset
28✔
1961
                        resp.Limit = JSMaxSubjectDetails
28✔
1962
                        resp.Total = lst
28✔
1963

28✔
1964
                        if offset == 0 && lst <= JSMaxSubjectDetails {
56✔
1965
                                resp.StreamInfo.State.Subjects = st
28✔
1966
                        } else {
28✔
1967
                                // Here we have to filter list due to offset or maximum constraints.
×
1968
                                subjs := make([]string, 0, len(st))
×
1969
                                for subj := range st {
×
1970
                                        subjs = append(subjs, subj)
×
1971
                                }
×
1972
                                // Sort it
1973
                                slices.Sort(subjs)
×
1974

×
1975
                                if offset > len(subjs) {
×
1976
                                        offset = len(subjs)
×
1977
                                }
×
1978

1979
                                end := offset + JSMaxSubjectDetails
×
1980
                                if end > len(subjs) {
×
1981
                                        end = len(subjs)
×
1982
                                }
×
1983
                                actualSize := end - offset
×
1984
                                var sd map[string]uint64
×
1985

×
1986
                                if actualSize > 0 {
×
1987
                                        sd = make(map[string]uint64, actualSize)
×
1988
                                        for _, ss := range subjs[offset:end] {
×
1989
                                                sd[ss] = st[ss]
×
1990
                                        }
×
1991
                                }
1992
                                resp.StreamInfo.State.Subjects = sd
×
1993
                        }
1994
                }
1995
        }
1996
        // Check for out of band catchups.
1997
        if mset.hasCatchupPeers() {
16,710✔
1998
                mset.checkClusterInfo(resp.StreamInfo.Cluster)
×
1999
        }
×
2000

2001
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
16,710✔
2002
}
2003

2004
// Request to have a stream leader stepdown.
2005
func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
240✔
2006
        if c == nil || !s.JetStreamEnabled() {
240✔
2007
                return
×
2008
        }
×
2009
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
240✔
2010
        if err != nil {
240✔
2011
                s.Warnf(badAPIRequestT, msg)
×
2012
                return
×
2013
        }
×
2014

2015
        // Have extra token for this one.
2016
        name := tokenAt(subject, 6)
240✔
2017

240✔
2018
        var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}}
240✔
2019
        if errorOnRequiredApiLevel(hdr) {
241✔
2020
                resp.Error = NewJSRequiredApiLevelError()
1✔
2021
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2022
                return
1✔
2023
        }
1✔
2024

2025
        // If we are not in clustered mode this is a failed request.
2026
        if !s.JetStreamIsClustered() {
239✔
2027
                resp.Error = NewJSClusterRequiredError()
×
2028
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2029
                return
×
2030
        }
×
2031

2032
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2033
        js, cc := s.getJetStreamCluster()
239✔
2034
        if js == nil || cc == nil {
239✔
2035
                return
×
2036
        }
×
2037
        if js.isLeaderless() {
239✔
2038
                resp.Error = NewJSClusterNotAvailError()
×
2039
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2040
                return
×
2041
        }
×
2042

2043
        js.mu.RLock()
239✔
2044
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
239✔
2045
        js.mu.RUnlock()
239✔
2046

239✔
2047
        if isLeader && sa == nil {
239✔
2048
                resp.Error = NewJSStreamNotFoundError()
×
2049
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2050
                return
×
2051
        } else if sa == nil {
240✔
2052
                return
1✔
2053
        }
1✔
2054

2055
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
238✔
2056
                if doErr {
×
2057
                        resp.Error = NewJSNotEnabledForAccountError()
×
2058
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2059
                }
×
2060
                return
×
2061
        }
2062

2063
        // Check to see if we are a member of the group and if the group has no leader.
2064
        if js.isGroupLeaderless(sa.Group) {
238✔
2065
                resp.Error = NewJSClusterNotAvailError()
×
2066
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2067
                return
×
2068
        }
×
2069

2070
        // We have the stream assigned and a leader, so only the stream leader should answer.
2071
        if !acc.JetStreamIsStreamLeader(name) {
425✔
2072
                return
187✔
2073
        }
187✔
2074

2075
        mset, err := acc.lookupStream(name)
51✔
2076
        if err != nil {
51✔
2077
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
2078
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2079
                return
×
2080
        }
×
2081

2082
        if mset == nil {
51✔
2083
                resp.Success = true
×
2084
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2085
                return
×
2086
        }
×
2087

2088
        node := mset.raftNode()
51✔
2089
        if node == nil {
51✔
2090
                resp.Success = true
×
2091
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2092
                return
×
2093
        }
×
2094

2095
        var preferredLeader string
51✔
2096
        if isJSONObjectOrArray(msg) {
64✔
2097
                var req JSApiLeaderStepdownRequest
13✔
2098
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2099
                        resp.Error = NewJSInvalidJSONError(err)
×
2100
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2101
                        return
×
2102
                }
×
2103
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(node, req.Placement); resp.Error != nil {
18✔
2104
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2105
                        return
5✔
2106
                }
5✔
2107
        }
2108

2109
        // Call actual stepdown.
2110
        err = node.StepDown(preferredLeader)
46✔
2111
        if err != nil {
46✔
2112
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2113
        } else {
46✔
2114
                resp.Success = true
46✔
2115
        }
46✔
2116
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
46✔
2117
}
2118

2119
// Request to have a consumer leader stepdown.
2120
func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
227✔
2121
        if c == nil || !s.JetStreamEnabled() {
227✔
2122
                return
×
2123
        }
×
2124
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
227✔
2125
        if err != nil {
227✔
2126
                s.Warnf(badAPIRequestT, msg)
×
2127
                return
×
2128
        }
×
2129

2130
        var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}}
227✔
2131
        if errorOnRequiredApiLevel(hdr) {
228✔
2132
                resp.Error = NewJSRequiredApiLevelError()
1✔
2133
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2134
                return
1✔
2135
        }
1✔
2136

2137
        // If we are not in clustered mode this is a failed request.
2138
        if !s.JetStreamIsClustered() {
226✔
2139
                resp.Error = NewJSClusterRequiredError()
×
2140
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2141
                return
×
2142
        }
×
2143

2144
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2145
        js, cc := s.getJetStreamCluster()
226✔
2146
        if js == nil || cc == nil {
226✔
2147
                return
×
2148
        }
×
2149
        if js.isLeaderless() {
226✔
2150
                resp.Error = NewJSClusterNotAvailError()
×
2151
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2152
                return
×
2153
        }
×
2154

2155
        // Have extra token for this one.
2156
        stream := tokenAt(subject, 6)
226✔
2157
        consumer := tokenAt(subject, 7)
226✔
2158

226✔
2159
        js.mu.RLock()
226✔
2160
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
226✔
2161
        js.mu.RUnlock()
226✔
2162

226✔
2163
        if isLeader && sa == nil {
226✔
2164
                resp.Error = NewJSStreamNotFoundError()
×
2165
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2166
                return
×
2167
        } else if sa == nil {
226✔
2168
                return
×
2169
        }
×
2170
        var ca *consumerAssignment
226✔
2171
        if sa.consumers != nil {
452✔
2172
                ca = sa.consumers[consumer]
226✔
2173
        }
226✔
2174
        if ca == nil {
226✔
2175
                resp.Error = NewJSConsumerNotFoundError()
×
2176
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2177
                return
×
2178
        }
×
2179
        // Check to see if we are a member of the group and if the group has no leader.
2180
        if js.isGroupLeaderless(ca.Group) {
226✔
2181
                resp.Error = NewJSClusterNotAvailError()
×
2182
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2183
                return
×
2184
        }
×
2185

2186
        if !acc.JetStreamIsConsumerLeader(stream, consumer) {
389✔
2187
                return
163✔
2188
        }
163✔
2189

2190
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
63✔
2191
                if doErr {
×
2192
                        resp.Error = NewJSNotEnabledForAccountError()
×
2193
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2194
                }
×
2195
                return
×
2196
        }
2197

2198
        mset, err := acc.lookupStream(stream)
63✔
2199
        if err != nil {
63✔
2200
                resp.Error = NewJSStreamNotFoundError()
×
2201
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2202
                return
×
2203
        }
×
2204
        o := mset.lookupConsumer(consumer)
63✔
2205
        if o == nil {
63✔
2206
                resp.Error = NewJSConsumerNotFoundError()
×
2207
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2208
                return
×
2209
        }
×
2210

2211
        n := o.raftNode()
63✔
2212
        if n == nil {
63✔
2213
                resp.Success = true
×
2214
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2215
                return
×
2216
        }
×
2217

2218
        var preferredLeader string
63✔
2219
        if isJSONObjectOrArray(msg) {
76✔
2220
                var req JSApiLeaderStepdownRequest
13✔
2221
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2222
                        resp.Error = NewJSInvalidJSONError(err)
×
2223
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2224
                        return
×
2225
                }
×
2226
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(n, req.Placement); resp.Error != nil {
18✔
2227
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2228
                        return
5✔
2229
                }
5✔
2230
        }
2231

2232
        // Call actual stepdown.
2233
        err = n.StepDown(preferredLeader)
58✔
2234
        if err != nil {
58✔
2235
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2236
        } else {
58✔
2237
                resp.Success = true
58✔
2238
        }
58✔
2239
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
58✔
2240
}
2241

2242
// Request to remove a peer from a clustered stream.
2243
func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
70✔
2244
        if c == nil || !s.JetStreamEnabled() {
70✔
2245
                return
×
2246
        }
×
2247
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
70✔
2248
        if err != nil {
70✔
2249
                s.Warnf(badAPIRequestT, msg)
×
2250
                return
×
2251
        }
×
2252

2253
        // Have extra token for this one.
2254
        name := tokenAt(subject, 6)
70✔
2255

70✔
2256
        var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}}
70✔
2257
        if errorOnRequiredApiLevel(hdr) {
71✔
2258
                resp.Error = NewJSRequiredApiLevelError()
1✔
2259
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2260
                return
1✔
2261
        }
1✔
2262

2263
        // If we are not in clustered mode this is a failed request.
2264
        if !s.JetStreamIsClustered() {
69✔
2265
                resp.Error = NewJSClusterRequiredError()
×
2266
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2267
                return
×
2268
        }
×
2269

2270
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2271
        js, cc := s.getJetStreamCluster()
69✔
2272
        if js == nil || cc == nil {
69✔
2273
                return
×
2274
        }
×
2275
        if js.isLeaderless() {
69✔
2276
                resp.Error = NewJSClusterNotAvailError()
×
2277
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2278
                return
×
2279
        }
×
2280

2281
        js.mu.RLock()
69✔
2282
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
69✔
2283
        js.mu.RUnlock()
69✔
2284

69✔
2285
        // Make sure we are meta leader.
69✔
2286
        if !isLeader {
127✔
2287
                return
58✔
2288
        }
58✔
2289

2290
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
2291
                if doErr {
×
2292
                        resp.Error = NewJSNotEnabledForAccountError()
×
2293
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2294
                }
×
2295
                return
×
2296
        }
2297
        if isEmptyRequest(msg) {
11✔
2298
                resp.Error = NewJSBadRequestError()
×
2299
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2300
                return
×
2301
        }
×
2302

2303
        var req JSApiStreamRemovePeerRequest
11✔
2304
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
2305
                resp.Error = NewJSInvalidJSONError(err)
×
2306
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2307
                return
×
2308
        }
×
2309
        if req.Peer == _EMPTY_ {
11✔
2310
                resp.Error = NewJSBadRequestError()
×
2311
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2312
                return
×
2313
        }
×
2314

2315
        if sa == nil {
11✔
2316
                // No stream present.
×
2317
                resp.Error = NewJSStreamNotFoundError()
×
2318
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2319
                return
×
2320
        }
×
2321

2322
        // Check to see if we are a member of the group and if the group has no leader.
2323
        // Peers here is a server name, convert to node name.
2324
        nodeName := getHash(req.Peer)
11✔
2325

11✔
2326
        js.mu.RLock()
11✔
2327
        rg := sa.Group
11✔
2328
        isMember := rg.isMember(nodeName)
11✔
2329
        js.mu.RUnlock()
11✔
2330

11✔
2331
        // Make sure we are a member.
11✔
2332
        if !isMember {
12✔
2333
                resp.Error = NewJSClusterPeerNotMemberError()
1✔
2334
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2335
                return
1✔
2336
        }
1✔
2337

2338
        // If we are here we have a valid peer member set for removal.
2339
        if !js.removePeerFromStream(sa, nodeName) {
12✔
2340
                resp.Error = NewJSPeerRemapError()
2✔
2341
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2342
                return
2✔
2343
        }
2✔
2344

2345
        resp.Success = true
8✔
2346
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
8✔
2347
}
2348

2349
// Request to have the metaleader remove a peer from the system.
2350
func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
13✔
2351
        if c == nil || !s.JetStreamEnabled() {
13✔
2352
                return
×
2353
        }
×
2354

2355
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
13✔
2356
        if err != nil {
13✔
2357
                s.Warnf(badAPIRequestT, msg)
×
2358
                return
×
2359
        }
×
2360
        if acc != s.SystemAccount() {
13✔
2361
                return
×
2362
        }
×
2363

2364
        js, cc := s.getJetStreamCluster()
13✔
2365
        if js == nil || cc == nil {
13✔
2366
                return
×
2367
        }
×
2368

2369
        js.mu.RLock()
13✔
2370
        isLeader := cc.isLeader()
13✔
2371
        meta := cc.meta
13✔
2372
        js.mu.RUnlock()
13✔
2373

13✔
2374
        // Extra checks here but only leader is listening.
13✔
2375
        if !isLeader {
13✔
2376
                return
×
2377
        }
×
2378

2379
        var resp = JSApiMetaServerRemoveResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerRemoveResponseType}}
13✔
2380
        if errorOnRequiredApiLevel(hdr) {
13✔
2381
                resp.Error = NewJSRequiredApiLevelError()
×
2382
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2383
                return
×
2384
        }
×
2385

2386
        if isEmptyRequest(msg) {
13✔
2387
                resp.Error = NewJSBadRequestError()
×
2388
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2389
                return
×
2390
        }
×
2391

2392
        var req JSApiMetaServerRemoveRequest
13✔
2393
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2394
                resp.Error = NewJSInvalidJSONError(err)
×
2395
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2396
                return
×
2397
        }
×
2398

2399
        js.mu.Lock()
13✔
2400
        defer js.mu.Unlock()
13✔
2401

13✔
2402
        // Another peer-remove is already in progress, don't allow multiple concurrent changes.
13✔
2403
        if cc.peerRemoveReply != nil {
15✔
2404
                resp.Error = NewJSClusterServerMemberChangeInflightError()
2✔
2405
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2406
                return
2✔
2407
        }
2✔
2408

2409
        var found string
11✔
2410
        for _, p := range meta.Peers() {
51✔
2411
                // If Peer is specified, it takes precedence
40✔
2412
                if req.Peer != _EMPTY_ {
46✔
2413
                        if p.ID == req.Peer {
7✔
2414
                                found = req.Peer
1✔
2415
                                break
1✔
2416
                        }
2417
                        continue
5✔
2418
                }
2419
                si, ok := s.nodeToInfo.Load(p.ID)
34✔
2420
                if ok && si.(nodeInfo).name == req.Server {
40✔
2421
                        found = p.ID
6✔
2422
                        break
6✔
2423
                }
2424
        }
2425

2426
        if found == _EMPTY_ {
15✔
2427
                resp.Error = NewJSClusterServerNotMemberError()
4✔
2428
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
2429
                return
4✔
2430
        }
4✔
2431

2432
        if err := meta.ProposeRemovePeer(found); err != nil {
7✔
2433
                if err == errMembershipChange {
×
2434
                        resp.Error = NewJSClusterServerMemberChangeInflightError()
×
2435
                } else {
×
2436
                        resp.Error = NewJSRaftGeneralError(err)
×
2437
                }
×
2438
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2439
                return
×
2440
        }
2441

2442
        if cc.peerRemoveReply == nil {
14✔
2443
                cc.peerRemoveReply = make(map[string]peerRemoveInfo, 1)
7✔
2444
        }
7✔
2445
        // Only copy the request, the subject and reply are already copied.
2446
        cc.peerRemoveReply[found] = peerRemoveInfo{ci: ci, subject: subject, reply: reply, request: string(msg)}
7✔
2447
}
2448

2449
func (s *Server) peerSetToNames(ps []string) []string {
170✔
2450
        names := make([]string, len(ps))
170✔
2451
        for i := 0; i < len(ps); i++ {
636✔
2452
                if si, ok := s.nodeToInfo.Load(ps[i]); !ok {
466✔
2453
                        names[i] = ps[i]
×
2454
                } else {
466✔
2455
                        names[i] = si.(nodeInfo).name
466✔
2456
                }
466✔
2457
        }
2458
        return names
170✔
2459
}
2460

2461
// looks up the peer id for a given server name. Cluster and domain name are optional filter criteria
2462
func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string {
29✔
2463
        js.mu.RLock()
29✔
2464
        defer js.mu.RUnlock()
29✔
2465
        if cc := js.cluster; cc != nil {
58✔
2466
                for _, p := range cc.meta.Peers() {
171✔
2467
                        si, ok := s.nodeToInfo.Load(p.ID)
142✔
2468
                        if ok && si.(nodeInfo).name == serverName {
171✔
2469
                                if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
58✔
2470
                                        if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
58✔
2471
                                                return p.ID
29✔
2472
                                        }
29✔
2473
                                }
2474
                        }
2475
                }
2476
        }
2477
        return _EMPTY_
×
2478
}
2479

2480
// Request to have the metaleader move a stream on a peer to another
2481
func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
33✔
2482
        if c == nil || !s.JetStreamEnabled() {
33✔
2483
                return
×
2484
        }
×
2485

2486
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
33✔
2487
        if err != nil {
33✔
2488
                s.Warnf(badAPIRequestT, msg)
×
2489
                return
×
2490
        }
×
2491

2492
        js, cc := s.getJetStreamCluster()
33✔
2493
        if js == nil || cc == nil {
33✔
2494
                return
×
2495
        }
×
2496

2497
        // Extra checks here but only leader is listening.
2498
        js.mu.RLock()
33✔
2499
        isLeader := cc.isLeader()
33✔
2500
        js.mu.RUnlock()
33✔
2501

33✔
2502
        if !isLeader {
33✔
2503
                return
×
2504
        }
×
2505

2506
        accName := tokenAt(subject, 6)
33✔
2507
        streamName := tokenAt(subject, 7)
33✔
2508

33✔
2509
        if acc.GetName() != accName && acc != s.SystemAccount() {
33✔
2510
                return
×
2511
        }
×
2512

2513
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
33✔
2514
        if errorOnRequiredApiLevel(hdr) {
33✔
2515
                resp.Error = NewJSRequiredApiLevelError()
×
2516
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2517
                return
×
2518
        }
×
2519

2520
        var req JSApiMetaServerStreamMoveRequest
33✔
2521
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
33✔
2522
                resp.Error = NewJSInvalidJSONError(err)
×
2523
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2524
                return
×
2525
        }
×
2526

2527
        srcPeer := _EMPTY_
33✔
2528
        if req.Server != _EMPTY_ {
62✔
2529
                srcPeer = s.nameToPeer(js, req.Server, req.Cluster, req.Domain)
29✔
2530
        }
29✔
2531

2532
        targetAcc, ok := s.accounts.Load(accName)
33✔
2533
        if !ok {
33✔
2534
                resp.Error = NewJSNoAccountError()
×
2535
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2536
                return
×
2537
        }
×
2538

2539
        var streamFound bool
33✔
2540
        cfg := StreamConfig{}
33✔
2541
        currPeers := []string{}
33✔
2542
        currCluster := _EMPTY_
33✔
2543
        js.mu.Lock()
33✔
2544
        streams, ok := cc.streams[accName]
33✔
2545
        if ok {
66✔
2546
                sa, ok := streams[streamName]
33✔
2547
                if ok {
66✔
2548
                        cfg = *sa.Config.clone()
33✔
2549
                        streamFound = true
33✔
2550
                        currPeers = sa.Group.Peers
33✔
2551
                        currCluster = sa.Group.Cluster
33✔
2552
                }
33✔
2553
        }
2554
        js.mu.Unlock()
33✔
2555

33✔
2556
        if !streamFound {
33✔
2557
                resp.Error = NewJSStreamNotFoundError()
×
2558
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2559
                return
×
2560
        }
×
2561

2562
        // if server was picked, make sure src peer exists and move it to first position.
2563
        // removal will drop peers from the left
2564
        if req.Server != _EMPTY_ {
62✔
2565
                if srcPeer == _EMPTY_ {
29✔
2566
                        resp.Error = NewJSClusterServerNotMemberError()
×
2567
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2568
                        return
×
2569
                }
×
2570
                var peerFound bool
29✔
2571
                for i := 0; i < len(currPeers); i++ {
79✔
2572
                        if currPeers[i] == srcPeer {
79✔
2573
                                copy(currPeers[1:], currPeers[:i])
29✔
2574
                                currPeers[0] = srcPeer
29✔
2575
                                peerFound = true
29✔
2576
                                break
29✔
2577
                        }
2578
                }
2579
                if !peerFound {
29✔
2580
                        resp.Error = NewJSClusterPeerNotMemberError()
×
2581
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2582
                        return
×
2583
                }
×
2584
        }
2585

2586
        // make sure client is scoped to requested account
2587
        ciNew := *(ci)
33✔
2588
        ciNew.Account = accName
33✔
2589

33✔
2590
        // backup placement such that peers can be looked up with modified tag list
33✔
2591
        var origPlacement *Placement
33✔
2592
        if cfg.Placement != nil {
33✔
2593
                tmp := *cfg.Placement
×
2594
                origPlacement = &tmp
×
2595
        }
×
2596

2597
        if len(req.Tags) > 0 {
60✔
2598
                if cfg.Placement == nil {
54✔
2599
                        cfg.Placement = &Placement{}
27✔
2600
                }
27✔
2601
                cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...)
27✔
2602
        }
2603

2604
        peers, e := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1, nil)
33✔
2605
        if len(peers) <= cfg.Replicas {
35✔
2606
                // since expanding in the same cluster did not yield a result, try in different cluster
2✔
2607
                peers = nil
2✔
2608

2✔
2609
                clusters := map[string]struct{}{}
2✔
2610
                s.nodeToInfo.Range(func(_, ni any) bool {
18✔
2611
                        if currCluster != ni.(nodeInfo).cluster {
24✔
2612
                                clusters[ni.(nodeInfo).cluster] = struct{}{}
8✔
2613
                        }
8✔
2614
                        return true
16✔
2615
                })
2616
                errs := &selectPeerError{}
2✔
2617
                errs.accumulate(e)
2✔
2618
                for cluster := range clusters {
4✔
2619
                        newPeers, e := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0, nil)
2✔
2620
                        if len(newPeers) >= cfg.Replicas {
4✔
2621
                                peers = append([]string{}, currPeers...)
2✔
2622
                                peers = append(peers, newPeers[:cfg.Replicas]...)
2✔
2623
                                break
2✔
2624
                        }
2625
                        errs.accumulate(e)
×
2626
                }
2627
                if peers == nil {
2✔
2628
                        resp.Error = NewJSClusterNoPeersError(errs)
×
2629
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2630
                        return
×
2631
                }
×
2632
        }
2633

2634
        cfg.Placement = origPlacement
33✔
2635

33✔
2636
        s.Noticef("Requested move for stream '%s > %s' R=%d from %+v to %+v",
33✔
2637
                accName, streamName, cfg.Replicas, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
33✔
2638

33✔
2639
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
33✔
2640
        // We should be fine ignoring pedantic mode here. as we do not touch configuration.
33✔
2641
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
33✔
2642
}
2643

2644
// Request to have the metaleader move a stream on a peer to another
2645
func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
4✔
2646
        if c == nil || !s.JetStreamEnabled() {
4✔
2647
                return
×
2648
        }
×
2649

2650
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
4✔
2651
        if err != nil {
4✔
2652
                s.Warnf(badAPIRequestT, msg)
×
2653
                return
×
2654
        }
×
2655

2656
        js, cc := s.getJetStreamCluster()
4✔
2657
        if js == nil || cc == nil {
4✔
2658
                return
×
2659
        }
×
2660

2661
        // Extra checks here but only leader is listening.
2662
        js.mu.RLock()
4✔
2663
        isLeader := cc.isLeader()
4✔
2664
        js.mu.RUnlock()
4✔
2665

4✔
2666
        if !isLeader {
4✔
2667
                return
×
2668
        }
×
2669

2670
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
4✔
2671
        if errorOnRequiredApiLevel(hdr) {
4✔
2672
                resp.Error = NewJSRequiredApiLevelError()
×
2673
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2674
                return
×
2675
        }
×
2676

2677
        accName := tokenAt(subject, 6)
4✔
2678
        streamName := tokenAt(subject, 7)
4✔
2679

4✔
2680
        if acc.GetName() != accName && acc != s.SystemAccount() {
4✔
2681
                return
×
2682
        }
×
2683

2684
        targetAcc, ok := s.accounts.Load(accName)
4✔
2685
        if !ok {
4✔
2686
                resp.Error = NewJSNoAccountError()
×
2687
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2688
                return
×
2689
        }
×
2690

2691
        streamFound := false
4✔
2692
        cfg := StreamConfig{}
4✔
2693
        currPeers := []string{}
4✔
2694
        js.mu.Lock()
4✔
2695
        streams, ok := cc.streams[accName]
4✔
2696
        if ok {
8✔
2697
                sa, ok := streams[streamName]
4✔
2698
                if ok {
8✔
2699
                        cfg = *sa.Config.clone()
4✔
2700
                        streamFound = true
4✔
2701
                        currPeers = sa.Group.Peers
4✔
2702
                }
4✔
2703
        }
2704
        js.mu.Unlock()
4✔
2705

4✔
2706
        if !streamFound {
4✔
2707
                resp.Error = NewJSStreamNotFoundError()
×
2708
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2709
                return
×
2710
        }
×
2711

2712
        if len(currPeers) <= cfg.Replicas {
4✔
2713
                resp.Error = NewJSStreamMoveNotInProgressError()
×
2714
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2715
                return
×
2716
        }
×
2717

2718
        // make sure client is scoped to requested account
2719
        ciNew := *(ci)
4✔
2720
        ciNew.Account = accName
4✔
2721

4✔
2722
        peers := currPeers[:cfg.Replicas]
4✔
2723

4✔
2724
        // Remove placement in case tags don't match
4✔
2725
        // This can happen if the move was initiated by modifying the tags.
4✔
2726
        // This is an account operation.
4✔
2727
        // This can NOT happen when the move was initiated by the system account.
4✔
2728
        // There move honors the original tag list.
4✔
2729
        if cfg.Placement != nil && len(cfg.Placement.Tags) != 0 {
5✔
2730
        FOR_TAGCHECK:
1✔
2731
                for _, peer := range peers {
2✔
2732
                        si, ok := s.nodeToInfo.Load(peer)
1✔
2733
                        if !ok {
1✔
2734
                                // can't verify tags, do the safe thing and error
×
2735
                                resp.Error = NewJSStreamGeneralError(
×
2736
                                        fmt.Errorf("peer %s not present for tag validation", peer))
×
2737
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2738
                                return
×
2739
                        }
×
2740
                        nodeTags := si.(nodeInfo).tags
1✔
2741
                        for _, tag := range cfg.Placement.Tags {
2✔
2742
                                if !nodeTags.Contains(tag) {
2✔
2743
                                        // clear placement as tags don't match
1✔
2744
                                        cfg.Placement = nil
1✔
2745
                                        break FOR_TAGCHECK
1✔
2746
                                }
2747
                        }
2748

2749
                }
2750
        }
2751

2752
        s.Noticef("Requested cancel of move: R=%d '%s > %s' to peer set %+v and restore previous peer set %+v",
4✔
2753
                cfg.Replicas, accName, streamName, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
4✔
2754

4✔
2755
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
4✔
2756
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
4✔
2757
}
2758

2759
// Request to have an account purged
2760
func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
6✔
2761
        if c == nil || !s.JetStreamEnabled() {
6✔
2762
                return
×
2763
        }
×
2764

2765
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
6✔
2766
        if err != nil {
6✔
2767
                s.Warnf(badAPIRequestT, msg)
×
2768
                return
×
2769
        }
×
2770
        if acc != s.SystemAccount() {
6✔
2771
                return
×
2772
        }
×
2773

2774
        js := s.getJetStream()
6✔
2775
        if js == nil {
6✔
2776
                return
×
2777
        }
×
2778

2779
        accName := tokenAt(subject, 5)
6✔
2780

6✔
2781
        var resp = JSApiAccountPurgeResponse{ApiResponse: ApiResponse{Type: JSApiAccountPurgeResponseType}}
6✔
2782
        if errorOnRequiredApiLevel(hdr) {
6✔
2783
                resp.Error = NewJSRequiredApiLevelError()
×
2784
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2785
                return
×
2786
        }
×
2787

2788
        if !s.JetStreamIsClustered() {
8✔
2789
                var streams []*stream
2✔
2790
                var ac *Account
2✔
2791
                if ac, err = s.lookupAccount(accName); err == nil && ac != nil {
3✔
2792
                        streams = ac.streams()
1✔
2793
                }
1✔
2794

2795
                s.Noticef("Purge request for account %s (streams: %d, hasAccount: %t)",
2✔
2796
                        accName, len(streams), ac != nil)
2✔
2797

2✔
2798
                for _, mset := range streams {
3✔
2799
                        err := mset.delete()
1✔
2800
                        if err != nil {
1✔
2801
                                resp.Error = NewJSStreamDeleteError(err)
×
2802
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2803
                                return
×
2804
                        }
×
2805
                }
2806
                if err := os.RemoveAll(filepath.Join(js.config.StoreDir, accName)); err != nil {
2✔
2807
                        resp.Error = NewJSStreamGeneralError(err)
×
2808
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2809
                        return
×
2810
                }
×
2811
                resp.Initiated = true
2✔
2812
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2813
                return
2✔
2814
        }
2815

2816
        _, cc := s.getJetStreamCluster()
4✔
2817

4✔
2818
        js.mu.RLock()
4✔
2819
        isLeader := cc.isLeader()
4✔
2820
        meta := cc.meta
4✔
2821
        js.mu.RUnlock()
4✔
2822

4✔
2823
        if !isLeader {
4✔
2824
                return
×
2825
        }
×
2826

2827
        if js.isMetaRecovering() {
4✔
2828
                // While in recovery mode, the data structures are not fully initialized
×
2829
                resp.Error = NewJSClusterNotAvailError()
×
2830
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2831
                return
×
2832
        }
×
2833

2834
        js.mu.RLock()
4✔
2835
        ns, nc := 0, 0
4✔
2836
        streams, hasAccount := cc.streams[accName]
4✔
2837
        for _, osa := range streams {
12✔
2838
                for _, oca := range osa.consumers {
20✔
2839
                        oca.deleted = true
12✔
2840
                        ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client, Created: oca.Created}
12✔
2841
                        meta.Propose(encodeDeleteConsumerAssignment(ca))
12✔
2842
                        nc++
12✔
2843
                }
12✔
2844
                sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client, Created: osa.Created}
8✔
2845
                meta.Propose(encodeDeleteStreamAssignment(sa))
8✔
2846
                ns++
8✔
2847
        }
2848
        js.mu.RUnlock()
4✔
2849

4✔
2850
        s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)", accName, ns, nc, hasAccount)
4✔
2851

4✔
2852
        resp.Initiated = true
4✔
2853
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
2854
}
2855

2856
// Request to have the meta leader stepdown.
2857
// These will only be received by the meta leader, so less checking needed.
2858
func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
24✔
2859
        if c == nil || !s.JetStreamEnabled() {
24✔
2860
                return
×
2861
        }
×
2862

2863
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
24✔
2864
        if err != nil {
24✔
2865
                s.Warnf(badAPIRequestT, msg)
×
2866
                return
×
2867
        }
×
2868

2869
        // This should only be coming from the System Account.
2870
        if acc != s.SystemAccount() {
25✔
2871
                s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User)
1✔
2872
                return
1✔
2873
        }
1✔
2874

2875
        js, cc := s.getJetStreamCluster()
23✔
2876
        if js == nil || cc == nil {
23✔
2877
                return
×
2878
        }
×
2879

2880
        // Extra checks here but only leader is listening.
2881
        js.mu.RLock()
23✔
2882
        isLeader := cc.isLeader()
23✔
2883
        meta := cc.meta
23✔
2884
        js.mu.RUnlock()
23✔
2885

23✔
2886
        if !isLeader {
23✔
2887
                return
×
2888
        }
×
2889

2890
        var preferredLeader string
23✔
2891
        var resp = JSApiLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiLeaderStepDownResponseType}}
23✔
2892
        if errorOnRequiredApiLevel(hdr) {
23✔
2893
                resp.Error = NewJSRequiredApiLevelError()
×
2894
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2895
                return
×
2896
        }
×
2897

2898
        if isJSONObjectOrArray(msg) {
38✔
2899
                var req JSApiLeaderStepdownRequest
15✔
2900
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
15✔
2901
                        resp.Error = NewJSInvalidJSONError(err)
×
2902
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2903
                        return
×
2904
                }
×
2905
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(meta, req.Placement); resp.Error != nil {
21✔
2906
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
6✔
2907
                        return
6✔
2908
                }
6✔
2909
        }
2910

2911
        // Call actual stepdown.
2912
        err = meta.StepDown(preferredLeader)
17✔
2913
        if err != nil {
17✔
2914
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2915
        } else {
17✔
2916
                resp.Success = true
17✔
2917
        }
17✔
2918
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
17✔
2919
}
2920

2921
// Check if given []bytes is a JSON Object or Array.
2922
// Technically, valid JSON can also be a plain string or number, but for our use case,
2923
// we care only for JSON objects or arrays which starts with `[` or `{`.
2924
// This function does not have to ensure valid JSON in its entirety. It is used merely
2925
// to hint the codepath if it should attempt to parse the request as JSON or not.
2926
func isJSONObjectOrArray(req []byte) bool {
18,337✔
2927
        // Skip leading JSON whitespace (space, tab, newline, carriage return)
18,337✔
2928
        i := 0
18,337✔
2929
        for i < len(req) && (req[i] == ' ' || req[i] == '\t' || req[i] == '\n' || req[i] == '\r') {
18,349✔
2930
                i++
12✔
2931
        }
12✔
2932
        // Check for empty input after trimming
2933
        if i >= len(req) {
36,093✔
2934
                return false
17,756✔
2935
        }
17,756✔
2936
        // Check if the first non-whitespace character is '{' or '['
2937
        return req[i] == '{' || req[i] == '['
581✔
2938
}
2939

2940
func isEmptyRequest(req []byte) bool {
47,377✔
2941
        if len(req) == 0 {
92,377✔
2942
                return true
45,000✔
2943
        }
45,000✔
2944
        if bytes.Equal(req, []byte("{}")) {
2,378✔
2945
                return true
1✔
2946
        }
1✔
2947
        // If we are here we didn't get our simple match, but still could be valid.
2948
        var v any
2,376✔
2949
        if err := json.Unmarshal(req, &v); err != nil {
2,376✔
2950
                return false
×
2951
        }
×
2952
        vm, ok := v.(map[string]any)
2,376✔
2953
        if !ok {
2,376✔
2954
                return false
×
2955
        }
×
2956
        return len(vm) == 0
2,376✔
2957
}
2958

2959
// getStepDownPreferredPlacement attempts to work out what the best placement is
2960
// for a stepdown request. The preferred server name always takes precedence, but
2961
// if not specified, the placement will be used to filter by cluster. The caller
2962
// should check for return API errors and return those to the requestor if needed.
2963
func (s *Server) getStepDownPreferredPlacement(group RaftNode, placement *Placement) (string, *ApiError) {
41✔
2964
        if placement == nil {
43✔
2965
                return _EMPTY_, nil
2✔
2966
        }
2✔
2967
        var preferredLeader string
39✔
2968
        if placement.Preferred != _EMPTY_ {
58✔
2969
                for _, p := range group.Peers() {
79✔
2970
                        si, ok := s.nodeToInfo.Load(p.ID)
60✔
2971
                        if !ok || si == nil {
60✔
2972
                                continue
×
2973
                        }
2974
                        if si.(nodeInfo).name == placement.Preferred {
75✔
2975
                                preferredLeader = p.ID
15✔
2976
                                break
15✔
2977
                        }
2978
                }
2979
                if preferredLeader == group.ID() {
23✔
2980
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q is already leader", placement.Preferred))
4✔
2981
                }
4✔
2982
                if preferredLeader == _EMPTY_ {
19✔
2983
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q not known", placement.Preferred))
4✔
2984
                }
4✔
2985
        } else {
20✔
2986
                possiblePeers := make(map[*Peer]nodeInfo, len(group.Peers()))
20✔
2987
                ourID := group.ID()
20✔
2988
                for _, p := range group.Peers() {
116✔
2989
                        if p == nil {
96✔
2990
                                continue // ... shouldn't happen.
×
2991
                        }
2992
                        si, ok := s.nodeToInfo.Load(p.ID)
96✔
2993
                        if !ok || si == nil {
96✔
2994
                                continue
×
2995
                        }
2996
                        ni := si.(nodeInfo)
96✔
2997
                        if ni.offline || p.ID == ourID {
116✔
2998
                                continue
20✔
2999
                        }
3000
                        possiblePeers[p] = ni
76✔
3001
                }
3002
                // If cluster is specified, filter out anything not matching the cluster name.
3003
                if placement.Cluster != _EMPTY_ {
31✔
3004
                        for p, si := range possiblePeers {
51✔
3005
                                if si.cluster != placement.Cluster {
66✔
3006
                                        delete(possiblePeers, p)
26✔
3007
                                }
26✔
3008
                        }
3009
                }
3010
                // If tags are specified, filter out anything not matching all supplied tags.
3011
                if len(placement.Tags) > 0 {
32✔
3012
                        for p, si := range possiblePeers {
55✔
3013
                                matchesAll := true
43✔
3014
                                for _, tag := range placement.Tags {
93✔
3015
                                        if matchesAll = matchesAll && si.tags.Contains(tag); !matchesAll {
82✔
3016
                                                break
32✔
3017
                                        }
3018
                                }
3019
                                if !matchesAll {
75✔
3020
                                        delete(possiblePeers, p)
32✔
3021
                                }
32✔
3022
                        }
3023
                }
3024
                // If there are no possible peers, return an error.
3025
                if len(possiblePeers) == 0 {
28✔
3026
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
8✔
3027
                }
8✔
3028
                // Take advantage of random map iteration order to select the preferred.
3029
                for p := range possiblePeers {
24✔
3030
                        preferredLeader = p.ID
12✔
3031
                        break
12✔
3032
                }
3033
        }
3034
        return preferredLeader, nil
23✔
3035
}
3036

3037
// Request to delete a stream.
3038
func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
543✔
3039
        if c == nil || !s.JetStreamEnabled() {
543✔
3040
                return
×
3041
        }
×
3042
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
543✔
3043
        if err != nil {
543✔
3044
                s.Warnf(badAPIRequestT, msg)
×
3045
                return
×
3046
        }
×
3047

3048
        var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
543✔
3049
        if errorOnRequiredApiLevel(hdr) {
544✔
3050
                resp.Error = NewJSRequiredApiLevelError()
1✔
3051
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3052
                return
1✔
3053
        }
1✔
3054

3055
        // Determine if we should proceed here when we are in clustered mode.
3056
        if s.JetStreamIsClustered() {
1,040✔
3057
                js, cc := s.getJetStreamCluster()
498✔
3058
                if js == nil || cc == nil {
498✔
3059
                        return
×
3060
                }
×
3061
                if js.isLeaderless() {
499✔
3062
                        resp.Error = NewJSClusterNotAvailError()
1✔
3063
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3064
                        return
1✔
3065
                }
1✔
3066
                // Make sure we are meta leader.
3067
                if !s.JetStreamIsLeader() {
885✔
3068
                        return
388✔
3069
                }
388✔
3070
        }
3071

3072
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
153✔
3073
                if doErr {
×
3074
                        resp.Error = NewJSNotEnabledForAccountError()
×
3075
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3076
                }
×
3077
                return
×
3078
        }
3079

3080
        if !isEmptyRequest(msg) {
154✔
3081
                resp.Error = NewJSNotEmptyRequestError()
1✔
3082
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3083
                return
1✔
3084
        }
1✔
3085
        stream := streamNameFromSubject(subject)
152✔
3086

152✔
3087
        // Clustered.
152✔
3088
        if s.JetStreamIsClustered() {
261✔
3089
                s.jsClusteredStreamDeleteRequest(ci, acc, stream, subject, reply, msg)
109✔
3090
                return
109✔
3091
        }
109✔
3092

3093
        mset, err := acc.lookupStream(stream)
43✔
3094
        if err != nil {
48✔
3095
                resp.Error = NewJSStreamNotFoundError(Unless(err))
5✔
3096
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
3097
                return
5✔
3098
        }
5✔
3099

3100
        if err := mset.delete(); err != nil {
38✔
3101
                resp.Error = NewJSStreamDeleteError(err, Unless(err))
×
3102
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3103
                return
×
3104
        }
×
3105
        resp.Success = true
38✔
3106
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
38✔
3107
}
3108

3109
// Request to delete a message.
3110
// This expects a stream sequence number as the msg body.
3111
func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,141✔
3112
        if c == nil || !s.JetStreamEnabled() {
1,144✔
3113
                return
3✔
3114
        }
3✔
3115
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
1,138✔
3116
        if err != nil {
1,138✔
3117
                s.Warnf(badAPIRequestT, msg)
×
3118
                return
×
3119
        }
×
3120

3121
        stream := tokenAt(subject, 6)
1,138✔
3122

1,138✔
3123
        var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
1,138✔
3124
        if errorOnRequiredApiLevel(hdr) {
1,139✔
3125
                resp.Error = NewJSRequiredApiLevelError()
1✔
3126
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3127
                return
1✔
3128
        }
1✔
3129

3130
        // If we are in clustered mode we need to be the stream leader to proceed.
3131
        if s.JetStreamIsClustered() {
1,532✔
3132
                // Check to make sure the stream is assigned.
395✔
3133
                js, cc := s.getJetStreamCluster()
395✔
3134
                if js == nil || cc == nil {
397✔
3135
                        return
2✔
3136
                }
2✔
3137
                if js.isLeaderless() {
394✔
3138
                        resp.Error = NewJSClusterNotAvailError()
1✔
3139
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3140
                        return
1✔
3141
                }
1✔
3142

3143
                js.mu.RLock()
392✔
3144
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
392✔
3145
                js.mu.RUnlock()
392✔
3146

392✔
3147
                if isLeader && sa == nil {
392✔
3148
                        // We can't find the stream, so mimic what would be the errors below.
×
3149
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3150
                                if doErr {
×
3151
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3152
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3153
                                }
×
3154
                                return
×
3155
                        }
3156
                        // No stream present.
3157
                        resp.Error = NewJSStreamNotFoundError()
×
3158
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3159
                        return
×
3160
                } else if sa == nil {
392✔
3161
                        return
×
3162
                }
×
3163

3164
                // Check to see if we are a member of the group and if the group has no leader.
3165
                if js.isGroupLeaderless(sa.Group) {
392✔
3166
                        resp.Error = NewJSClusterNotAvailError()
×
3167
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3168
                        return
×
3169
                }
×
3170

3171
                // We have the stream assigned and a leader, so only the stream leader should answer.
3172
                if !acc.JetStreamIsStreamLeader(stream) {
651✔
3173
                        return
259✔
3174
                }
259✔
3175
        }
3176

3177
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
1,016✔
3178
                if doErr {
280✔
3179
                        resp.Error = NewJSNotEnabledForAccountError()
139✔
3180
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
139✔
3181
                }
139✔
3182
                return
141✔
3183
        }
3184
        if isEmptyRequest(msg) {
734✔
3185
                resp.Error = NewJSBadRequestError()
×
3186
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3187
                return
×
3188
        }
×
3189
        var req JSApiMsgDeleteRequest
734✔
3190
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
734✔
3191
                resp.Error = NewJSInvalidJSONError(err)
×
3192
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3193
                return
×
3194
        }
×
3195

3196
        mset, err := acc.lookupStream(stream)
734✔
3197
        if err != nil {
736✔
3198
                resp.Error = NewJSStreamNotFoundError(Unless(err))
2✔
3199
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3200
                return
2✔
3201
        }
2✔
3202
        if mset.cfg.Sealed {
734✔
3203
                resp.Error = NewJSStreamSealedError()
2✔
3204
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3205
                return
2✔
3206
        }
2✔
3207
        if mset.cfg.DenyDelete {
731✔
3208
                resp.Error = NewJSStreamMsgDeleteFailedError(errors.New("message delete not permitted"))
1✔
3209
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3210
                return
1✔
3211
        }
1✔
3212

3213
        if s.JetStreamIsClustered() {
860✔
3214
                s.jsClusteredMsgDeleteRequest(ci, acc, mset, stream, subject, reply, &req, rmsg)
131✔
3215
                return
131✔
3216
        }
131✔
3217

3218
        var removed bool
598✔
3219
        if req.NoErase {
1,194✔
3220
                removed, err = mset.removeMsg(req.Seq)
596✔
3221
        } else {
598✔
3222
                removed, err = mset.eraseMsg(req.Seq)
2✔
3223
        }
2✔
3224
        if err != nil {
598✔
3225
                resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err))
×
3226
        } else if !removed {
598✔
3227
                resp.Error = NewJSSequenceNotFoundError(req.Seq)
×
3228
        } else {
598✔
3229
                resp.Success = true
598✔
3230
        }
598✔
3231
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
598✔
3232
}
3233

3234
// Request to get a raw stream message.
3235
func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
2,532✔
3236
        if c == nil || !s.JetStreamEnabled() {
2,532✔
3237
                return
×
3238
        }
×
3239
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
2,532✔
3240
        if err != nil {
2,532✔
3241
                s.Warnf(badAPIRequestT, msg)
×
3242
                return
×
3243
        }
×
3244

3245
        stream := tokenAt(subject, 6)
2,532✔
3246

2,532✔
3247
        var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
2,532✔
3248
        if errorOnRequiredApiLevel(hdr) {
2,533✔
3249
                resp.Error = NewJSRequiredApiLevelError()
1✔
3250
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3251
                return
1✔
3252
        }
1✔
3253

3254
        // If we are in clustered mode we need to be the stream leader to proceed.
3255
        if s.JetStreamIsClustered() {
3,946✔
3256
                // Check to make sure the stream is assigned.
1,415✔
3257
                js, cc := s.getJetStreamCluster()
1,415✔
3258
                if js == nil || cc == nil {
1,415✔
3259
                        return
×
3260
                }
×
3261
                if js.isLeaderless() {
1,415✔
3262
                        resp.Error = NewJSClusterNotAvailError()
×
3263
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3264
                        return
×
3265
                }
×
3266

3267
                js.mu.RLock()
1,415✔
3268
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
1,415✔
3269
                js.mu.RUnlock()
1,415✔
3270

1,415✔
3271
                if isLeader && sa == nil {
1,415✔
3272
                        // We can't find the stream, so mimic what would be the errors below.
×
3273
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3274
                                if doErr {
×
3275
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3276
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3277
                                }
×
3278
                                return
×
3279
                        }
3280
                        // No stream present.
3281
                        resp.Error = NewJSStreamNotFoundError()
×
3282
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3283
                        return
×
3284
                } else if sa == nil {
1,415✔
3285
                        return
×
3286
                }
×
3287

3288
                // Check to see if we are a member of the group and if the group has no leader.
3289
                if js.isGroupLeaderless(sa.Group) {
1,415✔
3290
                        resp.Error = NewJSClusterNotAvailError()
×
3291
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3292
                        return
×
3293
                }
×
3294

3295
                // We have the stream assigned and a leader, so only the stream leader should answer.
3296
                if !acc.JetStreamIsStreamLeader(stream) {
2,366✔
3297
                        return
951✔
3298
                }
951✔
3299
        }
3300

3301
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
1,583✔
3302
                if doErr {
3✔
3303
                        resp.Error = NewJSNotEnabledForAccountError()
×
3304
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3305
                }
×
3306
                return
3✔
3307
        }
3308
        if isEmptyRequest(msg) {
1,577✔
3309
                resp.Error = NewJSBadRequestError()
×
3310
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3311
                return
×
3312
        }
×
3313
        var req JSApiMsgGetRequest
1,577✔
3314
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
1,577✔
3315
                resp.Error = NewJSInvalidJSONError(err)
×
3316
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3317
                return
×
3318
        }
×
3319

3320
        // This version does not support batch.
3321
        if req.Batch > 0 || req.MaxBytes > 0 {
1,578✔
3322
                resp.Error = NewJSBadRequestError()
1✔
3323
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3324
                return
1✔
3325
        }
1✔
3326

3327
        // Validate non-conflicting options. Seq, LastFor, and AsOfTime are mutually exclusive.
3328
        // NextFor can be paired with Seq or AsOfTime indicating a filter subject.
3329
        if (req.Seq > 0 && req.LastFor != _EMPTY_) ||
1,576✔
3330
                (req.Seq == 0 && req.LastFor == _EMPTY_ && req.NextFor == _EMPTY_ && req.StartTime == nil) ||
1,576✔
3331
                (req.Seq > 0 && req.StartTime != nil) ||
1,576✔
3332
                (req.StartTime != nil && req.LastFor != _EMPTY_) ||
1,576✔
3333
                (req.LastFor != _EMPTY_ && req.NextFor != _EMPTY_) {
1,580✔
3334
                resp.Error = NewJSBadRequestError()
4✔
3335
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3336
                return
4✔
3337
        }
4✔
3338

3339
        mset, err := acc.lookupStream(stream)
1,572✔
3340
        if err != nil {
1,572✔
3341
                resp.Error = NewJSStreamNotFoundError()
×
3342
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3343
                return
×
3344
        }
×
3345
        if mset.offlineReason != _EMPTY_ {
1,572✔
3346
                // Just let the request time out.
×
3347
                return
×
3348
        }
×
3349

3350
        var svp StoreMsg
1,572✔
3351
        var sm *StoreMsg
1,572✔
3352

1,572✔
3353
        // Ensure this read request is isolated and doesn't interleave with writes.
1,572✔
3354
        mset.mu.RLock()
1,572✔
3355
        defer mset.mu.RUnlock()
1,572✔
3356

1,572✔
3357
        // If AsOfTime is set, perform this first to get the sequence.
1,572✔
3358
        var seq uint64
1,572✔
3359
        if req.StartTime != nil {
1,578✔
3360
                seq = mset.store.GetSeqFromTime(*req.StartTime)
6✔
3361
        } else {
1,572✔
3362
                seq = req.Seq
1,566✔
3363
        }
1,566✔
3364

3365
        if seq > 0 && req.NextFor == _EMPTY_ {
2,020✔
3366
                sm, err = mset.store.LoadMsg(seq, &svp)
448✔
3367
        } else if req.NextFor != _EMPTY_ {
1,674✔
3368
                sm, _, err = mset.store.LoadNextMsg(req.NextFor, subjectHasWildcard(req.NextFor), seq, &svp)
102✔
3369
        } else {
1,124✔
3370
                sm, err = mset.store.LoadLastMsg(req.LastFor, &svp)
1,022✔
3371
        }
1,022✔
3372
        if err != nil {
2,413✔
3373
                resp.Error = NewJSNoMessageFoundError()
841✔
3374
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
841✔
3375
                return
841✔
3376
        }
841✔
3377
        resp.Message = &StoredMsg{
731✔
3378
                Subject:  sm.subj,
731✔
3379
                Sequence: sm.seq,
731✔
3380
                Data:     sm.msg,
731✔
3381
                Time:     time.Unix(0, sm.ts).UTC(),
731✔
3382
        }
731✔
3383
        if !req.NoHeaders {
1,461✔
3384
                resp.Message.Header = sm.hdr
730✔
3385
        }
730✔
3386

3387
        // Don't send response through API layer for this call.
3388
        s.sendInternalAccountMsg(nil, reply, s.jsonResponse(resp))
731✔
3389
}
3390

3391
func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
28✔
3392
        if c == nil || !s.JetStreamEnabled() {
28✔
3393
                return
×
3394
        }
×
3395

3396
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
28✔
3397
        if err != nil {
28✔
3398
                s.Warnf(badAPIRequestT, msg)
×
3399
                return
×
3400
        }
×
3401

3402
        stream := streamNameFromSubject(subject)
28✔
3403
        consumer := consumerNameFromSubject(subject)
28✔
3404

28✔
3405
        var req JSApiConsumerUnpinRequest
28✔
3406
        var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}}
28✔
3407
        if errorOnRequiredApiLevel(hdr) {
29✔
3408
                resp.Error = NewJSRequiredApiLevelError()
1✔
3409
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3410
                return
1✔
3411
        }
1✔
3412

3413
        if err := json.Unmarshal(msg, &req); err != nil {
27✔
3414
                resp.Error = NewJSInvalidJSONError(err)
×
3415
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3416
                return
×
3417
        }
×
3418

3419
        if req.Group == _EMPTY_ {
31✔
3420
                resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
4✔
3421
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3422
                return
4✔
3423
        }
4✔
3424

3425
        if !validGroupName.MatchString(req.Group) {
27✔
3426
                resp.Error = NewJSConsumerInvalidGroupNameError()
4✔
3427
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3428
                return
4✔
3429
        }
4✔
3430
        if s.JetStreamIsClustered() {
31✔
3431
                // Check to make sure the stream is assigned.
12✔
3432
                js, cc := s.getJetStreamCluster()
12✔
3433
                if js == nil || cc == nil {
12✔
3434
                        return
×
3435
                }
×
3436

3437
                // First check if the stream and consumer is there.
3438
                js.mu.RLock()
12✔
3439
                sa := js.streamAssignment(acc.Name, stream)
12✔
3440
                if sa == nil {
15✔
3441
                        js.mu.RUnlock()
3✔
3442
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
3✔
3443
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3444
                        return
3✔
3445
                }
3✔
3446
                if sa.unsupported != nil {
9✔
3447
                        js.mu.RUnlock()
×
3448
                        // Just let the request time out.
×
3449
                        return
×
3450
                }
×
3451

3452
                ca, ok := sa.consumers[consumer]
9✔
3453
                if !ok || ca == nil {
12✔
3454
                        js.mu.RUnlock()
3✔
3455
                        resp.Error = NewJSConsumerNotFoundError()
3✔
3456
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3457
                        return
3✔
3458
                }
3✔
3459
                if ca.unsupported != nil {
6✔
3460
                        js.mu.RUnlock()
×
3461
                        // Just let the request time out.
×
3462
                        return
×
3463
                }
×
3464
                js.mu.RUnlock()
6✔
3465

6✔
3466
                // Then check if we are the leader.
6✔
3467
                mset, err := acc.lookupStream(stream)
6✔
3468
                if err != nil {
6✔
3469
                        return
×
3470
                }
×
3471

3472
                o := mset.lookupConsumer(consumer)
6✔
3473
                if o == nil {
6✔
3474
                        return
×
3475
                }
×
3476
                if !o.isLeader() {
10✔
3477
                        return
4✔
3478
                }
4✔
3479
        }
3480

3481
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
9✔
3482
                if doErr {
×
3483
                        resp.Error = NewJSNotEnabledForAccountError()
×
3484
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3485
                }
×
3486
                return
×
3487
        }
3488

3489
        mset, err := acc.lookupStream(stream)
9✔
3490
        if err != nil {
10✔
3491
                resp.Error = NewJSStreamNotFoundError()
1✔
3492
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3493
                return
1✔
3494
        }
1✔
3495
        if mset.offlineReason != _EMPTY_ {
8✔
3496
                // Just let the request time out.
×
3497
                return
×
3498
        }
×
3499
        o := mset.lookupConsumer(consumer)
8✔
3500
        if o == nil {
9✔
3501
                resp.Error = NewJSConsumerNotFoundError()
1✔
3502
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3503
                return
1✔
3504
        }
1✔
3505
        if o.offlineReason != _EMPTY_ {
7✔
3506
                // Just let the request time out.
×
3507
                return
×
3508
        }
×
3509

3510
        var foundPriority bool
7✔
3511
        for _, group := range o.config().PriorityGroups {
14✔
3512
                if group == req.Group {
12✔
3513
                        foundPriority = true
5✔
3514
                        break
5✔
3515
                }
3516
        }
3517
        if !foundPriority {
9✔
3518
                resp.Error = NewJSConsumerInvalidPriorityGroupError()
2✔
3519
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3520
                return
2✔
3521
        }
2✔
3522

3523
        o.mu.Lock()
5✔
3524
        o.currentPinId = _EMPTY_
5✔
3525
        o.sendUnpinnedAdvisoryLocked(req.Group, "admin")
5✔
3526
        o.mu.Unlock()
5✔
3527
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
5✔
3528
}
3529

3530
// Request to purge a stream.
3531
func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
136✔
3532
        if c == nil || !s.JetStreamEnabled() {
136✔
3533
                return
×
3534
        }
×
3535
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
136✔
3536
        if err != nil {
136✔
3537
                s.Warnf(badAPIRequestT, msg)
×
3538
                return
×
3539
        }
×
3540

3541
        stream := streamNameFromSubject(subject)
136✔
3542

136✔
3543
        var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
136✔
3544
        if errorOnRequiredApiLevel(hdr) {
137✔
3545
                resp.Error = NewJSRequiredApiLevelError()
1✔
3546
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3547
                return
1✔
3548
        }
1✔
3549

3550
        // If we are in clustered mode we need to be the stream leader to proceed.
3551
        if s.JetStreamIsClustered() {
236✔
3552
                // Check to make sure the stream is assigned.
101✔
3553
                js, cc := s.getJetStreamCluster()
101✔
3554
                if js == nil || cc == nil {
101✔
3555
                        return
×
3556
                }
×
3557

3558
                js.mu.RLock()
101✔
3559
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
101✔
3560
                js.mu.RUnlock()
101✔
3561

101✔
3562
                if isLeader && sa == nil {
101✔
3563
                        // We can't find the stream, so mimic what would be the errors below.
×
3564
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3565
                                if doErr {
×
3566
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3567
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3568
                                }
×
3569
                                return
×
3570
                        }
3571
                        // No stream present.
3572
                        resp.Error = NewJSStreamNotFoundError()
×
3573
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3574
                        return
×
3575
                } else if sa == nil {
102✔
3576
                        if js.isLeaderless() {
1✔
3577
                                resp.Error = NewJSClusterNotAvailError()
×
3578
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3579
                        }
×
3580
                        return
1✔
3581
                }
3582

3583
                // Check to see if we are a member of the group and if the group has no leader.
3584
                if js.isGroupLeaderless(sa.Group) {
100✔
3585
                        resp.Error = NewJSClusterNotAvailError()
×
3586
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3587
                        return
×
3588
                }
×
3589

3590
                // We have the stream assigned and a leader, so only the stream leader should answer.
3591
                if !acc.JetStreamIsStreamLeader(stream) {
168✔
3592
                        if js.isLeaderless() {
69✔
3593
                                resp.Error = NewJSClusterNotAvailError()
1✔
3594
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3595
                        }
1✔
3596
                        return
68✔
3597
                }
3598
        }
3599

3600
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
66✔
3601
                if doErr {
×
3602
                        resp.Error = NewJSNotEnabledForAccountError()
×
3603
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3604
                }
×
3605
                return
×
3606
        }
3607

3608
        var purgeRequest *JSApiStreamPurgeRequest
66✔
3609
        if isJSONObjectOrArray(msg) {
100✔
3610
                var req JSApiStreamPurgeRequest
34✔
3611
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
34✔
3612
                        resp.Error = NewJSInvalidJSONError(err)
×
3613
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3614
                        return
×
3615
                }
×
3616
                if req.Sequence > 0 && req.Keep > 0 {
34✔
3617
                        resp.Error = NewJSBadRequestError()
×
3618
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3619
                        return
×
3620
                }
×
3621
                purgeRequest = &req
34✔
3622
        }
3623

3624
        mset, err := acc.lookupStream(stream)
66✔
3625
        if err != nil {
66✔
3626
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
3627
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3628
                return
×
3629
        }
×
3630
        if mset.cfg.Sealed {
68✔
3631
                resp.Error = NewJSStreamSealedError()
2✔
3632
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3633
                return
2✔
3634
        }
2✔
3635
        if mset.cfg.DenyPurge {
65✔
3636
                resp.Error = NewJSStreamPurgeFailedError(errors.New("stream purge not permitted"))
1✔
3637
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3638
                return
1✔
3639
        }
1✔
3640

3641
        if s.JetStreamIsClustered() {
93✔
3642
                s.jsClusteredStreamPurgeRequest(ci, acc, mset, stream, subject, reply, rmsg, purgeRequest)
30✔
3643
                return
30✔
3644
        }
30✔
3645

3646
        purged, err := mset.purge(purgeRequest)
33✔
3647
        if err != nil {
33✔
3648
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
3649
        } else {
33✔
3650
                resp.Purged = purged
33✔
3651
                resp.Success = true
33✔
3652
        }
33✔
3653
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
33✔
3654
}
3655

3656
func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError {
1,350✔
3657
        var replicas int
1,350✔
3658
        if cfg != nil {
2,700✔
3659
                replicas = cfg.Replicas
1,350✔
3660
        }
1,350✔
3661
        selectedLimits, tier, jsa, apiErr := acc.selectLimits(replicas)
1,350✔
3662
        if apiErr != nil {
1,350✔
3663
                return apiErr
×
3664
        }
×
3665
        jsa.js.mu.RLock()
1,350✔
3666
        defer jsa.js.mu.RUnlock()
1,350✔
3667
        jsa.mu.RLock()
1,350✔
3668
        defer jsa.mu.RUnlock()
1,350✔
3669
        if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, cfg) >= selectedLimits.MaxStreams {
1,353✔
3670
                return NewJSMaximumStreamsLimitError()
3✔
3671
        }
3✔
3672
        reserved := jsa.tieredReservation(tier, cfg)
1,347✔
3673
        if err := jsa.js.checkAllLimits(selectedLimits, cfg, reserved, 0); err != nil {
1,348✔
3674
                return NewJSStreamLimitsError(err, Unless(err))
1✔
3675
        }
1✔
3676
        return nil
1,346✔
3677
}
3678

3679
// Request to restore a stream.
3680
func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
50✔
3681
        if c == nil || !s.JetStreamIsLeader() {
74✔
3682
                return
24✔
3683
        }
24✔
3684
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
26✔
3685
        if err != nil {
26✔
3686
                s.Warnf(badAPIRequestT, msg)
×
3687
                return
×
3688
        }
×
3689

3690
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
26✔
3691
        if errorOnRequiredApiLevel(hdr) {
27✔
3692
                resp.Error = NewJSRequiredApiLevelError()
1✔
3693
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3694
                return
1✔
3695
        }
1✔
3696
        if !acc.JetStreamEnabled() {
25✔
3697
                resp.Error = NewJSNotEnabledForAccountError()
×
3698
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3699
                return
×
3700
        }
×
3701
        if isEmptyRequest(msg) {
26✔
3702
                resp.Error = NewJSBadRequestError()
1✔
3703
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3704
                return
1✔
3705
        }
1✔
3706

3707
        var req JSApiStreamRestoreRequest
24✔
3708
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
24✔
3709
                resp.Error = NewJSInvalidJSONError(err)
×
3710
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3711
                return
×
3712
        }
×
3713

3714
        stream := streamNameFromSubject(subject)
24✔
3715

24✔
3716
        if stream != req.Config.Name && req.Config.Name == _EMPTY_ {
24✔
3717
                req.Config.Name = stream
×
3718
        }
×
3719

3720
        // check stream config at the start of the restore process, not at the end
3721
        cfg, apiErr := s.checkStreamCfg(&req.Config, acc, false)
24✔
3722
        if apiErr != nil {
26✔
3723
                resp.Error = apiErr
2✔
3724
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3725
                return
2✔
3726
        }
2✔
3727

3728
        if s.JetStreamIsClustered() {
33✔
3729
                s.jsClusteredStreamRestoreRequest(ci, acc, &req, subject, reply, rmsg)
11✔
3730
                return
11✔
3731
        }
11✔
3732

3733
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil {
13✔
3734
                resp.Error = err
2✔
3735
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3736
                return
2✔
3737
        }
2✔
3738

3739
        if _, err := acc.lookupStream(stream); err == nil {
10✔
3740
                resp.Error = NewJSStreamNameExistRestoreFailedError()
1✔
3741
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3742
                return
1✔
3743
        }
1✔
3744

3745
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
8✔
3746
                if doErr {
×
3747
                        resp.Error = NewJSNotEnabledForAccountError()
×
3748
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3749
                }
×
3750
                return
×
3751
        }
3752

3753
        s.processStreamRestore(ci, acc, &req.Config, subject, reply, string(msg))
8✔
3754
}
3755

3756
func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamConfig, subject, reply, msg string) <-chan error {
16✔
3757
        js := s.getJetStream()
16✔
3758

16✔
3759
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
16✔
3760

16✔
3761
        snapDir := filepath.Join(js.config.StoreDir, snapStagingDir)
16✔
3762
        if _, err := os.Stat(snapDir); os.IsNotExist(err) {
28✔
3763
                if err := os.MkdirAll(snapDir, defaultDirPerms); err != nil {
12✔
3764
                        resp.Error = &ApiError{Code: 503, Description: "JetStream unable to create temp storage for restore"}
×
3765
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3766
                        return nil
×
3767
                }
×
3768
        }
3769

3770
        tfile, err := os.CreateTemp(snapDir, "js-restore-")
16✔
3771
        if err != nil {
16✔
3772
                resp.Error = NewJSTempStorageFailedError()
×
3773
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
3774
                return nil
×
3775
        }
×
3776

3777
        streamName := cfg.Name
16✔
3778
        s.Noticef("Starting restore for stream '%s > %s'", acc.Name, streamName)
16✔
3779

16✔
3780
        start := time.Now().UTC()
16✔
3781
        domain := s.getOpts().JetStreamDomain
16✔
3782
        s.publishAdvisory(acc, JSAdvisoryStreamRestoreCreatePre+"."+streamName, &JSRestoreCreateAdvisory{
16✔
3783
                TypedEvent: TypedEvent{
16✔
3784
                        Type: JSRestoreCreateAdvisoryType,
16✔
3785
                        ID:   nuid.Next(),
16✔
3786
                        Time: start,
16✔
3787
                },
16✔
3788
                Stream: streamName,
16✔
3789
                Client: ci.forAdvisory(),
16✔
3790
                Domain: domain,
16✔
3791
        })
16✔
3792

16✔
3793
        // Create our internal subscription to accept the snapshot.
16✔
3794
        restoreSubj := fmt.Sprintf(jsRestoreDeliverT, streamName, nuid.Next())
16✔
3795

16✔
3796
        type result struct {
16✔
3797
                err   error
16✔
3798
                reply string
16✔
3799
        }
16✔
3800

16✔
3801
        // For signaling to upper layers.
16✔
3802
        resultCh := make(chan result, 1)
16✔
3803
        activeQ := newIPQueue[int](s, fmt.Sprintf("[ACC:%s] stream '%s' restore", acc.Name, streamName)) // of int
16✔
3804

16✔
3805
        var total int
16✔
3806

16✔
3807
        // FIXME(dlc) - Probably take out of network path eventually due to disk I/O?
16✔
3808
        processChunk := func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
134✔
3809
                // We require reply subjects to communicate back failures, flow etc. If they do not have one log and cancel.
118✔
3810
                if reply == _EMPTY_ {
119✔
3811
                        sub.client.processUnsub(sub.sid)
1✔
3812
                        resultCh <- result{
1✔
3813
                                fmt.Errorf("restore for stream '%s > %s' requires reply subject for each chunk", acc.Name, streamName),
1✔
3814
                                reply,
1✔
3815
                        }
1✔
3816
                        return
1✔
3817
                }
1✔
3818
                // Account client messages have \r\n on end. This is an error.
3819
                if len(msg) < LEN_CR_LF {
117✔
3820
                        sub.client.processUnsub(sub.sid)
×
3821
                        resultCh <- result{
×
3822
                                fmt.Errorf("restore for stream '%s > %s' received short chunk", acc.Name, streamName),
×
3823
                                reply,
×
3824
                        }
×
3825
                        return
×
3826
                }
×
3827
                // Adjust.
3828
                msg = msg[:len(msg)-LEN_CR_LF]
117✔
3829

117✔
3830
                // This means we are complete with our transfer from the client.
117✔
3831
                if len(msg) == 0 {
132✔
3832
                        s.Debugf("Finished staging restore for stream '%s > %s'", acc.Name, streamName)
15✔
3833
                        resultCh <- result{err, reply}
15✔
3834
                        return
15✔
3835
                }
15✔
3836

3837
                // We track total and check on server limits.
3838
                // TODO(dlc) - We could check apriori and cancel initial request if we know it won't fit.
3839
                total += len(msg)
102✔
3840
                if js.wouldExceedLimits(FileStorage, total) {
102✔
3841
                        s.resourcesExceededError(FileStorage)
×
3842
                        resultCh <- result{NewJSInsufficientResourcesError(), reply}
×
3843
                        return
×
3844
                }
×
3845

3846
                // Append chunk to temp file. Mark as issue if we encounter an error.
3847
                if n, err := tfile.Write(msg); n != len(msg) || err != nil {
102✔
3848
                        resultCh <- result{err, reply}
×
3849
                        if reply != _EMPTY_ {
×
3850
                                s.sendInternalAccountMsg(acc, reply, "-ERR 'storage failure during restore'")
×
3851
                        }
×
3852
                        return
×
3853
                }
3854

3855
                activeQ.push(len(msg))
102✔
3856

102✔
3857
                s.sendInternalAccountMsg(acc, reply, nil)
102✔
3858
        }
3859

3860
        sub, err := acc.subscribeInternal(restoreSubj, processChunk)
16✔
3861
        if err != nil {
16✔
3862
                tfile.Close()
×
3863
                os.Remove(tfile.Name())
×
3864
                resp.Error = NewJSRestoreSubscribeFailedError(err, restoreSubj)
×
3865
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
3866
                return nil
×
3867
        }
×
3868

3869
        // Mark the subject so the end user knows where to send the snapshot chunks.
3870
        resp.DeliverSubject = restoreSubj
16✔
3871
        s.sendAPIResponse(ci, acc, subject, reply, msg, s.jsonResponse(resp))
16✔
3872

16✔
3873
        doneCh := make(chan error, 1)
16✔
3874

16✔
3875
        // Monitor the progress from another Go routine.
16✔
3876
        s.startGoRoutine(func() {
32✔
3877
                defer s.grWG.Done()
16✔
3878
                defer func() {
32✔
3879
                        tfile.Close()
16✔
3880
                        os.Remove(tfile.Name())
16✔
3881
                        sub.client.processUnsub(sub.sid)
16✔
3882
                        activeQ.unregister()
16✔
3883
                }()
16✔
3884

3885
                const activityInterval = 5 * time.Second
16✔
3886
                notActive := time.NewTimer(activityInterval)
16✔
3887
                defer notActive.Stop()
16✔
3888

16✔
3889
                total := 0
16✔
3890
                for {
134✔
3891
                        select {
118✔
3892
                        case result := <-resultCh:
16✔
3893
                                err := result.err
16✔
3894
                                var mset *stream
16✔
3895

16✔
3896
                                // If we staged properly go ahead and do restore now.
16✔
3897
                                if err == nil {
31✔
3898
                                        s.Debugf("Finalizing restore for stream '%s > %s'", acc.Name, streamName)
15✔
3899
                                        tfile.Seek(0, 0)
15✔
3900
                                        mset, err = acc.RestoreStream(cfg, tfile)
15✔
3901
                                } else {
16✔
3902
                                        errStr := err.Error()
1✔
3903
                                        tmp := []rune(errStr)
1✔
3904
                                        tmp[0] = unicode.ToUpper(tmp[0])
1✔
3905
                                        s.Warnf(errStr)
1✔
3906
                                }
1✔
3907

3908
                                end := time.Now().UTC()
16✔
3909

16✔
3910
                                // TODO(rip) - Should this have the error code in it??
16✔
3911
                                s.publishAdvisory(acc, JSAdvisoryStreamRestoreCompletePre+"."+streamName, &JSRestoreCompleteAdvisory{
16✔
3912
                                        TypedEvent: TypedEvent{
16✔
3913
                                                Type: JSRestoreCompleteAdvisoryType,
16✔
3914
                                                ID:   nuid.Next(),
16✔
3915
                                                Time: end,
16✔
3916
                                        },
16✔
3917
                                        Stream: streamName,
16✔
3918
                                        Start:  start,
16✔
3919
                                        End:    end,
16✔
3920
                                        Bytes:  int64(total),
16✔
3921
                                        Client: ci.forAdvisory(),
16✔
3922
                                        Domain: domain,
16✔
3923
                                })
16✔
3924

16✔
3925
                                var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
16✔
3926

16✔
3927
                                if err != nil {
20✔
3928
                                        resp.Error = NewJSStreamRestoreError(err, Unless(err))
4✔
3929
                                        s.Warnf("Restore failed for %s for stream '%s > %s' in %v",
4✔
3930
                                                friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start))
4✔
3931
                                } else {
16✔
3932
                                        msetCfg := mset.config()
12✔
3933
                                        resp.StreamInfo = &StreamInfo{
12✔
3934
                                                Created:   mset.createdTime(),
12✔
3935
                                                State:     mset.state(),
12✔
3936
                                                Config:    *setDynamicStreamMetadata(&msetCfg),
12✔
3937
                                                TimeStamp: time.Now().UTC(),
12✔
3938
                                        }
12✔
3939
                                        s.Noticef("Completed restore of %s for stream '%s > %s' in %v",
12✔
3940
                                                friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start).Round(time.Millisecond))
12✔
3941
                                }
12✔
3942

3943
                                // On the last EOF, send back the stream info or error status.
3944
                                s.sendInternalAccountMsg(acc, result.reply, s.jsonResponse(&resp))
16✔
3945
                                // Signal to the upper layers.
16✔
3946
                                doneCh <- err
16✔
3947
                                return
16✔
3948
                        case <-activeQ.ch:
102✔
3949
                                if n, ok := activeQ.popOne(); ok {
204✔
3950
                                        total += n
102✔
3951
                                        notActive.Reset(activityInterval)
102✔
3952
                                }
102✔
3953
                        case <-notActive.C:
×
3954
                                err := fmt.Errorf("restore for stream '%s > %s' is stalled", acc, streamName)
×
3955
                                doneCh <- err
×
3956
                                return
×
3957
                        }
3958
                }
3959
        })
3960

3961
        return doneCh
16✔
3962
}
3963

3964
// Process a snapshot request.
3965
func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
29✔
3966
        if c == nil || !s.JetStreamEnabled() {
29✔
3967
                return
×
3968
        }
×
3969
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
29✔
3970
        if err != nil {
29✔
3971
                s.Warnf(badAPIRequestT, msg)
×
3972
                return
×
3973
        }
×
3974

3975
        smsg := string(msg)
29✔
3976
        stream := streamNameFromSubject(subject)
29✔
3977

29✔
3978
        // If we are in clustered mode we need to be the stream leader to proceed.
29✔
3979
        if s.JetStreamIsClustered() && !acc.JetStreamIsStreamLeader(stream) {
44✔
3980
                return
15✔
3981
        }
15✔
3982

3983
        var resp = JSApiStreamSnapshotResponse{ApiResponse: ApiResponse{Type: JSApiStreamSnapshotResponseType}}
14✔
3984
        if errorOnRequiredApiLevel(hdr) {
15✔
3985
                resp.Error = NewJSRequiredApiLevelError()
1✔
3986
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3987
                return
1✔
3988
        }
1✔
3989
        if !acc.JetStreamEnabled() {
13✔
3990
                resp.Error = NewJSNotEnabledForAccountError()
×
3991
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
3992
                return
×
3993
        }
×
3994
        if isEmptyRequest(msg) {
14✔
3995
                resp.Error = NewJSBadRequestError()
1✔
3996
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
3997
                return
1✔
3998
        }
1✔
3999

4000
        mset, err := acc.lookupStream(stream)
12✔
4001
        if err != nil {
13✔
4002
                resp.Error = NewJSStreamNotFoundError(Unless(err))
1✔
4003
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4004
                return
1✔
4005
        }
1✔
4006

4007
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
4008
                if doErr {
×
4009
                        resp.Error = NewJSNotEnabledForAccountError()
×
4010
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4011
                }
×
4012
                return
×
4013
        }
4014

4015
        var req JSApiStreamSnapshotRequest
11✔
4016
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
4017
                resp.Error = NewJSInvalidJSONError(err)
×
4018
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4019
                return
×
4020
        }
×
4021
        if !IsValidSubject(req.DeliverSubject) {
12✔
4022
                resp.Error = NewJSSnapshotDeliverSubjectInvalidError()
1✔
4023
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4024
                return
1✔
4025
        }
1✔
4026

4027
        // We will do the snapshot in a go routine as well since check msgs may
4028
        // stall this go routine.
4029
        go func() {
20✔
4030
                if req.CheckMsgs {
12✔
4031
                        s.Noticef("Starting health check and snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
2✔
4032
                } else {
10✔
4033
                        s.Noticef("Starting snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
8✔
4034
                }
8✔
4035

4036
                start := time.Now().UTC()
10✔
4037

10✔
4038
                sr, err := mset.snapshot(0, req.CheckMsgs, !req.NoConsumers)
10✔
4039
                if err != nil {
10✔
4040
                        s.Warnf("Snapshot of stream '%s > %s' failed: %v", mset.jsa.account.Name, mset.name(), err)
×
4041
                        resp.Error = NewJSStreamSnapshotError(err, Unless(err))
×
4042
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4043
                        return
×
4044
                }
×
4045

4046
                config := mset.config()
10✔
4047
                resp.State = &sr.State
10✔
4048
                resp.Config = &config
10✔
4049

10✔
4050
                s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(resp))
10✔
4051

10✔
4052
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCreatePre+"."+mset.name(), &JSSnapshotCreateAdvisory{
10✔
4053
                        TypedEvent: TypedEvent{
10✔
4054
                                Type: JSSnapshotCreatedAdvisoryType,
10✔
4055
                                ID:   nuid.Next(),
10✔
4056
                                Time: time.Now().UTC(),
10✔
4057
                        },
10✔
4058
                        Stream: mset.name(),
10✔
4059
                        State:  sr.State,
10✔
4060
                        Client: ci.forAdvisory(),
10✔
4061
                        Domain: s.getOpts().JetStreamDomain,
10✔
4062
                })
10✔
4063

10✔
4064
                // Now do the real streaming.
10✔
4065
                s.streamSnapshot(acc, mset, sr, &req)
10✔
4066

10✔
4067
                end := time.Now().UTC()
10✔
4068

10✔
4069
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCompletePre+"."+mset.name(), &JSSnapshotCompleteAdvisory{
10✔
4070
                        TypedEvent: TypedEvent{
10✔
4071
                                Type: JSSnapshotCompleteAdvisoryType,
10✔
4072
                                ID:   nuid.Next(),
10✔
4073
                                Time: end,
10✔
4074
                        },
10✔
4075
                        Stream: mset.name(),
10✔
4076
                        Start:  start,
10✔
4077
                        End:    end,
10✔
4078
                        Client: ci.forAdvisory(),
10✔
4079
                        Domain: s.getOpts().JetStreamDomain,
10✔
4080
                })
10✔
4081

10✔
4082
                s.Noticef("Completed snapshot of %s for stream '%s > %s' in %v",
10✔
4083
                        friendlyBytes(int64(sr.State.Bytes)),
10✔
4084
                        mset.jsa.account.Name,
10✔
4085
                        mset.name(),
10✔
4086
                        end.Sub(start))
10✔
4087
        }()
4088
}
4089

4090
// Default chunk size for now.
4091
const defaultSnapshotChunkSize = 128 * 1024
4092
const defaultSnapshotWindowSize = 8 * 1024 * 1024 // 8MB
4093

4094
// streamSnapshot will stream out our snapshot to the reply subject.
4095
func (s *Server) streamSnapshot(acc *Account, mset *stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) {
10✔
4096
        chunkSize := req.ChunkSize
10✔
4097
        if chunkSize == 0 {
12✔
4098
                chunkSize = defaultSnapshotChunkSize
2✔
4099
        }
2✔
4100
        // Setup for the chunk stream.
4101
        reply := req.DeliverSubject
10✔
4102
        r := sr.Reader
10✔
4103
        defer r.Close()
10✔
4104

10✔
4105
        // Check interest for the snapshot deliver subject.
10✔
4106
        inch := make(chan bool, 1)
10✔
4107
        acc.sl.RegisterNotification(req.DeliverSubject, inch)
10✔
4108
        defer acc.sl.ClearNotification(req.DeliverSubject, inch)
10✔
4109
        hasInterest := <-inch
10✔
4110
        if !hasInterest {
15✔
4111
                // Allow 2 seconds or so for interest to show up.
5✔
4112
                select {
5✔
4113
                case <-inch:
4✔
4114
                case <-time.After(2 * time.Second):
1✔
4115
                }
4116
        }
4117

4118
        // Create our ack flow handler.
4119
        // This is very simple for now.
4120
        ackSize := defaultSnapshotWindowSize / chunkSize
10✔
4121
        if ackSize < 8 {
10✔
4122
                ackSize = 8
×
4123
        } else if ackSize > 8*1024 {
16✔
4124
                ackSize = 8 * 1024
6✔
4125
        }
6✔
4126
        acks := make(chan struct{}, ackSize)
10✔
4127
        acks <- struct{}{}
10✔
4128

10✔
4129
        // Track bytes outstanding.
10✔
4130
        var out int32
10✔
4131

10✔
4132
        // We will place sequence number and size of chunk sent in the reply.
10✔
4133
        ackSubj := fmt.Sprintf(jsSnapshotAckT, mset.name(), nuid.Next())
10✔
4134
        ackSub, _ := mset.subscribeInternal(ackSubj+".>", func(_ *subscription, _ *client, _ *Account, subject, _ string, _ []byte) {
20✔
4135
                cs, _ := strconv.Atoi(tokenAt(subject, 6))
10✔
4136
                // This is very crude and simple, but ok for now.
10✔
4137
                // This only matters when sending multiple chunks.
10✔
4138
                if atomic.AddInt32(&out, int32(-cs)) < defaultSnapshotWindowSize {
20✔
4139
                        select {
10✔
4140
                        case acks <- struct{}{}:
10✔
4141
                        default:
×
4142
                        }
4143
                }
4144
        })
4145
        defer mset.unsubscribe(ackSub)
10✔
4146

10✔
4147
        // TODO(dlc) - Add in NATS-Chunked-Sequence header
10✔
4148
        var hdr []byte
10✔
4149
        for index := 1; ; index++ {
113✔
4150
                chunk := make([]byte, chunkSize)
103✔
4151
                n, err := r.Read(chunk)
103✔
4152
                chunk = chunk[:n]
103✔
4153
                if err != nil {
113✔
4154
                        if n > 0 {
10✔
4155
                                mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0))
×
4156
                        }
×
4157
                        break
10✔
4158
                }
4159

4160
                // Wait on acks for flow control if past our window size.
4161
                // Wait up to 10ms for now if no acks received.
4162
                if atomic.LoadInt32(&out) > defaultSnapshotWindowSize {
93✔
4163
                        select {
×
4164
                        case <-acks:
×
4165
                                // ok to proceed.
4166
                        case <-inch:
×
4167
                                // Lost interest
×
4168
                                hdr = []byte("NATS/1.0 408 No Interest\r\n\r\n")
×
4169
                                goto done
×
4170
                        case <-time.After(2 * time.Second):
×
4171
                                hdr = []byte("NATS/1.0 408 No Flow Response\r\n\r\n")
×
4172
                                goto done
×
4173
                        }
4174
                }
4175
                ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index)
93✔
4176
                if hdr == nil {
103✔
4177
                        hdr = []byte("NATS/1.0 204\r\n\r\n")
10✔
4178
                }
10✔
4179
                mset.outq.send(newJSPubMsg(reply, _EMPTY_, ackReply, nil, chunk, nil, 0))
93✔
4180
                atomic.AddInt32(&out, int32(len(chunk)))
93✔
4181
        }
4182

4183
        if err := <-sr.errCh; err != _EMPTY_ {
10✔
4184
                hdr = []byte(fmt.Sprintf("NATS/1.0 500 %s\r\n\r\n", err))
×
4185
        }
×
4186

4187
done:
4188
        // Send last EOF
4189
        // TODO(dlc) - place hash in header
4190
        mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
10✔
4191
}
4192

4193
// For determining consumer request type.
4194
type ccReqType uint8
4195

4196
const (
4197
        ccNew = iota
4198
        ccLegacyEphemeral
4199
        ccLegacyDurable
4200
)
4201

4202
// Request to create a consumer where stream and optional consumer name are part of the subject, and optional
4203
// filtered subjects can be at the tail end.
4204
// Assumes stream and consumer names are single tokens.
4205
func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
10,956✔
4206
        if c == nil || !s.JetStreamEnabled() {
10,956✔
4207
                return
×
4208
        }
×
4209

4210
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
10,956✔
4211
        if err != nil {
10,957✔
4212
                s.Warnf(badAPIRequestT, msg)
1✔
4213
                return
1✔
4214
        }
1✔
4215

4216
        var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
10,955✔
4217
        if errorOnRequiredApiLevel(hdr) {
10,958✔
4218
                resp.Error = NewJSRequiredApiLevelError()
3✔
4219
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4220
                return
3✔
4221
        }
3✔
4222

4223
        var req CreateConsumerRequest
10,952✔
4224
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
10,953✔
4225
                resp.Error = NewJSInvalidJSONError(err)
1✔
4226
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4227
                return
1✔
4228
        }
1✔
4229

4230
        var js *jetStream
10,951✔
4231
        isClustered := s.JetStreamIsClustered()
10,951✔
4232

10,951✔
4233
        // Determine if we should proceed here when we are in clustered mode.
10,951✔
4234
        if isClustered {
20,903✔
4235
                if req.Config.Direct {
10,379✔
4236
                        // Check to see if we have this stream and are the stream leader.
427✔
4237
                        if !acc.JetStreamIsStreamLeader(streamNameFromSubject(subject)) {
753✔
4238
                                return
326✔
4239
                        }
326✔
4240
                } else {
9,525✔
4241
                        var cc *jetStreamCluster
9,525✔
4242
                        js, cc = s.getJetStreamCluster()
9,525✔
4243
                        if js == nil || cc == nil {
9,525✔
4244
                                return
×
4245
                        }
×
4246
                        if js.isLeaderless() {
9,525✔
4247
                                resp.Error = NewJSClusterNotAvailError()
×
4248
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4249
                                return
×
4250
                        }
×
4251
                        // Make sure we are meta leader.
4252
                        if !s.JetStreamIsLeader() {
15,962✔
4253
                                return
6,437✔
4254
                        }
6,437✔
4255
                }
4256
        }
4257

4258
        var streamName, consumerName, filteredSubject string
4,188✔
4259
        var rt ccReqType
4,188✔
4260

4,188✔
4261
        if n := numTokens(subject); n < 5 {
4,188✔
4262
                s.Warnf(badAPIRequestT, msg)
×
4263
                return
×
4264
        } else if n == 5 {
4,991✔
4265
                // Legacy ephemeral.
803✔
4266
                rt = ccLegacyEphemeral
803✔
4267
                streamName = streamNameFromSubject(subject)
803✔
4268
        } else {
4,188✔
4269
                // New style and durable legacy.
3,385✔
4270
                if tokenAt(subject, 4) == "DURABLE" {
3,647✔
4271
                        rt = ccLegacyDurable
262✔
4272
                        if n != 7 {
262✔
4273
                                resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4274
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4275
                                return
×
4276
                        }
×
4277
                        streamName = tokenAt(subject, 6)
262✔
4278
                        consumerName = tokenAt(subject, 7)
262✔
4279
                } else {
3,123✔
4280
                        streamName = streamNameFromSubject(subject)
3,123✔
4281
                        consumerName = consumerNameFromSubject(subject)
3,123✔
4282
                        // New has optional filtered subject as part of main subject..
3,123✔
4283
                        if n > 6 {
5,771✔
4284
                                tokens := strings.Split(subject, tsep)
2,648✔
4285
                                filteredSubject = strings.Join(tokens[6:], tsep)
2,648✔
4286
                        }
2,648✔
4287
                }
4288
        }
4289

4290
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
4,190✔
4291
                if doErr {
2✔
4292
                        resp.Error = NewJSNotEnabledForAccountError()
×
4293
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4294
                }
×
4295
                return
2✔
4296
        }
4297

4298
        if streamName != req.Stream {
4,187✔
4299
                resp.Error = NewJSStreamMismatchError()
1✔
4300
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4301
                return
1✔
4302
        }
1✔
4303

4304
        if consumerName != _EMPTY_ {
7,569✔
4305
                // Check for path like separators in the name.
3,384✔
4306
                if strings.ContainsAny(consumerName, `\/`) {
3,388✔
4307
                        resp.Error = NewJSConsumerNameContainsPathSeparatorsError()
4✔
4308
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4309
                        return
4✔
4310
                }
4✔
4311
        }
4312

4313
        // Should we expect a durable name
4314
        if rt == ccLegacyDurable {
4,442✔
4315
                if numTokens(subject) < 7 {
261✔
4316
                        resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4317
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4318
                        return
×
4319
                }
×
4320
                // Now check on requirements for durable request.
4321
                if req.Config.Durable == _EMPTY_ {
262✔
4322
                        resp.Error = NewJSConsumerDurableNameNotSetError()
1✔
4323
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4324
                        return
1✔
4325
                }
1✔
4326
                if consumerName != req.Config.Durable {
260✔
4327
                        resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4328
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4329
                        return
×
4330
                }
×
4331
        }
4332
        // If new style and durable set make sure they match.
4333
        if rt == ccNew {
7,299✔
4334
                if req.Config.Durable != _EMPTY_ {
5,730✔
4335
                        if consumerName != req.Config.Durable {
2,611✔
4336
                                resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4337
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4338
                                return
×
4339
                        }
×
4340
                }
4341
                // New style ephemeral so we need to honor the name.
4342
                req.Config.Name = consumerName
3,119✔
4343
        }
4344
        // Check for legacy ephemeral mis-configuration.
4345
        if rt == ccLegacyEphemeral && req.Config.Durable != _EMPTY_ {
4,183✔
4346
                resp.Error = NewJSConsumerEphemeralWithDurableNameError()
3✔
4347
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4348
                return
3✔
4349
        }
3✔
4350

4351
        // in case of multiple filters provided, error if new API is used.
4352
        if filteredSubject != _EMPTY_ && len(req.Config.FilterSubjects) != 0 {
4,178✔
4353
                resp.Error = NewJSConsumerMultipleFiltersNotAllowedError()
1✔
4354
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4355
                return
1✔
4356
        }
1✔
4357

4358
        // Check for a filter subject.
4359
        if filteredSubject != _EMPTY_ && req.Config.FilterSubject != filteredSubject {
4,178✔
4360
                resp.Error = NewJSConsumerCreateFilterSubjectMismatchError()
2✔
4361
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
4362
                return
2✔
4363
        }
2✔
4364

4365
        if isClustered && !req.Config.Direct {
7,260✔
4366
                s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
3,086✔
4367
                return
3,086✔
4368
        }
3,086✔
4369

4370
        // If we are here we are single server mode.
4371
        if req.Config.Replicas > 1 {
1,088✔
4372
                resp.Error = NewJSStreamReplicasNotSupportedError()
×
4373
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4374
                return
×
4375
        }
×
4376

4377
        stream, err := acc.lookupStream(req.Stream)
1,088✔
4378
        if err != nil {
1,092✔
4379
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
4380
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4381
                return
4✔
4382
        }
4✔
4383
        if stream.offlineReason != _EMPTY_ {
1,084✔
4384
                resp.Error = NewJSStreamOfflineReasonError(errors.New(stream.offlineReason))
×
4385
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4386
                return
×
4387
        }
×
4388

4389
        if o := stream.lookupConsumer(consumerName); o != nil {
1,132✔
4390
                if o.offlineReason != _EMPTY_ {
48✔
4391
                        resp.Error = NewJSConsumerOfflineReasonError(errors.New(o.offlineReason))
×
4392
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4393
                        return
×
4394
                }
×
4395
                // If the consumer already exists then don't allow updating the PauseUntil, just set
4396
                // it back to whatever the current configured value is.
4397
                o.mu.RLock()
48✔
4398
                req.Config.PauseUntil = o.cfg.PauseUntil
48✔
4399
                o.mu.RUnlock()
48✔
4400
        }
4401

4402
        // Initialize/update asset version metadata.
4403
        setStaticConsumerMetadata(&req.Config)
1,084✔
4404

1,084✔
4405
        o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic)
1,084✔
4406

1,084✔
4407
        if err != nil {
1,137✔
4408
                if IsNatsErr(err, JSConsumerStoreFailedErrF) {
53✔
4409
                        cname := req.Config.Durable // Will be empty if ephemeral.
×
4410
                        s.Warnf("Consumer create failed for '%s > %s > %s': %v", acc, req.Stream, cname, err)
×
4411
                        err = errConsumerStoreFailed
×
4412
                }
×
4413
                resp.Error = NewJSConsumerCreateError(err, Unless(err))
53✔
4414
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
53✔
4415
                return
53✔
4416
        }
4417
        resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.initialInfo())
1,031✔
4418
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,031✔
4419

1,031✔
4420
        o.mu.RLock()
1,031✔
4421
        if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
1,035✔
4422
                o.sendPauseAdvisoryLocked(&o.cfg)
4✔
4423
        }
4✔
4424
        o.mu.RUnlock()
1,031✔
4425
}
4426

4427
// Request for the list of all consumer names.
4428
func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
40✔
4429
        if c == nil || !s.JetStreamEnabled() {
40✔
4430
                return
×
4431
        }
×
4432
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
40✔
4433
        if err != nil {
40✔
4434
                s.Warnf(badAPIRequestT, msg)
×
4435
                return
×
4436
        }
×
4437

4438
        var resp = JSApiConsumerNamesResponse{
40✔
4439
                ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType},
40✔
4440
                Consumers:   []string{},
40✔
4441
        }
40✔
4442
        if errorOnRequiredApiLevel(hdr) {
41✔
4443
                resp.Error = NewJSRequiredApiLevelError()
1✔
4444
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4445
                return
1✔
4446
        }
1✔
4447

4448
        // Determine if we should proceed here when we are in clustered mode.
4449
        if s.JetStreamIsClustered() {
74✔
4450
                js, cc := s.getJetStreamCluster()
35✔
4451
                if js == nil || cc == nil {
35✔
4452
                        return
×
4453
                }
×
4454
                if js.isLeaderless() {
35✔
4455
                        resp.Error = NewJSClusterNotAvailError()
×
4456
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4457
                        return
×
4458
                }
×
4459
                // Make sure we are meta leader.
4460
                if !s.JetStreamIsLeader() {
59✔
4461
                        return
24✔
4462
                }
24✔
4463
        }
4464

4465
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
15✔
4466
                if doErr {
×
4467
                        resp.Error = NewJSNotEnabledForAccountError()
×
4468
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4469
                }
×
4470
                return
×
4471
        }
4472

4473
        var offset int
15✔
4474
        if isJSONObjectOrArray(msg) {
24✔
4475
                var req JSApiConsumersRequest
9✔
4476
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
9✔
4477
                        resp.Error = NewJSInvalidJSONError(err)
×
4478
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4479
                        return
×
4480
                }
×
4481
                offset = req.Offset
9✔
4482
        }
4483

4484
        streamName := streamNameFromSubject(subject)
15✔
4485
        var numConsumers int
15✔
4486

15✔
4487
        if s.JetStreamIsClustered() {
26✔
4488
                js, cc := s.getJetStreamCluster()
11✔
4489
                if js == nil || cc == nil {
11✔
4490
                        // TODO(dlc) - Debug or Warn?
×
4491
                        return
×
4492
                }
×
4493
                js.mu.RLock()
11✔
4494
                sas := cc.streams[acc.Name]
11✔
4495
                if sas == nil {
11✔
4496
                        js.mu.RUnlock()
×
4497
                        resp.Error = NewJSStreamNotFoundError()
×
4498
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4499
                        return
×
4500
                }
×
4501
                sa := sas[streamName]
11✔
4502
                if sa == nil || sa.err != nil {
11✔
4503
                        js.mu.RUnlock()
×
4504
                        resp.Error = NewJSStreamNotFoundError()
×
4505
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4506
                        return
×
4507
                }
×
4508
                for consumer := range sa.consumers {
26✔
4509
                        resp.Consumers = append(resp.Consumers, consumer)
15✔
4510
                }
15✔
4511
                if len(resp.Consumers) > 1 {
16✔
4512
                        slices.Sort(resp.Consumers)
5✔
4513
                }
5✔
4514
                numConsumers = len(resp.Consumers)
11✔
4515
                if offset > numConsumers {
11✔
4516
                        offset = numConsumers
×
4517
                }
×
4518
                resp.Consumers = resp.Consumers[offset:]
11✔
4519
                if len(resp.Consumers) > JSApiNamesLimit {
11✔
4520
                        resp.Consumers = resp.Consumers[:JSApiNamesLimit]
×
4521
                }
×
4522
                js.mu.RUnlock()
11✔
4523

4524
        } else {
4✔
4525
                mset, err := acc.lookupStream(streamName)
4✔
4526
                if err != nil {
4✔
4527
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4528
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4529
                        return
×
4530
                }
×
4531

4532
                obs := mset.getPublicConsumers()
4✔
4533
                slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
4✔
4534

4535
                numConsumers = len(obs)
4✔
4536
                if offset > numConsumers {
4✔
4537
                        offset = numConsumers
×
4538
                }
×
4539

4540
                for _, o := range obs[offset:] {
7✔
4541
                        resp.Consumers = append(resp.Consumers, o.String())
3✔
4542
                        if len(resp.Consumers) >= JSApiNamesLimit {
3✔
4543
                                break
×
4544
                        }
4545
                }
4546
        }
4547
        resp.Total = numConsumers
15✔
4548
        resp.Limit = JSApiNamesLimit
15✔
4549
        resp.Offset = offset
15✔
4550
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
15✔
4551
}
4552

4553
// Request for the list of all detailed consumer information.
4554
func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
67✔
4555
        if c == nil || !s.JetStreamEnabled() {
67✔
4556
                return
×
4557
        }
×
4558

4559
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
67✔
4560
        if err != nil {
67✔
4561
                s.Warnf(badAPIRequestT, msg)
×
4562
                return
×
4563
        }
×
4564

4565
        var resp = JSApiConsumerListResponse{
67✔
4566
                ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
67✔
4567
                Consumers:   []*ConsumerInfo{},
67✔
4568
        }
67✔
4569
        if errorOnRequiredApiLevel(hdr) {
68✔
4570
                resp.Error = NewJSRequiredApiLevelError()
1✔
4571
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4572
                return
1✔
4573
        }
1✔
4574

4575
        // Determine if we should proceed here when we are in clustered mode.
4576
        if s.JetStreamIsClustered() {
124✔
4577
                js, cc := s.getJetStreamCluster()
58✔
4578
                if js == nil || cc == nil {
58✔
4579
                        return
×
4580
                }
×
4581
                if js.isLeaderless() {
59✔
4582
                        resp.Error = NewJSClusterNotAvailError()
1✔
4583
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4584
                        return
1✔
4585
                }
1✔
4586
                // Make sure we are meta leader.
4587
                if !s.JetStreamIsLeader() {
101✔
4588
                        return
44✔
4589
                }
44✔
4590
        }
4591

4592
        if errorOnRequiredApiLevel(hdr) {
21✔
4593
                resp.Error = NewJSClusterNotAvailError()
×
4594
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4595
                return
×
4596
        }
×
4597

4598
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
21✔
4599
                if doErr {
×
4600
                        resp.Error = NewJSNotEnabledForAccountError()
×
4601
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4602
                }
×
4603
                return
×
4604
        }
4605

4606
        var offset int
21✔
4607
        if isJSONObjectOrArray(msg) {
34✔
4608
                var req JSApiConsumersRequest
13✔
4609
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
14✔
4610
                        resp.Error = NewJSInvalidJSONError(err)
1✔
4611
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4612
                        return
1✔
4613
                }
1✔
4614
                offset = req.Offset
12✔
4615
        }
4616

4617
        streamName := streamNameFromSubject(subject)
20✔
4618

20✔
4619
        // Clustered mode will invoke a scatter and gather.
20✔
4620
        if s.JetStreamIsClustered() {
33✔
4621
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
13✔
4622
                msg = copyBytes(msg)
13✔
4623
                s.startGoRoutine(func() {
26✔
4624
                        s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg)
13✔
4625
                })
13✔
4626
                return
13✔
4627
        }
4628

4629
        mset, err := acc.lookupStream(streamName)
7✔
4630
        if err != nil {
7✔
4631
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4632
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4633
                return
×
4634
        }
×
4635

4636
        obs := mset.getPublicConsumers()
7✔
4637
        slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
7✔
4638

4639
        ocnt := len(obs)
7✔
4640
        if offset > ocnt {
7✔
4641
                offset = ocnt
×
4642
        }
×
4643

4644
        var missingNames []string
7✔
4645
        for _, o := range obs[offset:] {
13✔
4646
                if o.offlineReason != _EMPTY_ {
7✔
4647
                        if resp.Offline == nil {
2✔
4648
                                resp.Offline = make(map[string]string, 1)
1✔
4649
                        }
1✔
4650
                        resp.Offline[o.name] = o.offlineReason
1✔
4651
                        missingNames = append(missingNames, o.name)
1✔
4652
                        continue
1✔
4653
                }
4654
                if cinfo := o.info(); cinfo != nil {
10✔
4655
                        resp.Consumers = append(resp.Consumers, cinfo)
5✔
4656
                }
5✔
4657
                if len(resp.Consumers) >= JSApiListLimit {
5✔
4658
                        break
×
4659
                }
4660
        }
4661
        resp.Total = ocnt
7✔
4662
        resp.Limit = JSApiListLimit
7✔
4663
        resp.Offset = offset
7✔
4664
        resp.Missing = missingNames
7✔
4665
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
7✔
4666
}
4667

4668
// Request for information about an consumer.
4669
func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
42,142✔
4670
        if c == nil || !s.JetStreamEnabled() {
42,142✔
4671
                return
×
4672
        }
×
4673
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
42,142✔
4674
        if err != nil {
42,142✔
4675
                s.Warnf(badAPIRequestT, msg)
×
4676
                return
×
4677
        }
×
4678

4679
        streamName := streamNameFromSubject(subject)
42,142✔
4680
        consumerName := consumerNameFromSubject(subject)
42,142✔
4681

42,142✔
4682
        var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}
42,142✔
4683
        if errorOnRequiredApiLevel(hdr) {
42,143✔
4684
                resp.Error = NewJSRequiredApiLevelError()
1✔
4685
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4686
                return
1✔
4687
        }
1✔
4688

4689
        if !isEmptyRequest(msg) {
42,142✔
4690
                resp.Error = NewJSNotEmptyRequestError()
1✔
4691
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4692
                return
1✔
4693
        }
1✔
4694

4695
        // If we are in clustered mode we need to be the consumer leader to proceed.
4696
        if s.JetStreamIsClustered() {
83,791✔
4697
                // Check to make sure the consumer is assigned.
41,651✔
4698
                js, cc := s.getJetStreamCluster()
41,651✔
4699
                if js == nil || cc == nil {
41,651✔
4700
                        return
×
4701
                }
×
4702

4703
                js.mu.RLock()
41,651✔
4704
                meta := cc.meta
41,651✔
4705
                js.mu.RUnlock()
41,651✔
4706

41,651✔
4707
                if meta == nil {
41,651✔
4708
                        return
×
4709
                }
×
4710

4711
                // Since these could wait on the Raft group lock, don't do so under the JS lock.
4712
                ourID := meta.ID()
41,651✔
4713
                groupLeaderless := meta.Leaderless()
41,651✔
4714
                groupCreated := meta.Created()
41,651✔
4715

41,651✔
4716
                js.mu.RLock()
41,651✔
4717
                isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName)
41,651✔
4718
                var rg *raftGroup
41,651✔
4719
                var offline, isMember bool
41,651✔
4720
                if ca != nil {
45,797✔
4721
                        if rg = ca.Group; rg != nil {
8,292✔
4722
                                offline = s.allPeersOffline(rg)
4,146✔
4723
                                isMember = rg.isMember(ourID)
4,146✔
4724
                        }
4,146✔
4725
                        if ca.unsupported != nil && isMember {
4,164✔
4726
                                // If we're a member for this consumer, and it's not supported, report it as offline.
18✔
4727
                                resp.Error = NewJSConsumerOfflineReasonError(errors.New(ca.unsupported.reason))
18✔
4728
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
18✔
4729
                                js.mu.RUnlock()
18✔
4730
                                return
18✔
4731
                        }
18✔
4732
                }
4733
                // Capture consumer leader here.
4734
                isConsumerLeader := cc.isConsumerLeader(acc.Name, streamName, consumerName)
41,633✔
4735
                // Also capture if we think there is no meta leader.
41,633✔
4736
                var isLeaderLess bool
41,633✔
4737
                if !isLeader {
69,592✔
4738
                        isLeaderLess = groupLeaderless && time.Since(groupCreated) > lostQuorumIntervalDefault
27,959✔
4739
                }
27,959✔
4740
                js.mu.RUnlock()
41,633✔
4741

41,633✔
4742
                if isLeader && ca == nil {
54,049✔
4743
                        // We can't find the consumer, so mimic what would be the errors below.
12,416✔
4744
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
12,416✔
4745
                                if doErr {
×
4746
                                        resp.Error = NewJSNotEnabledForAccountError()
×
4747
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4748
                                }
×
4749
                                return
×
4750
                        }
4751
                        if sa == nil {
22,419✔
4752
                                resp.Error = NewJSStreamNotFoundError()
10,003✔
4753
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
10,003✔
4754
                                return
10,003✔
4755
                        }
10,003✔
4756
                        // If we are here the consumer is not present.
4757
                        resp.Error = NewJSConsumerNotFoundError()
2,413✔
4758
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2,413✔
4759
                        return
2,413✔
4760
                } else if ca == nil {
54,306✔
4761
                        if isLeaderLess {
25,091✔
4762
                                resp.Error = NewJSClusterNotAvailError()
2✔
4763
                                // Delaying an error response gives the leader a chance to respond before us
2✔
4764
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
4765
                        }
2✔
4766
                        return
25,089✔
4767
                } else if isLeader && offline {
4,130✔
4768
                        resp.Error = NewJSConsumerOfflineError()
2✔
4769
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
4770
                        return
2✔
4771
                }
2✔
4772

4773
                // Check to see if we are a member of the group and if the group has no leader.
4774
                if isMember && js.isGroupLeaderless(ca.Group) {
4,127✔
4775
                        resp.Error = NewJSClusterNotAvailError()
1✔
4776
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4777
                        return
1✔
4778
                }
1✔
4779

4780
                // We have the consumer assigned and a leader, so only the consumer leader should answer.
4781
                if !isConsumerLeader {
7,003✔
4782
                        if isLeaderLess {
2,878✔
4783
                                resp.Error = NewJSClusterNotAvailError()
×
4784
                                // Delaying an error response gives the leader a chance to respond before us
×
4785
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group, errRespDelay)
×
4786
                                return
×
4787
                        }
×
4788

4789
                        var node RaftNode
2,878✔
4790
                        var leaderNotPartOfGroup bool
2,878✔
4791

2,878✔
4792
                        // We have a consumer assignment.
2,878✔
4793
                        if isMember {
4,974✔
4794
                                js.mu.RLock()
2,096✔
4795
                                if rg != nil && rg.node != nil {
4,192✔
4796
                                        node = rg.node
2,096✔
4797
                                        if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
2,096✔
4798
                                                leaderNotPartOfGroup = true
×
4799
                                        }
×
4800
                                }
4801
                                js.mu.RUnlock()
2,096✔
4802
                        }
4803

4804
                        // Check if we should ignore all together.
4805
                        if node == nil {
3,660✔
4806
                                // We have been assigned but have not created a node yet. If we are a member return
782✔
4807
                                // our config and defaults for state and no cluster info.
782✔
4808
                                if isMember {
782✔
4809
                                        // Since we access consumerAssignment, need js lock.
×
4810
                                        js.mu.RLock()
×
4811
                                        resp.ConsumerInfo = &ConsumerInfo{
×
4812
                                                Stream:    ca.Stream,
×
4813
                                                Name:      ca.Name,
×
4814
                                                Created:   ca.Created,
×
4815
                                                Config:    setDynamicConsumerMetadata(ca.Config),
×
4816
                                                TimeStamp: time.Now().UTC(),
×
4817
                                        }
×
4818
                                        b := s.jsonResponse(resp)
×
4819
                                        js.mu.RUnlock()
×
4820
                                        s.sendAPIResponse(ci, acc, subject, reply, string(msg), b)
×
4821
                                }
×
4822
                                return
782✔
4823
                        }
4824
                        // If we are a member and we have a group leader or we had a previous leader consider bailing out.
4825
                        if !node.Leaderless() || node.HadPreviousLeader() || (rg != nil && rg.Preferred != _EMPTY_ && rg.Preferred != ourID) {
4,183✔
4826
                                if leaderNotPartOfGroup {
2,087✔
4827
                                        resp.Error = NewJSConsumerOfflineError()
×
4828
                                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4829
                                }
×
4830
                                return
2,087✔
4831
                        }
4832
                        // If we are here we are a member and this is just a new consumer that does not have a (preferred) leader yet.
4833
                        // Will fall through and return what we have. All consumers can respond but this should be very rare
4834
                        // but makes more sense to clients when they try to create, get a consumer exists, and then do consumer info.
4835
                }
4836
        }
4837

4838
        if !acc.JetStreamEnabled() {
1,745✔
4839
                resp.Error = NewJSNotEnabledForAccountError()
×
4840
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4841
                return
×
4842
        }
×
4843

4844
        mset, err := acc.lookupStream(streamName)
1,745✔
4845
        if err != nil {
1,745✔
4846
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4847
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4848
                return
×
4849
        }
×
4850

4851
        obs := mset.lookupConsumer(consumerName)
1,745✔
4852
        if obs == nil {
1,926✔
4853
                resp.Error = NewJSConsumerNotFoundError()
181✔
4854
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
181✔
4855
                return
181✔
4856
        }
181✔
4857

4858
        if obs.offlineReason != _EMPTY_ {
1,565✔
4859
                resp.Error = NewJSConsumerOfflineReasonError(errors.New(obs.offlineReason))
1✔
4860
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
4861
                return
1✔
4862
        }
1✔
4863

4864
        if resp.ConsumerInfo = setDynamicConsumerInfoMetadata(obs.info()); resp.ConsumerInfo == nil {
1,563✔
4865
                // This consumer returned nil which means it's closed. Respond with not found.
×
4866
                resp.Error = NewJSConsumerNotFoundError()
×
4867
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4868
                return
×
4869
        }
×
4870
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,563✔
4871
}
4872

4873
// Request to delete an Consumer.
4874
func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
7,385✔
4875
        if c == nil || !s.JetStreamEnabled() {
7,388✔
4876
                return
3✔
4877
        }
3✔
4878
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
7,382✔
4879
        if err != nil {
7,382✔
4880
                s.Warnf(badAPIRequestT, msg)
×
4881
                return
×
4882
        }
×
4883

4884
        var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
7,382✔
4885
        if errorOnRequiredApiLevel(hdr) {
7,383✔
4886
                resp.Error = NewJSRequiredApiLevelError()
1✔
4887
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4888
                return
1✔
4889
        }
1✔
4890

4891
        // Determine if we should proceed here when we are in clustered mode.
4892
        if s.JetStreamIsClustered() {
14,317✔
4893
                js, cc := s.getJetStreamCluster()
6,936✔
4894
                if js == nil || cc == nil {
6,938✔
4895
                        return
2✔
4896
                }
2✔
4897
                if js.isLeaderless() {
6,935✔
4898
                        resp.Error = NewJSClusterNotAvailError()
1✔
4899
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4900
                        return
1✔
4901
                }
1✔
4902
                // Make sure we are meta leader.
4903
                if !s.JetStreamIsLeader() {
11,534✔
4904
                        return
4,601✔
4905
                }
4,601✔
4906
        }
4907

4908
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
2,844✔
4909
                if doErr {
132✔
4910
                        resp.Error = NewJSNotEnabledForAccountError()
65✔
4911
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
65✔
4912
                }
65✔
4913
                return
67✔
4914
        }
4915
        if !isEmptyRequest(msg) {
2,711✔
4916
                resp.Error = NewJSNotEmptyRequestError()
1✔
4917
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4918
                return
1✔
4919
        }
1✔
4920
        stream := streamNameFromSubject(subject)
2,709✔
4921
        consumer := consumerNameFromSubject(subject)
2,709✔
4922

2,709✔
4923
        if s.JetStreamIsClustered() {
5,041✔
4924
                s.jsClusteredConsumerDeleteRequest(ci, acc, stream, consumer, subject, reply, rmsg)
2,332✔
4925
                return
2,332✔
4926
        }
2,332✔
4927

4928
        mset, err := acc.lookupStream(stream)
377✔
4929
        if err != nil {
377✔
4930
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4931
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4932
                return
×
4933
        }
×
4934

4935
        obs := mset.lookupConsumer(consumer)
377✔
4936
        if obs == nil {
537✔
4937
                resp.Error = NewJSConsumerNotFoundError()
160✔
4938
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
160✔
4939
                return
160✔
4940
        }
160✔
4941
        if err := obs.delete(); err != nil {
217✔
4942
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
4943
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4944
                return
×
4945
        }
×
4946
        resp.Success = true
217✔
4947
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
217✔
4948
}
4949

4950
// Request to pause or unpause a Consumer.
4951
func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
64✔
4952
        if c == nil || !s.JetStreamEnabled() {
64✔
4953
                return
×
4954
        }
×
4955
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
64✔
4956
        if err != nil {
64✔
4957
                s.Warnf(badAPIRequestT, msg)
×
4958
                return
×
4959
        }
×
4960

4961
        var req JSApiConsumerPauseRequest
64✔
4962
        var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}}
64✔
4963
        if errorOnRequiredApiLevel(hdr) {
65✔
4964
                resp.Error = NewJSRequiredApiLevelError()
1✔
4965
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4966
                return
1✔
4967
        }
1✔
4968

4969
        if isJSONObjectOrArray(msg) {
118✔
4970
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
55✔
4971
                        resp.Error = NewJSInvalidJSONError(err)
×
4972
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4973
                        return
×
4974
                }
×
4975
        }
4976

4977
        // Determine if we should proceed here when we are in clustered mode.
4978
        isClustered := s.JetStreamIsClustered()
63✔
4979
        js, cc := s.getJetStreamCluster()
63✔
4980
        if isClustered {
117✔
4981
                if js == nil || cc == nil {
54✔
4982
                        return
×
4983
                }
×
4984
                if js.isLeaderless() {
54✔
4985
                        resp.Error = NewJSClusterNotAvailError()
×
4986
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4987
                        return
×
4988
                }
×
4989
                // Make sure we are meta leader.
4990
                if !s.JetStreamIsLeader() {
94✔
4991
                        return
40✔
4992
                }
40✔
4993
        }
4994

4995
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
23✔
4996
                if doErr {
×
4997
                        resp.Error = NewJSNotEnabledForAccountError()
×
4998
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4999
                }
×
5000
                return
×
5001
        }
5002

5003
        stream := streamNameFromSubject(subject)
23✔
5004
        consumer := consumerNameFromSubject(subject)
23✔
5005

23✔
5006
        if isClustered {
37✔
5007
                js.mu.RLock()
14✔
5008
                sa := js.streamAssignment(acc.Name, stream)
14✔
5009
                if sa == nil {
14✔
5010
                        js.mu.RUnlock()
×
5011
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5012
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5013
                        return
×
5014
                }
×
5015
                if sa.unsupported != nil {
14✔
5016
                        js.mu.RUnlock()
×
5017
                        // Just let the request time out.
×
5018
                        return
×
5019
                }
×
5020

5021
                ca, ok := sa.consumers[consumer]
14✔
5022
                if !ok || ca == nil {
14✔
5023
                        js.mu.RUnlock()
×
5024
                        resp.Error = NewJSConsumerNotFoundError()
×
5025
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5026
                        return
×
5027
                }
×
5028
                if ca.unsupported != nil {
14✔
5029
                        js.mu.RUnlock()
×
5030
                        // Just let the request time out.
×
5031
                        return
×
5032
                }
×
5033

5034
                nca := *ca
14✔
5035
                ncfg := *ca.Config
14✔
5036
                nca.Config = &ncfg
14✔
5037
                meta := cc.meta
14✔
5038
                js.mu.RUnlock()
14✔
5039
                pauseUTC := req.PauseUntil.UTC()
14✔
5040
                if !pauseUTC.IsZero() {
24✔
5041
                        nca.Config.PauseUntil = &pauseUTC
10✔
5042
                } else {
14✔
5043
                        nca.Config.PauseUntil = nil
4✔
5044
                }
4✔
5045

5046
                // Update asset version metadata due to updating pause/resume.
5047
                // Only PauseUntil is updated above, so reuse config for both.
5048
                setStaticConsumerMetadata(nca.Config)
14✔
5049

14✔
5050
                eca := encodeAddConsumerAssignment(&nca)
14✔
5051
                meta.Propose(eca)
14✔
5052

14✔
5053
                resp.PauseUntil = pauseUTC
14✔
5054
                if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
24✔
5055
                        resp.PauseRemaining = time.Until(pauseUTC)
10✔
5056
                }
10✔
5057
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
14✔
5058
                return
14✔
5059
        }
5060

5061
        mset, err := acc.lookupStream(stream)
9✔
5062
        if err != nil {
9✔
5063
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5064
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5065
                return
×
5066
        }
×
5067
        if mset.offlineReason != _EMPTY_ {
9✔
5068
                // Just let the request time out.
×
5069
                return
×
5070
        }
×
5071

5072
        obs := mset.lookupConsumer(consumer)
9✔
5073
        if obs == nil {
9✔
5074
                resp.Error = NewJSConsumerNotFoundError()
×
5075
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5076
                return
×
5077
        }
×
5078
        if obs.offlineReason != _EMPTY_ {
9✔
5079
                // Just let the request time out.
×
5080
                return
×
5081
        }
×
5082

5083
        ncfg := obs.cfg
9✔
5084
        pauseUTC := req.PauseUntil.UTC()
9✔
5085
        if !pauseUTC.IsZero() {
14✔
5086
                ncfg.PauseUntil = &pauseUTC
5✔
5087
        } else {
9✔
5088
                ncfg.PauseUntil = nil
4✔
5089
        }
4✔
5090

5091
        // Update asset version metadata due to updating pause/resume.
5092
        setStaticConsumerMetadata(&ncfg)
9✔
5093

9✔
5094
        if err := obs.updateConfig(&ncfg); err != nil {
9✔
5095
                // The only type of error that should be returned here is from o.store,
×
5096
                // so use a store failed error type.
×
5097
                resp.Error = NewJSConsumerStoreFailedError(err)
×
5098
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5099
                return
×
5100
        }
×
5101

5102
        resp.PauseUntil = pauseUTC
9✔
5103
        if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
14✔
5104
                resp.PauseRemaining = time.Until(pauseUTC)
5✔
5105
        }
5✔
5106
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
9✔
5107
}
5108

5109
// sendJetStreamAPIAuditAdvisory will send the audit event for a given event.
5110
func (s *Server) sendJetStreamAPIAuditAdvisory(ci *ClientInfo, acc *Account, subject, request, response string) {
67,518✔
5111
        s.publishAdvisory(acc, JSAuditAdvisory, JSAPIAudit{
67,518✔
5112
                TypedEvent: TypedEvent{
67,518✔
5113
                        Type: JSAPIAuditType,
67,518✔
5114
                        ID:   nuid.Next(),
67,518✔
5115
                        Time: time.Now().UTC(),
67,518✔
5116
                },
67,518✔
5117
                Server:   s.Name(),
67,518✔
5118
                Client:   ci.forAdvisory(),
67,518✔
5119
                Subject:  subject,
67,518✔
5120
                Request:  request,
67,518✔
5121
                Response: response,
67,518✔
5122
                Domain:   s.getOpts().JetStreamDomain,
67,518✔
5123
        })
67,518✔
5124
}
67,518✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc