• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 24949216239

24 Apr 2026 08:34AM UTC coverage: 80.645% (-2.4%) from 83.05%
24949216239

push

github

web-flow
(2.14) [ADDED] `RemoteLeafOpts.IgnoreDiscoveredServers` option (#8067)

For a given leafnode remote, if this is set to true, this remote will
ignore any server leafnode URLs returned by the hub, allowing the user
to fully manage the servers this remote can connect to.

Resolves #8002

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>

74685 of 92610 relevant lines covered (80.64%)

632737.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.22
/server/jetstream_api.go
1
// Copyright 2020-2026 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "cmp"
19
        "encoding/json"
20
        "errors"
21
        "fmt"
22
        "io"
23
        "maps"
24
        "os"
25
        "path/filepath"
26
        "runtime"
27
        "slices"
28
        "strings"
29
        "sync"
30
        "sync/atomic"
31
        "time"
32

33
        "github.com/nats-io/nuid"
34
)
35

36
// Request API subjects for JetStream.
37
const (
38
        // All API endpoints.
39
        jsAllAPI = "$JS.API.>"
40

41
        // For constructing JetStream domain prefixes.
42
        jsDomainAPI = "$JS.%s.API.>"
43

44
        JSApiPrefix = "$JS.API"
45

46
        // JSApiAccountInfo is for obtaining general information about JetStream for this account.
47
        // Will return JSON response.
48
        JSApiAccountInfo = "$JS.API.INFO"
49

50
        // JSApiStreamCreate is the endpoint to create new streams.
51
        // Will return JSON response.
52
        JSApiStreamCreate  = "$JS.API.STREAM.CREATE.*"
53
        JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"
54

55
        // JSApiStreamUpdate is the endpoint to update existing streams.
56
        // Will return JSON response.
57
        JSApiStreamUpdate  = "$JS.API.STREAM.UPDATE.*"
58
        JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"
59

60
        // JSApiStreams is the endpoint to list all stream names for this account.
61
        // Will return JSON response.
62
        JSApiStreams = "$JS.API.STREAM.NAMES"
63
        // JSApiStreamList is the endpoint that will return all detailed stream information
64
        JSApiStreamList = "$JS.API.STREAM.LIST"
65

66
        // JSApiStreamInfo is for obtaining general information about a named stream.
67
        // Will return JSON response.
68
        JSApiStreamInfo  = "$JS.API.STREAM.INFO.*"
69
        JSApiStreamInfoT = "$JS.API.STREAM.INFO.%s"
70

71
        // JSApiStreamDelete is the endpoint to delete streams.
72
        // Will return JSON response.
73
        JSApiStreamDelete  = "$JS.API.STREAM.DELETE.*"
74
        JSApiStreamDeleteT = "$JS.API.STREAM.DELETE.%s"
75

76
        // JSApiStreamPurge is the endpoint to purge streams.
77
        // Will return JSON response.
78
        JSApiStreamPurge  = "$JS.API.STREAM.PURGE.*"
79
        JSApiStreamPurgeT = "$JS.API.STREAM.PURGE.%s"
80

81
        // JSApiStreamSnapshot is the endpoint to snapshot streams.
82
        // Will return a stream of chunks with a nil chunk as EOF to
83
        // the deliver subject. Caller should respond to each chunk
84
        // with a nil body response for ack flow.
85
        JSApiStreamSnapshot  = "$JS.API.STREAM.SNAPSHOT.*"
86
        JSApiStreamSnapshotT = "$JS.API.STREAM.SNAPSHOT.%s"
87

88
        // JSApiStreamRestore is the endpoint to restore a stream from a snapshot.
89
        // Caller should respond to each chunk with a nil body response.
90
        JSApiStreamRestore  = "$JS.API.STREAM.RESTORE.*"
91
        JSApiStreamRestoreT = "$JS.API.STREAM.RESTORE.%s"
92

93
        // JSApiMsgDelete is the endpoint to delete messages from a stream.
94
        // Will return JSON response.
95
        JSApiMsgDelete  = "$JS.API.STREAM.MSG.DELETE.*"
96
        JSApiMsgDeleteT = "$JS.API.STREAM.MSG.DELETE.%s"
97

98
        // JSApiMsgGet is the template for direct requests for a message by its stream sequence number.
99
        // Will return JSON response.
100
        JSApiMsgGet  = "$JS.API.STREAM.MSG.GET.*"
101
        JSApiMsgGetT = "$JS.API.STREAM.MSG.GET.%s"
102

103
        // JSDirectMsgGet is the template for non-api layer direct requests for a message by its stream sequence number or last by subject.
104
        // Will return the message similar to how a consumer receives the message, no JSON processing.
105
        // If the message can not be found we will use a status header of 404. If the stream does not exist the client will get a no-responders or timeout.
106
        JSDirectMsgGet  = "$JS.API.DIRECT.GET.*"
107
        JSDirectMsgGetT = "$JS.API.DIRECT.GET.%s"
108

109
        // This is a direct version of get last by subject, which will be the dominant pattern for KV access once 2.9 is released.
110
        // The stream and the key will be part of the subject to allow for no-marshal payloads and subject based security permissions.
111
        JSDirectGetLastBySubject  = "$JS.API.DIRECT.GET.*.>"
112
        JSDirectGetLastBySubjectT = "$JS.API.DIRECT.GET.%s.%s"
113

114
        // jsDirectGetPre
115
        jsDirectGetPre = "$JS.API.DIRECT.GET"
116

117
        // JSApiConsumerCreate is the endpoint to create consumers for streams.
118
        // This was also the legacy endpoint for ephemeral consumers.
119
        // It now can take consumer name and optional filter subject, which when part of the subject controls access.
120
        // Will return JSON response.
121
        JSApiConsumerCreate    = "$JS.API.CONSUMER.CREATE.*"
122
        JSApiConsumerCreateT   = "$JS.API.CONSUMER.CREATE.%s"
123
        JSApiConsumerCreateEx  = "$JS.API.CONSUMER.CREATE.*.>"
124
        JSApiConsumerCreateExT = "$JS.API.CONSUMER.CREATE.%s.%s.%s"
125

126
        // JSApiDurableCreate is the endpoint to create durable consumers for streams.
127
        // You need to include the stream and consumer name in the subject.
128
        JSApiDurableCreate  = "$JS.API.CONSUMER.DURABLE.CREATE.*.*"
129
        JSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s"
130

131
        // JSApiConsumers is the endpoint to list all consumer names for the stream.
132
        // Will return JSON response.
133
        JSApiConsumers  = "$JS.API.CONSUMER.NAMES.*"
134
        JSApiConsumersT = "$JS.API.CONSUMER.NAMES.%s"
135

136
        // JSApiConsumerList is the endpoint that will return all detailed consumer information
137
        JSApiConsumerList  = "$JS.API.CONSUMER.LIST.*"
138
        JSApiConsumerListT = "$JS.API.CONSUMER.LIST.%s"
139

140
        // JSApiConsumerInfo is for obtaining general information about a consumer.
141
        // Will return JSON response.
142
        JSApiConsumerInfo  = "$JS.API.CONSUMER.INFO.*.*"
143
        JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s"
144

145
        // JSApiConsumerDelete is the endpoint to delete consumers.
146
        // Will return JSON response.
147
        JSApiConsumerDelete  = "$JS.API.CONSUMER.DELETE.*.*"
148
        JSApiConsumerDeleteT = "$JS.API.CONSUMER.DELETE.%s.%s"
149

150
        // JSApiConsumerPause is the endpoint to pause or unpause consumers.
151
        // Will return JSON response.
152
        JSApiConsumerPause  = "$JS.API.CONSUMER.PAUSE.*.*"
153
        JSApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"
154

155
        // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
156
        JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
157

158
        // JSApiConsumerResetT is the prefix for resetting a given consumer to a new starting sequence.
159
        JSApiConsumerResetT = "$JS.API.CONSUMER.RESET.%s.%s"
160

161
        // JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
162
        JSApiConsumerUnpin  = "$JS.API.CONSUMER.UNPIN.*.*"
163
        JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"
164

165
        // jsRequestNextPre
166
        jsRequestNextPre = "$JS.API.CONSUMER.MSG.NEXT."
167

168
        // For snapshots and restores. The ack will have additional tokens.
169
        jsSnapshotAckT    = "$JS.SNAPSHOT.ACK.%s.%s"
170
        jsRestoreDeliverT = "$JS.SNAPSHOT.RESTORE.%s.%s"
171

172
        // JSApiStreamRemovePeer is the endpoint to remove a peer from a clustered stream and its consumers.
173
        // Will return JSON response.
174
        JSApiStreamRemovePeer  = "$JS.API.STREAM.PEER.REMOVE.*"
175
        JSApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.%s"
176

177
        // JSApiStreamLeaderStepDown is the endpoint to have stream leader stepdown.
178
        // Will return JSON response.
179
        JSApiStreamLeaderStepDown  = "$JS.API.STREAM.LEADER.STEPDOWN.*"
180
        JSApiStreamLeaderStepDownT = "$JS.API.STREAM.LEADER.STEPDOWN.%s"
181

182
        // JSApiConsumerLeaderStepDown is the endpoint to have consumer leader stepdown.
183
        // Will return JSON response.
184
        JSApiConsumerLeaderStepDown  = "$JS.API.CONSUMER.LEADER.STEPDOWN.*.*"
185
        JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
186

187
        // JSApiLeaderStepDown is the endpoint to have our metaleader stepdown.
188
        // Only works from system account.
189
        // Will return JSON response.
190
        JSApiLeaderStepDown = "$JS.API.META.LEADER.STEPDOWN"
191

192
        // JSApiRemoveServer is the endpoint to remove a peer server from the cluster.
193
        // Only works from system account.
194
        // Will return JSON response.
195
        JSApiRemoveServer = "$JS.API.SERVER.REMOVE"
196

197
        // JSApiAccountPurge is the endpoint to purge the js content of an account
198
        // Only works from system account.
199
        // Will return JSON response.
200
        JSApiAccountPurge  = "$JS.API.ACCOUNT.PURGE.*"
201
        JSApiAccountPurgeT = "$JS.API.ACCOUNT.PURGE.%s"
202

203
        // JSApiServerStreamMove is the endpoint to move streams off a server
204
        // Only works from system account.
205
        // Will return JSON response.
206
        JSApiServerStreamMove  = "$JS.API.ACCOUNT.STREAM.MOVE.*.*"
207
        JSApiServerStreamMoveT = "$JS.API.ACCOUNT.STREAM.MOVE.%s.%s"
208

209
        // JSApiServerStreamCancelMove is the endpoint to cancel a stream move
210
        // Only works from system account.
211
        // Will return JSON response.
212
        JSApiServerStreamCancelMove  = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.*.*"
213
        JSApiServerStreamCancelMoveT = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.%s.%s"
214

215
        // The prefix for system level account API.
216
        jsAPIAccountPre = "$JS.API.ACCOUNT."
217

218
        // jsAckT is the template for the ack message stream coming back from a consumer
219
        // when they ACK/NAK, etc a message.
220
        jsAckT      = "$JS.ACK.%s.%s"
221
        jsAckTv2    = "$JS.ACK.%s.%s.%s.%s"
222
        jsAckPre    = "$JS.ACK."
223
        jsAckPreLen = len(jsAckPre)
224

225
        // jsFlowControl is for flow control subjects.
226
        jsFlowControlPre = "$JS.FC."
227
        // jsFlowControl is for FC responses.
228
        jsFlowControl   = "$JS.FC.%s.%s.*"
229
        jsFlowControlV2 = "$JS.FC.%s.%s.%s.%s.*"
230

231
        // JSAdvisoryPrefix is a prefix for all JetStream advisories.
232
        JSAdvisoryPrefix = "$JS.EVENT.ADVISORY"
233

234
        // JSMetricPrefix is a prefix for all JetStream metrics.
235
        JSMetricPrefix = "$JS.EVENT.METRIC"
236

237
        // JSMetricConsumerAckPre is a metric containing ack latency.
238
        JSMetricConsumerAckPre = "$JS.EVENT.METRIC.CONSUMER.ACK"
239

240
        // JSAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold.
241
        JSAdvisoryConsumerMaxDeliveryExceedPre = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES"
242

243
        // JSAdvisoryConsumerMsgNakPre is a notification published when a message has been naked
244
        JSAdvisoryConsumerMsgNakPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED"
245

246
        // JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated.
247
        JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"
248

249
        // JSAdvisoryStreamCreatedPre notification that a stream was created.
250
        JSAdvisoryStreamCreatedPre = "$JS.EVENT.ADVISORY.STREAM.CREATED"
251

252
        // JSAdvisoryStreamDeletedPre notification that a stream was deleted.
253
        JSAdvisoryStreamDeletedPre = "$JS.EVENT.ADVISORY.STREAM.DELETED"
254

255
        // JSAdvisoryStreamUpdatedPre notification that a stream was updated.
256
        JSAdvisoryStreamUpdatedPre = "$JS.EVENT.ADVISORY.STREAM.UPDATED"
257

258
        // JSAdvisoryConsumerCreatedPre notification that a consumer was created.
259
        JSAdvisoryConsumerCreatedPre = "$JS.EVENT.ADVISORY.CONSUMER.CREATED"
260

261
        // JSAdvisoryConsumerDeletedPre notification that a consumer was deleted.
262
        JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"
263

264
        // JSAdvisoryConsumerPausePre notification that a consumer paused/unpaused.
265
        JSAdvisoryConsumerPausePre = "$JS.EVENT.ADVISORY.CONSUMER.PAUSE"
266

267
        // JSAdvisoryConsumerPinnedPre notification that a consumer was pinned.
268
        JSAdvisoryConsumerPinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.PINNED"
269

270
        // JSAdvisoryConsumerUnpinnedPre notification that a consumer was unpinned.
271
        JSAdvisoryConsumerUnpinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.UNPINNED"
272

273
        // JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
274
        JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"
275

276
        // JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed.
277
        JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"
278

279
        // JSAdvisoryStreamRestoreCreatePre notification that a restore was start.
280
        JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"
281

282
        // JSAdvisoryStreamRestoreCompletePre notification that a restore was completed.
283
        JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"
284

285
        // JSAdvisoryDomainLeaderElectedPre notification that a jetstream domain has elected a leader.
286
        JSAdvisoryDomainLeaderElected = "$JS.EVENT.ADVISORY.DOMAIN.LEADER_ELECTED"
287

288
        // JSAdvisoryStreamLeaderElectedPre notification that a replicated stream has elected a leader.
289
        JSAdvisoryStreamLeaderElectedPre = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED"
290

291
        // JSAdvisoryStreamQuorumLostPre notification that a stream and its consumers are stalled.
292
        JSAdvisoryStreamQuorumLostPre = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST"
293

294
        // JSAdvisoryStreamBatchAbandonedPre notification that a stream's batch was abandoned.
295
        JSAdvisoryStreamBatchAbandonedPre = "$JS.EVENT.ADVISORY.STREAM.BATCH_ABANDONED"
296

297
        // JSAdvisoryConsumerLeaderElectedPre notification that a replicated consumer has elected a leader.
298
        JSAdvisoryConsumerLeaderElectedPre = "$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED"
299

300
        // JSAdvisoryConsumerQuorumLostPre notification that a consumer is stalled.
301
        JSAdvisoryConsumerQuorumLostPre = "$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST"
302

303
        // JSAdvisoryServerOutOfStorage notification that a server has no more storage.
304
        JSAdvisoryServerOutOfStorage = "$JS.EVENT.ADVISORY.SERVER.OUT_OF_STORAGE"
305

306
        // JSAdvisoryServerRemoved notification that a server has been removed from the system.
307
        JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED"
308

309
        // JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit.
310
        JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED"
311

312
        // JSAuditAdvisory is a notification about JetStream API access.
313
        // FIXME - Add in details about who..
314
        JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
315
)
316

317
// Headers used in $JS.API.> requests.
318
const (
319
        // JSRequiredApiLevel requires the API level of the responding server to have the specified minimum value.
320
        JSRequiredApiLevel = "Nats-Required-Api-Level"
321
)
322

323
var denyAllClientJs = []string{jsAllAPI, "$KV.>", "$OBJ.>"}
324
var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI, "$KV.>", "$OBJ.>"}
325

326
func generateJSMappingTable(domain string) map[string]string {
773✔
327
        mappings := map[string]string{}
773✔
328
        // This set of mappings is very very very ugly.
773✔
329
        // It is a consequence of what we defined the domain prefix to be "$JS.domain.API" and it's mapping to "$JS.API"
773✔
330
        // For optics $KV and $OBJ where made to be independent subject spaces.
773✔
331
        // As materialized views of JS, they did not simply extend that subject space to say "$JS.API.KV" "$JS.API.OBJ"
773✔
332
        // This is very unfortunate!!!
773✔
333
        // Furthermore, it seemed bad to require different domain prefixes for JS/KV/OBJ.
773✔
334
        // Especially since the actual API for say KV, does use stream create from JS.
773✔
335
        // To avoid overlaps KV and OBJ views append the prefix to their API.
773✔
336
        // (Replacing $KV with the prefix allows users to create collisions with say the bucket name)
773✔
337
        // This mapping therefore needs to have extra token so that the mapping can properly discern between $JS, $KV, $OBJ
773✔
338
        for srcMappingSuffix, to := range map[string]string{
773✔
339
                "INFO":       JSApiAccountInfo,
773✔
340
                "STREAM.>":   "$JS.API.STREAM.>",
773✔
341
                "CONSUMER.>": "$JS.API.CONSUMER.>",
773✔
342
                "DIRECT.>":   "$JS.API.DIRECT.>",
773✔
343
                "META.>":     "$JS.API.META.>",
773✔
344
                "SERVER.>":   "$JS.API.SERVER.>",
773✔
345
                "ACCOUNT.>":  "$JS.API.ACCOUNT.>",
773✔
346
                "$KV.>":      "$KV.>",
773✔
347
                "$OBJ.>":     "$OBJ.>",
773✔
348
        } {
7,730✔
349
                mappings[fmt.Sprintf("$JS.%s.API.%s", domain, srcMappingSuffix)] = to
6,957✔
350
        }
6,957✔
351
        return mappings
773✔
352
}
353

354
// JSMaxDescription is the maximum description length for streams and consumers.
355
const JSMaxDescriptionLen = 4 * 1024
356

357
// JSMaxMetadataLen is the maximum length for streams and consumers metadata map.
358
// It's calculated by summing length of all keys and values.
359
const JSMaxMetadataLen = 128 * 1024
360

361
// JSMaxNameLen is the maximum name lengths for streams, consumers and templates.
362
// Picked 255 as it seems to be a widely used file name limit
363
const JSMaxNameLen = 255
364

365
// JSDefaultRequestQueueLimit is the default number of entries that we will
366
// put on the global request queue before we react.
367
const JSDefaultRequestQueueLimit = 10_000
368

369
// Responses for API calls.
370

371
// ApiResponse is a standard response from the JetStream JSON API
372
type ApiResponse struct {
373
        Type  string    `json:"type"`
374
        Error *ApiError `json:"error,omitempty"`
375
}
376

377
const JSApiSystemResponseType = "io.nats.jetstream.api.v1.system_response"
378

379
// When passing back to the clients generalize store failures.
380
var (
381
        errStreamStoreFailed   = errors.New("error creating store for stream")
382
        errConsumerStoreFailed = errors.New("error creating store for consumer")
383
)
384

385
// ToError checks if the response has a error and if it does converts it to an error avoiding
386
// the pitfalls described by https://yourbasic.org/golang/gotcha-why-nil-error-not-equal-nil/
387
func (r *ApiResponse) ToError() error {
1,901✔
388
        if r.Error == nil {
2,928✔
389
                return nil
1,027✔
390
        }
1,027✔
391

392
        return r.Error
874✔
393
}
394

395
const JSApiOverloadedType = "io.nats.jetstream.api.v1.system_overloaded"
396

397
// ApiPaged includes variables used to create paged responses from the JSON API
398
type ApiPaged struct {
399
        Total  int `json:"total"`
400
        Offset int `json:"offset"`
401
        Limit  int `json:"limit"`
402
}
403

404
// ApiPagedRequest includes parameters allowing specific pages to be requests from APIs responding with ApiPaged
405
type ApiPagedRequest struct {
406
        Offset int `json:"offset"`
407
}
408

409
// JSApiAccountInfoResponse reports back information on jetstream for this account.
410
type JSApiAccountInfoResponse struct {
411
        ApiResponse
412
        *JetStreamAccountStats
413
}
414

415
const JSApiAccountInfoResponseType = "io.nats.jetstream.api.v1.account_info_response"
416

417
// JSApiStreamCreateResponse stream creation.
418
type JSApiStreamCreateResponse struct {
419
        ApiResponse
420
        *StreamInfo
421
        DidCreate bool `json:"did_create,omitempty"`
422
}
423

424
const JSApiStreamCreateResponseType = "io.nats.jetstream.api.v1.stream_create_response"
425

426
// JSApiStreamDeleteResponse stream removal.
427
type JSApiStreamDeleteResponse struct {
428
        ApiResponse
429
        Success bool `json:"success,omitempty"`
430
}
431

432
const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"
433

434
// JSMaxSubjectDetails The limit of the number of subject details we will send in a stream info response.
435
const JSMaxSubjectDetails = 100_000
436

437
type JSApiStreamInfoRequest struct {
438
        ApiPagedRequest
439
        DeletedDetails bool   `json:"deleted_details,omitempty"`
440
        SubjectsFilter string `json:"subjects_filter,omitempty"`
441
}
442

443
type JSApiStreamInfoResponse struct {
444
        ApiResponse
445
        ApiPaged
446
        *StreamInfo
447
}
448

449
const JSApiStreamInfoResponseType = "io.nats.jetstream.api.v1.stream_info_response"
450

451
// JSApiNamesLimit is the maximum entries we will return for streams or consumers lists.
452
// TODO(dlc) - with header or request support could request chunked response.
453
const JSApiNamesLimit = 1024
454
const JSApiListLimit = 256
455

456
type JSApiStreamNamesRequest struct {
457
        ApiPagedRequest
458
        // These are filters that can be applied to the list.
459
        Subject string `json:"subject,omitempty"`
460
}
461

462
// JSApiStreamNamesResponse list of streams.
463
// A nil request is valid and means all streams.
464
type JSApiStreamNamesResponse struct {
465
        ApiResponse
466
        ApiPaged
467
        Streams []string `json:"streams"`
468
}
469

470
const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"
471

472
type JSApiStreamListRequest struct {
473
        ApiPagedRequest
474
        // These are filters that can be applied to the list.
475
        Subject string `json:"subject,omitempty"`
476
}
477

478
// JSApiStreamListResponse list of detailed stream information.
479
// A nil request is valid and means all streams.
480
type JSApiStreamListResponse struct {
481
        ApiResponse
482
        ApiPaged
483
        Streams []*StreamInfo     `json:"streams"`
484
        Missing []string          `json:"missing,omitempty"`
485
        Offline map[string]string `json:"offline,omitempty"`
486
}
487

488
const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response"
489

490
// JSApiStreamPurgeRequest is optional request information to the purge API.
491
// Subject will filter the purge request to only messages that match the subject, which can have wildcards.
492
// Sequence will purge up to but not including this sequence and can be combined with subject filtering.
493
// Keep will specify how many messages to keep. This can also be combined with subject filtering.
494
// Note that Sequence and Keep are mutually exclusive, so both can not be set at the same time.
495
type JSApiStreamPurgeRequest struct {
496
        // Purge up to but not including sequence.
497
        Sequence uint64 `json:"seq,omitempty"`
498
        // Subject to match against messages for the purge command.
499
        Subject string `json:"filter,omitempty"`
500
        // Number of messages to keep.
501
        Keep uint64 `json:"keep,omitempty"`
502
}
503

504
type JSApiStreamPurgeResponse struct {
505
        ApiResponse
506
        Success bool   `json:"success,omitempty"`
507
        Purged  uint64 `json:"purged"`
508
}
509

510
const JSApiStreamPurgeResponseType = "io.nats.jetstream.api.v1.stream_purge_response"
511

512
type JSApiConsumerUnpinRequest struct {
513
        Group string `json:"group"`
514
}
515

516
type JSApiConsumerUnpinResponse struct {
517
        ApiResponse
518
}
519

520
const JSApiConsumerUnpinResponseType = "io.nats.jetstream.api.v1.consumer_unpin_response"
521

522
// JSApiStreamUpdateResponse for updating a stream.
523
type JSApiStreamUpdateResponse struct {
524
        ApiResponse
525
        *StreamInfo
526
}
527

528
const JSApiStreamUpdateResponseType = "io.nats.jetstream.api.v1.stream_update_response"
529

530
// JSApiMsgDeleteRequest delete message request.
531
type JSApiMsgDeleteRequest struct {
532
        Seq     uint64 `json:"seq"`
533
        NoErase bool   `json:"no_erase,omitempty"`
534
}
535

536
type JSApiMsgDeleteResponse struct {
537
        ApiResponse
538
        Success bool `json:"success,omitempty"`
539
}
540

541
const JSApiMsgDeleteResponseType = "io.nats.jetstream.api.v1.stream_msg_delete_response"
542

543
type JSApiStreamSnapshotRequest struct {
544
        // Subject to deliver the chunks to for the snapshot.
545
        DeliverSubject string `json:"deliver_subject"`
546
        // Do not include consumers in the snapshot.
547
        NoConsumers bool `json:"no_consumers,omitempty"`
548
        // Optional chunk size preference. Defaults to 128KB,
549
        // automatically clamped to within the range 1KB to 1MB.
550
        // A smaller chunk size means more in-flight messages
551
        // and more acks needed. Links with good throughput
552
        // but high latency may need to increase this.
553
        ChunkSize int `json:"chunk_size,omitempty"`
554
        // Optional window size preference. Defaults to 8MB,
555
        // automatically clamped to within the range 1KB to 32MB.
556
        // very slow connections may need to reduce this to
557
        // avoid slow consumer issues.
558
        WindowSize int `json:"window_size,omitempty"`
559
        // Check all message's checksums prior to snapshot.
560
        CheckMsgs bool `json:"jsck,omitempty"`
561
}
562

563
// JSApiStreamSnapshotResponse is the direct response to the snapshot request.
564
type JSApiStreamSnapshotResponse struct {
565
        ApiResponse
566
        // Configuration of the given stream.
567
        Config *StreamConfig `json:"config,omitempty"`
568
        // Current State for the given stream.
569
        State *StreamState `json:"state,omitempty"`
570
}
571

572
const JSApiStreamSnapshotResponseType = "io.nats.jetstream.api.v1.stream_snapshot_response"
573

574
// JSApiStreamRestoreRequest is the required restore request.
575
type JSApiStreamRestoreRequest struct {
576
        // Configuration of the given stream.
577
        Config StreamConfig `json:"config"`
578
        // Current State for the given stream.
579
        State StreamState `json:"state"`
580
}
581

582
// JSApiStreamRestoreResponse is the direct response to the restore request.
583
type JSApiStreamRestoreResponse struct {
584
        ApiResponse
585
        // Subject to deliver the chunks to for the snapshot restore.
586
        DeliverSubject string `json:"deliver_subject"`
587
}
588

589
const JSApiStreamRestoreResponseType = "io.nats.jetstream.api.v1.stream_restore_response"
590

591
// JSApiStreamRemovePeerRequest is the required remove peer request.
592
type JSApiStreamRemovePeerRequest struct {
593
        // Server name or peer ID of the peer to be removed.
594
        Peer string `json:"peer"`
595
}
596

597
// JSApiStreamRemovePeerResponse is the response to a remove peer request.
598
type JSApiStreamRemovePeerResponse struct {
599
        ApiResponse
600
        Success bool `json:"success,omitempty"`
601
}
602

603
const JSApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response"
604

605
// JSApiStreamLeaderStepDownResponse is the response to a leader stepdown request.
606
type JSApiStreamLeaderStepDownResponse struct {
607
        ApiResponse
608
        Success bool `json:"success,omitempty"`
609
}
610

611
const JSApiStreamLeaderStepDownResponseType = "io.nats.jetstream.api.v1.stream_leader_stepdown_response"
612

613
// JSApiConsumerLeaderStepDownResponse is the response to a consumer leader stepdown request.
614
type JSApiConsumerLeaderStepDownResponse struct {
615
        ApiResponse
616
        Success bool `json:"success,omitempty"`
617
}
618

619
const JSApiConsumerLeaderStepDownResponseType = "io.nats.jetstream.api.v1.consumer_leader_stepdown_response"
620

621
// JSApiLeaderStepdownRequest allows placement control over the meta leader placement.
622
type JSApiLeaderStepdownRequest struct {
623
        Placement *Placement `json:"placement,omitempty"`
624
}
625

626
// JSApiLeaderStepDownResponse is the response to a meta leader stepdown request.
627
type JSApiLeaderStepDownResponse struct {
628
        ApiResponse
629
        Success bool `json:"success,omitempty"`
630
}
631

632
const JSApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_stepdown_response"
633

634
// JSApiMetaServerRemoveRequest will remove a peer from the meta group.
635
type JSApiMetaServerRemoveRequest struct {
636
        // Server name of the peer to be removed.
637
        Server string `json:"peer"`
638
        // Peer ID of the peer to be removed. If specified this is used
639
        // instead of the server name.
640
        Peer string `json:"peer_id,omitempty"`
641
}
642

643
// JSApiMetaServerRemoveResponse is the response to a peer removal request in the meta group.
644
type JSApiMetaServerRemoveResponse struct {
645
        ApiResponse
646
        Success bool `json:"success,omitempty"`
647
}
648

649
const JSApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response"
650

651
// JSApiMetaServerStreamMoveRequest will move a stream on a server to another
652
// response to this will come as JSApiStreamUpdateResponse/JSApiStreamUpdateResponseType
653
type JSApiMetaServerStreamMoveRequest struct {
654
        // Server name of the peer to be evacuated.
655
        Server string `json:"server,omitempty"`
656
        // Cluster the server is in
657
        Cluster string `json:"cluster,omitempty"`
658
        // Domain the sever is in
659
        Domain string `json:"domain,omitempty"`
660
        // Ephemeral placement tags for the move
661
        Tags []string `json:"tags,omitempty"`
662
}
663

664
const JSApiAccountPurgeResponseType = "io.nats.jetstream.api.v1.account_purge_response"
665

666
// JSApiAccountPurgeResponse is the response to a purge request in the meta group.
667
type JSApiAccountPurgeResponse struct {
668
        ApiResponse
669
        Initiated bool `json:"initiated,omitempty"`
670
}
671

672
// JSApiMsgGetRequest get a message request.
673
type JSApiMsgGetRequest struct {
674
        Seq     uint64 `json:"seq,omitempty"`
675
        LastFor string `json:"last_by_subj,omitempty"`
676
        NextFor string `json:"next_by_subj,omitempty"`
677

678
        // Batch support. Used to request more than one msg at a time.
679
        // Can be used with simple starting seq, but also NextFor with wildcards.
680
        Batch int `json:"batch,omitempty"`
681
        // This will make sure we limit how much data we blast out. If not set we will
682
        // inherit the slow consumer default max setting of the server. Default is MAX_PENDING_SIZE.
683
        MaxBytes int `json:"max_bytes,omitempty"`
684
        // Return messages as of this start time.
685
        StartTime *time.Time `json:"start_time,omitempty"`
686

687
        // Multiple response support. Will get the last msgs matching the subjects. These can include wildcards.
688
        MultiLastFor []string `json:"multi_last,omitempty"`
689
        // Only return messages up to this sequence. If not set, will be last sequence for the stream.
690
        UpToSeq uint64 `json:"up_to_seq,omitempty"`
691
        // Only return messages up to this time.
692
        UpToTime *time.Time `json:"up_to_time,omitempty"`
693
        // Only return the message payload, excluding headers if present.
694
        NoHeaders bool `json:"no_hdr,omitempty"`
695
}
696

697
type JSApiMsgGetResponse struct {
698
        ApiResponse
699
        Message *StoredMsg `json:"message,omitempty"`
700
}
701

702
const JSApiMsgGetResponseType = "io.nats.jetstream.api.v1.stream_msg_get_response"
703

704
// JSWaitQueueDefaultMax is the default max number of outstanding requests for pull consumers.
705
const JSWaitQueueDefaultMax = 512
706

707
type JSApiConsumerCreateResponse struct {
708
        ApiResponse
709
        *ConsumerInfo
710
}
711

712
const JSApiConsumerCreateResponseType = "io.nats.jetstream.api.v1.consumer_create_response"
713

714
type JSApiConsumerDeleteResponse struct {
715
        ApiResponse
716
        Success bool `json:"success,omitempty"`
717
}
718

719
const JSApiConsumerDeleteResponseType = "io.nats.jetstream.api.v1.consumer_delete_response"
720

721
type JSApiConsumerPauseRequest struct {
722
        PauseUntil time.Time `json:"pause_until,omitempty"`
723
}
724

725
const JSApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response"
726

727
type JSApiConsumerPauseResponse struct {
728
        ApiResponse
729
        Paused         bool          `json:"paused"`
730
        PauseUntil     time.Time     `json:"pause_until"`
731
        PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
732
}
733

734
type JSApiConsumerInfoResponse struct {
735
        ApiResponse
736
        *ConsumerInfo
737
}
738

739
const JSApiConsumerInfoResponseType = "io.nats.jetstream.api.v1.consumer_info_response"
740

741
type JSApiConsumersRequest struct {
742
        ApiPagedRequest
743
}
744

745
type JSApiConsumerNamesResponse struct {
746
        ApiResponse
747
        ApiPaged
748
        Consumers []string `json:"consumers"`
749
}
750

751
const JSApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_response"
752

753
type JSApiConsumerListResponse struct {
754
        ApiResponse
755
        ApiPaged
756
        Consumers []*ConsumerInfo   `json:"consumers"`
757
        Missing   []string          `json:"missing,omitempty"`
758
        Offline   map[string]string `json:"offline,omitempty"`
759
}
760

761
const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"
762

763
// JSApiConsumerGetNextRequest is for getting next messages for pull based consumers.
764
type JSApiConsumerGetNextRequest struct {
765
        Expires   time.Duration `json:"expires,omitempty"`
766
        Batch     int           `json:"batch,omitempty"`
767
        MaxBytes  int           `json:"max_bytes,omitempty"`
768
        NoWait    bool          `json:"no_wait,omitempty"`
769
        Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
770
        PriorityGroup
771
}
772

773
// JSApiConsumerResetRequest is for resetting a consumer to a specific sequence.
774
type JSApiConsumerResetRequest struct {
775
        Seq uint64 `json:"seq,omitempty"`
776
}
777

778
// JSApiConsumerResetResponse is a superset of JSApiConsumerCreateResponse, but including an explicit ResetSeq.
779
type JSApiConsumerResetResponse struct {
780
        ApiResponse
781
        *ConsumerInfo
782
        ResetSeq uint64 `json:"reset_seq"`
783
}
784

785
const JSApiConsumerResetResponseType = "io.nats.jetstream.api.v1.consumer_reset_response"
786

787
// Structure that holds state for a JetStream API request that is processed
788
// in a separate long-lived go routine. This is to avoid blocking connections.
789
type jsAPIRoutedReq struct {
790
        jsub    *subscription
791
        sub     *subscription
792
        acc     *Account
793
        subject string
794
        reply   string
795
        msg     []byte
796
        pa      pubArg
797
}
798

799
func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) {
143,150✔
800
        // Ignore system level directives meta stepdown and peer remove requests here.
143,150✔
801
        if subject == JSApiLeaderStepDown ||
143,150✔
802
                subject == JSApiRemoveServer ||
143,150✔
803
                strings.HasPrefix(subject, jsAPIAccountPre) {
143,624✔
804
                return
474✔
805
        }
474✔
806
        // No lock needed, those are immutable.
807
        s, rr := js.srv, js.apiSubs.Match(subject)
142,676✔
808

142,676✔
809
        hdr, msg := c.msgParts(rmsg)
142,676✔
810
        if len(sliceHeader(ClientInfoHdr, hdr)) == 0 {
142,683✔
811
                // Check if this is the system account. We will let these through for the account info only.
7✔
812
                sacc := s.SystemAccount()
7✔
813
                if sacc != acc {
7✔
814
                        return
×
815
                }
×
816
                if subject != JSApiAccountInfo {
11✔
817
                        // Only respond from the initial server entry to the NATS system.
4✔
818
                        if c.kind == CLIENT || c.kind == LEAF {
6✔
819
                                var resp = ApiResponse{
2✔
820
                                        Type:  JSApiSystemResponseType,
2✔
821
                                        Error: NewJSNotEnabledForAccountError(),
2✔
822
                                }
2✔
823
                                s.sendAPIErrResponse(nil, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
824
                        }
2✔
825
                        return
4✔
826
                }
827
        }
828

829
        // Short circuit for no interest.
830
        if len(rr.psubs)+len(rr.qsubs) == 0 {
162,707✔
831
                if (c.kind == CLIENT || c.kind == LEAF) && acc != s.SystemAccount() {
20,035✔
832
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
833
                        var resp = ApiResponse{
×
834
                                Type:  JSApiSystemResponseType,
×
835
                                Error: NewJSBadRequestError(),
×
836
                        }
×
837
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
838
                }
×
839
                return
20,035✔
840
        }
841

842
        // We should only have psubs and only 1 per result.
843
        if len(rr.psubs) != 1 {
122,637✔
844
                s.Warnf("Malformed JetStream API Request: [%s] %q", subject, rmsg)
×
845
                if c.kind == CLIENT || c.kind == LEAF {
×
846
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
847
                        var resp = ApiResponse{
×
848
                                Type:  JSApiSystemResponseType,
×
849
                                Error: NewJSBadRequestError(),
×
850
                        }
×
851
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
852
                }
×
853
                return
×
854
        }
855
        jsub := rr.psubs[0]
122,637✔
856

122,637✔
857
        // We need to make sure not to block. We will send the request to a long-lived
122,637✔
858
        // pool of go routines.
122,637✔
859

122,637✔
860
        // Increment inflight. Do this before queueing.
122,637✔
861
        atomic.AddInt64(&js.apiInflight, 1)
122,637✔
862

122,637✔
863
        // Copy the state. Note the JSAPI only uses the hdr index to piece apart the
122,637✔
864
        // header from the msg body. No other references are needed.
122,637✔
865
        // Check pending and warn if getting backed up.
122,637✔
866
        var queue *ipQueue[*jsAPIRoutedReq]
122,637✔
867
        var limit int64
122,637✔
868
        if js.infoSubs.HasInterest(subject) {
193,095✔
869
                queue = s.jsAPIRoutedInfoReqs
70,458✔
870
                limit = atomic.LoadInt64(&js.infoQueueLimit)
70,458✔
871
        } else {
122,637✔
872
                queue = s.jsAPIRoutedReqs
52,179✔
873
                limit = atomic.LoadInt64(&js.queueLimit)
52,179✔
874
        }
52,179✔
875
        pending, _ := queue.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
122,637✔
876
        if pending >= int(limit) {
122,731✔
877
                s.rateLimitFormatWarnf("%s limit reached, dropping %d requests", queue.name, pending)
94✔
878
                drained := int64(queue.drain())
94✔
879
                atomic.AddInt64(&js.apiInflight, -drained)
94✔
880

94✔
881
                s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
94✔
882
                        TypedEvent: TypedEvent{
94✔
883
                                Type: JSAPILimitReachedAdvisoryType,
94✔
884
                                ID:   nuid.Next(),
94✔
885
                                Time: time.Now().UTC(),
94✔
886
                        },
94✔
887
                        Server:  s.Name(),
94✔
888
                        Domain:  js.config.Domain,
94✔
889
                        Dropped: drained,
94✔
890
                })
94✔
891
        }
94✔
892
}
893

894
func (s *Server) processJSAPIRoutedRequests() {
17,112✔
895
        defer s.grWG.Done()
17,112✔
896

17,112✔
897
        s.mu.RLock()
17,112✔
898
        queue, infoqueue := s.jsAPIRoutedReqs, s.jsAPIRoutedInfoReqs
17,112✔
899
        client := &client{srv: s, kind: JETSTREAM}
17,112✔
900
        s.mu.RUnlock()
17,112✔
901

17,112✔
902
        js := s.getJetStream()
17,112✔
903

17,112✔
904
        processFromQueue := func(ipq *ipQueue[*jsAPIRoutedReq]) {
129,723✔
905
                // Only pop one item at a time here, otherwise if the system is recovering
112,611✔
906
                // from queue buildup, then one worker will pull off all the tasks and the
112,611✔
907
                // others will be starved of work.
112,611✔
908
                if r, ok := ipq.popOne(); ok && r != nil {
225,175✔
909
                        client.pa = r.pa
112,564✔
910
                        start := time.Now()
112,564✔
911
                        r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
112,564✔
912
                        if dur := time.Since(start); dur >= readLoopReportThreshold {
112,567✔
913
                                s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
3✔
914
                        }
3✔
915
                        atomic.AddInt64(&js.apiInflight, -1)
112,564✔
916
                }
917
        }
918

919
        for {
146,835✔
920
                // First select case is prioritizing queue, we will only fall through
129,723✔
921
                // to the second select case that considers infoqueue if queue is empty.
129,723✔
922
                // This effectively means infos are deprioritized.
129,723✔
923
                select {
129,723✔
924
                case <-queue.ch:
3,290✔
925
                        processFromQueue(queue)
3,290✔
926
                case <-s.quitCh:
9✔
927
                        return
9✔
928
                default:
126,424✔
929
                        select {
126,424✔
930
                        case <-infoqueue.ch:
70,412✔
931
                                processFromQueue(infoqueue)
70,412✔
932
                        case <-queue.ch:
38,909✔
933
                                processFromQueue(queue)
38,909✔
934
                        case <-s.quitCh:
17,055✔
935
                                return
17,055✔
936
                        }
937
                }
938
        }
939
}
940

941
func (s *Server) setJetStreamExportSubs() error {
4,278✔
942
        js := s.getJetStream()
4,278✔
943
        if js == nil {
4,278✔
944
                return NewJSNotEnabledError()
×
945
        }
×
946

947
        // Start the go routine that will process API requests received by the
948
        // subscription below when they are coming from routes, etc..
949
        const maxProcs = 16
4,278✔
950
        mp := runtime.GOMAXPROCS(0)
4,278✔
951
        // Cap at 16 max for now on larger core setups.
4,278✔
952
        if mp > maxProcs {
4,278✔
953
                mp = maxProcs
×
954
        }
×
955
        s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "JetStream API queue")
4,278✔
956
        s.jsAPIRoutedInfoReqs = newIPQueue[*jsAPIRoutedReq](s, "JetStream API info queue")
4,278✔
957
        for i := 0; i < mp; i++ {
21,390✔
958
                s.startGoRoutine(s.processJSAPIRoutedRequests)
17,112✔
959
        }
17,112✔
960

961
        // This is the catch all now for all JetStream API calls.
962
        if _, err := s.sysSubscribe(jsAllAPI, js.apiDispatch); err != nil {
4,278✔
963
                return err
×
964
        }
×
965

966
        if err := s.SystemAccount().AddServiceExport(jsAllAPI, nil); err != nil {
4,278✔
967
                s.Warnf("Error setting up jetstream service exports: %v", err)
×
968
                return err
×
969
        }
×
970

971
        // API handles themselves.
972
        // infopairs are deprioritized compared to pairs in processJSAPIRoutedRequests.
973
        pairs := []struct {
4,278✔
974
                subject string
4,278✔
975
                handler msgHandler
4,278✔
976
        }{
4,278✔
977
                {JSApiStreamCreate, s.jsStreamCreateRequest},
4,278✔
978
                {JSApiStreamUpdate, s.jsStreamUpdateRequest},
4,278✔
979
                {JSApiStreamDelete, s.jsStreamDeleteRequest},
4,278✔
980
                {JSApiStreamPurge, s.jsStreamPurgeRequest},
4,278✔
981
                {JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
4,278✔
982
                {JSApiStreamRestore, s.jsStreamRestoreRequest},
4,278✔
983
                {JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},
4,278✔
984
                {JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},
4,278✔
985
                {JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
4,278✔
986
                {JSApiMsgDelete, s.jsMsgDeleteRequest},
4,278✔
987
                {JSApiMsgGet, s.jsMsgGetRequest},
4,278✔
988
                {JSApiConsumerCreateEx, s.jsConsumerCreateRequest},
4,278✔
989
                {JSApiConsumerCreate, s.jsConsumerCreateRequest},
4,278✔
990
                {JSApiDurableCreate, s.jsConsumerCreateRequest},
4,278✔
991
                {JSApiConsumerDelete, s.jsConsumerDeleteRequest},
4,278✔
992
                {JSApiConsumerPause, s.jsConsumerPauseRequest},
4,278✔
993
                {JSApiConsumerUnpin, s.jsConsumerUnpinRequest},
4,278✔
994
        }
4,278✔
995
        infopairs := []struct {
4,278✔
996
                subject string
4,278✔
997
                handler msgHandler
4,278✔
998
        }{
4,278✔
999
                {JSApiAccountInfo, s.jsAccountInfoRequest},
4,278✔
1000
                {JSApiStreams, s.jsStreamNamesRequest},
4,278✔
1001
                {JSApiStreamList, s.jsStreamListRequest},
4,278✔
1002
                {JSApiStreamInfo, s.jsStreamInfoRequest},
4,278✔
1003
                {JSApiConsumers, s.jsConsumerNamesRequest},
4,278✔
1004
                {JSApiConsumerList, s.jsConsumerListRequest},
4,278✔
1005
                {JSApiConsumerInfo, s.jsConsumerInfoRequest},
4,278✔
1006
        }
4,278✔
1007

4,278✔
1008
        js.mu.Lock()
4,278✔
1009
        defer js.mu.Unlock()
4,278✔
1010

4,278✔
1011
        // As well as populating js.apiSubs for the dispatch function to use, we
4,278✔
1012
        // will also populate js.infoSubs, so that the dispatch function can
4,278✔
1013
        // decide quickly whether or not the request is an info request or not.
4,278✔
1014
        for _, p := range append(infopairs, pairs...) {
106,950✔
1015
                sub := &subscription{subject: []byte(p.subject), icb: p.handler}
102,672✔
1016
                if err := js.apiSubs.Insert(sub); err != nil {
102,672✔
1017
                        return err
×
1018
                }
×
1019
        }
1020
        for _, p := range infopairs {
34,224✔
1021
                if err := js.infoSubs.Insert(p.subject, struct{}{}); err != nil {
29,946✔
1022
                        return err
×
1023
                }
×
1024
        }
1025

1026
        return nil
4,278✔
1027
}
1028

1029
func (s *Server) sendAPIResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
56,439✔
1030
        acc.trackAPI()
56,439✔
1031
        if reply != _EMPTY_ {
90,831✔
1032
                s.sendInternalAccountMsg(nil, reply, response)
34,392✔
1033
        }
34,392✔
1034
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
56,439✔
1035
}
1036

1037
func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
18,433✔
1038
        acc.trackAPIErr()
18,433✔
1039
        if reply != _EMPTY_ {
32,986✔
1040
                s.sendInternalAccountMsg(nil, reply, response)
14,553✔
1041
        }
14,553✔
1042
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
18,433✔
1043
}
1044

1045
const errRespDelay = 500 * time.Millisecond
1046

1047
type delayedAPIResponse struct {
1048
        ci       *ClientInfo
1049
        acc      *Account
1050
        subject  string
1051
        reply    string
1052
        request  string
1053
        hdr      []byte
1054
        response string
1055
        rg       *raftGroup
1056
        deadline time.Time
1057
        noJs     bool
1058
        next     *delayedAPIResponse
1059
}
1060

1061
// Add `r` in the list that is maintained ordered by the `delayedAPIResponse.deadline` time.
1062
func addDelayedResponse(head, tail **delayedAPIResponse, r *delayedAPIResponse) {
114✔
1063
        // Check if list empty.
114✔
1064
        if *head == nil {
214✔
1065
                *head, *tail = r, r
100✔
1066
                return
100✔
1067
        }
100✔
1068
        // Check if it should be added at the end, which is if after or equal to the tail.
1069
        if r.deadline.After((*tail).deadline) || r.deadline.Equal((*tail).deadline) {
26✔
1070
                (*tail).next, *tail = r, r
12✔
1071
                return
12✔
1072
        }
12✔
1073
        // Find its spot in the list.
1074
        var prev *delayedAPIResponse
2✔
1075
        for c := *head; c != nil; c = c.next {
6✔
1076
                // We insert only if we are stricly before the current `c`.
4✔
1077
                if r.deadline.Before(c.deadline) {
6✔
1078
                        r.next = c
2✔
1079
                        if prev != nil {
3✔
1080
                                prev.next = r
1✔
1081
                        } else {
2✔
1082
                                *head = r
1✔
1083
                        }
1✔
1084
                        return
2✔
1085
                }
1086
                prev = c
2✔
1087
        }
1088
}
1089

1090
func (s *Server) delayedAPIResponder() {
6,572✔
1091
        defer s.grWG.Done()
6,572✔
1092
        var (
6,572✔
1093
                head, tail *delayedAPIResponse // Linked list.
6,572✔
1094
                r          *delayedAPIResponse // Updated by calling next().
6,572✔
1095
                rqch       <-chan struct{}     // Quit channel of the Raft group (if present).
6,572✔
1096
                tm         = time.NewTimer(time.Hour)
6,572✔
1097
        )
6,572✔
1098
        next := func() {
6,777✔
1099
                r, rqch = nil, nil
205✔
1100
                // Check that JetStream is still on. Do not exit the go routine
205✔
1101
                // since JS can be enabled/disabled. The go routine will exit
205✔
1102
                // only if server is shutdown.
205✔
1103
                js := s.getJetStream()
205✔
1104
                if js == nil {
206✔
1105
                        // Reset head and tail here. Also drain the ipQueue.
1✔
1106
                        head, tail = nil, nil
1✔
1107
                        s.delayedAPIResponses.drain()
1✔
1108
                        // Fall back into next "if" that resets timer.
1✔
1109
                }
1✔
1110
                // If there are no delayed messages then delay the timer for
1111
                // a while.
1112
                if head == nil {
305✔
1113
                        tm.Reset(time.Hour)
100✔
1114
                        return
100✔
1115
                }
100✔
1116
                // Get the first expected message and then reset the timer.
1117
                r = head
105✔
1118
                js.mu.RLock()
105✔
1119
                if r.rg != nil && r.rg.node != nil {
106✔
1120
                        // If there's an attached Raft group to the delayed response
1✔
1121
                        // then pull out the quit channel, so that we don't bother
1✔
1122
                        // sending responses for entities which are now no longer
1✔
1123
                        // running.
1✔
1124
                        rqch = r.rg.node.QuitC()
1✔
1125
                }
1✔
1126
                js.mu.RUnlock()
105✔
1127
                tm.Reset(time.Until(r.deadline))
105✔
1128
        }
1129
        pop := func() {
6,676✔
1130
                if head == nil {
104✔
1131
                        return
×
1132
                }
×
1133
                head = head.next
104✔
1134
                if head == nil {
203✔
1135
                        tail = nil
99✔
1136
                }
99✔
1137
        }
1138
        for {
13,362✔
1139
                select {
6,790✔
1140
                case <-s.delayedAPIResponses.ch:
114✔
1141
                        v, ok := s.delayedAPIResponses.popOne()
114✔
1142
                        if !ok {
114✔
1143
                                continue
×
1144
                        }
1145
                        // Add it to the list, and if ends up being the head, set things up.
1146
                        addDelayedResponse(&head, &tail, v)
114✔
1147
                        if v == head {
215✔
1148
                                next()
101✔
1149
                        }
101✔
1150
                case <-s.quitCh:
6,562✔
1151
                        return
6,562✔
1152
                case <-rqch:
1✔
1153
                        // If we were the head, drop and setup things for next.
1✔
1154
                        if r != nil && r == head {
2✔
1155
                                pop()
1✔
1156
                        }
1✔
1157
                        next()
1✔
1158
                case <-tm.C:
103✔
1159
                        if r != nil {
206✔
1160
                                // If it's not a JS API error, send it as a raw response without additional API/audit tracking.
103✔
1161
                                if r.noJs {
139✔
1162
                                        s.sendInternalAccountMsgWithReply(r.acc, r.subject, _EMPTY_, r.hdr, r.response, false)
36✔
1163
                                } else {
103✔
1164
                                        s.sendAPIErrResponse(r.ci, r.acc, r.subject, r.reply, r.request, r.response)
67✔
1165
                                }
67✔
1166
                                pop()
103✔
1167
                        }
1168
                        next()
103✔
1169
                }
1170
        }
1171
}
1172

1173
func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup, duration time.Duration) {
78✔
1174
        s.delayedAPIResponses.push(&delayedAPIResponse{
78✔
1175
                ci, acc, subject, reply, request, nil, response, rg, time.Now().Add(duration), false, nil,
78✔
1176
        })
78✔
1177
}
78✔
1178

1179
func (s *Server) sendDelayedErrResponse(acc *Account, subject string, hdr []byte, response string, duration time.Duration) {
36✔
1180
        s.delayedAPIResponses.push(&delayedAPIResponse{
36✔
1181
                nil, acc, subject, _EMPTY_, _EMPTY_, hdr, response, nil, time.Now().Add(duration), true, nil,
36✔
1182
        })
36✔
1183
}
36✔
1184

1185
func (s *Server) getRequestInfo(c *client, raw []byte) (pci *ClientInfo, acc *Account, hdr, msg []byte, err error) {
113,338✔
1186
        hdr, msg = c.msgParts(raw)
113,338✔
1187
        var ci ClientInfo
113,338✔
1188

113,338✔
1189
        if len(hdr) > 0 {
226,594✔
1190
                if err := json.Unmarshal(sliceHeader(ClientInfoHdr, hdr), &ci); err != nil {
113,256✔
1191
                        return nil, nil, nil, nil, err
×
1192
                }
×
1193
        }
1194

1195
        if ci.Service != _EMPTY_ {
113,400✔
1196
                acc, _ = s.LookupAccount(ci.Service)
62✔
1197
        } else if ci.Account != _EMPTY_ {
226,532✔
1198
                acc, _ = s.LookupAccount(ci.Account)
113,194✔
1199
        } else {
113,276✔
1200
                // Direct $SYS access.
82✔
1201
                acc = c.acc
82✔
1202
                if acc == nil {
91✔
1203
                        acc = s.SystemAccount()
9✔
1204
                }
9✔
1205
        }
1206
        if acc == nil {
113,348✔
1207
                return nil, nil, nil, nil, ErrMissingAccount
10✔
1208
        }
10✔
1209
        return &ci, acc, hdr, msg, nil
113,328✔
1210
}
1211

1212
func (s *Server) unmarshalRequest(c *client, acc *Account, subject string, msg []byte, v any) error {
24,809✔
1213
        decoder := json.NewDecoder(bytes.NewReader(msg))
24,809✔
1214
        decoder.DisallowUnknownFields()
24,809✔
1215

24,809✔
1216
        for {
74,418✔
1217
                if err := decoder.Decode(v); err != nil {
74,418✔
1218
                        if err == io.EOF {
49,612✔
1219
                                return nil
24,803✔
1220
                        }
24,803✔
1221

1222
                        var syntaxErr *json.SyntaxError
6✔
1223
                        if errors.As(err, &syntaxErr) {
6✔
1224
                                err = fmt.Errorf("%w at offset %d", err, syntaxErr.Offset)
×
1225
                        }
×
1226

1227
                        c.RateLimitWarnf("Invalid JetStream request '%s > %s': %s", acc, subject, err)
6✔
1228

6✔
1229
                        if js := s.getJetStream(); js != nil && js.config.Strict {
12✔
1230
                                return err
6✔
1231
                        }
6✔
1232

1233
                        return json.Unmarshal(msg, v)
×
1234
                }
1235
        }
1236
}
1237

1238
func (a *Account) trackAPI() {
56,439✔
1239
        a.mu.RLock()
56,439✔
1240
        jsa := a.js
56,439✔
1241
        a.mu.RUnlock()
56,439✔
1242
        if jsa != nil {
112,838✔
1243
                jsa.usageMu.Lock()
56,399✔
1244
                jsa.usageApi++
56,399✔
1245
                jsa.apiTotal++
56,399✔
1246
                jsa.sendClusterUsageUpdate()
56,399✔
1247
                atomic.AddInt64(&jsa.js.apiTotal, 1)
56,399✔
1248
                jsa.usageMu.Unlock()
56,399✔
1249
        }
56,399✔
1250
}
1251

1252
func (a *Account) trackAPIErr() {
18,433✔
1253
        a.mu.RLock()
18,433✔
1254
        jsa := a.js
18,433✔
1255
        a.mu.RUnlock()
18,433✔
1256
        if jsa != nil {
36,740✔
1257
                jsa.usageMu.Lock()
18,307✔
1258
                jsa.usageApi++
18,307✔
1259
                jsa.apiTotal++
18,307✔
1260
                jsa.usageErr++
18,307✔
1261
                jsa.apiErrors++
18,307✔
1262
                jsa.sendClusterUsageUpdate()
18,307✔
1263
                atomic.AddInt64(&jsa.js.apiTotal, 1)
18,307✔
1264
                atomic.AddInt64(&jsa.js.apiErrors, 1)
18,307✔
1265
                jsa.usageMu.Unlock()
18,307✔
1266
        }
18,307✔
1267
}
1268

1269
const badAPIRequestT = "Malformed JetStream API Request: %q"
1270

1271
// Helper function to check on JetStream being enabled but also on status of leafnodes
1272
// If the local account is not enabled but does have leafnode connectivity we will not
1273
// want to error immediately and let the other side decide.
1274
func (a *Account) checkJetStream() (enabled, shouldError bool) {
46,321✔
1275
        a.mu.RLock()
46,321✔
1276
        defer a.mu.RUnlock()
46,321✔
1277
        return a.js != nil, a.nleafs+a.nrleafs == 0
46,321✔
1278
}
46,321✔
1279

1280
// Request for current usage and limits for this account.
1281
func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
456✔
1282
        if c == nil || !s.JetStreamEnabled() {
456✔
1283
                return
×
1284
        }
×
1285

1286
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
456✔
1287
        if err != nil {
457✔
1288
                s.Warnf(badAPIRequestT, msg)
1✔
1289
                return
1✔
1290
        }
1✔
1291

1292
        var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}}
455✔
1293

455✔
1294
        // Determine if we should proceed here when we are in clustered mode.
455✔
1295
        if s.JetStreamIsClustered() {
833✔
1296
                js, cc := s.getJetStreamCluster()
378✔
1297
                if js == nil || cc == nil {
378✔
1298
                        return
×
1299
                }
×
1300
                if js.isLeaderless() {
379✔
1301
                        resp.Error = NewJSClusterNotAvailError()
1✔
1302
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1303
                        return
1✔
1304
                }
1✔
1305
                // Make sure we are meta leader.
1306
                if !s.JetStreamIsLeader() {
638✔
1307
                        return
261✔
1308
                }
261✔
1309
        }
1310

1311
        if errorOnRequiredApiLevel(hdr) {
194✔
1312
                resp.Error = NewJSRequiredApiLevelError()
1✔
1313
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1314
                return
1✔
1315
        }
1✔
1316

1317
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
198✔
1318
                if !doErr {
7✔
1319
                        return
1✔
1320
                }
1✔
1321
                resp.Error = NewJSNotEnabledForAccountError()
5✔
1322
        } else {
186✔
1323
                stats := acc.JetStreamUsage()
186✔
1324
                resp.JetStreamAccountStats = &stats
186✔
1325
        }
186✔
1326
        b, err := json.Marshal(resp)
191✔
1327
        if err != nil {
191✔
1328
                return
×
1329
        }
×
1330

1331
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), string(b))
191✔
1332
}
1333

1334
// Helpers for token extraction.
1335
func streamNameFromSubject(subject string) string {
84,154✔
1336
        return tokenAt(subject, 5)
84,154✔
1337
}
84,154✔
1338

1339
func consumerNameFromSubject(subject string) string {
51,586✔
1340
        return tokenAt(subject, 6)
51,586✔
1341
}
51,586✔
1342

1343
func (s *Server) jsonResponse(v any) string {
75,528✔
1344
        b, err := json.Marshal(v)
75,528✔
1345
        if err != nil {
75,528✔
1346
                s.Warnf("Problem marshaling JSON for JetStream API:", err)
×
1347
                return ""
×
1348
        }
×
1349
        return string(b)
75,528✔
1350
}
1351

1352
// Read lock must be held
1353
func (jsa *jsAccount) tieredReservation(tier string, cfg *StreamConfig) int64 {
2,386✔
1354
        var reservation int64
2,386✔
1355
        for _, sa := range jsa.streams {
4,463✔
1356
                // Don't count the stream toward the limit if it already exists.
2,077✔
1357
                if sa.cfg.Name == cfg.Name {
2,196✔
1358
                        continue
119✔
1359
                }
1360
                if (tier == _EMPTY_ || isSameTier(&sa.cfg, cfg)) && sa.cfg.MaxBytes > 0 && sa.cfg.Storage == cfg.Storage {
1,973✔
1361
                        // If tier is empty, all storage is flat and we should adjust for replicas.
15✔
1362
                        // Otherwise if tiered, storage replication already taken into consideration.
15✔
1363
                        if tier == _EMPTY_ && sa.cfg.Replicas > 1 {
16✔
1364
                                reservation = addSaturate(reservation, mulSaturate(int64(sa.cfg.Replicas), sa.cfg.MaxBytes))
1✔
1365
                        } else {
15✔
1366
                                reservation = addSaturate(reservation, sa.cfg.MaxBytes)
14✔
1367
                        }
14✔
1368
                }
1369
        }
1370
        return reservation
2,386✔
1371
}
1372

1373
// Request to create a stream.
1374
func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
11,303✔
1375
        if c == nil || !s.JetStreamEnabled() {
11,571✔
1376
                return
268✔
1377
        }
268✔
1378
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
11,035✔
1379
        if err != nil {
11,038✔
1380
                s.Warnf(badAPIRequestT, msg)
3✔
1381
                return
3✔
1382
        }
3✔
1383

1384
        var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
11,032✔
1385

11,032✔
1386
        // Determine if we should proceed here when we are in clustered mode.
11,032✔
1387
        if s.JetStreamIsClustered() {
21,004✔
1388
                js, cc := s.getJetStreamCluster()
9,972✔
1389
                if js == nil || cc == nil {
9,972✔
1390
                        return
×
1391
                }
×
1392
                if js.isLeaderless() {
9,973✔
1393
                        resp.Error = NewJSClusterNotAvailError()
1✔
1394
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1395
                        return
1✔
1396
                }
1✔
1397
                // Make sure we are meta leader.
1398
                if !s.JetStreamIsLeader() {
16,803✔
1399
                        return
6,832✔
1400
                }
6,832✔
1401
        }
1402

1403
        if errorOnRequiredApiLevel(hdr) {
4,200✔
1404
                resp.Error = NewJSRequiredApiLevelError()
1✔
1405
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1406
                return
1✔
1407
        }
1✔
1408

1409
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
4,206✔
1410
                if doErr {
8✔
1411
                        resp.Error = NewJSNotEnabledForAccountError()
×
1412
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1413
                }
×
1414
                return
8✔
1415
        }
1416

1417
        var cfg StreamConfigRequest
4,190✔
1418
        if err := s.unmarshalRequest(c, acc, subject, msg, &cfg); err != nil {
4,191✔
1419
                resp.Error = NewJSInvalidJSONError(err)
1✔
1420
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1421
                return
1✔
1422
        }
1✔
1423

1424
        // Initialize asset version metadata.
1425
        setStaticStreamMetadata(&cfg.StreamConfig)
4,189✔
1426

4,189✔
1427
        streamName := streamNameFromSubject(subject)
4,189✔
1428
        if streamName != cfg.Name {
4,190✔
1429
                resp.Error = NewJSStreamMismatchError()
1✔
1430
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1431
                return
1✔
1432
        }
1✔
1433

1434
        // Check for path like separators in the name.
1435
        if strings.ContainsAny(streamName, `\/`) {
4,190✔
1436
                resp.Error = NewJSStreamNameContainsPathSeparatorsError()
2✔
1437
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1438
                return
2✔
1439
        }
2✔
1440

1441
        // Can't create a stream with a sealed state.
1442
        if cfg.Sealed {
4,188✔
1443
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed"))
2✔
1444
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1445
                return
2✔
1446
        }
2✔
1447

1448
        // If we are told to do mirror direct but are not mirroring, error.
1449
        if cfg.MirrorDirect && cfg.Mirror == nil {
4,184✔
1450
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream has no mirror but does have mirror direct"))
×
1451
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1452
                return
×
1453
        }
×
1454

1455
        // Hand off to cluster for processing.
1456
        if s.JetStreamIsClustered() {
7,321✔
1457
                s.jsClusteredStreamRequest(ci, acc, subject, reply, rmsg, &cfg)
3,137✔
1458
                return
3,137✔
1459
        }
3,137✔
1460

1461
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg.StreamConfig); err != nil {
1,055✔
1462
                resp.Error = err
8✔
1463
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
8✔
1464
                return
8✔
1465
        }
8✔
1466

1467
        mset, err := acc.addStreamPedantic(&cfg.StreamConfig, cfg.Pedantic)
1,039✔
1468
        if err != nil {
1,115✔
1469
                if IsNatsErr(err, JSStreamStoreFailedF) {
76✔
1470
                        s.Warnf("Stream create failed for '%s > %s': %v", acc, streamName, err)
×
1471
                        err = errStreamStoreFailed
×
1472
                }
×
1473
                resp.Error = NewJSStreamCreateError(err, Unless(err))
76✔
1474
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
76✔
1475
                return
76✔
1476
        }
1477
        msetCfg := mset.config()
963✔
1478
        resp.StreamInfo = &StreamInfo{
963✔
1479
                Created:   mset.createdTime(),
963✔
1480
                State:     mset.state(),
963✔
1481
                Config:    *setDynamicStreamMetadata(&msetCfg),
963✔
1482
                TimeStamp: time.Now().UTC(),
963✔
1483
                Mirror:    mset.mirrorInfo(),
963✔
1484
                Sources:   mset.sourcesInfo(),
963✔
1485
        }
963✔
1486
        resp.DidCreate = true
963✔
1487
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
963✔
1488
}
1489

1490
// Request to update a stream.
1491
func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
940✔
1492
        if c == nil || !s.JetStreamEnabled() {
940✔
1493
                return
×
1494
        }
×
1495

1496
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
940✔
1497
        if err != nil {
940✔
1498
                s.Warnf(badAPIRequestT, msg)
×
1499
                return
×
1500
        }
×
1501

1502
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
940✔
1503

940✔
1504
        // Determine if we should proceed here when we are in clustered mode.
940✔
1505
        if s.JetStreamIsClustered() {
1,773✔
1506
                js, cc := s.getJetStreamCluster()
833✔
1507
                if js == nil || cc == nil {
833✔
1508
                        return
×
1509
                }
×
1510
                if js.isLeaderless() {
835✔
1511
                        resp.Error = NewJSClusterNotAvailError()
2✔
1512
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1513
                        return
2✔
1514
                }
2✔
1515
                // Make sure we are meta leader.
1516
                if !s.JetStreamIsLeader() {
1,449✔
1517
                        return
618✔
1518
                }
618✔
1519
        }
1520

1521
        if errorOnRequiredApiLevel(hdr) {
321✔
1522
                resp.Error = NewJSRequiredApiLevelError()
1✔
1523
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1524
                return
1✔
1525
        }
1✔
1526

1527
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
319✔
1528
                if doErr {
×
1529
                        resp.Error = NewJSNotEnabledForAccountError()
×
1530
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1531
                }
×
1532
                return
×
1533
        }
1534
        var ncfg StreamConfigRequest
319✔
1535
        if err := s.unmarshalRequest(c, acc, subject, msg, &ncfg); err != nil {
320✔
1536
                resp.Error = NewJSInvalidJSONError(err)
1✔
1537
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1538
                return
1✔
1539
        }
1✔
1540

1541
        cfg, apiErr := s.checkStreamCfg(&ncfg.StreamConfig, acc, ncfg.Pedantic)
318✔
1542
        if apiErr != nil {
337✔
1543
                resp.Error = apiErr
19✔
1544
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
19✔
1545
                return
19✔
1546
        }
19✔
1547

1548
        streamName := streamNameFromSubject(subject)
299✔
1549
        if streamName != cfg.Name {
300✔
1550
                resp.Error = NewJSStreamMismatchError()
1✔
1551
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1552
                return
1✔
1553
        }
1✔
1554

1555
        // Handle clustered version here.
1556
        if s.JetStreamIsClustered() {
506✔
1557
                s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
208✔
1558
                return
208✔
1559
        }
208✔
1560

1561
        mset, err := acc.lookupStream(streamName)
90✔
1562
        if err != nil {
94✔
1563
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
1564
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
1565
                return
4✔
1566
        }
4✔
1567
        if mset.offlineReason != _EMPTY_ {
86✔
1568
                resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason))
×
1569
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
1570
                return
×
1571
        }
×
1572

1573
        // Update asset version metadata.
1574
        setStaticStreamMetadata(&cfg)
86✔
1575

86✔
1576
        if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil {
99✔
1577
                resp.Error = NewJSStreamUpdateError(err, Unless(err))
13✔
1578
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
13✔
1579
                return
13✔
1580
        }
13✔
1581

1582
        msetCfg := mset.config()
73✔
1583
        resp.StreamInfo = &StreamInfo{
73✔
1584
                Created:   mset.createdTime(),
73✔
1585
                State:     mset.state(),
73✔
1586
                Config:    *setDynamicStreamMetadata(&msetCfg),
73✔
1587
                Domain:    s.getOpts().JetStreamDomain,
73✔
1588
                Mirror:    mset.mirrorInfo(),
73✔
1589
                Sources:   mset.sourcesInfo(),
73✔
1590
                TimeStamp: time.Now().UTC(),
73✔
1591
        }
73✔
1592
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
73✔
1593
}
1594

1595
// Request for the list of all stream names.
1596
func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,234✔
1597
        if c == nil || !s.JetStreamEnabled() {
1,234✔
1598
                return
×
1599
        }
×
1600
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
1,234✔
1601
        if err != nil {
1,234✔
1602
                s.Warnf(badAPIRequestT, msg)
×
1603
                return
×
1604
        }
×
1605

1606
        var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}}
1,234✔
1607

1,234✔
1608
        // Determine if we should proceed here when we are in clustered mode.
1,234✔
1609
        if s.JetStreamIsClustered() {
2,276✔
1610
                js, cc := s.getJetStreamCluster()
1,042✔
1611
                if js == nil || cc == nil {
1,042✔
1612
                        return
×
1613
                }
×
1614
                if js.isLeaderless() {
1,042✔
1615
                        resp.Error = NewJSClusterNotAvailError()
×
1616
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1617
                        return
×
1618
                }
×
1619
                // Make sure we are meta leader.
1620
                if !s.JetStreamIsLeader() {
1,802✔
1621
                        return
760✔
1622
                }
760✔
1623
        }
1624

1625
        if errorOnRequiredApiLevel(hdr) {
475✔
1626
                resp.Error = NewJSRequiredApiLevelError()
1✔
1627
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1628
                return
1✔
1629
        }
1✔
1630

1631
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
474✔
1632
                if doErr {
1✔
1633
                        resp.Error = NewJSNotEnabledForAccountError()
×
1634
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1635
                }
×
1636
                return
1✔
1637
        }
1638

1639
        var offset int
472✔
1640
        var filter string
472✔
1641

472✔
1642
        if isJSONObjectOrArray(msg) {
854✔
1643
                var req JSApiStreamNamesRequest
382✔
1644
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
382✔
1645
                        resp.Error = NewJSInvalidJSONError(err)
×
1646
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1647
                        return
×
1648
                }
×
1649
                offset = max(req.Offset, 0)
382✔
1650
                if req.Subject != _EMPTY_ {
738✔
1651
                        filter = req.Subject
356✔
1652
                }
356✔
1653
        }
1654

1655
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1656
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1657
        var numStreams int
472✔
1658
        if s.JetStreamIsClustered() {
754✔
1659
                js, cc := s.getJetStreamCluster()
282✔
1660
                if js == nil || cc == nil {
282✔
1661
                        // TODO(dlc) - Debug or Warn?
×
1662
                        return
×
1663
                }
×
1664
                js.mu.RLock()
282✔
1665
                for stream, sa := range cc.streams[acc.Name] {
608✔
1666
                        if IsNatsErr(sa.err, JSClusterNotAssignedErr) {
326✔
1667
                                continue
×
1668
                        }
1669
                        if filter != _EMPTY_ {
614✔
1670
                                // These could not have subjects auto-filled in since they are raw and unprocessed.
288✔
1671
                                if len(sa.Config.Subjects) == 0 {
290✔
1672
                                        if SubjectsCollide(filter, sa.Config.Name) {
2✔
1673
                                                resp.Streams = append(resp.Streams, stream)
×
1674
                                        }
×
1675
                                } else {
286✔
1676
                                        for _, subj := range sa.Config.Subjects {
572✔
1677
                                                if SubjectsCollide(filter, subj) {
519✔
1678
                                                        resp.Streams = append(resp.Streams, stream)
233✔
1679
                                                        break
233✔
1680
                                                }
1681
                                        }
1682
                                }
1683
                        } else {
38✔
1684
                                resp.Streams = append(resp.Streams, stream)
38✔
1685
                        }
38✔
1686
                }
1687
                js.mu.RUnlock()
282✔
1688
                if len(resp.Streams) > 1 {
283✔
1689
                        slices.Sort(resp.Streams)
1✔
1690
                }
1✔
1691
                numStreams = len(resp.Streams)
282✔
1692
                if offset > numStreams {
282✔
1693
                        offset = numStreams
×
1694
                }
×
1695
                if offset > 0 {
282✔
1696
                        resp.Streams = resp.Streams[offset:]
×
1697
                }
×
1698
                if len(resp.Streams) > JSApiNamesLimit {
282✔
1699
                        resp.Streams = resp.Streams[:JSApiNamesLimit]
×
1700
                }
×
1701
        } else {
190✔
1702
                msets := acc.filteredStreams(filter)
190✔
1703
                // Since we page results order matters.
190✔
1704
                if len(msets) > 1 {
195✔
1705
                        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
14✔
1706
                }
1707

1708
                numStreams = len(msets)
190✔
1709
                if offset > numStreams {
190✔
1710
                        offset = numStreams
×
1711
                }
×
1712

1713
                for _, mset := range msets[offset:] {
389✔
1714
                        resp.Streams = append(resp.Streams, mset.cfg.Name)
199✔
1715
                        if len(resp.Streams) >= JSApiNamesLimit {
199✔
1716
                                break
×
1717
                        }
1718
                }
1719
        }
1720
        resp.Total = numStreams
472✔
1721
        resp.Limit = JSApiNamesLimit
472✔
1722
        resp.Offset = offset
472✔
1723

472✔
1724
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
472✔
1725
}
1726

1727
// Request for the list of all detailed stream info.
1728
// TODO(dlc) - combine with above long term
1729
func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
68✔
1730
        if c == nil || !s.JetStreamEnabled() {
68✔
1731
                return
×
1732
        }
×
1733
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
68✔
1734
        if err != nil {
68✔
1735
                s.Warnf(badAPIRequestT, msg)
×
1736
                return
×
1737
        }
×
1738

1739
        var resp = JSApiStreamListResponse{
68✔
1740
                ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
68✔
1741
                Streams:     []*StreamInfo{},
68✔
1742
        }
68✔
1743

68✔
1744
        // Determine if we should proceed here when we are in clustered mode.
68✔
1745
        if s.JetStreamIsClustered() {
129✔
1746
                js, cc := s.getJetStreamCluster()
61✔
1747
                if js == nil || cc == nil {
61✔
1748
                        return
×
1749
                }
×
1750
                if js.isLeaderless() {
62✔
1751
                        resp.Error = NewJSClusterNotAvailError()
1✔
1752
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1753
                        return
1✔
1754
                }
1✔
1755
                // Make sure we are meta leader.
1756
                if !s.JetStreamIsLeader() {
103✔
1757
                        return
43✔
1758
                }
43✔
1759
        }
1760

1761
        if errorOnRequiredApiLevel(hdr) {
25✔
1762
                resp.Error = NewJSRequiredApiLevelError()
1✔
1763
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1764
                return
1✔
1765
        }
1✔
1766

1767
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
23✔
1768
                if doErr {
×
1769
                        resp.Error = NewJSNotEnabledForAccountError()
×
1770
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1771
                }
×
1772
                return
×
1773
        }
1774

1775
        var offset int
23✔
1776
        var filter string
23✔
1777

23✔
1778
        if isJSONObjectOrArray(msg) {
35✔
1779
                var req JSApiStreamListRequest
12✔
1780
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
1781
                        resp.Error = NewJSInvalidJSONError(err)
1✔
1782
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1783
                        return
1✔
1784
                }
1✔
1785
                offset = max(req.Offset, 0)
11✔
1786
                if req.Subject != _EMPTY_ {
13✔
1787
                        filter = req.Subject
2✔
1788
                }
2✔
1789
        }
1790

1791
        // Clustered mode will invoke a scatter and gather.
1792
        if s.JetStreamIsClustered() {
39✔
1793
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
17✔
1794
                msg = copyBytes(msg)
17✔
1795
                s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) })
34✔
1796
                return
17✔
1797
        }
1798

1799
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1800
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1801
        var msets []*stream
5✔
1802
        if filter == _EMPTY_ {
9✔
1803
                msets = acc.streams()
4✔
1804
        } else {
5✔
1805
                msets = acc.filteredStreams(filter)
1✔
1806
        }
1✔
1807

1808
        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
6✔
1809

1810
        scnt := len(msets)
5✔
1811
        if offset > scnt {
5✔
1812
                offset = scnt
×
1813
        }
×
1814

1815
        var missingNames []string
5✔
1816
        for _, mset := range msets[offset:] {
11✔
1817
                if mset.offlineReason != _EMPTY_ {
7✔
1818
                        if resp.Offline == nil {
2✔
1819
                                resp.Offline = make(map[string]string, 1)
1✔
1820
                        }
1✔
1821
                        resp.Offline[mset.getCfgName()] = mset.offlineReason
1✔
1822
                        missingNames = append(missingNames, mset.getCfgName())
1✔
1823
                        continue
1✔
1824
                }
1825

1826
                config := mset.config()
5✔
1827
                resp.Streams = append(resp.Streams, &StreamInfo{
5✔
1828
                        Created:   mset.createdTime(),
5✔
1829
                        State:     mset.state(),
5✔
1830
                        Config:    config,
5✔
1831
                        Domain:    s.getOpts().JetStreamDomain,
5✔
1832
                        Mirror:    mset.mirrorInfo(),
5✔
1833
                        Sources:   mset.sourcesInfo(),
5✔
1834
                        TimeStamp: time.Now().UTC(),
5✔
1835
                })
5✔
1836
                if len(resp.Streams) >= JSApiListLimit {
5✔
1837
                        break
×
1838
                }
1839
        }
1840
        resp.Total = scnt
5✔
1841
        resp.Limit = JSApiListLimit
5✔
1842
        resp.Offset = offset
5✔
1843
        resp.Missing = missingNames
5✔
1844
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
5✔
1845
}
1846

1847
// Request for information about a stream.
1848
func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
26,021✔
1849
        if c == nil || !s.JetStreamEnabled() {
26,027✔
1850
                return
6✔
1851
        }
6✔
1852
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
26,015✔
1853
        if err != nil {
26,020✔
1854
                s.Warnf(badAPIRequestT, msg)
5✔
1855
                return
5✔
1856
        }
5✔
1857

1858
        streamName := streamNameFromSubject(subject)
26,010✔
1859

26,010✔
1860
        var resp = JSApiStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamInfoResponseType}}
26,010✔
1861

26,010✔
1862
        // If someone creates a duplicate stream that is identical we will get this request forwarded to us.
26,010✔
1863
        // Make sure the response type is for a create call.
26,010✔
1864
        if rt := getHeader(JSResponseType, hdr); len(rt) > 0 && string(rt) == jsCreateResponse {
26,010✔
1865
                resp.ApiResponse.Type = JSApiStreamCreateResponseType
×
1866
        }
×
1867

1868
        var clusterWideConsCount int
26,010✔
1869

26,010✔
1870
        js, cc := s.getJetStreamCluster()
26,010✔
1871
        if js == nil {
26,010✔
1872
                return
×
1873
        }
×
1874
        // If we are in clustered mode we need to be the stream leader to proceed.
1875
        if cc != nil {
38,636✔
1876
                // Check to make sure the stream is assigned.
12,626✔
1877
                js.mu.RLock()
12,626✔
1878
                isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, streamName)
12,626✔
1879
                var offline bool
12,626✔
1880
                if sa != nil {
24,105✔
1881
                        clusterWideConsCount = len(sa.consumers)
11,479✔
1882
                        offline = s.allPeersOffline(sa.Group)
11,479✔
1883
                        if sa.unsupported != nil && sa.Group != nil && cc.meta != nil && sa.Group.isMember(cc.meta.ID()) {
11,509✔
1884
                                // If we're a member for this stream, and it's not supported, report it as offline.
30✔
1885
                                resp.Error = NewJSStreamOfflineReasonError(errors.New(sa.unsupported.reason))
30✔
1886
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
30✔
1887
                                js.mu.RUnlock()
30✔
1888
                                return
30✔
1889
                        }
30✔
1890
                }
1891
                js.mu.RUnlock()
12,596✔
1892

12,596✔
1893
                if isLeader && sa == nil {
12,834✔
1894
                        // We can't find the stream, so mimic what would be the errors below.
238✔
1895
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
238✔
1896
                                if doErr {
×
1897
                                        resp.Error = NewJSNotEnabledForAccountError()
×
1898
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1899
                                }
×
1900
                                return
×
1901
                        }
1902
                        // No stream present.
1903
                        resp.Error = NewJSStreamNotFoundError()
238✔
1904
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
238✔
1905
                        return
238✔
1906
                } else if sa == nil {
13,267✔
1907
                        if js.isLeaderless() {
909✔
1908
                                resp.Error = NewJSClusterNotAvailError()
×
1909
                                // Delaying an error response gives the leader a chance to respond before us
×
1910
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
1911
                        }
×
1912
                        return
909✔
1913
                } else if isLeader && offline {
11,452✔
1914
                        resp.Error = NewJSStreamOfflineError()
3✔
1915
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
3✔
1916
                        return
3✔
1917
                }
3✔
1918

1919
                // Check to see if we are a member of the group and if the group has no leader.
1920
                isLeaderless := js.isGroupLeaderless(sa.Group)
11,446✔
1921

11,446✔
1922
                // We have the stream assigned and a leader, so only the stream leader should answer.
11,446✔
1923
                if !acc.JetStreamIsStreamLeader(streamName) && !isLeaderless {
20,622✔
1924
                        if js.isLeaderless() {
9,176✔
1925
                                resp.Error = NewJSClusterNotAvailError()
×
1926
                                // Delaying an error response gives the leader a chance to respond before us
×
1927
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), sa.Group, errRespDelay)
×
1928
                                return
×
1929
                        }
×
1930

1931
                        // We may be in process of electing a leader, but if this is a scale up from 1 we will still be the state leader
1932
                        // while the new members work through the election and catchup process.
1933
                        // Double check for that instead of exiting here and being silent. e.g. nats stream update test --replicas=3
1934
                        js.mu.RLock()
9,176✔
1935
                        rg := sa.Group
9,176✔
1936
                        var ourID string
9,176✔
1937
                        if cc.meta != nil {
18,352✔
1938
                                ourID = cc.meta.ID()
9,176✔
1939
                        }
9,176✔
1940
                        // We have seen cases where rg is nil at this point,
1941
                        // so check explicitly and bail if that is the case.
1942
                        bail := rg == nil || !rg.isMember(ourID)
9,176✔
1943
                        if !bail {
12,542✔
1944
                                // We know we are a member here, if this group is new and we are preferred allow us to answer.
3,366✔
1945
                                // Also, we have seen cases where rg.node is nil at this point,
3,366✔
1946
                                // so check explicitly and bail if that is the case.
3,366✔
1947
                                bail = rg.Preferred != ourID || (rg.node != nil && time.Since(rg.node.Created()) > lostQuorumIntervalDefault)
3,366✔
1948
                        }
3,366✔
1949
                        js.mu.RUnlock()
9,176✔
1950
                        if bail {
18,339✔
1951
                                return
9,163✔
1952
                        }
9,163✔
1953
                }
1954
        }
1955

1956
        if errorOnRequiredApiLevel(hdr) {
15,668✔
1957
                resp.Error = NewJSRequiredApiLevelError()
1✔
1958
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1959
                return
1✔
1960
        }
1✔
1961

1962
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
15,672✔
1963
                if doErr {
7✔
1964
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
1965
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1966
                }
1✔
1967
                return
6✔
1968
        }
1969

1970
        var details bool
15,660✔
1971
        var subjects string
15,660✔
1972
        var offset int
15,660✔
1973
        if isJSONObjectOrArray(msg) {
15,768✔
1974
                var req JSApiStreamInfoRequest
108✔
1975
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
109✔
1976
                        resp.Error = NewJSInvalidJSONError(err)
1✔
1977
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1978
                        return
1✔
1979
                }
1✔
1980
                details, subjects = req.DeletedDetails, req.SubjectsFilter
107✔
1981
                offset = max(req.Offset, 0)
107✔
1982
        }
1983

1984
        mset, err := acc.lookupStream(streamName)
15,659✔
1985
        // Error is not to be expected at this point, but could happen if same stream trying to be created.
15,659✔
1986
        if err != nil {
15,970✔
1987
                if cc != nil {
311✔
1988
                        // This could be inflight, pause for a short bit and try again.
×
1989
                        // This will not be inline, so ok.
×
1990
                        time.Sleep(10 * time.Millisecond)
×
1991
                        mset, err = acc.lookupStream(streamName)
×
1992
                }
×
1993
                // Check again.
1994
                if err != nil {
622✔
1995
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
311✔
1996
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
311✔
1997
                        return
311✔
1998
                }
311✔
1999
        }
2000

2001
        if mset.offlineReason != _EMPTY_ {
15,349✔
2002
                resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason))
1✔
2003
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
2004
                return
1✔
2005
        }
1✔
2006

2007
        config := mset.config()
15,347✔
2008
        resp.StreamInfo = &StreamInfo{
15,347✔
2009
                Created:    mset.createdTime(),
15,347✔
2010
                State:      mset.stateWithDetail(details),
15,347✔
2011
                Config:     *setDynamicStreamMetadata(&config),
15,347✔
2012
                Domain:     s.getOpts().JetStreamDomain,
15,347✔
2013
                Cluster:    js.clusterInfo(mset.raftGroup()),
15,347✔
2014
                Mirror:     mset.mirrorInfo(),
15,347✔
2015
                Sources:    mset.sourcesInfo(),
15,347✔
2016
                Alternates: js.streamAlternates(ci, config.Name),
15,347✔
2017
                TimeStamp:  time.Now().UTC(),
15,347✔
2018
        }
15,347✔
2019
        if clusterWideConsCount > 0 {
15,843✔
2020
                resp.StreamInfo.State.Consumers = clusterWideConsCount
496✔
2021
        }
496✔
2022

2023
        // Check if they have asked for subject details.
2024
        if subjects != _EMPTY_ {
15,452✔
2025
                st := mset.store.SubjectsTotals(subjects)
105✔
2026
                if lst := len(st); lst > 0 {
177✔
2027
                        // Common for both cases.
72✔
2028
                        resp.Offset = offset
72✔
2029
                        resp.Limit = JSMaxSubjectDetails
72✔
2030
                        resp.Total = lst
72✔
2031

72✔
2032
                        if offset == 0 && lst <= JSMaxSubjectDetails {
144✔
2033
                                resp.StreamInfo.State.Subjects = st
72✔
2034
                        } else {
72✔
2035
                                // Here we have to filter list due to offset or maximum constraints.
×
2036
                                subjs := make([]string, 0, len(st))
×
2037
                                for subj := range st {
×
2038
                                        subjs = append(subjs, subj)
×
2039
                                }
×
2040
                                // Sort it
2041
                                slices.Sort(subjs)
×
2042

×
2043
                                if offset > len(subjs) {
×
2044
                                        offset = len(subjs)
×
2045
                                }
×
2046

2047
                                end := offset + JSMaxSubjectDetails
×
2048
                                if end > len(subjs) {
×
2049
                                        end = len(subjs)
×
2050
                                }
×
2051
                                actualSize := end - offset
×
2052
                                var sd map[string]uint64
×
2053

×
2054
                                if actualSize > 0 {
×
2055
                                        sd = make(map[string]uint64, actualSize)
×
2056
                                        for _, ss := range subjs[offset:end] {
×
2057
                                                sd[ss] = st[ss]
×
2058
                                        }
×
2059
                                }
2060
                                resp.StreamInfo.State.Subjects = sd
×
2061
                        }
2062
                }
2063
        }
2064
        // Check for out of band catchups.
2065
        if mset.hasCatchupPeers() {
15,347✔
2066
                mset.checkClusterInfo(resp.StreamInfo.Cluster)
×
2067
        }
×
2068

2069
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
15,347✔
2070
}
2071

2072
// Request to have a stream leader stepdown.
2073
func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
268✔
2074
        if c == nil || !s.JetStreamEnabled() {
268✔
2075
                return
×
2076
        }
×
2077
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
268✔
2078
        if err != nil {
268✔
2079
                s.Warnf(badAPIRequestT, msg)
×
2080
                return
×
2081
        }
×
2082

2083
        // Have extra token for this one.
2084
        name := tokenAt(subject, 6)
268✔
2085

268✔
2086
        var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}}
268✔
2087

268✔
2088
        // If we are not in clustered mode this is a failed request.
268✔
2089
        if !s.JetStreamIsClustered() {
269✔
2090
                resp.Error = NewJSClusterRequiredError()
1✔
2091
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2092
                return
1✔
2093
        }
1✔
2094

2095
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2096
        js, cc := s.getJetStreamCluster()
267✔
2097
        if js == nil || cc == nil {
267✔
2098
                return
×
2099
        }
×
2100
        if js.isLeaderless() {
267✔
2101
                resp.Error = NewJSClusterNotAvailError()
×
2102
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2103
                return
×
2104
        }
×
2105

2106
        js.mu.RLock()
267✔
2107
        isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, name)
267✔
2108
        js.mu.RUnlock()
267✔
2109

267✔
2110
        if isLeader && sa == nil {
267✔
2111
                resp.Error = NewJSStreamNotFoundError()
×
2112
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2113
                return
×
2114
        } else if sa == nil {
268✔
2115
                return
1✔
2116
        }
1✔
2117

2118
        if errorOnRequiredApiLevel(hdr) {
266✔
2119
                resp.Error = NewJSRequiredApiLevelError()
×
2120
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2121
                return
×
2122
        }
×
2123

2124
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
266✔
2125
                if doErr {
×
2126
                        resp.Error = NewJSNotEnabledForAccountError()
×
2127
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2128
                }
×
2129
                return
×
2130
        }
2131

2132
        // Check to see if we are a member of the group and if the group has no leader.
2133
        if js.isGroupLeaderless(sa.Group) {
266✔
2134
                resp.Error = NewJSClusterNotAvailError()
×
2135
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2136
                return
×
2137
        }
×
2138

2139
        // We have the stream assigned and a leader, so only the stream leader should answer.
2140
        if !acc.JetStreamIsStreamLeader(name) {
469✔
2141
                return
203✔
2142
        }
203✔
2143

2144
        mset, err := acc.lookupStream(name)
63✔
2145
        if err != nil {
63✔
2146
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
2147
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2148
                return
×
2149
        }
×
2150

2151
        if mset == nil {
63✔
2152
                resp.Success = true
×
2153
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2154
                return
×
2155
        }
×
2156

2157
        node := mset.raftNode()
63✔
2158
        if node == nil {
63✔
2159
                resp.Success = true
×
2160
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2161
                return
×
2162
        }
×
2163

2164
        var preferredLeader string
63✔
2165
        if isJSONObjectOrArray(msg) {
75✔
2166
                var req JSApiLeaderStepdownRequest
12✔
2167
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
2168
                        resp.Error = NewJSInvalidJSONError(err)
×
2169
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2170
                        return
×
2171
                }
×
2172
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(node, req.Placement); resp.Error != nil {
17✔
2173
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2174
                        return
5✔
2175
                }
5✔
2176
        }
2177

2178
        // Call actual stepdown.
2179
        err = node.StepDown(preferredLeader)
58✔
2180
        if err != nil {
58✔
2181
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2182
        } else {
58✔
2183
                resp.Success = true
58✔
2184
        }
58✔
2185
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
58✔
2186
}
2187

2188
// Request to have a consumer leader stepdown.
2189
func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
222✔
2190
        if c == nil || !s.JetStreamEnabled() {
222✔
2191
                return
×
2192
        }
×
2193
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
222✔
2194
        if err != nil {
222✔
2195
                s.Warnf(badAPIRequestT, msg)
×
2196
                return
×
2197
        }
×
2198

2199
        var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}}
222✔
2200

222✔
2201
        // If we are not in clustered mode this is a failed request.
222✔
2202
        if !s.JetStreamIsClustered() {
223✔
2203
                resp.Error = NewJSClusterRequiredError()
1✔
2204
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2205
                return
1✔
2206
        }
1✔
2207

2208
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2209
        js, cc := s.getJetStreamCluster()
221✔
2210
        if js == nil || cc == nil {
221✔
2211
                return
×
2212
        }
×
2213
        if js.isLeaderless() {
221✔
2214
                resp.Error = NewJSClusterNotAvailError()
×
2215
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2216
                return
×
2217
        }
×
2218

2219
        // Have extra token for this one.
2220
        stream := tokenAt(subject, 6)
221✔
2221
        consumer := tokenAt(subject, 7)
221✔
2222

221✔
2223
        js.mu.RLock()
221✔
2224
        isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, stream)
221✔
2225
        js.mu.RUnlock()
221✔
2226

221✔
2227
        if isLeader && sa == nil {
221✔
2228
                resp.Error = NewJSStreamNotFoundError()
×
2229
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2230
                return
×
2231
        } else if sa == nil {
221✔
2232
                return
×
2233
        }
×
2234

2235
        if errorOnRequiredApiLevel(hdr) {
221✔
2236
                resp.Error = NewJSRequiredApiLevelError()
×
2237
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2238
                return
×
2239
        }
×
2240

2241
        var ca *consumerAssignment
221✔
2242
        if sa.consumers != nil {
442✔
2243
                ca = sa.consumers[consumer]
221✔
2244
        }
221✔
2245
        if ca == nil {
221✔
2246
                resp.Error = NewJSConsumerNotFoundError()
×
2247
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2248
                return
×
2249
        }
×
2250
        // Check to see if we are a member of the group and if the group has no leader.
2251
        if js.isGroupLeaderless(ca.Group) {
221✔
2252
                resp.Error = NewJSClusterNotAvailError()
×
2253
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2254
                return
×
2255
        }
×
2256

2257
        if !acc.JetStreamIsConsumerLeader(stream, consumer) {
381✔
2258
                return
160✔
2259
        }
160✔
2260

2261
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
61✔
2262
                if doErr {
×
2263
                        resp.Error = NewJSNotEnabledForAccountError()
×
2264
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2265
                }
×
2266
                return
×
2267
        }
2268

2269
        mset, err := acc.lookupStream(stream)
61✔
2270
        if err != nil {
61✔
2271
                resp.Error = NewJSStreamNotFoundError()
×
2272
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2273
                return
×
2274
        }
×
2275
        o := mset.lookupConsumer(consumer)
61✔
2276
        if o == nil {
61✔
2277
                resp.Error = NewJSConsumerNotFoundError()
×
2278
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2279
                return
×
2280
        }
×
2281

2282
        n := o.raftNode()
61✔
2283
        if n == nil {
61✔
2284
                resp.Success = true
×
2285
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2286
                return
×
2287
        }
×
2288

2289
        var preferredLeader string
61✔
2290
        if isJSONObjectOrArray(msg) {
73✔
2291
                var req JSApiLeaderStepdownRequest
12✔
2292
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
2293
                        resp.Error = NewJSInvalidJSONError(err)
×
2294
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2295
                        return
×
2296
                }
×
2297
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(n, req.Placement); resp.Error != nil {
17✔
2298
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2299
                        return
5✔
2300
                }
5✔
2301
        }
2302

2303
        // Call actual stepdown.
2304
        err = n.StepDown(preferredLeader)
56✔
2305
        if err != nil {
56✔
2306
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2307
        } else {
56✔
2308
                resp.Success = true
56✔
2309
        }
56✔
2310
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
56✔
2311
}
2312

2313
// Request to remove a peer from a clustered stream.
2314
func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
75✔
2315
        if c == nil || !s.JetStreamEnabled() {
75✔
2316
                return
×
2317
        }
×
2318
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
75✔
2319
        if err != nil {
75✔
2320
                s.Warnf(badAPIRequestT, msg)
×
2321
                return
×
2322
        }
×
2323

2324
        // Have extra token for this one.
2325
        name := tokenAt(subject, 6)
75✔
2326

75✔
2327
        var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}}
75✔
2328

75✔
2329
        // If we are not in clustered mode this is a failed request.
75✔
2330
        if !s.JetStreamIsClustered() {
76✔
2331
                resp.Error = NewJSClusterRequiredError()
1✔
2332
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2333
                return
1✔
2334
        }
1✔
2335

2336
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2337
        js, cc := s.getJetStreamCluster()
74✔
2338
        if js == nil || cc == nil {
74✔
2339
                return
×
2340
        }
×
2341
        if js.isLeaderless() {
74✔
2342
                resp.Error = NewJSClusterNotAvailError()
×
2343
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2344
                return
×
2345
        }
×
2346

2347
        js.mu.RLock()
74✔
2348
        isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, name)
74✔
2349
        js.mu.RUnlock()
74✔
2350

74✔
2351
        // Make sure we are meta leader.
74✔
2352
        if !isLeader {
136✔
2353
                return
62✔
2354
        }
62✔
2355

2356
        if errorOnRequiredApiLevel(hdr) {
12✔
2357
                resp.Error = NewJSRequiredApiLevelError()
×
2358
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2359
                return
×
2360
        }
×
2361

2362
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
12✔
2363
                if doErr {
×
2364
                        resp.Error = NewJSNotEnabledForAccountError()
×
2365
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2366
                }
×
2367
                return
×
2368
        }
2369
        if isEmptyRequest(msg) {
12✔
2370
                resp.Error = NewJSBadRequestError()
×
2371
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2372
                return
×
2373
        }
×
2374

2375
        var req JSApiStreamRemovePeerRequest
12✔
2376
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
2377
                resp.Error = NewJSInvalidJSONError(err)
×
2378
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2379
                return
×
2380
        }
×
2381
        if req.Peer == _EMPTY_ {
12✔
2382
                resp.Error = NewJSBadRequestError()
×
2383
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2384
                return
×
2385
        }
×
2386

2387
        if sa == nil {
12✔
2388
                // No stream present.
×
2389
                resp.Error = NewJSStreamNotFoundError()
×
2390
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2391
                return
×
2392
        }
×
2393

2394
        js.mu.RLock()
12✔
2395
        rg := sa.Group
12✔
2396

12✔
2397
        // Check to see if we are a member of the group.
12✔
2398
        // Peer here is either a peer ID or a server name, convert to node name.
12✔
2399
        nodeName := getHash(req.Peer)
12✔
2400
        isMember := rg.isMember(nodeName)
12✔
2401
        if !isMember {
14✔
2402
                nodeName = req.Peer
2✔
2403
                isMember = rg.isMember(nodeName)
2✔
2404
        }
2✔
2405
        js.mu.RUnlock()
12✔
2406

12✔
2407
        // Make sure we are a member.
12✔
2408
        if !isMember {
13✔
2409
                resp.Error = NewJSClusterPeerNotMemberError()
1✔
2410
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2411
                return
1✔
2412
        }
1✔
2413

2414
        // If we are here we have a valid peer member set for removal.
2415
        if !js.removePeerFromStream(sa, nodeName) {
13✔
2416
                resp.Error = NewJSPeerRemapError()
2✔
2417
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2418
                return
2✔
2419
        }
2✔
2420

2421
        resp.Success = true
9✔
2422
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
9✔
2423
}
2424

2425
// Request to have the metaleader remove a peer from the system.
2426
func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
13✔
2427
        if c == nil || !s.JetStreamEnabled() {
13✔
2428
                return
×
2429
        }
×
2430

2431
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
13✔
2432
        if err != nil {
13✔
2433
                s.Warnf(badAPIRequestT, msg)
×
2434
                return
×
2435
        }
×
2436
        if acc != s.SystemAccount() {
13✔
2437
                return
×
2438
        }
×
2439

2440
        js, cc := s.getJetStreamCluster()
13✔
2441
        if js == nil || cc == nil {
13✔
2442
                return
×
2443
        }
×
2444

2445
        js.mu.RLock()
13✔
2446
        isLeader := cc.isLeader()
13✔
2447
        meta := cc.meta
13✔
2448
        js.mu.RUnlock()
13✔
2449

13✔
2450
        // Extra checks here but only leader is listening.
13✔
2451
        if !isLeader {
13✔
2452
                return
×
2453
        }
×
2454

2455
        var resp = JSApiMetaServerRemoveResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerRemoveResponseType}}
13✔
2456
        if errorOnRequiredApiLevel(hdr) {
13✔
2457
                resp.Error = NewJSRequiredApiLevelError()
×
2458
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2459
                return
×
2460
        }
×
2461

2462
        if isEmptyRequest(msg) {
13✔
2463
                resp.Error = NewJSBadRequestError()
×
2464
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2465
                return
×
2466
        }
×
2467

2468
        var req JSApiMetaServerRemoveRequest
13✔
2469
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2470
                resp.Error = NewJSInvalidJSONError(err)
×
2471
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2472
                return
×
2473
        }
×
2474

2475
        js.mu.Lock()
13✔
2476
        defer js.mu.Unlock()
13✔
2477

13✔
2478
        // Another peer-remove is already in progress, don't allow multiple concurrent changes.
13✔
2479
        if cc.peerRemoveReply != nil {
15✔
2480
                resp.Error = NewJSClusterServerMemberChangeInflightError()
2✔
2481
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2482
                return
2✔
2483
        }
2✔
2484

2485
        var found string
11✔
2486
        for _, p := range meta.Peers() {
49✔
2487
                // If Peer is specified, it takes precedence
38✔
2488
                if req.Peer != _EMPTY_ {
42✔
2489
                        if p.ID == req.Peer {
5✔
2490
                                found = req.Peer
1✔
2491
                                break
1✔
2492
                        }
2493
                        continue
3✔
2494
                }
2495
                si, ok := s.nodeToInfo.Load(p.ID)
34✔
2496
                if ok && si.(nodeInfo).name == req.Server {
40✔
2497
                        found = p.ID
6✔
2498
                        break
6✔
2499
                }
2500
        }
2501

2502
        if found == _EMPTY_ {
15✔
2503
                resp.Error = NewJSClusterServerNotMemberError()
4✔
2504
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
2505
                return
4✔
2506
        }
4✔
2507

2508
        if err := meta.ProposeRemovePeer(found); err != nil {
7✔
2509
                if err == errMembershipChange {
×
2510
                        resp.Error = NewJSClusterServerMemberChangeInflightError()
×
2511
                } else {
×
2512
                        resp.Error = NewJSRaftGeneralError(err)
×
2513
                }
×
2514
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2515
                return
×
2516
        }
2517

2518
        if cc.peerRemoveReply == nil {
14✔
2519
                cc.peerRemoveReply = make(map[string]peerRemoveInfo, 1)
7✔
2520
        }
7✔
2521
        // Only copy the request, the subject and reply are already copied.
2522
        cc.peerRemoveReply[found] = peerRemoveInfo{ci: ci, subject: subject, reply: reply, request: string(msg)}
7✔
2523
}
2524

2525
func (s *Server) peerSetToNames(ps []string) []string {
172✔
2526
        names := make([]string, len(ps))
172✔
2527
        for i := 0; i < len(ps); i++ {
640✔
2528
                if si, ok := s.nodeToInfo.Load(ps[i]); !ok {
468✔
2529
                        names[i] = ps[i]
×
2530
                } else {
468✔
2531
                        names[i] = si.(nodeInfo).name
468✔
2532
                }
468✔
2533
        }
2534
        return names
172✔
2535
}
2536

2537
// looks up the peer id for a given server name. Cluster and domain name are optional filter criteria
2538
func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string {
29✔
2539
        js.mu.RLock()
29✔
2540
        defer js.mu.RUnlock()
29✔
2541
        if cc := js.cluster; cc != nil {
58✔
2542
                for _, p := range cc.meta.Peers() {
185✔
2543
                        si, ok := s.nodeToInfo.Load(p.ID)
156✔
2544
                        if ok && si.(nodeInfo).name == serverName {
185✔
2545
                                if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
58✔
2546
                                        if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
58✔
2547
                                                return p.ID
29✔
2548
                                        }
29✔
2549
                                }
2550
                        }
2551
                }
2552
        }
2553
        return _EMPTY_
×
2554
}
2555

2556
// Request to have the metaleader move a stream on a peer to another
2557
func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
33✔
2558
        if c == nil || !s.JetStreamEnabled() {
33✔
2559
                return
×
2560
        }
×
2561

2562
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
33✔
2563
        if err != nil {
33✔
2564
                s.Warnf(badAPIRequestT, msg)
×
2565
                return
×
2566
        }
×
2567

2568
        js, cc := s.getJetStreamCluster()
33✔
2569
        if js == nil || cc == nil {
33✔
2570
                return
×
2571
        }
×
2572

2573
        // Extra checks here but only leader is listening.
2574
        js.mu.RLock()
33✔
2575
        isLeader := cc.isLeader()
33✔
2576
        js.mu.RUnlock()
33✔
2577

33✔
2578
        if !isLeader {
33✔
2579
                return
×
2580
        }
×
2581

2582
        accName := tokenAt(subject, 6)
33✔
2583
        streamName := tokenAt(subject, 7)
33✔
2584

33✔
2585
        if acc.GetName() != accName && acc != s.SystemAccount() {
33✔
2586
                return
×
2587
        }
×
2588

2589
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
33✔
2590
        if errorOnRequiredApiLevel(hdr) {
33✔
2591
                resp.Error = NewJSRequiredApiLevelError()
×
2592
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2593
                return
×
2594
        }
×
2595

2596
        var req JSApiMetaServerStreamMoveRequest
33✔
2597
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
33✔
2598
                resp.Error = NewJSInvalidJSONError(err)
×
2599
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2600
                return
×
2601
        }
×
2602

2603
        srcPeer := _EMPTY_
33✔
2604
        if req.Server != _EMPTY_ {
62✔
2605
                srcPeer = s.nameToPeer(js, req.Server, req.Cluster, req.Domain)
29✔
2606
        }
29✔
2607

2608
        targetAcc, ok := s.accounts.Load(accName)
33✔
2609
        if !ok {
33✔
2610
                resp.Error = NewJSNoAccountError()
×
2611
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2612
                return
×
2613
        }
×
2614

2615
        var streamFound bool
33✔
2616
        cfg := StreamConfig{}
33✔
2617
        currPeers := []string{}
33✔
2618
        currCluster := _EMPTY_
33✔
2619
        js.mu.Lock()
33✔
2620
        streams, ok := cc.streams[accName]
33✔
2621
        if ok {
66✔
2622
                sa, ok := streams[streamName]
33✔
2623
                if ok {
66✔
2624
                        cfg = *sa.Config.clone()
33✔
2625
                        streamFound = true
33✔
2626
                        currPeers = sa.Group.Peers
33✔
2627
                        currCluster = sa.Group.Cluster
33✔
2628
                }
33✔
2629
        }
2630
        js.mu.Unlock()
33✔
2631

33✔
2632
        if !streamFound {
33✔
2633
                resp.Error = NewJSStreamNotFoundError()
×
2634
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2635
                return
×
2636
        }
×
2637

2638
        // if server was picked, make sure src peer exists and move it to first position.
2639
        // removal will drop peers from the left
2640
        if req.Server != _EMPTY_ {
62✔
2641
                if srcPeer == _EMPTY_ {
29✔
2642
                        resp.Error = NewJSClusterServerNotMemberError()
×
2643
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2644
                        return
×
2645
                }
×
2646
                var peerFound bool
29✔
2647
                for i := 0; i < len(currPeers); i++ {
83✔
2648
                        if currPeers[i] == srcPeer {
83✔
2649
                                copy(currPeers[1:], currPeers[:i])
29✔
2650
                                currPeers[0] = srcPeer
29✔
2651
                                peerFound = true
29✔
2652
                                break
29✔
2653
                        }
2654
                }
2655
                if !peerFound {
29✔
2656
                        resp.Error = NewJSClusterPeerNotMemberError()
×
2657
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2658
                        return
×
2659
                }
×
2660
        }
2661

2662
        // make sure client is scoped to requested account
2663
        ciNew := *(ci)
33✔
2664
        ciNew.Account = accName
33✔
2665

33✔
2666
        // backup placement such that peers can be looked up with modified tag list
33✔
2667
        var origPlacement *Placement
33✔
2668
        if cfg.Placement != nil {
33✔
2669
                tmp := *cfg.Placement
×
2670
                origPlacement = &tmp
×
2671
        }
×
2672

2673
        if len(req.Tags) > 0 {
60✔
2674
                if cfg.Placement == nil {
54✔
2675
                        cfg.Placement = &Placement{}
27✔
2676
                }
27✔
2677
                cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...)
27✔
2678
        }
2679

2680
        peers, e := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1, nil)
33✔
2681
        if len(peers) <= cfg.Replicas {
35✔
2682
                // since expanding in the same cluster did not yield a result, try in different cluster
2✔
2683
                peers = nil
2✔
2684

2✔
2685
                clusters := map[string]struct{}{}
2✔
2686
                s.nodeToInfo.Range(func(_, ni any) bool {
18✔
2687
                        if currCluster != ni.(nodeInfo).cluster {
24✔
2688
                                clusters[ni.(nodeInfo).cluster] = struct{}{}
8✔
2689
                        }
8✔
2690
                        return true
16✔
2691
                })
2692
                errs := &selectPeerError{}
2✔
2693
                errs.accumulate(e)
2✔
2694
                for cluster := range clusters {
4✔
2695
                        newPeers, e := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0, nil)
2✔
2696
                        if len(newPeers) >= cfg.Replicas {
4✔
2697
                                peers = append([]string{}, currPeers...)
2✔
2698
                                peers = append(peers, newPeers[:cfg.Replicas]...)
2✔
2699
                                break
2✔
2700
                        }
2701
                        errs.accumulate(e)
×
2702
                }
2703
                if peers == nil {
2✔
2704
                        resp.Error = NewJSClusterNoPeersError(errs)
×
2705
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2706
                        return
×
2707
                }
×
2708
        }
2709

2710
        cfg.Placement = origPlacement
33✔
2711

33✔
2712
        s.Noticef("Requested move for stream '%s > %s' R=%d from %+v to %+v",
33✔
2713
                accName, streamName, cfg.Replicas, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
33✔
2714

33✔
2715
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
33✔
2716
        // We should be fine ignoring pedantic mode here. as we do not touch configuration.
33✔
2717
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
33✔
2718
}
2719

2720
// Request to have the metaleader move a stream on a peer to another
2721
func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
4✔
2722
        if c == nil || !s.JetStreamEnabled() {
4✔
2723
                return
×
2724
        }
×
2725

2726
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
4✔
2727
        if err != nil {
4✔
2728
                s.Warnf(badAPIRequestT, msg)
×
2729
                return
×
2730
        }
×
2731

2732
        js, cc := s.getJetStreamCluster()
4✔
2733
        if js == nil || cc == nil {
4✔
2734
                return
×
2735
        }
×
2736

2737
        // Extra checks here but only leader is listening.
2738
        js.mu.RLock()
4✔
2739
        isLeader := cc.isLeader()
4✔
2740
        js.mu.RUnlock()
4✔
2741

4✔
2742
        if !isLeader {
4✔
2743
                return
×
2744
        }
×
2745

2746
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
4✔
2747
        if errorOnRequiredApiLevel(hdr) {
4✔
2748
                resp.Error = NewJSRequiredApiLevelError()
×
2749
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2750
                return
×
2751
        }
×
2752

2753
        accName := tokenAt(subject, 6)
4✔
2754
        streamName := tokenAt(subject, 7)
4✔
2755

4✔
2756
        if acc.GetName() != accName && acc != s.SystemAccount() {
4✔
2757
                return
×
2758
        }
×
2759

2760
        targetAcc, ok := s.accounts.Load(accName)
4✔
2761
        if !ok {
4✔
2762
                resp.Error = NewJSNoAccountError()
×
2763
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2764
                return
×
2765
        }
×
2766

2767
        streamFound := false
4✔
2768
        cfg := StreamConfig{}
4✔
2769
        currPeers := []string{}
4✔
2770
        js.mu.Lock()
4✔
2771
        streams, ok := cc.streams[accName]
4✔
2772
        if ok {
8✔
2773
                sa, ok := streams[streamName]
4✔
2774
                if ok {
8✔
2775
                        cfg = *sa.Config.clone()
4✔
2776
                        streamFound = true
4✔
2777
                        currPeers = sa.Group.Peers
4✔
2778
                }
4✔
2779
        }
2780
        js.mu.Unlock()
4✔
2781

4✔
2782
        if !streamFound {
4✔
2783
                resp.Error = NewJSStreamNotFoundError()
×
2784
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2785
                return
×
2786
        }
×
2787

2788
        if len(currPeers) <= cfg.Replicas {
4✔
2789
                resp.Error = NewJSStreamMoveNotInProgressError()
×
2790
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2791
                return
×
2792
        }
×
2793

2794
        // make sure client is scoped to requested account
2795
        ciNew := *(ci)
4✔
2796
        ciNew.Account = accName
4✔
2797

4✔
2798
        peers := currPeers[:cfg.Replicas]
4✔
2799

4✔
2800
        // Remove placement in case tags don't match
4✔
2801
        // This can happen if the move was initiated by modifying the tags.
4✔
2802
        // This is an account operation.
4✔
2803
        // This can NOT happen when the move was initiated by the system account.
4✔
2804
        // There move honors the original tag list.
4✔
2805
        if cfg.Placement != nil && len(cfg.Placement.Tags) != 0 {
5✔
2806
        FOR_TAGCHECK:
1✔
2807
                for _, peer := range peers {
2✔
2808
                        si, ok := s.nodeToInfo.Load(peer)
1✔
2809
                        if !ok {
1✔
2810
                                // can't verify tags, do the safe thing and error
×
2811
                                resp.Error = NewJSStreamGeneralError(
×
2812
                                        fmt.Errorf("peer %s not present for tag validation", peer))
×
2813
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2814
                                return
×
2815
                        }
×
2816
                        nodeTags := si.(nodeInfo).tags
1✔
2817
                        for _, tag := range cfg.Placement.Tags {
2✔
2818
                                if !nodeTags.Contains(tag) {
2✔
2819
                                        // clear placement as tags don't match
1✔
2820
                                        cfg.Placement = nil
1✔
2821
                                        break FOR_TAGCHECK
1✔
2822
                                }
2823
                        }
2824

2825
                }
2826
        }
2827

2828
        s.Noticef("Requested cancel of move: R=%d '%s > %s' to peer set %+v and restore previous peer set %+v",
4✔
2829
                cfg.Replicas, accName, streamName, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
4✔
2830

4✔
2831
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
4✔
2832
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
4✔
2833
}
2834

2835
// Request to have an account purged
2836
func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
7✔
2837
        if c == nil || !s.JetStreamEnabled() {
7✔
2838
                return
×
2839
        }
×
2840

2841
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
7✔
2842
        if err != nil {
7✔
2843
                s.Warnf(badAPIRequestT, msg)
×
2844
                return
×
2845
        }
×
2846
        if acc != s.SystemAccount() {
7✔
2847
                return
×
2848
        }
×
2849

2850
        js := s.getJetStream()
7✔
2851
        if js == nil {
7✔
2852
                return
×
2853
        }
×
2854

2855
        accName := tokenAt(subject, 5)
7✔
2856

7✔
2857
        var resp = JSApiAccountPurgeResponse{ApiResponse: ApiResponse{Type: JSApiAccountPurgeResponseType}}
7✔
2858

7✔
2859
        // Check for path like separators in the name.
7✔
2860
        if strings.ContainsAny(accName, `\/`) {
8✔
2861
                resp.Error = NewJSStreamGeneralError(errors.New("account name can not contain path separators"))
1✔
2862
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2863
                return
1✔
2864
        }
1✔
2865

2866
        if !s.JetStreamIsClustered() {
8✔
2867
                var streams []*stream
2✔
2868
                var ac *Account
2✔
2869
                if ac, err = s.lookupAccount(accName); err == nil && ac != nil {
3✔
2870
                        streams = ac.streams()
1✔
2871
                }
1✔
2872

2873
                s.Noticef("Purge request for account %s (streams: %d, hasAccount: %t)",
2✔
2874
                        accName, len(streams), ac != nil)
2✔
2875

2✔
2876
                for _, mset := range streams {
3✔
2877
                        err := mset.delete()
1✔
2878
                        if err != nil {
1✔
2879
                                resp.Error = NewJSStreamDeleteError(err)
×
2880
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2881
                                return
×
2882
                        }
×
2883
                }
2884
                if err := os.RemoveAll(filepath.Join(js.config.StoreDir, accName)); err != nil {
2✔
2885
                        resp.Error = NewJSStreamGeneralError(err)
×
2886
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2887
                        return
×
2888
                }
×
2889
                resp.Initiated = true
2✔
2890
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2891
                return
2✔
2892
        }
2893

2894
        _, cc := s.getJetStreamCluster()
4✔
2895

4✔
2896
        js.mu.RLock()
4✔
2897
        isLeader := cc.isLeader()
4✔
2898
        meta := cc.meta
4✔
2899
        js.mu.RUnlock()
4✔
2900

4✔
2901
        if !isLeader {
4✔
2902
                return
×
2903
        }
×
2904

2905
        if errorOnRequiredApiLevel(hdr) {
4✔
2906
                resp.Error = NewJSRequiredApiLevelError()
×
2907
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2908
                return
×
2909
        }
×
2910

2911
        if js.isMetaRecovering() {
4✔
2912
                // While in recovery mode, the data structures are not fully initialized
×
2913
                resp.Error = NewJSClusterNotAvailError()
×
2914
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2915
                return
×
2916
        }
×
2917

2918
        js.mu.Lock()
4✔
2919
        ns, nc := 0, 0
4✔
2920
        for osa := range js.streamAssignmentsOrInflightSeq(accName) {
12✔
2921
                for oca := range js.consumerAssignmentsOrInflightSeq(accName, osa.Config.Name) {
20✔
2922
                        ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client, Created: oca.Created}
12✔
2923
                        meta.Propose(encodeDeleteConsumerAssignment(ca))
12✔
2924
                        cc.trackInflightConsumerProposal(accName, osa.Config.Name, ca, true)
12✔
2925
                        nc++
12✔
2926
                }
12✔
2927
                sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client, Created: osa.Created}
8✔
2928
                meta.Propose(encodeDeleteStreamAssignment(sa))
8✔
2929
                cc.trackInflightStreamProposal(accName, sa, true)
8✔
2930
                ns++
8✔
2931
        }
2932
        js.mu.Unlock()
4✔
2933

4✔
2934
        hasAccount := ns > 0
4✔
2935
        s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)", accName, ns, nc, hasAccount)
4✔
2936

4✔
2937
        resp.Initiated = true
4✔
2938
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
2939
}
2940

2941
// Request to have the meta leader stepdown.
2942
// These will only be received by the meta leader, so less checking needed.
2943
func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
23✔
2944
        if c == nil || !s.JetStreamEnabled() {
23✔
2945
                return
×
2946
        }
×
2947

2948
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
23✔
2949
        if err != nil {
23✔
2950
                s.Warnf(badAPIRequestT, msg)
×
2951
                return
×
2952
        }
×
2953

2954
        // This should only be coming from the System Account.
2955
        if acc != s.SystemAccount() {
24✔
2956
                s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User)
1✔
2957
                return
1✔
2958
        }
1✔
2959

2960
        js, cc := s.getJetStreamCluster()
22✔
2961
        if js == nil || cc == nil {
22✔
2962
                return
×
2963
        }
×
2964

2965
        // Extra checks here but only leader is listening.
2966
        js.mu.RLock()
22✔
2967
        isLeader := cc.isLeader()
22✔
2968
        meta := cc.meta
22✔
2969
        js.mu.RUnlock()
22✔
2970

22✔
2971
        if !isLeader {
22✔
2972
                return
×
2973
        }
×
2974

2975
        var preferredLeader string
22✔
2976
        var resp = JSApiLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiLeaderStepDownResponseType}}
22✔
2977
        if errorOnRequiredApiLevel(hdr) {
22✔
2978
                resp.Error = NewJSRequiredApiLevelError()
×
2979
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2980
                return
×
2981
        }
×
2982

2983
        if isJSONObjectOrArray(msg) {
37✔
2984
                var req JSApiLeaderStepdownRequest
15✔
2985
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
15✔
2986
                        resp.Error = NewJSInvalidJSONError(err)
×
2987
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2988
                        return
×
2989
                }
×
2990
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(meta, req.Placement); resp.Error != nil {
21✔
2991
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
6✔
2992
                        return
6✔
2993
                }
6✔
2994
        }
2995

2996
        // Call actual stepdown.
2997
        err = meta.StepDown(preferredLeader)
16✔
2998
        if err != nil {
16✔
2999
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
3000
        } else {
16✔
3001
                resp.Success = true
16✔
3002
        }
16✔
3003
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
16✔
3004
}
3005

3006
// Check if given []bytes is a JSON Object or Array.
3007
// Technically, valid JSON can also be a plain string or number, but for our use case,
3008
// we care only for JSON objects or arrays which starts with `[` or `{`.
3009
// This function does not have to ensure valid JSON in its entirety. It is used merely
3010
// to hint the codepath if it should attempt to parse the request as JSON or not.
3011
func isJSONObjectOrArray(req []byte) bool {
22,889✔
3012
        // Skip leading JSON whitespace (space, tab, newline, carriage return)
22,889✔
3013
        i := 0
22,889✔
3014
        for i < len(req) && (req[i] == ' ' || req[i] == '\t' || req[i] == '\n' || req[i] == '\r') {
22,901✔
3015
                i++
12✔
3016
        }
12✔
3017
        // Check for empty input after trimming
3018
        if i >= len(req) {
38,712✔
3019
                return false
15,823✔
3020
        }
15,823✔
3021
        // Check if the first non-whitespace character is '{' or '['
3022
        return req[i] == '{' || req[i] == '['
7,066✔
3023
}
3024

3025
func isEmptyRequest(req []byte) bool {
47,299✔
3026
        if len(req) == 0 {
92,523✔
3027
                return true
45,224✔
3028
        }
45,224✔
3029
        if bytes.Equal(req, []byte("{}")) {
2,076✔
3030
                return true
1✔
3031
        }
1✔
3032
        // If we are here we didn't get our simple match, but still could be valid.
3033
        var v any
2,074✔
3034
        if err := json.Unmarshal(req, &v); err != nil {
2,074✔
3035
                return false
×
3036
        }
×
3037
        vm, ok := v.(map[string]any)
2,074✔
3038
        if !ok {
2,074✔
3039
                return false
×
3040
        }
×
3041
        return len(vm) == 0
2,074✔
3042
}
3043

3044
// getStepDownPreferredPlacement attempts to work out what the best placement is
3045
// for a stepdown request. The preferred server name always takes precedence, but
3046
// if not specified, the placement will be used to filter by cluster. The caller
3047
// should check for return API errors and return those to the requestor if needed.
3048
func (s *Server) getStepDownPreferredPlacement(group RaftNode, placement *Placement) (string, *ApiError) {
39✔
3049
        if placement == nil {
41✔
3050
                return _EMPTY_, nil
2✔
3051
        }
2✔
3052
        var preferredLeader string
37✔
3053
        if placement.Preferred != _EMPTY_ {
54✔
3054
                for _, p := range group.Peers() {
69✔
3055
                        si, ok := s.nodeToInfo.Load(p.ID)
52✔
3056
                        if !ok || si == nil {
52✔
3057
                                continue
×
3058
                        }
3059
                        if si.(nodeInfo).name == placement.Preferred {
65✔
3060
                                preferredLeader = p.ID
13✔
3061
                                break
13✔
3062
                        }
3063
                }
3064
                if preferredLeader == group.ID() {
21✔
3065
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q is already leader", placement.Preferred))
4✔
3066
                }
4✔
3067
                if preferredLeader == _EMPTY_ {
17✔
3068
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q not known", placement.Preferred))
4✔
3069
                }
4✔
3070
        } else {
20✔
3071
                possiblePeers := make(map[*Peer]nodeInfo, len(group.Peers()))
20✔
3072
                ourID := group.ID()
20✔
3073
                for _, p := range group.Peers() {
116✔
3074
                        if p == nil {
96✔
3075
                                continue // ... shouldn't happen.
×
3076
                        }
3077
                        si, ok := s.nodeToInfo.Load(p.ID)
96✔
3078
                        if !ok || si == nil {
96✔
3079
                                continue
×
3080
                        }
3081
                        ni := si.(nodeInfo)
96✔
3082
                        if ni.offline || p.ID == ourID {
116✔
3083
                                continue
20✔
3084
                        }
3085
                        possiblePeers[p] = ni
76✔
3086
                }
3087
                // If cluster is specified, filter out anything not matching the cluster name.
3088
                if placement.Cluster != _EMPTY_ {
31✔
3089
                        for p, si := range possiblePeers {
51✔
3090
                                if si.cluster != placement.Cluster {
66✔
3091
                                        delete(possiblePeers, p)
26✔
3092
                                }
26✔
3093
                        }
3094
                }
3095
                // If tags are specified, filter out anything not matching all supplied tags.
3096
                if len(placement.Tags) > 0 {
32✔
3097
                        for p, si := range possiblePeers {
55✔
3098
                                matchesAll := true
43✔
3099
                                for _, tag := range placement.Tags {
93✔
3100
                                        if matchesAll = matchesAll && si.tags.Contains(tag); !matchesAll {
82✔
3101
                                                break
32✔
3102
                                        }
3103
                                }
3104
                                if !matchesAll {
75✔
3105
                                        delete(possiblePeers, p)
32✔
3106
                                }
32✔
3107
                        }
3108
                }
3109
                // If there are no possible peers, return an error.
3110
                if len(possiblePeers) == 0 {
28✔
3111
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
8✔
3112
                }
8✔
3113
                // Take advantage of random map iteration order to select the preferred.
3114
                for p := range possiblePeers {
24✔
3115
                        preferredLeader = p.ID
12✔
3116
                        break
12✔
3117
                }
3118
        }
3119
        return preferredLeader, nil
21✔
3120
}
3121

3122
// Request to delete a stream.
3123
func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
574✔
3124
        if c == nil || !s.JetStreamEnabled() {
574✔
3125
                return
×
3126
        }
×
3127
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
574✔
3128
        if err != nil {
574✔
3129
                s.Warnf(badAPIRequestT, msg)
×
3130
                return
×
3131
        }
×
3132

3133
        var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
574✔
3134

574✔
3135
        // Determine if we should proceed here when we are in clustered mode.
574✔
3136
        if s.JetStreamIsClustered() {
1,099✔
3137
                js, cc := s.getJetStreamCluster()
525✔
3138
                if js == nil || cc == nil {
525✔
3139
                        return
×
3140
                }
×
3141
                if js.isLeaderless() {
526✔
3142
                        resp.Error = NewJSClusterNotAvailError()
1✔
3143
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3144
                        return
1✔
3145
                }
1✔
3146
                // Make sure we are meta leader.
3147
                if !s.JetStreamIsLeader() {
930✔
3148
                        return
406✔
3149
                }
406✔
3150
        }
3151

3152
        if errorOnRequiredApiLevel(hdr) {
168✔
3153
                resp.Error = NewJSRequiredApiLevelError()
1✔
3154
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3155
                return
1✔
3156
        }
1✔
3157

3158
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
166✔
3159
                if doErr {
×
3160
                        resp.Error = NewJSNotEnabledForAccountError()
×
3161
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3162
                }
×
3163
                return
×
3164
        }
3165

3166
        if !isEmptyRequest(msg) {
167✔
3167
                resp.Error = NewJSNotEmptyRequestError()
1✔
3168
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3169
                return
1✔
3170
        }
1✔
3171
        stream := streamNameFromSubject(subject)
165✔
3172

165✔
3173
        // Clustered.
165✔
3174
        if s.JetStreamIsClustered() {
283✔
3175
                s.jsClusteredStreamDeleteRequest(ci, acc, stream, subject, reply, msg)
118✔
3176
                return
118✔
3177
        }
118✔
3178

3179
        mset, err := acc.lookupStream(stream)
47✔
3180
        if err != nil {
52✔
3181
                resp.Error = NewJSStreamNotFoundError(Unless(err))
5✔
3182
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
3183
                return
5✔
3184
        }
5✔
3185

3186
        if err := mset.delete(); err != nil {
42✔
3187
                resp.Error = NewJSStreamDeleteError(err, Unless(err))
×
3188
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3189
                return
×
3190
        }
×
3191
        resp.Success = true
42✔
3192
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
42✔
3193
}
3194

3195
// Request to delete a message.
3196
// This expects a stream sequence number as the msg body.
3197
func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
935✔
3198
        if c == nil || !s.JetStreamEnabled() {
946✔
3199
                return
11✔
3200
        }
11✔
3201
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
924✔
3202
        if err != nil {
924✔
3203
                s.Warnf(badAPIRequestT, msg)
×
3204
                return
×
3205
        }
×
3206

3207
        stream := tokenAt(subject, 6)
924✔
3208

924✔
3209
        var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
924✔
3210

924✔
3211
        // If we are in clustered mode we need to be the stream leader to proceed.
924✔
3212
        if s.JetStreamIsClustered() {
1,335✔
3213
                // Check to make sure the stream is assigned.
411✔
3214
                js, cc := s.getJetStreamCluster()
411✔
3215
                if js == nil || cc == nil {
419✔
3216
                        return
8✔
3217
                }
8✔
3218
                if js.isLeaderless() {
404✔
3219
                        resp.Error = NewJSClusterNotAvailError()
1✔
3220
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3221
                        return
1✔
3222
                }
1✔
3223

3224
                js.mu.RLock()
402✔
3225
                isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, stream)
402✔
3226
                js.mu.RUnlock()
402✔
3227

402✔
3228
                if isLeader && sa == nil {
402✔
3229
                        // We can't find the stream, so mimic what would be the errors below.
×
3230
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3231
                                if doErr {
×
3232
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3233
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3234
                                }
×
3235
                                return
×
3236
                        }
3237
                        // No stream present.
3238
                        resp.Error = NewJSStreamNotFoundError()
×
3239
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3240
                        return
×
3241
                } else if sa == nil {
402✔
3242
                        return
×
3243
                }
×
3244

3245
                // Check to see if we are a member of the group and if the group has no leader.
3246
                if js.isGroupLeaderless(sa.Group) {
402✔
3247
                        resp.Error = NewJSClusterNotAvailError()
×
3248
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3249
                        return
×
3250
                }
×
3251

3252
                // We have the stream assigned and a leader, so only the stream leader should answer.
3253
                if !acc.JetStreamIsStreamLeader(stream) {
673✔
3254
                        return
271✔
3255
                }
271✔
3256
        }
3257

3258
        if errorOnRequiredApiLevel(hdr) {
645✔
3259
                resp.Error = NewJSRequiredApiLevelError()
1✔
3260
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3261
                return
1✔
3262
        }
1✔
3263

3264
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
729✔
3265
                if doErr {
170✔
3266
                        resp.Error = NewJSNotEnabledForAccountError()
84✔
3267
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
84✔
3268
                }
84✔
3269
                return
86✔
3270
        }
3271
        if isEmptyRequest(msg) {
557✔
3272
                resp.Error = NewJSBadRequestError()
×
3273
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3274
                return
×
3275
        }
×
3276
        var req JSApiMsgDeleteRequest
557✔
3277
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
557✔
3278
                resp.Error = NewJSInvalidJSONError(err)
×
3279
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3280
                return
×
3281
        }
×
3282

3283
        mset, err := acc.lookupStream(stream)
557✔
3284
        if err != nil {
559✔
3285
                resp.Error = NewJSStreamNotFoundError(Unless(err))
2✔
3286
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3287
                return
2✔
3288
        }
2✔
3289
        if mset.cfg.Sealed {
557✔
3290
                resp.Error = NewJSStreamSealedError()
2✔
3291
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3292
                return
2✔
3293
        }
2✔
3294
        if mset.cfg.DenyDelete {
554✔
3295
                resp.Error = NewJSStreamMsgDeleteFailedError(errors.New("message delete not permitted"))
1✔
3296
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3297
                return
1✔
3298
        }
1✔
3299

3300
        if s.JetStreamIsClustered() {
681✔
3301
                s.jsClusteredMsgDeleteRequest(ci, acc, mset, stream, subject, reply, &req, rmsg)
129✔
3302
                return
129✔
3303
        }
129✔
3304

3305
        var removed bool
423✔
3306
        if req.NoErase {
844✔
3307
                removed, err = mset.removeMsg(req.Seq)
421✔
3308
        } else {
423✔
3309
                removed, err = mset.eraseMsg(req.Seq)
2✔
3310
        }
2✔
3311
        if err != nil {
423✔
3312
                resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err))
×
3313
        } else if !removed {
423✔
3314
                resp.Error = NewJSSequenceNotFoundError(req.Seq)
×
3315
        } else {
423✔
3316
                resp.Success = true
423✔
3317
        }
423✔
3318
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
423✔
3319
}
3320

3321
// Request to get a raw stream message.
3322
func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
2,712✔
3323
        if c == nil || !s.JetStreamEnabled() {
2,712✔
3324
                return
×
3325
        }
×
3326
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
2,712✔
3327
        if err != nil {
2,712✔
3328
                s.Warnf(badAPIRequestT, msg)
×
3329
                return
×
3330
        }
×
3331

3332
        stream := tokenAt(subject, 6)
2,712✔
3333

2,712✔
3334
        var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
2,712✔
3335

2,712✔
3336
        // If we are in clustered mode we need to be the stream leader to proceed.
2,712✔
3337
        if s.JetStreamIsClustered() {
4,593✔
3338
                // Check to make sure the stream is assigned.
1,881✔
3339
                js, cc := s.getJetStreamCluster()
1,881✔
3340
                if js == nil || cc == nil {
1,881✔
3341
                        return
×
3342
                }
×
3343
                if js.isLeaderless() {
1,881✔
3344
                        resp.Error = NewJSClusterNotAvailError()
×
3345
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3346
                        return
×
3347
                }
×
3348

3349
                js.mu.RLock()
1,881✔
3350
                isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, stream)
1,881✔
3351
                js.mu.RUnlock()
1,881✔
3352

1,881✔
3353
                if isLeader && sa == nil {
1,881✔
3354
                        // We can't find the stream, so mimic what would be the errors below.
×
3355
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3356
                                if doErr {
×
3357
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3358
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3359
                                }
×
3360
                                return
×
3361
                        }
3362
                        // No stream present.
3363
                        resp.Error = NewJSStreamNotFoundError()
×
3364
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3365
                        return
×
3366
                } else if sa == nil {
1,881✔
3367
                        return
×
3368
                }
×
3369

3370
                // Check to see if we are a member of the group and if the group has no leader.
3371
                if js.isGroupLeaderless(sa.Group) {
1,881✔
3372
                        resp.Error = NewJSClusterNotAvailError()
×
3373
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3374
                        return
×
3375
                }
×
3376

3377
                // We have the stream assigned and a leader, so only the stream leader should answer.
3378
                if !acc.JetStreamIsStreamLeader(stream) {
3,142✔
3379
                        return
1,261✔
3380
                }
1,261✔
3381
        }
3382

3383
        if errorOnRequiredApiLevel(hdr) {
1,452✔
3384
                resp.Error = NewJSRequiredApiLevelError()
1✔
3385
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3386
                return
1✔
3387
        }
1✔
3388

3389
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
1,453✔
3390
                if doErr {
3✔
3391
                        resp.Error = NewJSNotEnabledForAccountError()
×
3392
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3393
                }
×
3394
                return
3✔
3395
        }
3396
        if isEmptyRequest(msg) {
1,447✔
3397
                resp.Error = NewJSBadRequestError()
×
3398
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3399
                return
×
3400
        }
×
3401
        var req JSApiMsgGetRequest
1,447✔
3402
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
1,447✔
3403
                resp.Error = NewJSInvalidJSONError(err)
×
3404
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3405
                return
×
3406
        }
×
3407

3408
        // This version does not support batch.
3409
        if req.Batch > 0 || req.MaxBytes > 0 {
1,448✔
3410
                resp.Error = NewJSBadRequestError()
1✔
3411
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3412
                return
1✔
3413
        }
1✔
3414

3415
        // Validate non-conflicting options. Seq, LastFor, and AsOfTime are mutually exclusive.
3416
        // NextFor can be paired with Seq or AsOfTime indicating a filter subject.
3417
        if (req.Seq > 0 && req.LastFor != _EMPTY_) ||
1,446✔
3418
                (req.Seq == 0 && req.LastFor == _EMPTY_ && req.NextFor == _EMPTY_ && req.StartTime == nil) ||
1,446✔
3419
                (req.Seq > 0 && req.StartTime != nil) ||
1,446✔
3420
                (req.StartTime != nil && req.LastFor != _EMPTY_) ||
1,446✔
3421
                (req.LastFor != _EMPTY_ && req.NextFor != _EMPTY_) {
1,450✔
3422
                resp.Error = NewJSBadRequestError()
4✔
3423
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3424
                return
4✔
3425
        }
4✔
3426

3427
        mset, err := acc.lookupStream(stream)
1,442✔
3428
        if err != nil {
1,442✔
3429
                resp.Error = NewJSStreamNotFoundError()
×
3430
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3431
                return
×
3432
        }
×
3433
        if mset.offlineReason != _EMPTY_ {
1,442✔
3434
                // Just let the request time out.
×
3435
                return
×
3436
        }
×
3437

3438
        var svp StoreMsg
1,442✔
3439
        var sm *StoreMsg
1,442✔
3440

1,442✔
3441
        // Ensure this read request is isolated and doesn't interleave with writes.
1,442✔
3442
        mset.mu.RLock()
1,442✔
3443
        defer mset.mu.RUnlock()
1,442✔
3444

1,442✔
3445
        // If AsOfTime is set, perform this first to get the sequence.
1,442✔
3446
        var seq uint64
1,442✔
3447
        if req.StartTime != nil {
1,448✔
3448
                seq = mset.store.GetSeqFromTime(*req.StartTime)
6✔
3449
        } else {
1,442✔
3450
                seq = req.Seq
1,436✔
3451
        }
1,436✔
3452

3453
        if seq > 0 && req.NextFor == _EMPTY_ {
2,031✔
3454
                sm, err = mset.store.LoadMsg(seq, &svp)
589✔
3455
        } else if req.NextFor != _EMPTY_ {
1,544✔
3456
                sm, _, err = mset.store.LoadNextMsg(req.NextFor, subjectHasWildcard(req.NextFor), seq, &svp)
102✔
3457
        } else {
853✔
3458
                sm, err = mset.store.LoadLastMsg(req.LastFor, &svp)
751✔
3459
        }
751✔
3460
        if err != nil {
2,121✔
3461
                resp.Error = NewJSNoMessageFoundError()
679✔
3462
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
679✔
3463
                return
679✔
3464
        }
679✔
3465
        resp.Message = &StoredMsg{
763✔
3466
                Subject:  sm.subj,
763✔
3467
                Sequence: sm.seq,
763✔
3468
                Data:     sm.msg,
763✔
3469
                Time:     time.Unix(0, sm.ts).UTC(),
763✔
3470
        }
763✔
3471
        if !req.NoHeaders {
1,525✔
3472
                resp.Message.Header = sm.hdr
762✔
3473
        }
762✔
3474

3475
        // Don't send response through API layer for this call.
3476
        s.sendInternalAccountMsg(nil, reply, s.jsonResponse(resp))
763✔
3477
}
3478

3479
func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
28✔
3480
        if c == nil || !s.JetStreamEnabled() {
28✔
3481
                return
×
3482
        }
×
3483

3484
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
28✔
3485
        if err != nil {
28✔
3486
                s.Warnf(badAPIRequestT, msg)
×
3487
                return
×
3488
        }
×
3489

3490
        stream := streamNameFromSubject(subject)
28✔
3491
        consumer := consumerNameFromSubject(subject)
28✔
3492

28✔
3493
        var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}}
28✔
3494

28✔
3495
        if s.JetStreamIsClustered() {
46✔
3496
                // Check to make sure the stream is assigned.
18✔
3497
                js, cc := s.getJetStreamCluster()
18✔
3498
                if js == nil || cc == nil {
18✔
3499
                        return
×
3500
                }
×
3501

3502
                // First check if the stream and consumer is there.
3503
                js.mu.RLock()
18✔
3504
                sa := js.streamAssignment(acc.Name, stream)
18✔
3505
                if sa == nil {
21✔
3506
                        js.mu.RUnlock()
3✔
3507
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
3✔
3508
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3509
                        return
3✔
3510
                }
3✔
3511
                if sa.unsupported != nil {
15✔
3512
                        js.mu.RUnlock()
×
3513
                        // Just let the request time out.
×
3514
                        return
×
3515
                }
×
3516

3517
                ca, ok := sa.consumers[consumer]
15✔
3518
                if !ok || ca == nil {
18✔
3519
                        js.mu.RUnlock()
3✔
3520
                        resp.Error = NewJSConsumerNotFoundError()
3✔
3521
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3522
                        return
3✔
3523
                }
3✔
3524
                if ca.unsupported != nil {
12✔
3525
                        js.mu.RUnlock()
×
3526
                        // Just let the request time out.
×
3527
                        return
×
3528
                }
×
3529
                js.mu.RUnlock()
12✔
3530

12✔
3531
                // Then check if we are the leader.
12✔
3532
                mset, err := acc.lookupStream(stream)
12✔
3533
                if err != nil {
12✔
3534
                        return
×
3535
                }
×
3536

3537
                o := mset.lookupConsumer(consumer)
12✔
3538
                if o == nil {
12✔
3539
                        return
×
3540
                }
×
3541
                if !o.isLeader() {
20✔
3542
                        return
8✔
3543
                }
8✔
3544
        }
3545

3546
        if errorOnRequiredApiLevel(hdr) {
15✔
3547
                resp.Error = NewJSRequiredApiLevelError()
1✔
3548
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3549
                return
1✔
3550
        }
1✔
3551

3552
        var req JSApiConsumerUnpinRequest
13✔
3553
        if err := json.Unmarshal(msg, &req); err != nil {
13✔
3554
                resp.Error = NewJSInvalidJSONError(err)
×
3555
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3556
                return
×
3557
        }
×
3558

3559
        if req.Group == _EMPTY_ {
15✔
3560
                resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
2✔
3561
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3562
                return
2✔
3563
        }
2✔
3564

3565
        if !validGroupName.MatchString(req.Group) {
13✔
3566
                resp.Error = NewJSConsumerInvalidGroupNameError()
2✔
3567
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3568
                return
2✔
3569
        }
2✔
3570

3571
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
9✔
3572
                if doErr {
×
3573
                        resp.Error = NewJSNotEnabledForAccountError()
×
3574
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3575
                }
×
3576
                return
×
3577
        }
3578

3579
        mset, err := acc.lookupStream(stream)
9✔
3580
        if err != nil {
10✔
3581
                resp.Error = NewJSStreamNotFoundError()
1✔
3582
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3583
                return
1✔
3584
        }
1✔
3585
        if mset.offlineReason != _EMPTY_ {
8✔
3586
                // Just let the request time out.
×
3587
                return
×
3588
        }
×
3589
        o := mset.lookupConsumer(consumer)
8✔
3590
        if o == nil {
9✔
3591
                resp.Error = NewJSConsumerNotFoundError()
1✔
3592
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3593
                return
1✔
3594
        }
1✔
3595
        if o.offlineReason != _EMPTY_ {
7✔
3596
                // Just let the request time out.
×
3597
                return
×
3598
        }
×
3599

3600
        var foundPriority bool
7✔
3601
        for _, group := range o.config().PriorityGroups {
14✔
3602
                if group == req.Group {
12✔
3603
                        foundPriority = true
5✔
3604
                        break
5✔
3605
                }
3606
        }
3607
        if !foundPriority {
9✔
3608
                resp.Error = NewJSConsumerInvalidPriorityGroupError()
2✔
3609
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3610
                return
2✔
3611
        }
2✔
3612

3613
        o.mu.Lock()
5✔
3614
        o.unassignPinId()
5✔
3615
        o.sendUnpinnedAdvisoryLocked(req.Group, "admin")
5✔
3616
        o.mu.Unlock()
5✔
3617
        o.signalNewMessages()
5✔
3618
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
5✔
3619
}
3620

3621
// Request to purge a stream.
3622
func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
140✔
3623
        if c == nil || !s.JetStreamEnabled() {
140✔
3624
                return
×
3625
        }
×
3626
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
140✔
3627
        if err != nil {
140✔
3628
                s.Warnf(badAPIRequestT, msg)
×
3629
                return
×
3630
        }
×
3631

3632
        stream := streamNameFromSubject(subject)
140✔
3633

140✔
3634
        var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
140✔
3635

140✔
3636
        // If we are in clustered mode we need to be the stream leader to proceed.
140✔
3637
        if s.JetStreamIsClustered() {
244✔
3638
                // Check to make sure the stream is assigned.
104✔
3639
                js, cc := s.getJetStreamCluster()
104✔
3640
                if js == nil || cc == nil {
104✔
3641
                        return
×
3642
                }
×
3643

3644
                js.mu.RLock()
104✔
3645
                isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, stream)
104✔
3646
                js.mu.RUnlock()
104✔
3647

104✔
3648
                if isLeader && sa == nil {
104✔
3649
                        // We can't find the stream, so mimic what would be the errors below.
×
3650
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3651
                                if doErr {
×
3652
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3653
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3654
                                }
×
3655
                                return
×
3656
                        }
3657
                        // No stream present.
3658
                        resp.Error = NewJSStreamNotFoundError()
×
3659
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3660
                        return
×
3661
                } else if sa == nil {
105✔
3662
                        if js.isLeaderless() {
1✔
3663
                                resp.Error = NewJSClusterNotAvailError()
×
3664
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3665
                        }
×
3666
                        return
1✔
3667
                }
3668

3669
                // Check to see if we are a member of the group and if the group has no leader.
3670
                if js.isGroupLeaderless(sa.Group) {
104✔
3671
                        resp.Error = NewJSClusterNotAvailError()
1✔
3672
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3673
                        return
1✔
3674
                }
1✔
3675

3676
                // We have the stream assigned and a leader, so only the stream leader should answer.
3677
                if !acc.JetStreamIsStreamLeader(stream) {
171✔
3678
                        if js.isLeaderless() {
69✔
3679
                                resp.Error = NewJSClusterNotAvailError()
×
3680
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3681
                        }
×
3682
                        return
69✔
3683
                }
3684
        }
3685

3686
        if errorOnRequiredApiLevel(hdr) {
70✔
3687
                resp.Error = NewJSRequiredApiLevelError()
1✔
3688
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3689
                return
1✔
3690
        }
1✔
3691

3692
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
68✔
3693
                if doErr {
×
3694
                        resp.Error = NewJSNotEnabledForAccountError()
×
3695
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3696
                }
×
3697
                return
×
3698
        }
3699

3700
        var purgeRequest *JSApiStreamPurgeRequest
68✔
3701
        if isJSONObjectOrArray(msg) {
102✔
3702
                var req JSApiStreamPurgeRequest
34✔
3703
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
34✔
3704
                        resp.Error = NewJSInvalidJSONError(err)
×
3705
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3706
                        return
×
3707
                }
×
3708
                if req.Sequence > 0 && req.Keep > 0 {
34✔
3709
                        resp.Error = NewJSBadRequestError()
×
3710
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3711
                        return
×
3712
                }
×
3713
                purgeRequest = &req
34✔
3714
        }
3715

3716
        mset, err := acc.lookupStream(stream)
68✔
3717
        if err != nil {
68✔
3718
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
3719
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3720
                return
×
3721
        }
×
3722
        if mset.cfg.Sealed {
70✔
3723
                resp.Error = NewJSStreamSealedError()
2✔
3724
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3725
                return
2✔
3726
        }
2✔
3727
        if mset.cfg.DenyPurge {
67✔
3728
                resp.Error = NewJSStreamPurgeFailedError(errors.New("stream purge not permitted"))
1✔
3729
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3730
                return
1✔
3731
        }
1✔
3732

3733
        if s.JetStreamIsClustered() {
96✔
3734
                s.jsClusteredStreamPurgeRequest(ci, acc, mset, stream, subject, reply, rmsg, purgeRequest)
31✔
3735
                return
31✔
3736
        }
31✔
3737

3738
        purged, err := mset.purge(purgeRequest)
34✔
3739
        if err != nil {
34✔
3740
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
3741
        } else {
34✔
3742
                resp.Purged = purged
34✔
3743
                resp.Success = true
34✔
3744
        }
34✔
3745
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
34✔
3746
}
3747

3748
func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError {
1,059✔
3749
        var replicas int
1,059✔
3750
        if cfg != nil {
2,118✔
3751
                replicas = cfg.Replicas
1,059✔
3752
        }
1,059✔
3753
        selectedLimits, tier, jsa, apiErr := acc.selectLimits(replicas)
1,059✔
3754
        if apiErr != nil {
1,059✔
3755
                return apiErr
×
3756
        }
×
3757
        jsa.js.mu.RLock()
1,059✔
3758
        defer jsa.js.mu.RUnlock()
1,059✔
3759
        jsa.mu.RLock()
1,059✔
3760
        defer jsa.mu.RUnlock()
1,059✔
3761
        if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, cfg) >= selectedLimits.MaxStreams {
1,062✔
3762
                return NewJSMaximumStreamsLimitError()
3✔
3763
        }
3✔
3764
        reserved := jsa.tieredReservation(tier, cfg)
1,056✔
3765
        if err := jsa.js.checkAllLimits(selectedLimits, cfg, reserved, 0); err != nil {
1,063✔
3766
                return NewJSStreamLimitsError(err, Unless(err))
7✔
3767
        }
7✔
3768
        return nil
1,049✔
3769
}
3770

3771
// Request to restore a stream.
3772
func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
58✔
3773
        if c == nil || !s.JetStreamIsLeader() {
86✔
3774
                return
28✔
3775
        }
28✔
3776
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
30✔
3777
        if err != nil {
30✔
3778
                s.Warnf(badAPIRequestT, msg)
×
3779
                return
×
3780
        }
×
3781

3782
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
30✔
3783
        if errorOnRequiredApiLevel(hdr) {
31✔
3784
                resp.Error = NewJSRequiredApiLevelError()
1✔
3785
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3786
                return
1✔
3787
        }
1✔
3788
        if !acc.JetStreamEnabled() {
29✔
3789
                resp.Error = NewJSNotEnabledForAccountError()
×
3790
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3791
                return
×
3792
        }
×
3793
        if isEmptyRequest(msg) {
30✔
3794
                resp.Error = NewJSBadRequestError()
1✔
3795
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3796
                return
1✔
3797
        }
1✔
3798

3799
        var req JSApiStreamRestoreRequest
28✔
3800
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
28✔
3801
                resp.Error = NewJSInvalidJSONError(err)
×
3802
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3803
                return
×
3804
        }
×
3805

3806
        stream := streamNameFromSubject(subject)
28✔
3807

28✔
3808
        if stream != req.Config.Name && req.Config.Name == _EMPTY_ {
30✔
3809
                req.Config.Name = stream
2✔
3810
        }
2✔
3811
        if stream != req.Config.Name {
30✔
3812
                resp.Error = NewJSStreamMismatchError()
2✔
3813
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3814
                return
2✔
3815
        }
2✔
3816

3817
        if s.JetStreamIsClustered() {
39✔
3818
                s.jsClusteredStreamRestoreRequest(ci, acc, &req, subject, reply, rmsg)
13✔
3819
                return
13✔
3820
        }
13✔
3821

3822
        // check stream config at the start of the restore process, not at the end
3823
        cfg, apiErr := s.checkStreamCfg(&req.Config, acc, false)
13✔
3824
        if apiErr != nil {
14✔
3825
                resp.Error = apiErr
1✔
3826
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3827
                return
1✔
3828
        }
1✔
3829

3830
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil {
14✔
3831
                resp.Error = err
2✔
3832
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3833
                return
2✔
3834
        }
2✔
3835

3836
        if _, err := acc.lookupStream(stream); err == nil {
12✔
3837
                resp.Error = NewJSStreamNameExistRestoreFailedError()
2✔
3838
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3839
                return
2✔
3840
        }
2✔
3841

3842
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
8✔
3843
                if doErr {
×
3844
                        resp.Error = NewJSNotEnabledForAccountError()
×
3845
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3846
                }
×
3847
                return
×
3848
        }
3849

3850
        s.processStreamRestore(ci, acc, &cfg, subject, reply, string(msg))
8✔
3851
}
3852

3853
func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamConfig, subject, reply, msg string) <-chan error {
16✔
3854
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
16✔
3855

16✔
3856
        streamName := cfg.Name
16✔
3857
        s.Noticef("Starting restore for stream '%s > %s'", acc.Name, streamName)
16✔
3858

16✔
3859
        start := time.Now().UTC()
16✔
3860
        domain := s.getOpts().JetStreamDomain
16✔
3861
        s.publishAdvisory(acc, JSAdvisoryStreamRestoreCreatePre+"."+streamName, &JSRestoreCreateAdvisory{
16✔
3862
                TypedEvent: TypedEvent{
16✔
3863
                        Type: JSRestoreCreateAdvisoryType,
16✔
3864
                        ID:   nuid.Next(),
16✔
3865
                        Time: start,
16✔
3866
                },
16✔
3867
                Stream: streamName,
16✔
3868
                Client: ci.forAdvisory(),
16✔
3869
                Domain: domain,
16✔
3870
        })
16✔
3871

16✔
3872
        // Create our internal subscription to accept the snapshot.
16✔
3873
        restoreSubj := fmt.Sprintf(jsRestoreDeliverT, streamName, nuid.Next())
16✔
3874

16✔
3875
        type result struct {
16✔
3876
                err   error
16✔
3877
                reply string
16✔
3878
        }
16✔
3879

16✔
3880
        // For signaling to upper layers.
16✔
3881
        var resultOnce sync.Once
16✔
3882
        var closeOnce sync.Once
16✔
3883
        resultCh := make(chan result, 1)
16✔
3884
        pr, pw := io.Pipe()
16✔
3885

16✔
3886
        setResult := func(err error, reply string) {
32✔
3887
                resultOnce.Do(func() {
32✔
3888
                        resultCh <- result{err, reply}
16✔
3889
                })
16✔
3890
        }
3891
        activeQ := newIPQueue[int](s, fmt.Sprintf("[ACC:%s] stream '%s' restore", acc.Name, streamName))
16✔
3892
        restoreCh := make(chan struct {
16✔
3893
                mset *stream
16✔
3894
                err  error
16✔
3895
        }, 1)
16✔
3896
        closeWithError := func(err error) {
48✔
3897
                closeOnce.Do(func() {
48✔
3898
                        if err != nil {
17✔
3899
                                pw.CloseWithError(err)
1✔
3900
                        } else {
16✔
3901
                                pw.Close()
15✔
3902
                        }
15✔
3903
                })
3904
        }
3905

3906
        s.startGoRoutine(func() {
32✔
3907
                defer s.grWG.Done()
16✔
3908
                mset, err := acc.RestoreStream(cfg, pr)
16✔
3909
                if err != nil {
20✔
3910
                        pr.CloseWithError(err)
4✔
3911
                } else {
16✔
3912
                        pr.Close()
12✔
3913
                }
12✔
3914
                restoreCh <- struct {
16✔
3915
                        mset *stream
16✔
3916
                        err  error
16✔
3917
                }{
16✔
3918
                        mset: mset,
16✔
3919
                        err:  err,
16✔
3920
                }
16✔
3921
        })
3922

3923
        processChunk := func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
128✔
3924
                // We require reply subjects to communicate back failures, flow etc. If they do not have one log and cancel.
112✔
3925
                if reply == _EMPTY_ {
113✔
3926
                        sub.client.processUnsub(sub.sid)
1✔
3927
                        setResult(fmt.Errorf("restore for stream '%s > %s' requires reply subject for each chunk", acc.Name, streamName), reply)
1✔
3928
                        return
1✔
3929
                }
1✔
3930
                // Account client messages have \r\n on end. This is an error.
3931
                if len(msg) < LEN_CR_LF {
111✔
3932
                        sub.client.processUnsub(sub.sid)
×
3933
                        setResult(fmt.Errorf("restore for stream '%s > %s' received short chunk", acc.Name, streamName), reply)
×
3934
                        return
×
3935
                }
×
3936
                // Adjust.
3937
                msg = msg[:len(msg)-LEN_CR_LF]
111✔
3938

111✔
3939
                // This means we are complete with our transfer from the client.
111✔
3940
                if len(msg) == 0 {
126✔
3941
                        s.Debugf("Finished streaming restore for stream '%s > %s'", acc.Name, streamName)
15✔
3942
                        closeWithError(nil)
15✔
3943
                        setResult(nil, reply)
15✔
3944
                        return
15✔
3945
                }
15✔
3946

3947
                // Signal activity before and after the blocking write.
3948
                // The pre-write signal refreshes the stall watchdog when the
3949
                // chunk arrives; the post-write signal refreshes it again once
3950
                // RestoreStream has consumed the data. This keeps the idle
3951
                // window between chunks anchored to the end of the previous
3952
                // write instead of its start.
3953
                activeQ.push(0)
96✔
3954

96✔
3955
                if _, err := pw.Write(msg); err != nil {
96✔
3956
                        closeWithError(err)
×
3957
                        sub.client.processUnsub(sub.sid)
×
3958
                        var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
×
3959
                        if IsNatsErr(err, JSStorageResourcesExceededErr, JSMemoryResourcesExceededErr) {
×
3960
                                s.resourcesExceededError(cfg.Storage)
×
3961
                        }
×
3962
                        resp.Error = NewJSStreamRestoreError(err, Unless(err))
×
3963
                        if s.sendInternalAccountMsg(acc, reply, s.jsonResponse(&resp)) == nil {
×
3964
                                reply = _EMPTY_
×
3965
                        }
×
3966
                        setResult(err, reply)
×
3967
                        return
×
3968
                }
3969

3970
                activeQ.push(len(msg))
96✔
3971

96✔
3972
                s.sendInternalAccountMsg(acc, reply, nil)
96✔
3973
        }
3974

3975
        sub, err := acc.subscribeInternal(restoreSubj, processChunk)
16✔
3976
        if err != nil {
16✔
3977
                closeWithError(err)
×
3978
                resp.Error = NewJSRestoreSubscribeFailedError(err, restoreSubj)
×
3979
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
3980
                return nil
×
3981
        }
×
3982

3983
        // Mark the subject so the end user knows where to send the snapshot chunks.
3984
        resp.DeliverSubject = restoreSubj
16✔
3985
        s.sendAPIResponse(ci, acc, subject, reply, msg, s.jsonResponse(resp))
16✔
3986

16✔
3987
        // Returned to the caller to wait for completion.
16✔
3988
        doneCh := make(chan error, 1)
16✔
3989

16✔
3990
        // Monitor the progress from another Go routine.
16✔
3991
        s.startGoRoutine(func() {
32✔
3992
                defer s.grWG.Done()
16✔
3993
                defer func() {
32✔
3994
                        closeWithError(ErrConnectionClosed)
16✔
3995
                        sub.client.processUnsub(sub.sid)
16✔
3996
                        activeQ.unregister()
16✔
3997
                }()
16✔
3998

3999
                const activityInterval = 5 * time.Second
16✔
4000
                notActive := time.NewTimer(activityInterval)
16✔
4001
                defer notActive.Stop()
16✔
4002

16✔
4003
                total := 0
16✔
4004
                var inputDone bool
16✔
4005
                var replySubj string
16✔
4006
                var inputErr error
16✔
4007
                var restoreDone bool
16✔
4008
                var restoreResult struct {
16✔
4009
                        mset *stream
16✔
4010
                        err  error
16✔
4011
                }
16✔
4012

16✔
4013
                finish := func(reply string, err error, mset *stream) {
32✔
4014
                        end := time.Now().UTC()
16✔
4015

16✔
4016
                        s.publishAdvisory(acc, JSAdvisoryStreamRestoreCompletePre+"."+streamName, &JSRestoreCompleteAdvisory{
16✔
4017
                                TypedEvent: TypedEvent{
16✔
4018
                                        Type: JSRestoreCompleteAdvisoryType,
16✔
4019
                                        ID:   nuid.Next(),
16✔
4020
                                        Time: end,
16✔
4021
                                },
16✔
4022
                                Stream: streamName,
16✔
4023
                                Start:  start,
16✔
4024
                                End:    end,
16✔
4025
                                Bytes:  int64(total),
16✔
4026
                                Client: ci.forAdvisory(),
16✔
4027
                                Domain: domain,
16✔
4028
                        })
16✔
4029

16✔
4030
                        var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
16✔
4031
                        if err != nil {
20✔
4032
                                if IsNatsErr(err, JSStorageResourcesExceededErr, JSMemoryResourcesExceededErr) {
4✔
4033
                                        s.resourcesExceededError(cfg.Storage)
×
4034
                                }
×
4035
                                resp.Error = NewJSStreamRestoreError(err, Unless(err))
4✔
4036
                                s.Warnf("Restore failed for %s for stream '%s > %s' in %v",
4✔
4037
                                        friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start))
4✔
4038
                        } else {
12✔
4039
                                msetCfg := mset.config()
12✔
4040
                                resp.StreamInfo = &StreamInfo{
12✔
4041
                                        Created:   mset.createdTime(),
12✔
4042
                                        State:     mset.state(),
12✔
4043
                                        Config:    *setDynamicStreamMetadata(&msetCfg),
12✔
4044
                                        TimeStamp: time.Now().UTC(),
12✔
4045
                                }
12✔
4046
                                s.Noticef("Completed restore of %s for stream '%s > %s' in %v",
12✔
4047
                                        friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start).Round(time.Millisecond))
12✔
4048
                        }
12✔
4049
                        if reply != _EMPTY_ {
31✔
4050
                                s.sendInternalAccountMsg(acc, reply, s.jsonResponse(&resp))
15✔
4051
                        }
15✔
4052
                        doneCh <- err
16✔
4053
                }
4054

4055
                for {
240✔
4056
                        select {
224✔
4057
                        case result := <-resultCh:
16✔
4058
                                replySubj = result.reply
16✔
4059
                                inputDone = true
16✔
4060
                                inputErr = result.err
16✔
4061
                                notActive.Stop()
16✔
4062
                                if result.err != nil {
17✔
4063
                                        closeWithError(result.err)
1✔
4064
                                        s.Warnf(result.err.Error())
1✔
4065
                                }
1✔
4066
                                if restoreDone {
17✔
4067
                                        err := inputErr
1✔
4068
                                        if err == nil {
2✔
4069
                                                err = restoreResult.err
1✔
4070
                                        }
1✔
4071
                                        finish(replySubj, err, restoreResult.mset)
1✔
4072
                                        return
1✔
4073
                                }
4074
                        case rr := <-restoreCh:
16✔
4075
                                restoreDone = true
16✔
4076
                                restoreResult = rr
16✔
4077
                                if inputDone {
31✔
4078
                                        err := inputErr
15✔
4079
                                        if err == nil {
29✔
4080
                                                err = rr.err
14✔
4081
                                        }
14✔
4082
                                        finish(replySubj, err, rr.mset)
15✔
4083
                                        return
15✔
4084
                                }
4085
                        case <-activeQ.ch:
192✔
4086
                                if n, ok := activeQ.popOne(); ok {
384✔
4087
                                        total += n
192✔
4088
                                        if !inputDone {
384✔
4089
                                                notActive.Reset(activityInterval)
192✔
4090
                                        }
192✔
4091
                                }
4092
                        case <-notActive.C:
×
4093
                                err := fmt.Errorf("restore for stream '%s > %s' is stalled", acc.Name, streamName)
×
4094
                                closeWithError(err)
×
4095
                                doneCh <- err
×
4096
                                return
×
4097
                        }
4098
                }
4099
        })
4100

4101
        return doneCh
16✔
4102
}
4103

4104
// Process a snapshot request.
4105
func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
29✔
4106
        if c == nil || !s.JetStreamEnabled() {
29✔
4107
                return
×
4108
        }
×
4109
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
29✔
4110
        if err != nil {
29✔
4111
                s.Warnf(badAPIRequestT, msg)
×
4112
                return
×
4113
        }
×
4114

4115
        smsg := string(msg)
29✔
4116
        stream := streamNameFromSubject(subject)
29✔
4117

29✔
4118
        // If we are in clustered mode we need to be the stream leader to proceed.
29✔
4119
        if s.JetStreamIsClustered() && !acc.JetStreamIsStreamLeader(stream) {
44✔
4120
                return
15✔
4121
        }
15✔
4122

4123
        var resp = JSApiStreamSnapshotResponse{ApiResponse: ApiResponse{Type: JSApiStreamSnapshotResponseType}}
14✔
4124
        if errorOnRequiredApiLevel(hdr) {
15✔
4125
                resp.Error = NewJSRequiredApiLevelError()
1✔
4126
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4127
                return
1✔
4128
        }
1✔
4129
        if !acc.JetStreamEnabled() {
13✔
4130
                resp.Error = NewJSNotEnabledForAccountError()
×
4131
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4132
                return
×
4133
        }
×
4134
        if isEmptyRequest(msg) {
14✔
4135
                resp.Error = NewJSBadRequestError()
1✔
4136
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4137
                return
1✔
4138
        }
1✔
4139

4140
        mset, err := acc.lookupStream(stream)
12✔
4141
        if err != nil {
13✔
4142
                resp.Error = NewJSStreamNotFoundError(Unless(err))
1✔
4143
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4144
                return
1✔
4145
        }
1✔
4146

4147
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
4148
                if doErr {
×
4149
                        resp.Error = NewJSNotEnabledForAccountError()
×
4150
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4151
                }
×
4152
                return
×
4153
        }
4154

4155
        var req JSApiStreamSnapshotRequest
11✔
4156
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
4157
                resp.Error = NewJSInvalidJSONError(err)
×
4158
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4159
                return
×
4160
        }
×
4161
        if !IsValidSubject(req.DeliverSubject) {
12✔
4162
                resp.Error = NewJSSnapshotDeliverSubjectInvalidError()
1✔
4163
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4164
                return
1✔
4165
        }
1✔
4166

4167
        // We will do the snapshot in a go routine as well since check msgs may
4168
        // stall this go routine.
4169
        go func() {
20✔
4170
                if req.CheckMsgs {
12✔
4171
                        s.Noticef("Starting health check and snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
2✔
4172
                } else {
10✔
4173
                        s.Noticef("Starting snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
8✔
4174
                }
8✔
4175

4176
                start := time.Now().UTC()
10✔
4177

10✔
4178
                sr, err := mset.snapshot(0, req.CheckMsgs, !req.NoConsumers)
10✔
4179
                if err != nil {
10✔
4180
                        s.Warnf("Snapshot of stream '%s > %s' failed: %v", mset.jsa.account.Name, mset.name(), err)
×
4181
                        resp.Error = NewJSStreamSnapshotError(err, Unless(err))
×
4182
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4183
                        return
×
4184
                }
×
4185

4186
                config := mset.config()
10✔
4187
                resp.State = &sr.State
10✔
4188
                resp.Config = &config
10✔
4189

10✔
4190
                s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(resp))
10✔
4191

10✔
4192
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCreatePre+"."+mset.name(), &JSSnapshotCreateAdvisory{
10✔
4193
                        TypedEvent: TypedEvent{
10✔
4194
                                Type: JSSnapshotCreatedAdvisoryType,
10✔
4195
                                ID:   nuid.Next(),
10✔
4196
                                Time: time.Now().UTC(),
10✔
4197
                        },
10✔
4198
                        Stream: mset.name(),
10✔
4199
                        State:  sr.State,
10✔
4200
                        Client: ci.forAdvisory(),
10✔
4201
                        Domain: s.getOpts().JetStreamDomain,
10✔
4202
                })
10✔
4203

10✔
4204
                // Now do the real streaming.
10✔
4205
                s.streamSnapshot(acc, mset, sr, &req)
10✔
4206

10✔
4207
                end := time.Now().UTC()
10✔
4208

10✔
4209
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCompletePre+"."+mset.name(), &JSSnapshotCompleteAdvisory{
10✔
4210
                        TypedEvent: TypedEvent{
10✔
4211
                                Type: JSSnapshotCompleteAdvisoryType,
10✔
4212
                                ID:   nuid.Next(),
10✔
4213
                                Time: end,
10✔
4214
                        },
10✔
4215
                        Stream: mset.name(),
10✔
4216
                        Start:  start,
10✔
4217
                        End:    end,
10✔
4218
                        Client: ci.forAdvisory(),
10✔
4219
                        Domain: s.getOpts().JetStreamDomain,
10✔
4220
                })
10✔
4221

10✔
4222
                s.Noticef("Completed snapshot of %s for stream '%s > %s' in %v",
10✔
4223
                        friendlyBytes(int64(sr.State.Bytes)),
10✔
4224
                        mset.jsa.account.Name,
10✔
4225
                        mset.name(),
10✔
4226
                        end.Sub(start))
10✔
4227
        }()
4228
}
4229

4230
// Default chunk size for now.
4231
const defaultSnapshotChunkSize = 128 * 1024       // 128KiB
4232
const defaultSnapshotWindowSize = 8 * 1024 * 1024 // 8MiB
4233
const defaultSnapshotAckTimeout = 5 * time.Second
4234

4235
var snapshotAckTimeout = defaultSnapshotAckTimeout
4236

4237
// streamSnapshot will stream out our snapshot to the reply subject.
4238
func (s *Server) streamSnapshot(acc *Account, mset *stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) {
10✔
4239
        chunkSize, wndSize := req.ChunkSize, req.WindowSize
10✔
4240
        if chunkSize == 0 {
12✔
4241
                chunkSize = defaultSnapshotChunkSize
2✔
4242
        }
2✔
4243
        if wndSize == 0 {
20✔
4244
                wndSize = defaultSnapshotWindowSize
10✔
4245
        }
10✔
4246
        chunkSize = min(max(1024, chunkSize), 1024*1024) // Clamp within 1KiB to 1MiB
10✔
4247
        wndSize = min(max(1024, wndSize), 32*1024*1024)  // Clamp within 1KiB to 32MiB
10✔
4248
        wndSize = max(wndSize, chunkSize)                // Guarantee at least one chunk
10✔
4249
        maxInflight := wndSize / chunkSize               // Between 1 and 32,768
10✔
4250

10✔
4251
        // Setup for the chunk stream.
10✔
4252
        reply := req.DeliverSubject
10✔
4253
        r := sr.Reader
10✔
4254
        defer r.Close()
10✔
4255

10✔
4256
        // In case we run into an error, this allows subscription callbacks
10✔
4257
        // to not sit and block endlessly.
10✔
4258
        done := make(chan struct{})
10✔
4259
        defer close(done)
10✔
4260

10✔
4261
        // Check interest for the snapshot deliver subject.
10✔
4262
        inch := make(chan bool, 1)
10✔
4263
        acc.sl.RegisterNotification(req.DeliverSubject, inch)
10✔
4264
        defer acc.sl.ClearNotification(req.DeliverSubject, inch)
10✔
4265
        hasInterest := <-inch
10✔
4266
        if !hasInterest {
15✔
4267
                // Allow 2 seconds or so for interest to show up.
5✔
4268
                select {
5✔
4269
                case <-inch:
4✔
4270
                case <-time.After(2 * time.Second):
1✔
4271
                }
4272
        }
4273

4274
        // One slot per chunk. Each chunk read takes a slot, each ack will
4275
        // replace it. Smooths out in-flight number of chunks.
4276
        slots := make(chan struct{}, maxInflight)
10✔
4277
        for range maxInflight {
65,674✔
4278
                slots <- struct{}{}
65,664✔
4279
        }
65,664✔
4280

4281
        // We will place sequence number and size of chunk sent in the reply.
4282
        ackSubj := fmt.Sprintf(jsSnapshotAckT, mset.name(), nuid.Next())
10✔
4283
        ackSub, _ := mset.subscribeInternal(ackSubj+".>", func(_ *subscription, _ *client, _ *Account, subject, _ string, _ []byte) {
34✔
4284
                select {
24✔
4285
                case slots <- struct{}{}:
24✔
4286
                case <-done:
×
4287
                }
4288
        })
4289
        defer mset.unsubscribe(ackSub)
10✔
4290

10✔
4291
        var hdr []byte
10✔
4292
        chunk := make([]byte, chunkSize)
10✔
4293
        for index := 1; ; index++ {
56✔
4294
                select {
46✔
4295
                case <-slots:
46✔
4296
                        // A slot has become available.
4297
                case <-inch:
×
4298
                        // The receiver appears to have gone away.
×
4299
                        hdr = []byte("NATS/1.0 408 No Interest\r\n\r\n")
×
4300
                        goto done
×
4301
                case err := <-sr.errCh:
×
4302
                        // The snapshotting goroutine has failed for some reason.
×
4303
                        hdr = []byte(fmt.Sprintf("NATS/1.0 500 %s\r\n\r\n", err))
×
4304
                        goto done
×
4305
                case <-time.After(snapshotAckTimeout):
×
4306
                        // It's taking a very long time for the receiver to send us acks,
×
4307
                        // they have probably stalled or there is high loss on the link.
×
4308
                        hdr = []byte("NATS/1.0 408 No Flow Response\r\n\r\n")
×
4309
                        goto done
×
4310
                }
4311
                n, err := io.ReadFull(r, chunk)
46✔
4312
                chunk := chunk[:n]
46✔
4313
                if err != nil {
56✔
4314
                        if n > 0 {
20✔
4315
                                mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0))
10✔
4316
                        }
10✔
4317
                        break
10✔
4318
                }
4319
                ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index)
36✔
4320
                if hdr == nil {
41✔
4321
                        hdr = []byte("NATS/1.0 204\r\n\r\n")
5✔
4322
                }
5✔
4323
                mset.outq.send(newJSPubMsg(reply, _EMPTY_, ackReply, nil, chunk, nil, 0))
36✔
4324
        }
4325

4326
done:
4327
        mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
10✔
4328
}
4329

4330
// For determining consumer request type.
4331
type ccReqType uint8
4332

4333
const (
4334
        ccNew = iota
4335
        ccLegacyEphemeral
4336
        ccLegacyDurable
4337
)
4338

4339
// Request to create a consumer where stream and optional consumer name are part of the subject, and optional
4340
// filtered subjects can be at the tail end.
4341
// Assumes stream and consumer names are single tokens.
4342
func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
11,147✔
4343
        if c == nil || !s.JetStreamEnabled() {
11,147✔
4344
                return
×
4345
        }
×
4346

4347
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
11,147✔
4348
        if err != nil {
11,148✔
4349
                s.Warnf(badAPIRequestT, msg)
1✔
4350
                return
1✔
4351
        }
1✔
4352

4353
        var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
11,146✔
4354

11,146✔
4355
        var req CreateConsumerRequest
11,146✔
4356
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11,147✔
4357
                resp.Error = NewJSInvalidJSONError(err)
1✔
4358
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4359
                return
1✔
4360
        }
1✔
4361

4362
        var js *jetStream
11,145✔
4363
        isClustered := s.JetStreamIsClustered()
11,145✔
4364

11,145✔
4365
        // Determine if we should proceed here when we are in clustered mode.
11,145✔
4366
        direct := req.Config.Direct
11,145✔
4367
        if isClustered {
21,395✔
4368
                if direct {
10,742✔
4369
                        // If it's just a direct consumer, check for stream leader.
492✔
4370
                        if !req.Config.Sourcing {
492✔
4371
                                // Check to see if we have this stream and are the stream leader.
×
4372
                                if !acc.JetStreamIsStreamLeader(streamNameFromSubject(subject)) {
×
4373
                                        return
×
4374
                                }
×
4375
                        } else {
492✔
4376
                                // Otherwise, we either need this to be answered by the stream or meta leader.
492✔
4377
                                var cc *jetStreamCluster
492✔
4378
                                js, cc = s.getJetStreamCluster()
492✔
4379
                                if js == nil || cc == nil {
492✔
4380
                                        return
×
4381
                                }
×
4382
                                js.mu.RLock()
492✔
4383
                                sa := js.streamAssignmentOrInflight(acc.Name, streamNameFromSubject(subject))
492✔
4384
                                if sa == nil {
495✔
4385
                                        js.mu.RUnlock()
3✔
4386
                                        return
3✔
4387
                                }
3✔
4388
                                // If the stream is WQ or Interest, we need the meta leader to answer.
4389
                                if sa.Config.Retention != LimitsPolicy {
549✔
4390
                                        direct = false
60✔
4391
                                }
60✔
4392
                                js.mu.RUnlock()
489✔
4393
                                if direct {
918✔
4394
                                        // Check to see if we have this stream and are the stream leader.
429✔
4395
                                        if !acc.JetStreamIsStreamLeader(streamNameFromSubject(subject)) {
755✔
4396
                                                return
326✔
4397
                                        }
326✔
4398
                                } else {
60✔
4399
                                        if js.isLeaderless() {
60✔
4400
                                                resp.Error = NewJSClusterNotAvailError()
×
4401
                                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4402
                                                return
×
4403
                                        }
×
4404
                                        // Make sure we are meta leader.
4405
                                        if !s.JetStreamIsLeader() {
100✔
4406
                                                return
40✔
4407
                                        }
40✔
4408
                                }
4409
                        }
4410
                } else {
9,758✔
4411
                        var cc *jetStreamCluster
9,758✔
4412
                        js, cc = s.getJetStreamCluster()
9,758✔
4413
                        if js == nil || cc == nil {
9,758✔
4414
                                return
×
4415
                        }
×
4416
                        if js.isLeaderless() {
9,758✔
4417
                                resp.Error = NewJSClusterNotAvailError()
×
4418
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4419
                                return
×
4420
                        }
×
4421
                        // Make sure we are meta leader.
4422
                        if !s.JetStreamIsLeader() {
16,348✔
4423
                                return
6,590✔
4424
                        }
6,590✔
4425
                }
4426
        }
4427

4428
        if errorOnRequiredApiLevel(hdr) {
4,189✔
4429
                resp.Error = NewJSRequiredApiLevelError()
3✔
4430
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4431
                return
3✔
4432
        }
3✔
4433

4434
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
4,186✔
4435
                if doErr {
4✔
4436
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
4437
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4438
                }
1✔
4439
                return
3✔
4440
        }
4441

4442
        var streamName, consumerName, filteredSubject string
4,180✔
4443
        var rt ccReqType
4,180✔
4444

4,180✔
4445
        if n := numTokens(subject); n < 5 {
4,180✔
4446
                s.Warnf(badAPIRequestT, msg)
×
4447
                return
×
4448
        } else if n == 5 {
4,927✔
4449
                // Legacy ephemeral.
747✔
4450
                rt = ccLegacyEphemeral
747✔
4451
                streamName = streamNameFromSubject(subject)
747✔
4452
                consumerName = req.Config.Name
747✔
4453
        } else {
4,180✔
4454
                // New style and durable legacy.
3,433✔
4455
                if tokenAt(subject, 4) == "DURABLE" {
3,592✔
4456
                        rt = ccLegacyDurable
159✔
4457
                        if n != 7 {
159✔
4458
                                resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4459
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4460
                                return
×
4461
                        }
×
4462
                        streamName = tokenAt(subject, 6)
159✔
4463
                        consumerName = tokenAt(subject, 7)
159✔
4464
                } else {
3,274✔
4465
                        streamName = streamNameFromSubject(subject)
3,274✔
4466
                        consumerName = consumerNameFromSubject(subject)
3,274✔
4467
                        // New has optional filtered subject as part of main subject..
3,274✔
4468
                        if n > 6 {
5,977✔
4469
                                tokens := strings.Split(subject, tsep)
2,703✔
4470
                                filteredSubject = strings.Join(tokens[6:], tsep)
2,703✔
4471
                        }
2,703✔
4472
                }
4473
        }
4474

4475
        if streamName != req.Stream {
4,181✔
4476
                resp.Error = NewJSStreamMismatchError()
1✔
4477
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4478
                return
1✔
4479
        }
1✔
4480

4481
        if consumerName != _EMPTY_ {
7,909✔
4482
                // Check for path like separators in the name.
3,730✔
4483
                if strings.ContainsAny(consumerName, `\/`) {
3,734✔
4484
                        resp.Error = NewJSConsumerNameContainsPathSeparatorsError()
4✔
4485
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4486
                        return
4✔
4487
                }
4✔
4488
        }
4489

4490
        // Should we expect a durable name
4491
        if rt == ccLegacyDurable {
4,334✔
4492
                if numTokens(subject) < 7 {
159✔
4493
                        resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4494
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4495
                        return
×
4496
                }
×
4497
                // Now check on requirements for durable request.
4498
                if req.Config.Durable == _EMPTY_ {
160✔
4499
                        resp.Error = NewJSConsumerDurableNameNotSetError()
1✔
4500
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4501
                        return
1✔
4502
                }
1✔
4503
                if consumerName != req.Config.Durable {
158✔
4504
                        resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4505
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4506
                        return
×
4507
                }
×
4508
                // Durable, so we need to honor the name.
4509
                req.Config.Name = consumerName
158✔
4510
        }
4511
        // If new style and durable set make sure they match.
4512
        if rt == ccNew {
7,444✔
4513
                if req.Config.Durable != _EMPTY_ {
5,982✔
4514
                        if consumerName != req.Config.Durable {
2,712✔
4515
                                resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4516
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4517
                                return
×
4518
                        }
×
4519
                }
4520
                // New style ephemeral so we need to honor the name.
4521
                req.Config.Name = consumerName
3,270✔
4522
        }
4523
        // Check for legacy ephemeral mis-configuration.
4524
        if rt == ccLegacyEphemeral && req.Config.Durable != _EMPTY_ {
4,177✔
4525
                resp.Error = NewJSConsumerEphemeralWithDurableNameError()
3✔
4526
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4527
                return
3✔
4528
        }
3✔
4529

4530
        // in case of multiple filters provided, error if new API is used.
4531
        if filteredSubject != _EMPTY_ && len(req.Config.FilterSubjects) != 0 {
4,172✔
4532
                resp.Error = NewJSConsumerMultipleFiltersNotAllowedError()
1✔
4533
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4534
                return
1✔
4535
        }
1✔
4536

4537
        // Check for a filter subject.
4538
        if filteredSubject != _EMPTY_ && req.Config.FilterSubject != filteredSubject {
4,172✔
4539
                resp.Error = NewJSConsumerCreateFilterSubjectMismatchError()
2✔
4540
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
4541
                return
2✔
4542
        }
2✔
4543

4544
        if isClustered && !direct {
7,354✔
4545
                s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
3,186✔
4546
                return
3,186✔
4547
        }
3,186✔
4548

4549
        // If we are here we are single server mode.
4550
        if req.Config.Replicas > 1 {
982✔
4551
                resp.Error = NewJSStreamReplicasNotSupportedError()
×
4552
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4553
                return
×
4554
        }
×
4555

4556
        stream, err := acc.lookupStream(req.Stream)
982✔
4557
        if err != nil {
986✔
4558
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
4559
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4560
                return
4✔
4561
        }
4✔
4562
        if stream.offlineReason != _EMPTY_ {
978✔
4563
                resp.Error = NewJSStreamOfflineReasonError(errors.New(stream.offlineReason))
×
4564
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4565
                return
×
4566
        }
×
4567

4568
        // If the consumer is a direct sourcing consumer, we need to "upgrade"
4569
        // it to be durable without AckNone if not a Limits-based stream.
4570
        if req.Config.Direct && req.Config.Sourcing && req.Config.Name != _EMPTY_ {
1,196✔
4571
                if !isClustered && stream.isInterestRetention() {
236✔
4572
                        req.Config.Direct = false
18✔
4573
                        req.Config.Durable = req.Config.Name
18✔
4574
                        req.Config.AckPolicy = AckFlowControl
18✔
4575
                        req.Config.AckWait = 0
18✔
4576
                        req.Config.MaxDeliver = 0
18✔
4577
                        req.Config.InactiveThreshold = 0
18✔
4578
                } else {
218✔
4579
                        // Otherwise, need to append a randomized suffix since the source uses a stable name.
200✔
4580
                        req.Config.Name = fmt.Sprintf("%s-%s", req.Config.Name, createConsumerName())
200✔
4581
                        consumerName = req.Config.Name
200✔
4582
                }
200✔
4583
        }
4584

4585
        if o := stream.lookupConsumer(consumerName); o != nil {
1,033✔
4586
                if o.offlineReason != _EMPTY_ {
55✔
4587
                        resp.Error = NewJSConsumerOfflineReasonError(errors.New(o.offlineReason))
×
4588
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4589
                        return
×
4590
                }
×
4591
                // If the consumer already exists then don't allow updating the PauseUntil, just set
4592
                // it back to whatever the current configured value is.
4593
                o.mu.RLock()
55✔
4594
                req.Config.PauseUntil = o.cfg.PauseUntil
55✔
4595
                // If a durable sourcing consumer is used, we need to reset the deliver policy.
55✔
4596
                if req.Config.Sourcing && req.Config.Durable != _EMPTY_ {
55✔
4597
                        req.Config.DeliverPolicy = o.cfg.DeliverPolicy
×
4598
                        req.Config.OptStartSeq = o.cfg.OptStartSeq
×
4599
                        req.Config.OptStartTime = o.cfg.OptStartTime
×
4600
                }
×
4601
                o.mu.RUnlock()
55✔
4602
        }
4603

4604
        // Initialize/update asset version metadata.
4605
        setStaticConsumerMetadata(&req.Config)
978✔
4606

978✔
4607
        o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic)
978✔
4608

978✔
4609
        if err != nil {
1,035✔
4610
                if IsNatsErr(err, JSConsumerStoreFailedErrF) {
57✔
4611
                        cname := req.Config.Durable // Will be empty if ephemeral.
×
4612
                        s.Warnf("Consumer create failed for '%s > %s > %s': %v", acc, req.Stream, cname, err)
×
4613
                        err = errConsumerStoreFailed
×
4614
                }
×
4615
                resp.Error = NewJSConsumerCreateError(err, Unless(err))
57✔
4616
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
57✔
4617
                return
57✔
4618
        }
4619
        resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.initialInfo())
921✔
4620
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
921✔
4621

921✔
4622
        o.mu.RLock()
921✔
4623
        if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
925✔
4624
                o.sendPauseAdvisoryLocked(&o.cfg)
4✔
4625
        }
4✔
4626
        o.mu.RUnlock()
921✔
4627
}
4628

4629
// Request for the list of all consumer names.
4630
func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
41✔
4631
        if c == nil || !s.JetStreamEnabled() {
41✔
4632
                return
×
4633
        }
×
4634
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
41✔
4635
        if err != nil {
41✔
4636
                s.Warnf(badAPIRequestT, msg)
×
4637
                return
×
4638
        }
×
4639

4640
        var resp = JSApiConsumerNamesResponse{
41✔
4641
                ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType},
41✔
4642
                Consumers:   []string{},
41✔
4643
        }
41✔
4644

41✔
4645
        // Determine if we should proceed here when we are in clustered mode.
41✔
4646
        if s.JetStreamIsClustered() {
74✔
4647
                js, cc := s.getJetStreamCluster()
33✔
4648
                if js == nil || cc == nil {
33✔
4649
                        return
×
4650
                }
×
4651
                if js.isLeaderless() {
33✔
4652
                        resp.Error = NewJSClusterNotAvailError()
×
4653
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4654
                        return
×
4655
                }
×
4656
                // Make sure we are meta leader.
4657
                if !s.JetStreamIsLeader() {
55✔
4658
                        return
22✔
4659
                }
22✔
4660
        }
4661

4662
        if errorOnRequiredApiLevel(hdr) {
20✔
4663
                resp.Error = NewJSRequiredApiLevelError()
1✔
4664
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4665
                return
1✔
4666
        }
1✔
4667

4668
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
18✔
4669
                if doErr {
×
4670
                        resp.Error = NewJSNotEnabledForAccountError()
×
4671
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4672
                }
×
4673
                return
×
4674
        }
4675

4676
        var offset int
18✔
4677
        if isJSONObjectOrArray(msg) {
30✔
4678
                var req JSApiConsumersRequest
12✔
4679
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
4680
                        resp.Error = NewJSInvalidJSONError(err)
×
4681
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4682
                        return
×
4683
                }
×
4684
                offset = max(req.Offset, 0)
12✔
4685
        }
4686

4687
        streamName := streamNameFromSubject(subject)
18✔
4688
        var numConsumers int
18✔
4689

18✔
4690
        if s.JetStreamIsClustered() {
29✔
4691
                js, cc := s.getJetStreamCluster()
11✔
4692
                if js == nil || cc == nil {
11✔
4693
                        // TODO(dlc) - Debug or Warn?
×
4694
                        return
×
4695
                }
×
4696
                js.mu.RLock()
11✔
4697
                sas := cc.streams[acc.Name]
11✔
4698
                if sas == nil {
11✔
4699
                        js.mu.RUnlock()
×
4700
                        resp.Error = NewJSStreamNotFoundError()
×
4701
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4702
                        return
×
4703
                }
×
4704
                sa := sas[streamName]
11✔
4705
                if sa == nil || sa.err != nil {
11✔
4706
                        js.mu.RUnlock()
×
4707
                        resp.Error = NewJSStreamNotFoundError()
×
4708
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4709
                        return
×
4710
                }
×
4711
                for consumer := range sa.consumers {
25✔
4712
                        resp.Consumers = append(resp.Consumers, consumer)
14✔
4713
                }
14✔
4714
                if len(resp.Consumers) > 1 {
15✔
4715
                        slices.Sort(resp.Consumers)
4✔
4716
                }
4✔
4717
                numConsumers = len(resp.Consumers)
11✔
4718
                if offset > numConsumers {
11✔
4719
                        offset = numConsumers
×
4720
                }
×
4721
                resp.Consumers = resp.Consumers[offset:]
11✔
4722
                if len(resp.Consumers) > JSApiNamesLimit {
11✔
4723
                        resp.Consumers = resp.Consumers[:JSApiNamesLimit]
×
4724
                }
×
4725
                js.mu.RUnlock()
11✔
4726

4727
        } else {
7✔
4728
                mset, err := acc.lookupStream(streamName)
7✔
4729
                if err != nil {
7✔
4730
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4731
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4732
                        return
×
4733
                }
×
4734

4735
                obs := mset.getPublicConsumers()
7✔
4736
                slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
7✔
4737

4738
                numConsumers = len(obs)
7✔
4739
                if offset > numConsumers {
7✔
4740
                        offset = numConsumers
×
4741
                }
×
4742

4743
                for _, o := range obs[offset:] {
12✔
4744
                        resp.Consumers = append(resp.Consumers, o.String())
5✔
4745
                        if len(resp.Consumers) >= JSApiNamesLimit {
5✔
4746
                                break
×
4747
                        }
4748
                }
4749
        }
4750
        resp.Total = numConsumers
18✔
4751
        resp.Limit = JSApiNamesLimit
18✔
4752
        resp.Offset = offset
18✔
4753
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
18✔
4754
}
4755

4756
// Request for the list of all detailed consumer information.
4757
func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
67✔
4758
        if c == nil || !s.JetStreamEnabled() {
67✔
4759
                return
×
4760
        }
×
4761

4762
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
67✔
4763
        if err != nil {
67✔
4764
                s.Warnf(badAPIRequestT, msg)
×
4765
                return
×
4766
        }
×
4767

4768
        var resp = JSApiConsumerListResponse{
67✔
4769
                ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
67✔
4770
                Consumers:   []*ConsumerInfo{},
67✔
4771
        }
67✔
4772

67✔
4773
        // Determine if we should proceed here when we are in clustered mode.
67✔
4774
        if s.JetStreamIsClustered() {
126✔
4775
                js, cc := s.getJetStreamCluster()
59✔
4776
                if js == nil || cc == nil {
59✔
4777
                        return
×
4778
                }
×
4779
                if js.isLeaderless() {
60✔
4780
                        resp.Error = NewJSClusterNotAvailError()
1✔
4781
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4782
                        return
1✔
4783
                }
1✔
4784
                // Make sure we are meta leader.
4785
                if !s.JetStreamIsLeader() {
100✔
4786
                        return
42✔
4787
                }
42✔
4788
        }
4789

4790
        if errorOnRequiredApiLevel(hdr) {
25✔
4791
                resp.Error = NewJSRequiredApiLevelError()
1✔
4792
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4793
                return
1✔
4794
        }
1✔
4795

4796
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
23✔
4797
                if doErr {
×
4798
                        resp.Error = NewJSNotEnabledForAccountError()
×
4799
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4800
                }
×
4801
                return
×
4802
        }
4803

4804
        var offset int
23✔
4805
        if isJSONObjectOrArray(msg) {
34✔
4806
                var req JSApiConsumersRequest
11✔
4807
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
4808
                        resp.Error = NewJSInvalidJSONError(err)
1✔
4809
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4810
                        return
1✔
4811
                }
1✔
4812
                offset = max(req.Offset, 0)
10✔
4813
        }
4814

4815
        streamName := streamNameFromSubject(subject)
22✔
4816

22✔
4817
        // Clustered mode will invoke a scatter and gather.
22✔
4818
        if s.JetStreamIsClustered() {
38✔
4819
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
16✔
4820
                msg = copyBytes(msg)
16✔
4821
                s.startGoRoutine(func() {
32✔
4822
                        s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg)
16✔
4823
                })
16✔
4824
                return
16✔
4825
        }
4826

4827
        mset, err := acc.lookupStream(streamName)
6✔
4828
        if err != nil {
6✔
4829
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4830
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4831
                return
×
4832
        }
×
4833

4834
        obs := mset.getPublicConsumers()
6✔
4835
        slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
6✔
4836

4837
        ocnt := len(obs)
6✔
4838
        if offset > ocnt {
6✔
4839
                offset = ocnt
×
4840
        }
×
4841

4842
        var missingNames []string
6✔
4843
        for _, o := range obs[offset:] {
11✔
4844
                if o.offlineReason != _EMPTY_ {
6✔
4845
                        if resp.Offline == nil {
2✔
4846
                                resp.Offline = make(map[string]string, 1)
1✔
4847
                        }
1✔
4848
                        resp.Offline[o.name] = o.offlineReason
1✔
4849
                        missingNames = append(missingNames, o.name)
1✔
4850
                        continue
1✔
4851
                }
4852
                if cinfo := o.info(); cinfo != nil {
8✔
4853
                        resp.Consumers = append(resp.Consumers, cinfo)
4✔
4854
                }
4✔
4855
                if len(resp.Consumers) >= JSApiListLimit {
4✔
4856
                        break
×
4857
                }
4858
        }
4859
        resp.Total = ocnt
6✔
4860
        resp.Limit = JSApiListLimit
6✔
4861
        resp.Offset = offset
6✔
4862
        resp.Missing = missingNames
6✔
4863
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
6✔
4864
}
4865

4866
// Request for information about an consumer.
4867
func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
42,478✔
4868
        if c == nil || !s.JetStreamEnabled() {
42,478✔
4869
                return
×
4870
        }
×
4871
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
42,478✔
4872
        if err != nil {
42,478✔
4873
                s.Warnf(badAPIRequestT, msg)
×
4874
                return
×
4875
        }
×
4876

4877
        streamName := streamNameFromSubject(subject)
42,478✔
4878
        consumerName := consumerNameFromSubject(subject)
42,478✔
4879

42,478✔
4880
        var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}
42,478✔
4881

42,478✔
4882
        if !isEmptyRequest(msg) {
42,479✔
4883
                resp.Error = NewJSNotEmptyRequestError()
1✔
4884
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4885
                return
1✔
4886
        }
1✔
4887

4888
        // If we are in clustered mode we need to be the consumer leader to proceed.
4889
        if s.JetStreamIsClustered() {
84,397✔
4890
                // Check to make sure the consumer is assigned.
41,920✔
4891
                js, cc := s.getJetStreamCluster()
41,920✔
4892
                if js == nil || cc == nil {
41,920✔
4893
                        return
×
4894
                }
×
4895

4896
                js.mu.RLock()
41,920✔
4897
                meta := cc.meta
41,920✔
4898
                js.mu.RUnlock()
41,920✔
4899

41,920✔
4900
                if meta == nil {
41,920✔
4901
                        return
×
4902
                }
×
4903

4904
                // Since these could wait on the Raft group lock, don't do so under the JS lock.
4905
                ourID := meta.ID()
41,920✔
4906
                groupLeaderless := meta.Leaderless()
41,920✔
4907
                groupCreated := meta.Created()
41,920✔
4908

41,920✔
4909
                js.mu.RLock()
41,920✔
4910
                isLeader, sa, ca := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, streamName), js.consumerAssignmentOrInflight(acc.Name, streamName, consumerName)
41,920✔
4911
                var rg *raftGroup
41,920✔
4912
                var offline, isMember bool
41,920✔
4913
                if ca != nil {
46,188✔
4914
                        if rg = ca.Group; rg != nil {
8,536✔
4915
                                offline = s.allPeersOffline(rg)
4,268✔
4916
                                isMember = rg.isMember(ourID)
4,268✔
4917
                        }
4,268✔
4918
                        if ca.unsupported != nil && isMember {
4,286✔
4919
                                // If we're a member for this consumer, and it's not supported, report it as offline.
18✔
4920
                                resp.Error = NewJSConsumerOfflineReasonError(errors.New(ca.unsupported.reason))
18✔
4921
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
18✔
4922
                                js.mu.RUnlock()
18✔
4923
                                return
18✔
4924
                        }
18✔
4925
                }
4926
                // Capture consumer leader here.
4927
                isConsumerLeader := cc.isConsumerLeader(acc.Name, streamName, consumerName)
41,902✔
4928
                // Also capture if we think there is no meta leader.
41,902✔
4929
                var isLeaderLess bool
41,902✔
4930
                if !isLeader {
70,056✔
4931
                        isLeaderLess = groupLeaderless && time.Since(groupCreated) > lostQuorumIntervalDefault
28,154✔
4932
                }
28,154✔
4933
                js.mu.RUnlock()
41,902✔
4934

41,902✔
4935
                if isLeader && ca == nil {
54,364✔
4936
                        // We can't find the consumer, so mimic what would be the errors below.
12,462✔
4937
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
12,462✔
4938
                                if doErr {
×
4939
                                        resp.Error = NewJSNotEnabledForAccountError()
×
4940
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4941
                                }
×
4942
                                return
×
4943
                        }
4944
                        if sa == nil {
22,465✔
4945
                                resp.Error = NewJSStreamNotFoundError()
10,003✔
4946
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
10,003✔
4947
                                return
10,003✔
4948
                        }
10,003✔
4949
                        // If we are here the consumer is not present.
4950
                        resp.Error = NewJSConsumerNotFoundError()
2,459✔
4951
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2,459✔
4952
                        return
2,459✔
4953
                } else if ca == nil {
54,630✔
4954
                        if isLeaderLess {
25,192✔
4955
                                resp.Error = NewJSClusterNotAvailError()
2✔
4956
                                // Delaying an error response gives the leader a chance to respond before us
2✔
4957
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
4958
                        }
2✔
4959
                        return
25,190✔
4960
                } else if isLeader && offline {
4,253✔
4961
                        resp.Error = NewJSConsumerOfflineError()
3✔
4962
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
3✔
4963
                        return
3✔
4964
                }
3✔
4965

4966
                // Check to see if we are a member of the group and if the group has no leader.
4967
                if isMember && js.isGroupLeaderless(ca.Group) {
4,248✔
4968
                        resp.Error = NewJSClusterNotAvailError()
1✔
4969
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4970
                        return
1✔
4971
                }
1✔
4972

4973
                // We have the consumer assigned and a leader, so only the consumer leader should answer.
4974
                if !isConsumerLeader {
7,217✔
4975
                        if isLeaderLess {
2,971✔
4976
                                resp.Error = NewJSClusterNotAvailError()
×
4977
                                // Delaying an error response gives the leader a chance to respond before us
×
4978
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group, errRespDelay)
×
4979
                                return
×
4980
                        }
×
4981

4982
                        var node RaftNode
2,971✔
4983
                        var leaderNotPartOfGroup bool
2,971✔
4984

2,971✔
4985
                        // We have a consumer assignment.
2,971✔
4986
                        if isMember {
5,121✔
4987
                                js.mu.RLock()
2,150✔
4988
                                if rg != nil && rg.node != nil {
4,296✔
4989
                                        node = rg.node
2,146✔
4990
                                        if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
2,146✔
4991
                                                leaderNotPartOfGroup = true
×
4992
                                        }
×
4993
                                }
4994
                                js.mu.RUnlock()
2,150✔
4995
                        }
4996

4997
                        // Check if we should ignore all together.
4998
                        if node == nil {
3,796✔
4999
                                // We have been assigned but have not created a node yet. If we are a member return
825✔
5000
                                // our config and defaults for state and no cluster info.
825✔
5001
                                if isMember {
829✔
5002
                                        // Since we access consumerAssignment, need js lock.
4✔
5003
                                        js.mu.RLock()
4✔
5004
                                        resp.ConsumerInfo = &ConsumerInfo{
4✔
5005
                                                Stream:    ca.Stream,
4✔
5006
                                                Name:      ca.Name,
4✔
5007
                                                Created:   ca.Created,
4✔
5008
                                                Config:    setDynamicConsumerMetadata(ca.Config),
4✔
5009
                                                TimeStamp: time.Now().UTC(),
4✔
5010
                                        }
4✔
5011
                                        b := s.jsonResponse(resp)
4✔
5012
                                        js.mu.RUnlock()
4✔
5013
                                        s.sendAPIResponse(ci, acc, subject, reply, string(msg), b)
4✔
5014
                                }
4✔
5015
                                return
825✔
5016
                        }
5017
                        // If we are a member and we have a group leader or we had a previous leader consider bailing out.
5018
                        if !node.Leaderless() || node.HadPreviousLeader() || (rg != nil && rg.Preferred != _EMPTY_ && rg.Preferred != ourID) {
4,287✔
5019
                                if leaderNotPartOfGroup {
2,141✔
5020
                                        resp.Error = NewJSConsumerOfflineError()
×
5021
                                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
5022
                                }
×
5023
                                return
2,141✔
5024
                        }
5025
                        // If we are here we are a member and this is just a new consumer that does not have a (preferred) leader yet.
5026
                        // Will fall through and return what we have. All consumers can respond but this should be very rare
5027
                        // but makes more sense to clients when they try to create, get a consumer exists, and then do consumer info.
5028
                }
5029
        }
5030

5031
        if errorOnRequiredApiLevel(hdr) {
1,838✔
5032
                resp.Error = NewJSRequiredApiLevelError()
1✔
5033
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5034
                return
1✔
5035
        }
1✔
5036

5037
        if !acc.JetStreamEnabled() {
1,836✔
5038
                resp.Error = NewJSNotEnabledForAccountError()
×
5039
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5040
                return
×
5041
        }
×
5042

5043
        mset, err := acc.lookupStream(streamName)
1,836✔
5044
        if err != nil {
1,836✔
5045
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5046
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5047
                return
×
5048
        }
×
5049

5050
        obs := mset.lookupConsumer(consumerName)
1,836✔
5051
        if obs == nil {
2,053✔
5052
                resp.Error = NewJSConsumerNotFoundError()
217✔
5053
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
217✔
5054
                return
217✔
5055
        }
217✔
5056

5057
        if obs.offlineReason != _EMPTY_ {
1,620✔
5058
                resp.Error = NewJSConsumerOfflineReasonError(errors.New(obs.offlineReason))
1✔
5059
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
5060
                return
1✔
5061
        }
1✔
5062

5063
        if resp.ConsumerInfo = setDynamicConsumerInfoMetadata(obs.info()); resp.ConsumerInfo == nil {
1,620✔
5064
                // This consumer returned nil which means it's closed. Respond with not found.
2✔
5065
                resp.Error = NewJSConsumerNotFoundError()
2✔
5066
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
5067
                return
2✔
5068
        }
2✔
5069
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,616✔
5070
}
5071

5072
// Request to delete an Consumer.
5073
func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
7,293✔
5074
        if c == nil || !s.JetStreamEnabled() {
7,301✔
5075
                return
8✔
5076
        }
8✔
5077
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
7,285✔
5078
        if err != nil {
7,285✔
5079
                s.Warnf(badAPIRequestT, msg)
×
5080
                return
×
5081
        }
×
5082

5083
        var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
7,285✔
5084

7,285✔
5085
        // Determine if we should proceed here when we are in clustered mode.
7,285✔
5086
        if s.JetStreamIsClustered() {
14,320✔
5087
                js, cc := s.getJetStreamCluster()
7,035✔
5088
                if js == nil || cc == nil {
7,037✔
5089
                        return
2✔
5090
                }
2✔
5091
                if js.isLeaderless() {
7,034✔
5092
                        resp.Error = NewJSClusterNotAvailError()
1✔
5093
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5094
                        return
1✔
5095
                }
1✔
5096
                // Make sure we are meta leader.
5097
                if !s.JetStreamIsLeader() {
11,704✔
5098
                        return
4,672✔
5099
                }
4,672✔
5100
        }
5101

5102
        if errorOnRequiredApiLevel(hdr) {
2,611✔
5103
                resp.Error = NewJSRequiredApiLevelError()
1✔
5104
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5105
                return
1✔
5106
        }
1✔
5107

5108
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
2,634✔
5109
                if doErr {
47✔
5110
                        resp.Error = NewJSNotEnabledForAccountError()
22✔
5111
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
22✔
5112
                }
22✔
5113
                return
25✔
5114
        }
5115
        if !isEmptyRequest(msg) {
2,585✔
5116
                resp.Error = NewJSNotEmptyRequestError()
1✔
5117
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5118
                return
1✔
5119
        }
1✔
5120
        stream := streamNameFromSubject(subject)
2,583✔
5121
        consumer := consumerNameFromSubject(subject)
2,583✔
5122

2,583✔
5123
        if s.JetStreamIsClustered() {
4,943✔
5124
                s.jsClusteredConsumerDeleteRequest(ci, acc, stream, consumer, subject, reply, rmsg)
2,360✔
5125
                return
2,360✔
5126
        }
2,360✔
5127

5128
        mset, err := acc.lookupStream(stream)
223✔
5129
        if err != nil {
224✔
5130
                resp.Error = NewJSStreamNotFoundError(Unless(err))
1✔
5131
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5132
                return
1✔
5133
        }
1✔
5134

5135
        obs := mset.lookupConsumer(consumer)
222✔
5136
        if obs == nil {
289✔
5137
                resp.Error = NewJSConsumerNotFoundError()
67✔
5138
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
67✔
5139
                return
67✔
5140
        }
67✔
5141
        if err := obs.delete(); err != nil {
155✔
5142
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
5143
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5144
                return
×
5145
        }
×
5146
        resp.Success = true
155✔
5147
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
155✔
5148
}
5149

5150
// Request to pause or unpause a Consumer.
5151
func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
6,464✔
5152
        if c == nil || !s.JetStreamEnabled() {
6,464✔
5153
                return
×
5154
        }
×
5155
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
6,464✔
5156
        if err != nil {
6,464✔
5157
                s.Warnf(badAPIRequestT, msg)
×
5158
                return
×
5159
        }
×
5160

5161
        var req JSApiConsumerPauseRequest
6,464✔
5162
        var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}}
6,464✔
5163

6,464✔
5164
        if isJSONObjectOrArray(msg) {
12,919✔
5165
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
6,455✔
5166
                        resp.Error = NewJSInvalidJSONError(err)
×
5167
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5168
                        return
×
5169
                }
×
5170
        }
5171

5172
        // Determine if we should proceed here when we are in clustered mode.
5173
        isClustered := s.JetStreamIsClustered()
6,464✔
5174
        js, cc := s.getJetStreamCluster()
6,464✔
5175
        if isClustered {
11,318✔
5176
                if js == nil || cc == nil {
4,854✔
5177
                        return
×
5178
                }
×
5179
                if js.isLeaderless() {
4,854✔
5180
                        resp.Error = NewJSClusterNotAvailError()
×
5181
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5182
                        return
×
5183
                }
×
5184
                // Make sure we are meta leader.
5185
                if !s.JetStreamIsLeader() {
8,094✔
5186
                        return
3,240✔
5187
                }
3,240✔
5188
        }
5189

5190
        if errorOnRequiredApiLevel(hdr) {
3,225✔
5191
                resp.Error = NewJSRequiredApiLevelError()
1✔
5192
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5193
                return
1✔
5194
        }
1✔
5195

5196
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
3,223✔
5197
                if doErr {
×
5198
                        resp.Error = NewJSNotEnabledForAccountError()
×
5199
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5200
                }
×
5201
                return
×
5202
        }
5203

5204
        stream := streamNameFromSubject(subject)
3,223✔
5205
        consumer := consumerNameFromSubject(subject)
3,223✔
5206

3,223✔
5207
        if isClustered {
4,837✔
5208
                js.mu.RLock()
1,614✔
5209
                sa := js.streamAssignment(acc.Name, stream)
1,614✔
5210
                if sa == nil {
1,614✔
5211
                        js.mu.RUnlock()
×
5212
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5213
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5214
                        return
×
5215
                }
×
5216
                if sa.unsupported != nil {
1,614✔
5217
                        js.mu.RUnlock()
×
5218
                        // Just let the request time out.
×
5219
                        return
×
5220
                }
×
5221

5222
                ca, ok := sa.consumers[consumer]
1,614✔
5223
                if !ok || ca == nil {
1,614✔
5224
                        js.mu.RUnlock()
×
5225
                        resp.Error = NewJSConsumerNotFoundError()
×
5226
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5227
                        return
×
5228
                }
×
5229
                if ca.unsupported != nil {
1,614✔
5230
                        js.mu.RUnlock()
×
5231
                        // Just let the request time out.
×
5232
                        return
×
5233
                }
×
5234

5235
                nca := *ca
1,614✔
5236
                // We're only holding the read lock and release below,
1,614✔
5237
                // we need a copy to prevent concurrent reads/writes.
1,614✔
5238
                ncfg := *ca.Config
1,614✔
5239
                ncfg.Metadata = maps.Clone(ncfg.Metadata)
1,614✔
5240
                nca.Config = &ncfg
1,614✔
5241
                meta := cc.meta
1,614✔
5242
                js.mu.RUnlock()
1,614✔
5243
                pauseUTC := req.PauseUntil.UTC()
1,614✔
5244
                if !pauseUTC.IsZero() {
3,224✔
5245
                        nca.Config.PauseUntil = &pauseUTC
1,610✔
5246
                } else {
1,614✔
5247
                        nca.Config.PauseUntil = nil
4✔
5248
                }
4✔
5249

5250
                // Update asset version metadata due to updating pause/resume.
5251
                // Only PauseUntil is updated above, so reuse config for both.
5252
                setStaticConsumerMetadata(nca.Config)
1,614✔
5253

1,614✔
5254
                eca := encodeAddConsumerAssignment(&nca)
1,614✔
5255
                meta.Propose(eca)
1,614✔
5256

1,614✔
5257
                resp.PauseUntil = pauseUTC
1,614✔
5258
                if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
3,224✔
5259
                        resp.PauseRemaining = time.Until(pauseUTC)
1,610✔
5260
                }
1,610✔
5261
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,614✔
5262
                return
1,614✔
5263
        }
5264

5265
        mset, err := acc.lookupStream(stream)
1,609✔
5266
        if err != nil {
1,609✔
5267
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5268
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5269
                return
×
5270
        }
×
5271
        if mset.offlineReason != _EMPTY_ {
1,609✔
5272
                // Just let the request time out.
×
5273
                return
×
5274
        }
×
5275

5276
        obs := mset.lookupConsumer(consumer)
1,609✔
5277
        if obs == nil {
1,609✔
5278
                resp.Error = NewJSConsumerNotFoundError()
×
5279
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5280
                return
×
5281
        }
×
5282
        if obs.offlineReason != _EMPTY_ {
1,609✔
5283
                // Just let the request time out.
×
5284
                return
×
5285
        }
×
5286

5287
        // We're only holding the read lock and release below,
5288
        // we need a copy to prevent concurrent reads/writes.
5289
        obs.mu.RLock()
1,609✔
5290
        ncfg := obs.cfg
1,609✔
5291
        ncfg.Metadata = maps.Clone(ncfg.Metadata)
1,609✔
5292
        obs.mu.RUnlock()
1,609✔
5293

1,609✔
5294
        pauseUTC := req.PauseUntil.UTC()
1,609✔
5295
        if !pauseUTC.IsZero() {
3,214✔
5296
                ncfg.PauseUntil = &pauseUTC
1,605✔
5297
        } else {
1,609✔
5298
                ncfg.PauseUntil = nil
4✔
5299
        }
4✔
5300

5301
        // Update asset version metadata due to updating pause/resume.
5302
        setStaticConsumerMetadata(&ncfg)
1,609✔
5303

1,609✔
5304
        if err := obs.updateConfig(&ncfg); err != nil {
1,609✔
5305
                // The only type of error that should be returned here is from o.store,
×
5306
                // so use a store failed error type.
×
5307
                resp.Error = NewJSConsumerStoreFailedError(err)
×
5308
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5309
                return
×
5310
        }
×
5311

5312
        resp.PauseUntil = pauseUTC
1,609✔
5313
        if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
3,214✔
5314
                resp.PauseRemaining = time.Until(pauseUTC)
1,605✔
5315
        }
1,605✔
5316
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,609✔
5317
}
5318

5319
// sendJetStreamAPIAuditAdvisory will send the audit event for a given event.
5320
func (s *Server) sendJetStreamAPIAuditAdvisory(ci *ClientInfo, acc *Account, subject, request, response string) {
74,872✔
5321
        s.publishAdvisory(acc, JSAuditAdvisory, JSAPIAudit{
74,872✔
5322
                TypedEvent: TypedEvent{
74,872✔
5323
                        Type: JSAPIAuditType,
74,872✔
5324
                        ID:   nuid.Next(),
74,872✔
5325
                        Time: time.Now().UTC(),
74,872✔
5326
                },
74,872✔
5327
                Server:   s.Name(),
74,872✔
5328
                Client:   ci.forAdvisory(),
74,872✔
5329
                Subject:  subject,
74,872✔
5330
                Request:  request,
74,872✔
5331
                Response: response,
74,872✔
5332
                Domain:   s.getOpts().JetStreamDomain,
74,872✔
5333
        })
74,872✔
5334
}
74,872✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc