• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 25478192467

06 May 2026 02:32PM UTC coverage: 82.15% (-0.9%) from 83.018%
25478192467

push

github

web-flow
Ensure invalid subject characters cannot be forwarded from MQTT to other connection types (#8104)

Some characters are invalid in NATS subjects but were making it in due
to missing MQTT topic validation.

76411 of 93014 relevant lines covered (82.15%)

481569.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.93
/server/jetstream_api.go
1
// Copyright 2020-2026 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "cmp"
19
        "encoding/json"
20
        "errors"
21
        "fmt"
22
        "io"
23
        "maps"
24
        "os"
25
        "path/filepath"
26
        "runtime"
27
        "slices"
28
        "strings"
29
        "sync"
30
        "sync/atomic"
31
        "time"
32

33
        "github.com/nats-io/nuid"
34
)
35

36
// Request API subjects for JetStream.
37
const (
38
        // All API endpoints.
39
        jsAllAPI = "$JS.API.>"
40

41
        // For constructing JetStream domain prefixes.
42
        jsDomainAPI = "$JS.%s.API.>"
43

44
        JSApiPrefix = "$JS.API"
45

46
        // JSApiAccountInfo is for obtaining general information about JetStream for this account.
47
        // Will return JSON response.
48
        JSApiAccountInfo = "$JS.API.INFO"
49

50
        // JSApiStreamCreate is the endpoint to create new streams.
51
        // Will return JSON response.
52
        JSApiStreamCreate  = "$JS.API.STREAM.CREATE.*"
53
        JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"
54

55
        // JSApiStreamUpdate is the endpoint to update existing streams.
56
        // Will return JSON response.
57
        JSApiStreamUpdate  = "$JS.API.STREAM.UPDATE.*"
58
        JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"
59

60
        // JSApiStreams is the endpoint to list all stream names for this account.
61
        // Will return JSON response.
62
        JSApiStreams = "$JS.API.STREAM.NAMES"
63
        // JSApiStreamList is the endpoint that will return all detailed stream information
64
        JSApiStreamList = "$JS.API.STREAM.LIST"
65

66
        // JSApiStreamInfo is for obtaining general information about a named stream.
67
        // Will return JSON response.
68
        JSApiStreamInfo  = "$JS.API.STREAM.INFO.*"
69
        JSApiStreamInfoT = "$JS.API.STREAM.INFO.%s"
70

71
        // JSApiStreamDelete is the endpoint to delete streams.
72
        // Will return JSON response.
73
        JSApiStreamDelete  = "$JS.API.STREAM.DELETE.*"
74
        JSApiStreamDeleteT = "$JS.API.STREAM.DELETE.%s"
75

76
        // JSApiStreamPurge is the endpoint to purge streams.
77
        // Will return JSON response.
78
        JSApiStreamPurge  = "$JS.API.STREAM.PURGE.*"
79
        JSApiStreamPurgeT = "$JS.API.STREAM.PURGE.%s"
80

81
        // JSApiStreamSnapshot is the endpoint to snapshot streams.
82
        // Will return a stream of chunks with a nil chunk as EOF to
83
        // the deliver subject. Caller should respond to each chunk
84
        // with a nil body response for ack flow.
85
        JSApiStreamSnapshot  = "$JS.API.STREAM.SNAPSHOT.*"
86
        JSApiStreamSnapshotT = "$JS.API.STREAM.SNAPSHOT.%s"
87

88
        // JSApiStreamRestore is the endpoint to restore a stream from a snapshot.
89
        // Caller should respond to each chunk with a nil body response.
90
        JSApiStreamRestore  = "$JS.API.STREAM.RESTORE.*"
91
        JSApiStreamRestoreT = "$JS.API.STREAM.RESTORE.%s"
92

93
        // JSApiMsgDelete is the endpoint to delete messages from a stream.
94
        // Will return JSON response.
95
        JSApiMsgDelete  = "$JS.API.STREAM.MSG.DELETE.*"
96
        JSApiMsgDeleteT = "$JS.API.STREAM.MSG.DELETE.%s"
97

98
        // JSApiMsgGet is the template for direct requests for a message by its stream sequence number.
99
        // Will return JSON response.
100
        JSApiMsgGet  = "$JS.API.STREAM.MSG.GET.*"
101
        JSApiMsgGetT = "$JS.API.STREAM.MSG.GET.%s"
102

103
        // JSDirectMsgGet is the template for non-api layer direct requests for a message by its stream sequence number or last by subject.
104
        // Will return the message similar to how a consumer receives the message, no JSON processing.
105
        // If the message can not be found we will use a status header of 404. If the stream does not exist the client will get a no-responders or timeout.
106
        JSDirectMsgGet  = "$JS.API.DIRECT.GET.*"
107
        JSDirectMsgGetT = "$JS.API.DIRECT.GET.%s"
108

109
        // This is a direct version of get last by subject, which will be the dominant pattern for KV access once 2.9 is released.
110
        // The stream and the key will be part of the subject to allow for no-marshal payloads and subject based security permissions.
111
        JSDirectGetLastBySubject  = "$JS.API.DIRECT.GET.*.>"
112
        JSDirectGetLastBySubjectT = "$JS.API.DIRECT.GET.%s.%s"
113

114
        // jsDirectGetPre
115
        jsDirectGetPre = "$JS.API.DIRECT.GET"
116

117
        // JSApiConsumerCreate is the endpoint to create consumers for streams.
118
        // This was also the legacy endpoint for ephemeral consumers.
119
        // It now can take consumer name and optional filter subject, which when part of the subject controls access.
120
        // Will return JSON response.
121
        JSApiConsumerCreate    = "$JS.API.CONSUMER.CREATE.*"
122
        JSApiConsumerCreateT   = "$JS.API.CONSUMER.CREATE.%s"
123
        JSApiConsumerCreateEx  = "$JS.API.CONSUMER.CREATE.*.>"
124
        JSApiConsumerCreateExT = "$JS.API.CONSUMER.CREATE.%s.%s.%s"
125

126
        // JSApiDurableCreate is the endpoint to create durable consumers for streams.
127
        // You need to include the stream and consumer name in the subject.
128
        JSApiDurableCreate  = "$JS.API.CONSUMER.DURABLE.CREATE.*.*"
129
        JSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s"
130

131
        // JSApiConsumers is the endpoint to list all consumer names for the stream.
132
        // Will return JSON response.
133
        JSApiConsumers  = "$JS.API.CONSUMER.NAMES.*"
134
        JSApiConsumersT = "$JS.API.CONSUMER.NAMES.%s"
135

136
        // JSApiConsumerList is the endpoint that will return all detailed consumer information
137
        JSApiConsumerList  = "$JS.API.CONSUMER.LIST.*"
138
        JSApiConsumerListT = "$JS.API.CONSUMER.LIST.%s"
139

140
        // JSApiConsumerInfo is for obtaining general information about a consumer.
141
        // Will return JSON response.
142
        JSApiConsumerInfo  = "$JS.API.CONSUMER.INFO.*.*"
143
        JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s"
144

145
        // JSApiConsumerDelete is the endpoint to delete consumers.
146
        // Will return JSON response.
147
        JSApiConsumerDelete  = "$JS.API.CONSUMER.DELETE.*.*"
148
        JSApiConsumerDeleteT = "$JS.API.CONSUMER.DELETE.%s.%s"
149

150
        // JSApiConsumerPause is the endpoint to pause or unpause consumers.
151
        // Will return JSON response.
152
        JSApiConsumerPause  = "$JS.API.CONSUMER.PAUSE.*.*"
153
        JSApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"
154

155
        // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
156
        JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
157

158
        // JSApiConsumerResetT is the prefix for resetting a given consumer to a new starting sequence.
159
        JSApiConsumerResetT = "$JS.API.CONSUMER.RESET.%s.%s"
160

161
        // JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
162
        JSApiConsumerUnpin  = "$JS.API.CONSUMER.UNPIN.*.*"
163
        JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"
164

165
        // jsRequestNextPre
166
        jsRequestNextPre = "$JS.API.CONSUMER.MSG.NEXT."
167

168
        // For snapshots and restores. The ack will have additional tokens.
169
        jsSnapshotAckT    = "$JS.SNAPSHOT.ACK.%s.%s"
170
        jsRestoreDeliverT = "$JS.SNAPSHOT.RESTORE.%s.%s"
171

172
        // JSApiStreamRemovePeer is the endpoint to remove a peer from a clustered stream and its consumers.
173
        // Will return JSON response.
174
        JSApiStreamRemovePeer  = "$JS.API.STREAM.PEER.REMOVE.*"
175
        JSApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.%s"
176

177
        // JSApiStreamLeaderStepDown is the endpoint to have stream leader stepdown.
178
        // Will return JSON response.
179
        JSApiStreamLeaderStepDown  = "$JS.API.STREAM.LEADER.STEPDOWN.*"
180
        JSApiStreamLeaderStepDownT = "$JS.API.STREAM.LEADER.STEPDOWN.%s"
181

182
        // JSApiConsumerLeaderStepDown is the endpoint to have consumer leader stepdown.
183
        // Will return JSON response.
184
        JSApiConsumerLeaderStepDown  = "$JS.API.CONSUMER.LEADER.STEPDOWN.*.*"
185
        JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
186

187
        // JSApiLeaderStepDown is the endpoint to have our metaleader stepdown.
188
        // Only works from system account.
189
        // Will return JSON response.
190
        JSApiLeaderStepDown = "$JS.API.META.LEADER.STEPDOWN"
191

192
        // JSApiRemoveServer is the endpoint to remove a peer server from the cluster.
193
        // Only works from system account.
194
        // Will return JSON response.
195
        JSApiRemoveServer = "$JS.API.SERVER.REMOVE"
196

197
        // JSApiAccountPurge is the endpoint to purge the js content of an account
198
        // Only works from system account.
199
        // Will return JSON response.
200
        JSApiAccountPurge  = "$JS.API.ACCOUNT.PURGE.*"
201
        JSApiAccountPurgeT = "$JS.API.ACCOUNT.PURGE.%s"
202

203
        // JSApiServerStreamMove is the endpoint to move streams off a server
204
        // Only works from system account.
205
        // Will return JSON response.
206
        JSApiServerStreamMove  = "$JS.API.ACCOUNT.STREAM.MOVE.*.*"
207
        JSApiServerStreamMoveT = "$JS.API.ACCOUNT.STREAM.MOVE.%s.%s"
208

209
        // JSApiServerStreamCancelMove is the endpoint to cancel a stream move
210
        // Only works from system account.
211
        // Will return JSON response.
212
        JSApiServerStreamCancelMove  = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.*.*"
213
        JSApiServerStreamCancelMoveT = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.%s.%s"
214

215
        // The prefix for system level account API.
216
        jsAPIAccountPre = "$JS.API.ACCOUNT."
217

218
        // jsAckT is the template for the ack message stream coming back from a consumer
219
        // when they ACK/NAK, etc a message.
220
        jsAckT      = "$JS.ACK.%s.%s"
221
        jsAckTv2    = "$JS.ACK.%s.%s.%s.%s"
222
        jsAckPre    = "$JS.ACK."
223
        jsAckPreLen = len(jsAckPre)
224

225
        // jsFlowControl is for flow control subjects.
226
        jsFlowControlPre = "$JS.FC."
227
        // jsFlowControl is for FC responses.
228
        jsFlowControl   = "$JS.FC.%s.%s.*"
229
        jsFlowControlV2 = "$JS.FC.%s.%s.%s.%s.*"
230

231
        // JSAdvisoryPrefix is a prefix for all JetStream advisories.
232
        JSAdvisoryPrefix = "$JS.EVENT.ADVISORY"
233

234
        // JSMetricPrefix is a prefix for all JetStream metrics.
235
        JSMetricPrefix = "$JS.EVENT.METRIC"
236

237
        // JSMetricConsumerAckPre is a metric containing ack latency.
238
        JSMetricConsumerAckPre = "$JS.EVENT.METRIC.CONSUMER.ACK"
239

240
        // JSAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold.
241
        JSAdvisoryConsumerMaxDeliveryExceedPre = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES"
242

243
        // JSAdvisoryConsumerMsgNakPre is a notification published when a message has been naked
244
        JSAdvisoryConsumerMsgNakPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED"
245

246
        // JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated.
247
        JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"
248

249
        // JSAdvisoryStreamCreatedPre notification that a stream was created.
250
        JSAdvisoryStreamCreatedPre = "$JS.EVENT.ADVISORY.STREAM.CREATED"
251

252
        // JSAdvisoryStreamDeletedPre notification that a stream was deleted.
253
        JSAdvisoryStreamDeletedPre = "$JS.EVENT.ADVISORY.STREAM.DELETED"
254

255
        // JSAdvisoryStreamUpdatedPre notification that a stream was updated.
256
        JSAdvisoryStreamUpdatedPre = "$JS.EVENT.ADVISORY.STREAM.UPDATED"
257

258
        // JSAdvisoryConsumerCreatedPre notification that a consumer was created.
259
        JSAdvisoryConsumerCreatedPre = "$JS.EVENT.ADVISORY.CONSUMER.CREATED"
260

261
        // JSAdvisoryConsumerDeletedPre notification that a consumer was deleted.
262
        JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"
263

264
        // JSAdvisoryConsumerPausePre notification that a consumer paused/unpaused.
265
        JSAdvisoryConsumerPausePre = "$JS.EVENT.ADVISORY.CONSUMER.PAUSE"
266

267
        // JSAdvisoryConsumerPinnedPre notification that a consumer was pinned.
268
        JSAdvisoryConsumerPinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.PINNED"
269

270
        // JSAdvisoryConsumerUnpinnedPre notification that a consumer was unpinned.
271
        JSAdvisoryConsumerUnpinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.UNPINNED"
272

273
        // JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
274
        JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"
275

276
        // JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed.
277
        JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"
278

279
        // JSAdvisoryStreamRestoreCreatePre notification that a restore was start.
280
        JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"
281

282
        // JSAdvisoryStreamRestoreCompletePre notification that a restore was completed.
283
        JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"
284

285
        // JSAdvisoryDomainLeaderElectedPre notification that a jetstream domain has elected a leader.
286
        JSAdvisoryDomainLeaderElected = "$JS.EVENT.ADVISORY.DOMAIN.LEADER_ELECTED"
287

288
        // JSAdvisoryStreamLeaderElectedPre notification that a replicated stream has elected a leader.
289
        JSAdvisoryStreamLeaderElectedPre = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED"
290

291
        // JSAdvisoryStreamQuorumLostPre notification that a stream and its consumers are stalled.
292
        JSAdvisoryStreamQuorumLostPre = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST"
293

294
        // JSAdvisoryStreamBatchAbandonedPre notification that a stream's batch was abandoned.
295
        JSAdvisoryStreamBatchAbandonedPre = "$JS.EVENT.ADVISORY.STREAM.BATCH_ABANDONED"
296

297
        // JSAdvisoryConsumerLeaderElectedPre notification that a replicated consumer has elected a leader.
298
        JSAdvisoryConsumerLeaderElectedPre = "$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED"
299

300
        // JSAdvisoryConsumerQuorumLostPre notification that a consumer is stalled.
301
        JSAdvisoryConsumerQuorumLostPre = "$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST"
302

303
        // JSAdvisoryServerOutOfStorage notification that a server has no more storage.
304
        JSAdvisoryServerOutOfStorage = "$JS.EVENT.ADVISORY.SERVER.OUT_OF_STORAGE"
305

306
        // JSAdvisoryServerRemoved notification that a server has been removed from the system.
307
        JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED"
308

309
        // JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit.
310
        JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED"
311

312
        // JSAuditAdvisory is a notification about JetStream API access.
313
        // FIXME - Add in details about who..
314
        JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
315
)
316

317
// Headers used in $JS.API.> requests.
318
const (
319
        // JSRequiredApiLevel requires the API level of the responding server to have the specified minimum value.
320
        JSRequiredApiLevel = "Nats-Required-Api-Level"
321
)
322

323
var denyAllClientJs = []string{jsAllAPI, "$KV.>", "$OBJ.>"}
324
var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI, "$KV.>", "$OBJ.>"}
325

326
func generateJSMappingTable(domain string) map[string]string {
802✔
327
        mappings := map[string]string{}
802✔
328
        // This set of mappings is very very very ugly.
802✔
329
        // It is a consequence of what we defined the domain prefix to be "$JS.domain.API" and it's mapping to "$JS.API"
802✔
330
        // For optics $KV and $OBJ where made to be independent subject spaces.
802✔
331
        // As materialized views of JS, they did not simply extend that subject space to say "$JS.API.KV" "$JS.API.OBJ"
802✔
332
        // This is very unfortunate!!!
802✔
333
        // Furthermore, it seemed bad to require different domain prefixes for JS/KV/OBJ.
802✔
334
        // Especially since the actual API for say KV, does use stream create from JS.
802✔
335
        // To avoid overlaps KV and OBJ views append the prefix to their API.
802✔
336
        // (Replacing $KV with the prefix allows users to create collisions with say the bucket name)
802✔
337
        // This mapping therefore needs to have extra token so that the mapping can properly discern between $JS, $KV, $OBJ
802✔
338
        for srcMappingSuffix, to := range map[string]string{
802✔
339
                "INFO":       JSApiAccountInfo,
802✔
340
                "STREAM.>":   "$JS.API.STREAM.>",
802✔
341
                "CONSUMER.>": "$JS.API.CONSUMER.>",
802✔
342
                "DIRECT.>":   "$JS.API.DIRECT.>",
802✔
343
                "META.>":     "$JS.API.META.>",
802✔
344
                "SERVER.>":   "$JS.API.SERVER.>",
802✔
345
                "ACCOUNT.>":  "$JS.API.ACCOUNT.>",
802✔
346
                "$KV.>":      "$KV.>",
802✔
347
                "$OBJ.>":     "$OBJ.>",
802✔
348
        } {
8,020✔
349
                mappings[fmt.Sprintf("$JS.%s.API.%s", domain, srcMappingSuffix)] = to
7,218✔
350
        }
7,218✔
351
        return mappings
802✔
352
}
353

354
// JSMaxDescription is the maximum description length for streams and consumers.
355
const JSMaxDescriptionLen = 4 * 1024
356

357
// JSMaxMetadataLen is the maximum length for streams and consumers metadata map.
358
// It's calculated by summing length of all keys and values.
359
const JSMaxMetadataLen = 128 * 1024
360

361
// JSMaxNameLen is the maximum name lengths for streams, consumers and templates.
362
// Picked 255 as it seems to be a widely used file name limit
363
const JSMaxNameLen = 255
364

365
// JSDefaultRequestQueueLimit is the default number of entries that we will
366
// put on the global request queue before we react.
367
const JSDefaultRequestQueueLimit = 10_000
368

369
// Responses for API calls.
370

371
// ApiResponse is a standard response from the JetStream JSON API
372
type ApiResponse struct {
373
        Type  string    `json:"type"`
374
        Error *ApiError `json:"error,omitempty"`
375
}
376

377
const JSApiSystemResponseType = "io.nats.jetstream.api.v1.system_response"
378

379
// When passing back to the clients generalize store failures.
380
var (
381
        errStreamStoreFailed   = errors.New("error creating store for stream")
382
        errConsumerStoreFailed = errors.New("error creating store for consumer")
383
)
384

385
// ToError checks if the response has a error and if it does converts it to an error avoiding
386
// the pitfalls described by https://yourbasic.org/golang/gotcha-why-nil-error-not-equal-nil/
387
func (r *ApiResponse) ToError() error {
3,606✔
388
        if r.Error == nil {
5,585✔
389
                return nil
1,979✔
390
        }
1,979✔
391

392
        return r.Error
1,627✔
393
}
394

395
const JSApiOverloadedType = "io.nats.jetstream.api.v1.system_overloaded"
396

397
// ApiPaged includes variables used to create paged responses from the JSON API
398
type ApiPaged struct {
399
        Total  int `json:"total"`
400
        Offset int `json:"offset"`
401
        Limit  int `json:"limit"`
402
}
403

404
// ApiPagedRequest includes parameters allowing specific pages to be requests from APIs responding with ApiPaged
405
type ApiPagedRequest struct {
406
        Offset int `json:"offset"`
407
}
408

409
// JSApiAccountInfoResponse reports back information on jetstream for this account.
410
type JSApiAccountInfoResponse struct {
411
        ApiResponse
412
        *JetStreamAccountStats
413
}
414

415
const JSApiAccountInfoResponseType = "io.nats.jetstream.api.v1.account_info_response"
416

417
// JSApiStreamCreateResponse stream creation.
418
type JSApiStreamCreateResponse struct {
419
        ApiResponse
420
        *StreamInfo
421
        DidCreate bool `json:"did_create,omitempty"`
422
}
423

424
const JSApiStreamCreateResponseType = "io.nats.jetstream.api.v1.stream_create_response"
425

426
// JSApiStreamDeleteResponse stream removal.
427
type JSApiStreamDeleteResponse struct {
428
        ApiResponse
429
        Success bool `json:"success,omitempty"`
430
}
431

432
const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"
433

434
// JSMaxSubjectDetails The limit of the number of subject details we will send in a stream info response.
435
const JSMaxSubjectDetails = 100_000
436

437
type JSApiStreamInfoRequest struct {
438
        ApiPagedRequest
439
        DeletedDetails bool   `json:"deleted_details,omitempty"`
440
        SubjectsFilter string `json:"subjects_filter,omitempty"`
441
}
442

443
type JSApiStreamInfoResponse struct {
444
        ApiResponse
445
        ApiPaged
446
        *StreamInfo
447
}
448

449
const JSApiStreamInfoResponseType = "io.nats.jetstream.api.v1.stream_info_response"
450

451
// JSApiNamesLimit is the maximum entries we will return for streams or consumers lists.
452
// TODO(dlc) - with header or request support could request chunked response.
453
const JSApiNamesLimit = 1024
454
const JSApiListLimit = 256
455

456
type JSApiStreamNamesRequest struct {
457
        ApiPagedRequest
458
        // These are filters that can be applied to the list.
459
        Subject string `json:"subject,omitempty"`
460
}
461

462
// JSApiStreamNamesResponse list of streams.
463
// A nil request is valid and means all streams.
464
type JSApiStreamNamesResponse struct {
465
        ApiResponse
466
        ApiPaged
467
        Streams []string `json:"streams"`
468
}
469

470
const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"
471

472
type JSApiStreamListRequest struct {
473
        ApiPagedRequest
474
        // These are filters that can be applied to the list.
475
        Subject string `json:"subject,omitempty"`
476
}
477

478
// JSApiStreamListResponse list of detailed stream information.
479
// A nil request is valid and means all streams.
480
type JSApiStreamListResponse struct {
481
        ApiResponse
482
        ApiPaged
483
        Streams []*StreamInfo     `json:"streams"`
484
        Missing []string          `json:"missing,omitempty"`
485
        Offline map[string]string `json:"offline,omitempty"`
486
}
487

488
const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response"
489

490
// JSApiStreamPurgeRequest is optional request information to the purge API.
491
// Subject will filter the purge request to only messages that match the subject, which can have wildcards.
492
// Sequence will purge up to but not including this sequence and can be combined with subject filtering.
493
// Keep will specify how many messages to keep. This can also be combined with subject filtering.
494
// Note that Sequence and Keep are mutually exclusive, so both can not be set at the same time.
495
type JSApiStreamPurgeRequest struct {
496
        // Purge up to but not including sequence.
497
        Sequence uint64 `json:"seq,omitempty"`
498
        // Subject to match against messages for the purge command.
499
        Subject string `json:"filter,omitempty"`
500
        // Number of messages to keep.
501
        Keep uint64 `json:"keep,omitempty"`
502
}
503

504
type JSApiStreamPurgeResponse struct {
505
        ApiResponse
506
        Success bool   `json:"success,omitempty"`
507
        Purged  uint64 `json:"purged"`
508
}
509

510
const JSApiStreamPurgeResponseType = "io.nats.jetstream.api.v1.stream_purge_response"
511

512
type JSApiConsumerUnpinRequest struct {
513
        Group string `json:"group"`
514
}
515

516
type JSApiConsumerUnpinResponse struct {
517
        ApiResponse
518
}
519

520
const JSApiConsumerUnpinResponseType = "io.nats.jetstream.api.v1.consumer_unpin_response"
521

522
// JSApiStreamUpdateResponse for updating a stream.
523
type JSApiStreamUpdateResponse struct {
524
        ApiResponse
525
        *StreamInfo
526
}
527

528
const JSApiStreamUpdateResponseType = "io.nats.jetstream.api.v1.stream_update_response"
529

530
// JSApiMsgDeleteRequest delete message request.
531
type JSApiMsgDeleteRequest struct {
532
        Seq     uint64 `json:"seq"`
533
        NoErase bool   `json:"no_erase,omitempty"`
534
}
535

536
type JSApiMsgDeleteResponse struct {
537
        ApiResponse
538
        Success bool `json:"success,omitempty"`
539
}
540

541
const JSApiMsgDeleteResponseType = "io.nats.jetstream.api.v1.stream_msg_delete_response"
542

543
type JSApiStreamSnapshotRequest struct {
544
        // Subject to deliver the chunks to for the snapshot.
545
        DeliverSubject string `json:"deliver_subject"`
546
        // Do not include consumers in the snapshot.
547
        NoConsumers bool `json:"no_consumers,omitempty"`
548
        // Optional chunk size preference. Defaults to 128KB,
549
        // automatically clamped to within the range 1KB to 1MB.
550
        // A smaller chunk size means more in-flight messages
551
        // and more acks needed. Links with good throughput
552
        // but high latency may need to increase this.
553
        ChunkSize int `json:"chunk_size,omitempty"`
554
        // Optional window size preference. Defaults to 8MB,
555
        // automatically clamped to within the range 1KB to 32MB.
556
        // very slow connections may need to reduce this to
557
        // avoid slow consumer issues.
558
        WindowSize int `json:"window_size,omitempty"`
559
        // Check all message's checksums prior to snapshot.
560
        CheckMsgs bool `json:"jsck,omitempty"`
561
}
562

563
// JSApiStreamSnapshotResponse is the direct response to the snapshot request.
564
type JSApiStreamSnapshotResponse struct {
565
        ApiResponse
566
        // Configuration of the given stream.
567
        Config *StreamConfig `json:"config,omitempty"`
568
        // Current State for the given stream.
569
        State *StreamState `json:"state,omitempty"`
570
}
571

572
const JSApiStreamSnapshotResponseType = "io.nats.jetstream.api.v1.stream_snapshot_response"
573

574
// JSApiStreamRestoreRequest is the required restore request.
575
type JSApiStreamRestoreRequest struct {
576
        // Configuration of the given stream.
577
        Config StreamConfig `json:"config"`
578
        // Current State for the given stream.
579
        State StreamState `json:"state"`
580
}
581

582
// JSApiStreamRestoreResponse is the direct response to the restore request.
583
type JSApiStreamRestoreResponse struct {
584
        ApiResponse
585
        // Subject to deliver the chunks to for the snapshot restore.
586
        DeliverSubject string `json:"deliver_subject"`
587
}
588

589
const JSApiStreamRestoreResponseType = "io.nats.jetstream.api.v1.stream_restore_response"
590

591
// JSApiStreamRemovePeerRequest is the required remove peer request.
592
type JSApiStreamRemovePeerRequest struct {
593
        // Server name or peer ID of the peer to be removed.
594
        Peer string `json:"peer"`
595
}
596

597
// JSApiStreamRemovePeerResponse is the response to a remove peer request.
598
type JSApiStreamRemovePeerResponse struct {
599
        ApiResponse
600
        Success bool `json:"success,omitempty"`
601
}
602

603
const JSApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response"
604

605
// JSApiStreamLeaderStepDownResponse is the response to a leader stepdown request.
606
type JSApiStreamLeaderStepDownResponse struct {
607
        ApiResponse
608
        Success bool `json:"success,omitempty"`
609
}
610

611
const JSApiStreamLeaderStepDownResponseType = "io.nats.jetstream.api.v1.stream_leader_stepdown_response"
612

613
// JSApiConsumerLeaderStepDownResponse is the response to a consumer leader stepdown request.
614
type JSApiConsumerLeaderStepDownResponse struct {
615
        ApiResponse
616
        Success bool `json:"success,omitempty"`
617
}
618

619
const JSApiConsumerLeaderStepDownResponseType = "io.nats.jetstream.api.v1.consumer_leader_stepdown_response"
620

621
// JSApiLeaderStepdownRequest allows placement control over the meta leader placement.
622
type JSApiLeaderStepdownRequest struct {
623
        Placement *Placement `json:"placement,omitempty"`
624
}
625

626
// JSApiLeaderStepDownResponse is the response to a meta leader stepdown request.
627
type JSApiLeaderStepDownResponse struct {
628
        ApiResponse
629
        Success bool `json:"success,omitempty"`
630
}
631

632
const JSApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_stepdown_response"
633

634
// JSApiMetaServerRemoveRequest will remove a peer from the meta group.
635
type JSApiMetaServerRemoveRequest struct {
636
        // Server name of the peer to be removed.
637
        Server string `json:"peer"`
638
        // Peer ID of the peer to be removed. If specified this is used
639
        // instead of the server name.
640
        Peer string `json:"peer_id,omitempty"`
641
}
642

643
// JSApiMetaServerRemoveResponse is the response to a peer removal request in the meta group.
644
type JSApiMetaServerRemoveResponse struct {
645
        ApiResponse
646
        Success bool `json:"success,omitempty"`
647
}
648

649
const JSApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response"
650

651
// JSApiMetaServerStreamMoveRequest will move a stream on a server to another
652
// response to this will come as JSApiStreamUpdateResponse/JSApiStreamUpdateResponseType
653
type JSApiMetaServerStreamMoveRequest struct {
654
        // Server name of the peer to be evacuated.
655
        Server string `json:"server,omitempty"`
656
        // Cluster the server is in
657
        Cluster string `json:"cluster,omitempty"`
658
        // Domain the sever is in
659
        Domain string `json:"domain,omitempty"`
660
        // Ephemeral placement tags for the move
661
        Tags []string `json:"tags,omitempty"`
662
}
663

664
const JSApiAccountPurgeResponseType = "io.nats.jetstream.api.v1.account_purge_response"
665

666
// JSApiAccountPurgeResponse is the response to a purge request in the meta group.
667
type JSApiAccountPurgeResponse struct {
668
        ApiResponse
669
        Initiated bool `json:"initiated,omitempty"`
670
}
671

672
// JSApiMsgGetRequest get a message request.
673
type JSApiMsgGetRequest struct {
674
        Seq     uint64 `json:"seq,omitempty"`
675
        LastFor string `json:"last_by_subj,omitempty"`
676
        NextFor string `json:"next_by_subj,omitempty"`
677

678
        // Batch support. Used to request more than one msg at a time.
679
        // Can be used with simple starting seq, but also NextFor with wildcards.
680
        Batch int `json:"batch,omitempty"`
681
        // This will make sure we limit how much data we blast out. If not set we will
682
        // inherit the slow consumer default max setting of the server. Default is MAX_PENDING_SIZE.
683
        MaxBytes int `json:"max_bytes,omitempty"`
684
        // Return messages as of this start time.
685
        StartTime *time.Time `json:"start_time,omitempty"`
686

687
        // Multiple response support. Will get the last msgs matching the subjects. These can include wildcards.
688
        MultiLastFor []string `json:"multi_last,omitempty"`
689
        // Only return messages up to this sequence. If not set, will be last sequence for the stream.
690
        UpToSeq uint64 `json:"up_to_seq,omitempty"`
691
        // Only return messages up to this time.
692
        UpToTime *time.Time `json:"up_to_time,omitempty"`
693
        // Only return the message payload, excluding headers if present.
694
        NoHeaders bool `json:"no_hdr,omitempty"`
695
}
696

697
type JSApiMsgGetResponse struct {
698
        ApiResponse
699
        Message *StoredMsg `json:"message,omitempty"`
700
}
701

702
const JSApiMsgGetResponseType = "io.nats.jetstream.api.v1.stream_msg_get_response"
703

704
// JSWaitQueueDefaultMax is the default max number of outstanding requests for pull consumers.
705
const JSWaitQueueDefaultMax = 512
706

707
type JSApiConsumerCreateResponse struct {
708
        ApiResponse
709
        *ConsumerInfo
710
}
711

712
const JSApiConsumerCreateResponseType = "io.nats.jetstream.api.v1.consumer_create_response"
713

714
type JSApiConsumerDeleteResponse struct {
715
        ApiResponse
716
        Success bool `json:"success,omitempty"`
717
}
718

719
const JSApiConsumerDeleteResponseType = "io.nats.jetstream.api.v1.consumer_delete_response"
720

721
type JSApiConsumerPauseRequest struct {
722
        PauseUntil time.Time `json:"pause_until,omitempty"`
723
}
724

725
const JSApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response"
726

727
type JSApiConsumerPauseResponse struct {
728
        ApiResponse
729
        Paused         bool          `json:"paused"`
730
        PauseUntil     time.Time     `json:"pause_until"`
731
        PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
732
}
733

734
type JSApiConsumerInfoResponse struct {
735
        ApiResponse
736
        *ConsumerInfo
737
}
738

739
const JSApiConsumerInfoResponseType = "io.nats.jetstream.api.v1.consumer_info_response"
740

741
type JSApiConsumersRequest struct {
742
        ApiPagedRequest
743
}
744

745
type JSApiConsumerNamesResponse struct {
746
        ApiResponse
747
        ApiPaged
748
        Consumers []string `json:"consumers"`
749
}
750

751
const JSApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_response"
752

753
type JSApiConsumerListResponse struct {
754
        ApiResponse
755
        ApiPaged
756
        Consumers []*ConsumerInfo   `json:"consumers"`
757
        Missing   []string          `json:"missing,omitempty"`
758
        Offline   map[string]string `json:"offline,omitempty"`
759
}
760

761
const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"
762

763
// JSApiConsumerGetNextRequest is for getting next messages for pull based consumers.
764
type JSApiConsumerGetNextRequest struct {
765
        Expires   time.Duration `json:"expires,omitempty"`
766
        Batch     int           `json:"batch,omitempty"`
767
        MaxBytes  int           `json:"max_bytes,omitempty"`
768
        NoWait    bool          `json:"no_wait,omitempty"`
769
        Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
770
        PriorityGroup
771
}
772

773
// JSApiConsumerResetRequest is for resetting a consumer to a specific sequence.
774
type JSApiConsumerResetRequest struct {
775
        Seq uint64 `json:"seq,omitempty"`
776
}
777

778
// JSApiConsumerResetResponse is a superset of JSApiConsumerCreateResponse, but including an explicit ResetSeq.
779
type JSApiConsumerResetResponse struct {
780
        ApiResponse
781
        *ConsumerInfo
782
        ResetSeq uint64 `json:"reset_seq"`
783
}
784

785
const JSApiConsumerResetResponseType = "io.nats.jetstream.api.v1.consumer_reset_response"
786

787
// Structure that holds state for a JetStream API request that is processed
788
// in a separate long-lived go routine. This is to avoid blocking connections.
789
type jsAPIRoutedReq struct {
790
        jsub    *subscription
791
        sub     *subscription
792
        acc     *Account
793
        subject string
794
        reply   string
795
        msg     []byte
796
        pa      pubArg
797
}
798

799
func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) {
149,585✔
800
        // Ignore system level directives meta stepdown and peer remove requests here.
149,585✔
801
        if subject == JSApiLeaderStepDown ||
149,585✔
802
                subject == JSApiRemoveServer ||
149,585✔
803
                strings.HasPrefix(subject, jsAPIAccountPre) {
150,062✔
804
                return
477✔
805
        }
477✔
806
        // No lock needed, those are immutable.
807
        s, rr := js.srv, js.apiSubs.Match(subject)
149,108✔
808

149,108✔
809
        hdr, msg := c.msgParts(rmsg)
149,108✔
810
        if len(sliceHeader(ClientInfoHdr, hdr)) == 0 {
149,115✔
811
                // Check if this is the system account. We will let these through for the account info only.
7✔
812
                sacc := s.SystemAccount()
7✔
813
                if sacc != acc {
7✔
814
                        return
×
815
                }
×
816
                if subject != JSApiAccountInfo {
11✔
817
                        // Only respond from the initial server entry to the NATS system.
4✔
818
                        if c.kind == CLIENT || c.kind == LEAF {
6✔
819
                                var resp = ApiResponse{
2✔
820
                                        Type:  JSApiSystemResponseType,
2✔
821
                                        Error: NewJSNotEnabledForAccountError(),
2✔
822
                                }
2✔
823
                                s.sendAPIErrResponse(nil, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
824
                        }
2✔
825
                        return
4✔
826
                }
827
        }
828

829
        // Short circuit for no interest.
830
        if len(rr.psubs)+len(rr.qsubs) == 0 {
169,139✔
831
                if (c.kind == CLIENT || c.kind == LEAF) && acc != s.SystemAccount() {
20,035✔
832
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
833
                        var resp = ApiResponse{
×
834
                                Type:  JSApiSystemResponseType,
×
835
                                Error: NewJSBadRequestError(),
×
836
                        }
×
837
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
838
                }
×
839
                return
20,035✔
840
        }
841

842
        // We should only have psubs and only 1 per result.
843
        if len(rr.psubs) != 1 {
129,069✔
844
                s.Warnf("Malformed JetStream API Request: [%s] %q", subject, rmsg)
×
845
                if c.kind == CLIENT || c.kind == LEAF {
×
846
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
847
                        var resp = ApiResponse{
×
848
                                Type:  JSApiSystemResponseType,
×
849
                                Error: NewJSBadRequestError(),
×
850
                        }
×
851
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
852
                }
×
853
                return
×
854
        }
855
        jsub := rr.psubs[0]
129,069✔
856

129,069✔
857
        // We need to make sure not to block. We will send the request to a long-lived
129,069✔
858
        // pool of go routines.
129,069✔
859

129,069✔
860
        // Increment inflight. Do this before queueing.
129,069✔
861
        atomic.AddInt64(&js.apiInflight, 1)
129,069✔
862

129,069✔
863
        // Copy the state. Note the JSAPI only uses the hdr index to piece apart the
129,069✔
864
        // header from the msg body. No other references are needed.
129,069✔
865
        // Check pending and warn if getting backed up.
129,069✔
866
        var queue *ipQueue[*jsAPIRoutedReq]
129,069✔
867
        var limit int64
129,069✔
868
        if js.infoSubs.HasInterest(subject) {
204,050✔
869
                queue = s.jsAPIRoutedInfoReqs
74,981✔
870
                limit = atomic.LoadInt64(&js.infoQueueLimit)
74,981✔
871
        } else {
129,069✔
872
                queue = s.jsAPIRoutedReqs
54,088✔
873
                limit = atomic.LoadInt64(&js.queueLimit)
54,088✔
874
        }
54,088✔
875
        pending, _ := queue.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
129,069✔
876
        if pending >= int(limit) {
129,349✔
877
                s.rateLimitFormatWarnf("%s limit reached, dropping %d requests", queue.name, pending)
280✔
878
                drained := int64(queue.drain())
280✔
879
                atomic.AddInt64(&js.apiInflight, -drained)
280✔
880

280✔
881
                s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
280✔
882
                        TypedEvent: TypedEvent{
280✔
883
                                Type: JSAPILimitReachedAdvisoryType,
280✔
884
                                ID:   nuid.Next(),
280✔
885
                                Time: time.Now().UTC(),
280✔
886
                        },
280✔
887
                        Server:  s.Name(),
280✔
888
                        Domain:  js.config.Domain,
280✔
889
                        Dropped: drained,
280✔
890
                })
280✔
891
        }
280✔
892
}
893

894
func (s *Server) processJSAPIRoutedRequests() {
17,884✔
895
        defer s.grWG.Done()
17,884✔
896

17,884✔
897
        s.mu.RLock()
17,884✔
898
        queue, infoqueue := s.jsAPIRoutedReqs, s.jsAPIRoutedInfoReqs
17,884✔
899
        client := &client{srv: s, kind: JETSTREAM}
17,884✔
900
        s.mu.RUnlock()
17,884✔
901

17,884✔
902
        js := s.getJetStream()
17,884✔
903

17,884✔
904
        processFromQueue := func(ipq *ipQueue[*jsAPIRoutedReq]) {
136,849✔
905
                // Only pop one item at a time here, otherwise if the system is recovering
118,965✔
906
                // from queue buildup, then one worker will pull off all the tasks and the
118,965✔
907
                // others will be starved of work.
118,965✔
908
                if r, ok := ipq.popOne(); ok && r != nil {
237,778✔
909
                        client.pa = r.pa
118,813✔
910
                        start := time.Now()
118,813✔
911
                        r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
118,813✔
912
                        if dur := time.Since(start); dur >= readLoopReportThreshold {
118,816✔
913
                                s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
3✔
914
                        }
3✔
915
                        atomic.AddInt64(&js.apiInflight, -1)
118,813✔
916
                }
917
        }
918

919
        for {
154,733✔
920
                // First select case is prioritizing queue, we will only fall through
136,849✔
921
                // to the second select case that considers infoqueue if queue is empty.
136,849✔
922
                // This effectively means infos are deprioritized.
136,849✔
923
                select {
136,849✔
924
                case <-queue.ch:
2,256✔
925
                        processFromQueue(queue)
2,256✔
926
                case <-s.quitCh:
9✔
927
                        return
9✔
928
                default:
134,584✔
929
                        select {
134,584✔
930
                        case <-infoqueue.ch:
74,863✔
931
                                processFromQueue(infoqueue)
74,863✔
932
                        case <-queue.ch:
41,846✔
933
                                processFromQueue(queue)
41,846✔
934
                        case <-s.quitCh:
17,855✔
935
                                return
17,855✔
936
                        }
937
                }
938
        }
939
}
940

941
func (s *Server) setJetStreamExportSubs() error {
4,471✔
942
        js := s.getJetStream()
4,471✔
943
        if js == nil {
4,471✔
944
                return NewJSNotEnabledError()
×
945
        }
×
946

947
        // Start the go routine that will process API requests received by the
948
        // subscription below when they are coming from routes, etc..
949
        const maxProcs = 16
4,471✔
950
        mp := runtime.GOMAXPROCS(0)
4,471✔
951
        // Cap at 16 max for now on larger core setups.
4,471✔
952
        if mp > maxProcs {
4,471✔
953
                mp = maxProcs
×
954
        }
×
955
        s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "JetStream API queue")
4,471✔
956
        s.jsAPIRoutedInfoReqs = newIPQueue[*jsAPIRoutedReq](s, "JetStream API info queue")
4,471✔
957
        for i := 0; i < mp; i++ {
22,355✔
958
                s.startGoRoutine(s.processJSAPIRoutedRequests)
17,884✔
959
        }
17,884✔
960

961
        // This is the catch all now for all JetStream API calls.
962
        if _, err := s.sysSubscribe(jsAllAPI, js.apiDispatch); err != nil {
4,471✔
963
                return err
×
964
        }
×
965

966
        if err := s.SystemAccount().AddServiceExport(jsAllAPI, nil); err != nil {
4,471✔
967
                s.Warnf("Error setting up jetstream service exports: %v", err)
×
968
                return err
×
969
        }
×
970

971
        // API handles themselves.
972
        // infopairs are deprioritized compared to pairs in processJSAPIRoutedRequests.
973
        pairs := []struct {
4,471✔
974
                subject string
4,471✔
975
                handler msgHandler
4,471✔
976
        }{
4,471✔
977
                {JSApiStreamCreate, s.jsStreamCreateRequest},
4,471✔
978
                {JSApiStreamUpdate, s.jsStreamUpdateRequest},
4,471✔
979
                {JSApiStreamDelete, s.jsStreamDeleteRequest},
4,471✔
980
                {JSApiStreamPurge, s.jsStreamPurgeRequest},
4,471✔
981
                {JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
4,471✔
982
                {JSApiStreamRestore, s.jsStreamRestoreRequest},
4,471✔
983
                {JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},
4,471✔
984
                {JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},
4,471✔
985
                {JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
4,471✔
986
                {JSApiMsgDelete, s.jsMsgDeleteRequest},
4,471✔
987
                {JSApiMsgGet, s.jsMsgGetRequest},
4,471✔
988
                {JSApiConsumerCreateEx, s.jsConsumerCreateRequest},
4,471✔
989
                {JSApiConsumerCreate, s.jsConsumerCreateRequest},
4,471✔
990
                {JSApiDurableCreate, s.jsConsumerCreateRequest},
4,471✔
991
                {JSApiConsumerDelete, s.jsConsumerDeleteRequest},
4,471✔
992
                {JSApiConsumerPause, s.jsConsumerPauseRequest},
4,471✔
993
                {JSApiConsumerUnpin, s.jsConsumerUnpinRequest},
4,471✔
994
        }
4,471✔
995
        infopairs := []struct {
4,471✔
996
                subject string
4,471✔
997
                handler msgHandler
4,471✔
998
        }{
4,471✔
999
                {JSApiAccountInfo, s.jsAccountInfoRequest},
4,471✔
1000
                {JSApiStreams, s.jsStreamNamesRequest},
4,471✔
1001
                {JSApiStreamList, s.jsStreamListRequest},
4,471✔
1002
                {JSApiStreamInfo, s.jsStreamInfoRequest},
4,471✔
1003
                {JSApiConsumers, s.jsConsumerNamesRequest},
4,471✔
1004
                {JSApiConsumerList, s.jsConsumerListRequest},
4,471✔
1005
                {JSApiConsumerInfo, s.jsConsumerInfoRequest},
4,471✔
1006
        }
4,471✔
1007

4,471✔
1008
        js.mu.Lock()
4,471✔
1009
        defer js.mu.Unlock()
4,471✔
1010

4,471✔
1011
        // As well as populating js.apiSubs for the dispatch function to use, we
4,471✔
1012
        // will also populate js.infoSubs, so that the dispatch function can
4,471✔
1013
        // decide quickly whether or not the request is an info request or not.
4,471✔
1014
        for _, p := range append(infopairs, pairs...) {
111,775✔
1015
                sub := &subscription{subject: []byte(p.subject), icb: p.handler}
107,304✔
1016
                if err := js.apiSubs.Insert(sub); err != nil {
107,304✔
1017
                        return err
×
1018
                }
×
1019
        }
1020
        for _, p := range infopairs {
35,768✔
1021
                if err := js.infoSubs.Insert(p.subject, struct{}{}); err != nil {
31,297✔
1022
                        return err
×
1023
                }
×
1024
        }
1025

1026
        return nil
4,471✔
1027
}
1028

1029
func (s *Server) sendAPIResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
59,698✔
1030
        acc.trackAPI()
59,698✔
1031
        if reply != _EMPTY_ {
97,724✔
1032
                s.sendInternalAccountMsg(nil, reply, response)
38,026✔
1033
        }
38,026✔
1034
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
59,698✔
1035
}
1036

1037
func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
19,134✔
1038
        acc.trackAPIErr()
19,134✔
1039
        if reply != _EMPTY_ {
34,468✔
1040
                s.sendInternalAccountMsg(nil, reply, response)
15,334✔
1041
        }
15,334✔
1042
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
19,134✔
1043
}
1044

1045
const errRespDelay = 500 * time.Millisecond
1046

1047
type delayedAPIResponse struct {
1048
        ci       *ClientInfo
1049
        acc      *Account
1050
        subject  string
1051
        reply    string
1052
        request  string
1053
        hdr      []byte
1054
        response string
1055
        rg       *raftGroup
1056
        deadline time.Time
1057
        noJs     bool
1058
        next     *delayedAPIResponse
1059
}
1060

1061
// Add `r` in the list that is maintained ordered by the `delayedAPIResponse.deadline` time.
1062
func addDelayedResponse(head, tail **delayedAPIResponse, r *delayedAPIResponse) {
114✔
1063
        // Check if list empty.
114✔
1064
        if *head == nil {
214✔
1065
                *head, *tail = r, r
100✔
1066
                return
100✔
1067
        }
100✔
1068
        // Check if it should be added at the end, which is if after or equal to the tail.
1069
        if r.deadline.After((*tail).deadline) || r.deadline.Equal((*tail).deadline) {
26✔
1070
                (*tail).next, *tail = r, r
12✔
1071
                return
12✔
1072
        }
12✔
1073
        // Find its spot in the list.
1074
        var prev *delayedAPIResponse
2✔
1075
        for c := *head; c != nil; c = c.next {
6✔
1076
                // We insert only if we are stricly before the current `c`.
4✔
1077
                if r.deadline.Before(c.deadline) {
6✔
1078
                        r.next = c
2✔
1079
                        if prev != nil {
3✔
1080
                                prev.next = r
1✔
1081
                        } else {
2✔
1082
                                *head = r
1✔
1083
                        }
1✔
1084
                        return
2✔
1085
                }
1086
                prev = c
2✔
1087
        }
1088
}
1089

1090
func (s *Server) delayedAPIResponder() {
6,847✔
1091
        defer s.grWG.Done()
6,847✔
1092
        var (
6,847✔
1093
                head, tail *delayedAPIResponse // Linked list.
6,847✔
1094
                r          *delayedAPIResponse // Updated by calling next().
6,847✔
1095
                rqch       <-chan struct{}     // Quit channel of the Raft group (if present).
6,847✔
1096
                tm         = time.NewTimer(time.Hour)
6,847✔
1097
        )
6,847✔
1098
        next := func() {
7,052✔
1099
                r, rqch = nil, nil
205✔
1100
                // Check that JetStream is still on. Do not exit the go routine
205✔
1101
                // since JS can be enabled/disabled. The go routine will exit
205✔
1102
                // only if server is shutdown.
205✔
1103
                js := s.getJetStream()
205✔
1104
                if js == nil {
206✔
1105
                        // Reset head and tail here. Also drain the ipQueue.
1✔
1106
                        head, tail = nil, nil
1✔
1107
                        s.delayedAPIResponses.drain()
1✔
1108
                        // Fall back into next "if" that resets timer.
1✔
1109
                }
1✔
1110
                // If there are no delayed messages then delay the timer for
1111
                // a while.
1112
                if head == nil {
305✔
1113
                        tm.Reset(time.Hour)
100✔
1114
                        return
100✔
1115
                }
100✔
1116
                // Get the first expected message and then reset the timer.
1117
                r = head
105✔
1118
                js.mu.RLock()
105✔
1119
                if r.rg != nil && r.rg.node != nil {
106✔
1120
                        // If there's an attached Raft group to the delayed response
1✔
1121
                        // then pull out the quit channel, so that we don't bother
1✔
1122
                        // sending responses for entities which are now no longer
1✔
1123
                        // running.
1✔
1124
                        rqch = r.rg.node.QuitC()
1✔
1125
                }
1✔
1126
                js.mu.RUnlock()
105✔
1127
                tm.Reset(time.Until(r.deadline))
105✔
1128
        }
1129
        pop := func() {
6,951✔
1130
                if head == nil {
104✔
1131
                        return
×
1132
                }
×
1133
                head = head.next
104✔
1134
                if head == nil {
203✔
1135
                        tail = nil
99✔
1136
                }
99✔
1137
        }
1138
        for {
13,912✔
1139
                select {
7,065✔
1140
                case <-s.delayedAPIResponses.ch:
114✔
1141
                        v, ok := s.delayedAPIResponses.popOne()
114✔
1142
                        if !ok {
114✔
1143
                                continue
×
1144
                        }
1145
                        // Add it to the list, and if ends up being the head, set things up.
1146
                        addDelayedResponse(&head, &tail, v)
114✔
1147
                        if v == head {
215✔
1148
                                next()
101✔
1149
                        }
101✔
1150
                case <-s.quitCh:
6,842✔
1151
                        return
6,842✔
1152
                case <-rqch:
1✔
1153
                        // If we were the head, drop and setup things for next.
1✔
1154
                        if r != nil && r == head {
2✔
1155
                                pop()
1✔
1156
                        }
1✔
1157
                        next()
1✔
1158
                case <-tm.C:
103✔
1159
                        if r != nil {
206✔
1160
                                // If it's not a JS API error, send it as a raw response without additional API/audit tracking.
103✔
1161
                                if r.noJs {
139✔
1162
                                        s.sendInternalAccountMsgWithReply(r.acc, r.subject, _EMPTY_, r.hdr, r.response, false)
36✔
1163
                                } else {
103✔
1164
                                        s.sendAPIErrResponse(r.ci, r.acc, r.subject, r.reply, r.request, r.response)
67✔
1165
                                }
67✔
1166
                                pop()
103✔
1167
                        }
1168
                        next()
103✔
1169
                }
1170
        }
1171
}
1172

1173
func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup, duration time.Duration) {
78✔
1174
        s.delayedAPIResponses.push(&delayedAPIResponse{
78✔
1175
                ci, acc, subject, reply, request, nil, response, rg, time.Now().Add(duration), false, nil,
78✔
1176
        })
78✔
1177
}
78✔
1178

1179
func (s *Server) sendDelayedErrResponse(acc *Account, subject string, hdr []byte, response string, duration time.Duration) {
36✔
1180
        s.delayedAPIResponses.push(&delayedAPIResponse{
36✔
1181
                nil, acc, subject, _EMPTY_, _EMPTY_, hdr, response, nil, time.Now().Add(duration), true, nil,
36✔
1182
        })
36✔
1183
}
36✔
1184

1185
func (s *Server) getRequestInfo(c *client, raw []byte) (pci *ClientInfo, acc *Account, hdr, msg []byte, err error) {
119,592✔
1186
        hdr, msg = c.msgParts(raw)
119,592✔
1187
        var ci ClientInfo
119,592✔
1188

119,592✔
1189
        if len(hdr) > 0 {
239,102✔
1190
                if err := json.Unmarshal(sliceHeader(ClientInfoHdr, hdr), &ci); err != nil {
119,510✔
1191
                        return nil, nil, nil, nil, err
×
1192
                }
×
1193
        }
1194

1195
        if ci.Service != _EMPTY_ {
119,656✔
1196
                acc, _ = s.LookupAccount(ci.Service)
64✔
1197
        } else if ci.Account != _EMPTY_ {
239,038✔
1198
                acc, _ = s.LookupAccount(ci.Account)
119,446✔
1199
        } else {
119,528✔
1200
                // Direct $SYS access.
82✔
1201
                acc = c.acc
82✔
1202
                if acc == nil {
85✔
1203
                        acc = s.SystemAccount()
3✔
1204
                }
3✔
1205
        }
1206
        if acc == nil {
119,602✔
1207
                return nil, nil, nil, nil, ErrMissingAccount
10✔
1208
        }
10✔
1209
        return &ci, acc, hdr, msg, nil
119,582✔
1210
}
1211

1212
func (s *Server) unmarshalRequest(c *client, acc *Account, subject string, msg []byte, v any) error {
26,207✔
1213
        decoder := json.NewDecoder(bytes.NewReader(msg))
26,207✔
1214
        decoder.DisallowUnknownFields()
26,207✔
1215

26,207✔
1216
        for {
78,612✔
1217
                if err := decoder.Decode(v); err != nil {
78,612✔
1218
                        if err == io.EOF {
52,408✔
1219
                                return nil
26,201✔
1220
                        }
26,201✔
1221

1222
                        var syntaxErr *json.SyntaxError
6✔
1223
                        if errors.As(err, &syntaxErr) {
6✔
1224
                                err = fmt.Errorf("%w at offset %d", err, syntaxErr.Offset)
×
1225
                        }
×
1226

1227
                        c.RateLimitWarnf("Invalid JetStream request '%s > %s': %s", acc, subject, err)
6✔
1228

6✔
1229
                        if js := s.getJetStream(); js != nil && js.config.Strict {
12✔
1230
                                return err
6✔
1231
                        }
6✔
1232

1233
                        return json.Unmarshal(msg, v)
×
1234
                }
1235
        }
1236
}
1237

1238
func (a *Account) trackAPI() {
59,698✔
1239
        a.mu.RLock()
59,698✔
1240
        jsa := a.js
59,698✔
1241
        a.mu.RUnlock()
59,698✔
1242
        if jsa != nil {
119,307✔
1243
                jsa.usageMu.Lock()
59,609✔
1244
                jsa.usageApi++
59,609✔
1245
                jsa.apiTotal++
59,609✔
1246
                jsa.sendClusterUsageUpdate()
59,609✔
1247
                atomic.AddInt64(&jsa.js.apiTotal, 1)
59,609✔
1248
                jsa.usageMu.Unlock()
59,609✔
1249
        }
59,609✔
1250
}
1251

1252
func (a *Account) trackAPIErr() {
19,134✔
1253
        a.mu.RLock()
19,134✔
1254
        jsa := a.js
19,134✔
1255
        a.mu.RUnlock()
19,134✔
1256
        if jsa != nil {
38,032✔
1257
                jsa.usageMu.Lock()
18,898✔
1258
                jsa.usageApi++
18,898✔
1259
                jsa.apiTotal++
18,898✔
1260
                jsa.usageErr++
18,898✔
1261
                jsa.apiErrors++
18,898✔
1262
                jsa.sendClusterUsageUpdate()
18,898✔
1263
                atomic.AddInt64(&jsa.js.apiTotal, 1)
18,898✔
1264
                atomic.AddInt64(&jsa.js.apiErrors, 1)
18,898✔
1265
                jsa.usageMu.Unlock()
18,898✔
1266
        }
18,898✔
1267
}
1268

1269
const badAPIRequestT = "Malformed JetStream API Request: %q"
1270

1271
// Helper function to check on JetStream being enabled but also on status of leafnodes
1272
// If the local account is not enabled but does have leafnode connectivity we will not
1273
// want to error immediately and let the other side decide.
1274
func (a *Account) checkJetStream() (enabled, shouldError bool) {
50,991✔
1275
        a.mu.RLock()
50,991✔
1276
        defer a.mu.RUnlock()
50,991✔
1277
        return a.js != nil, a.nleafs+a.nrleafs == 0
50,991✔
1278
}
50,991✔
1279

1280
// Request for current usage and limits for this account.
1281
func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
511✔
1282
        if c == nil || !s.JetStreamEnabled() {
511✔
1283
                return
×
1284
        }
×
1285

1286
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
511✔
1287
        if err != nil {
512✔
1288
                s.Warnf(badAPIRequestT, msg)
1✔
1289
                return
1✔
1290
        }
1✔
1291

1292
        var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}}
510✔
1293

510✔
1294
        // Determine if we should proceed here when we are in clustered mode.
510✔
1295
        if s.JetStreamIsClustered() {
943✔
1296
                js, cc := s.getJetStreamCluster()
433✔
1297
                if js == nil || cc == nil {
433✔
1298
                        return
×
1299
                }
×
1300
                if js.isLeaderless() {
434✔
1301
                        resp.Error = NewJSClusterNotAvailError()
1✔
1302
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1303
                        return
1✔
1304
                }
1✔
1305
                // Make sure we are meta leader.
1306
                if !s.JetStreamIsLeader() {
731✔
1307
                        return
299✔
1308
                }
299✔
1309
        }
1310

1311
        if errorOnRequiredApiLevel(hdr) {
211✔
1312
                resp.Error = NewJSRequiredApiLevelError()
1✔
1313
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1314
                return
1✔
1315
        }
1✔
1316

1317
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
215✔
1318
                if !doErr {
7✔
1319
                        return
1✔
1320
                }
1✔
1321
                resp.Error = NewJSNotEnabledForAccountError()
5✔
1322
        } else {
203✔
1323
                stats := acc.JetStreamUsage()
203✔
1324
                resp.JetStreamAccountStats = &stats
203✔
1325
        }
203✔
1326
        b, err := json.Marshal(resp)
208✔
1327
        if err != nil {
208✔
1328
                return
×
1329
        }
×
1330

1331
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), string(b))
208✔
1332
}
1333

1334
// Helpers for token extraction.
1335
func streamNameFromSubject(subject string) string {
89,132✔
1336
        return tokenAt(subject, 5)
89,132✔
1337
}
89,132✔
1338

1339
func consumerNameFromSubject(subject string) string {
52,769✔
1340
        return tokenAt(subject, 6)
52,769✔
1341
}
52,769✔
1342

1343
func (s *Server) jsonResponse(v any) string {
79,574✔
1344
        b, err := json.Marshal(v)
79,574✔
1345
        if err != nil {
79,574✔
1346
                s.Warnf("Problem marshaling JSON for JetStream API:", err)
×
1347
                return ""
×
1348
        }
×
1349
        return string(b)
79,574✔
1350
}
1351

1352
// Read lock must be held
1353
func (jsa *jsAccount) tieredReservation(tier string, cfg *StreamConfig) int64 {
3,341✔
1354
        var reservation int64
3,341✔
1355
        for _, sa := range jsa.streams {
7,306✔
1356
                // Don't count the stream toward the limit if it already exists.
3,965✔
1357
                if sa.cfg.Name == cfg.Name {
4,088✔
1358
                        continue
123✔
1359
                }
1360
                if (tier == _EMPTY_ || isSameTier(&sa.cfg, cfg)) && sa.cfg.MaxBytes > 0 && sa.cfg.Storage == cfg.Storage {
3,857✔
1361
                        // If tier is empty, all storage is flat and we should adjust for replicas.
15✔
1362
                        // Otherwise if tiered, storage replication already taken into consideration.
15✔
1363
                        if tier == _EMPTY_ && sa.cfg.Replicas > 1 {
16✔
1364
                                reservation = addSaturate(reservation, mulSaturate(int64(sa.cfg.Replicas), sa.cfg.MaxBytes))
1✔
1365
                        } else {
15✔
1366
                                reservation = addSaturate(reservation, sa.cfg.MaxBytes)
14✔
1367
                        }
14✔
1368
                }
1369
        }
1370
        return reservation
3,341✔
1371
}
1372

1373
// Request to create a stream.
1374
func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
12,009✔
1375
        if c == nil || !s.JetStreamEnabled() {
12,279✔
1376
                return
270✔
1377
        }
270✔
1378
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
11,739✔
1379
        if err != nil {
11,742✔
1380
                s.Warnf(badAPIRequestT, msg)
3✔
1381
                return
3✔
1382
        }
3✔
1383

1384
        var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
11,736✔
1385

11,736✔
1386
        // Determine if we should proceed here when we are in clustered mode.
11,736✔
1387
        if s.JetStreamIsClustered() {
21,936✔
1388
                js, cc := s.getJetStreamCluster()
10,200✔
1389
                if js == nil || cc == nil {
10,200✔
1390
                        return
×
1391
                }
×
1392
                if js.isLeaderless() {
10,201✔
1393
                        resp.Error = NewJSClusterNotAvailError()
1✔
1394
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1395
                        return
1✔
1396
                }
1✔
1397
                // Make sure we are meta leader.
1398
                if !s.JetStreamIsLeader() {
17,186✔
1399
                        return
6,987✔
1400
                }
6,987✔
1401
        }
1402

1403
        if errorOnRequiredApiLevel(hdr) {
4,749✔
1404
                resp.Error = NewJSRequiredApiLevelError()
1✔
1405
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1406
                return
1✔
1407
        }
1✔
1408

1409
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
4,755✔
1410
                if doErr {
8✔
1411
                        resp.Error = NewJSNotEnabledForAccountError()
×
1412
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1413
                }
×
1414
                return
8✔
1415
        }
1416

1417
        var cfg StreamConfigRequest
4,739✔
1418
        if err := s.unmarshalRequest(c, acc, subject, msg, &cfg); err != nil {
4,740✔
1419
                resp.Error = NewJSInvalidJSONError(err)
1✔
1420
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1421
                return
1✔
1422
        }
1✔
1423

1424
        // Initialize asset version metadata.
1425
        setStaticStreamMetadata(&cfg.StreamConfig)
4,738✔
1426

4,738✔
1427
        streamName := streamNameFromSubject(subject)
4,738✔
1428
        if streamName != cfg.Name {
4,739✔
1429
                resp.Error = NewJSStreamMismatchError()
1✔
1430
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1431
                return
1✔
1432
        }
1✔
1433

1434
        // Check for path like separators in the name.
1435
        if strings.ContainsAny(streamName, `\/`) {
4,739✔
1436
                resp.Error = NewJSStreamNameContainsPathSeparatorsError()
2✔
1437
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1438
                return
2✔
1439
        }
2✔
1440

1441
        // Can't create a stream with a sealed state.
1442
        if cfg.Sealed {
4,737✔
1443
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed"))
2✔
1444
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1445
                return
2✔
1446
        }
2✔
1447

1448
        // If we are told to do mirror direct but are not mirroring, error.
1449
        if cfg.MirrorDirect && cfg.Mirror == nil {
4,733✔
1450
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream has no mirror but does have mirror direct"))
×
1451
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1452
                return
×
1453
        }
×
1454

1455
        // Hand off to cluster for processing.
1456
        if s.JetStreamIsClustered() {
7,943✔
1457
                s.jsClusteredStreamRequest(ci, acc, subject, reply, rmsg, &cfg)
3,210✔
1458
                return
3,210✔
1459
        }
3,210✔
1460

1461
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg.StreamConfig); err != nil {
1,531✔
1462
                resp.Error = err
8✔
1463
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
8✔
1464
                return
8✔
1465
        }
8✔
1466

1467
        mset, err := acc.addStreamPedantic(&cfg.StreamConfig, cfg.Pedantic)
1,515✔
1468
        if err != nil {
1,592✔
1469
                if IsNatsErr(err, JSStreamStoreFailedF) {
77✔
1470
                        s.Warnf("Stream create failed for '%s > %s': %v", acc, streamName, err)
×
1471
                        err = errStreamStoreFailed
×
1472
                }
×
1473
                resp.Error = NewJSStreamCreateError(err, Unless(err))
77✔
1474
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
77✔
1475
                return
77✔
1476
        }
1477
        msetCfg := mset.config()
1,438✔
1478
        resp.StreamInfo = &StreamInfo{
1,438✔
1479
                Created:   mset.createdTime(),
1,438✔
1480
                State:     mset.state(),
1,438✔
1481
                Config:    *setDynamicStreamMetadata(&msetCfg),
1,438✔
1482
                TimeStamp: time.Now().UTC(),
1,438✔
1483
                Mirror:    mset.mirrorInfo(),
1,438✔
1484
                Sources:   mset.sourcesInfo(),
1,438✔
1485
        }
1,438✔
1486
        resp.DidCreate = true
1,438✔
1487
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,438✔
1488
}
1489

1490
// Request to update a stream.
1491
func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
980✔
1492
        if c == nil || !s.JetStreamEnabled() {
980✔
1493
                return
×
1494
        }
×
1495

1496
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
980✔
1497
        if err != nil {
980✔
1498
                s.Warnf(badAPIRequestT, msg)
×
1499
                return
×
1500
        }
×
1501

1502
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
980✔
1503

980✔
1504
        // Determine if we should proceed here when we are in clustered mode.
980✔
1505
        if s.JetStreamIsClustered() {
1,849✔
1506
                js, cc := s.getJetStreamCluster()
869✔
1507
                if js == nil || cc == nil {
869✔
1508
                        return
×
1509
                }
×
1510
                if js.isLeaderless() {
871✔
1511
                        resp.Error = NewJSClusterNotAvailError()
2✔
1512
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1513
                        return
2✔
1514
                }
2✔
1515
                // Make sure we are meta leader.
1516
                if !s.JetStreamIsLeader() {
1,515✔
1517
                        return
648✔
1518
                }
648✔
1519
        }
1520

1521
        if errorOnRequiredApiLevel(hdr) {
331✔
1522
                resp.Error = NewJSRequiredApiLevelError()
1✔
1523
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1524
                return
1✔
1525
        }
1✔
1526

1527
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
329✔
1528
                if doErr {
×
1529
                        resp.Error = NewJSNotEnabledForAccountError()
×
1530
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1531
                }
×
1532
                return
×
1533
        }
1534
        var ncfg StreamConfigRequest
329✔
1535
        if err := s.unmarshalRequest(c, acc, subject, msg, &ncfg); err != nil {
330✔
1536
                resp.Error = NewJSInvalidJSONError(err)
1✔
1537
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1538
                return
1✔
1539
        }
1✔
1540

1541
        cfg, apiErr := s.checkStreamCfg(&ncfg.StreamConfig, acc, ncfg.Pedantic)
328✔
1542
        if apiErr != nil {
347✔
1543
                resp.Error = apiErr
19✔
1544
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
19✔
1545
                return
19✔
1546
        }
19✔
1547

1548
        streamName := streamNameFromSubject(subject)
309✔
1549
        if streamName != cfg.Name {
310✔
1550
                resp.Error = NewJSStreamMismatchError()
1✔
1551
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1552
                return
1✔
1553
        }
1✔
1554

1555
        // Handle clustered version here.
1556
        if s.JetStreamIsClustered() {
522✔
1557
                s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
214✔
1558
                return
214✔
1559
        }
214✔
1560

1561
        mset, err := acc.lookupStream(streamName)
94✔
1562
        if err != nil {
98✔
1563
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
1564
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
1565
                return
4✔
1566
        }
4✔
1567
        if mset.offlineReason != _EMPTY_ {
90✔
1568
                resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason))
×
1569
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
1570
                return
×
1571
        }
×
1572

1573
        // Update asset version metadata.
1574
        setStaticStreamMetadata(&cfg)
90✔
1575

90✔
1576
        if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil {
103✔
1577
                resp.Error = NewJSStreamUpdateError(err, Unless(err))
13✔
1578
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
13✔
1579
                return
13✔
1580
        }
13✔
1581

1582
        msetCfg := mset.config()
77✔
1583
        resp.StreamInfo = &StreamInfo{
77✔
1584
                Created:   mset.createdTime(),
77✔
1585
                State:     mset.state(),
77✔
1586
                Config:    *setDynamicStreamMetadata(&msetCfg),
77✔
1587
                Domain:    s.getOpts().JetStreamDomain,
77✔
1588
                Mirror:    mset.mirrorInfo(),
77✔
1589
                Sources:   mset.sourcesInfo(),
77✔
1590
                TimeStamp: time.Now().UTC(),
77✔
1591
        }
77✔
1592
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
77✔
1593
}
1594

1595
// Request for the list of all stream names.
1596
func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,350✔
1597
        if c == nil || !s.JetStreamEnabled() {
1,350✔
1598
                return
×
1599
        }
×
1600
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
1,350✔
1601
        if err != nil {
1,350✔
1602
                s.Warnf(badAPIRequestT, msg)
×
1603
                return
×
1604
        }
×
1605

1606
        var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}}
1,350✔
1607

1,350✔
1608
        // Determine if we should proceed here when we are in clustered mode.
1,350✔
1609
        if s.JetStreamIsClustered() {
2,414✔
1610
                js, cc := s.getJetStreamCluster()
1,064✔
1611
                if js == nil || cc == nil {
1,064✔
1612
                        return
×
1613
                }
×
1614
                if js.isLeaderless() {
1,064✔
1615
                        resp.Error = NewJSClusterNotAvailError()
×
1616
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1617
                        return
×
1618
                }
×
1619
                // Make sure we are meta leader.
1620
                if !s.JetStreamIsLeader() {
1,840✔
1621
                        return
776✔
1622
                }
776✔
1623
        }
1624

1625
        if errorOnRequiredApiLevel(hdr) {
575✔
1626
                resp.Error = NewJSRequiredApiLevelError()
1✔
1627
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1628
                return
1✔
1629
        }
1✔
1630

1631
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
574✔
1632
                if doErr {
1✔
1633
                        resp.Error = NewJSNotEnabledForAccountError()
×
1634
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1635
                }
×
1636
                return
1✔
1637
        }
1638

1639
        var offset int
572✔
1640
        var filter string
572✔
1641

572✔
1642
        if isJSONObjectOrArray(msg) {
957✔
1643
                var req JSApiStreamNamesRequest
385✔
1644
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
385✔
1645
                        resp.Error = NewJSInvalidJSONError(err)
×
1646
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1647
                        return
×
1648
                }
×
1649
                offset = max(req.Offset, 0)
385✔
1650
                if req.Subject != _EMPTY_ {
744✔
1651
                        filter = req.Subject
359✔
1652
                }
359✔
1653
        }
1654

1655
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1656
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1657
        var numStreams int
572✔
1658
        if s.JetStreamIsClustered() {
860✔
1659
                js, cc := s.getJetStreamCluster()
288✔
1660
                if js == nil || cc == nil {
288✔
1661
                        // TODO(dlc) - Debug or Warn?
×
1662
                        return
×
1663
                }
×
1664
                js.mu.RLock()
288✔
1665
                for stream, sa := range cc.streams[acc.Name] {
624✔
1666
                        if IsNatsErr(sa.err, JSClusterNotAssignedErr) {
336✔
1667
                                continue
×
1668
                        }
1669
                        if filter != _EMPTY_ {
628✔
1670
                                // These could not have subjects auto-filled in since they are raw and unprocessed.
292✔
1671
                                if len(sa.Config.Subjects) == 0 {
294✔
1672
                                        if SubjectsCollide(filter, sa.Config.Name) {
2✔
1673
                                                resp.Streams = append(resp.Streams, stream)
×
1674
                                        }
×
1675
                                } else {
290✔
1676
                                        for _, subj := range sa.Config.Subjects {
580✔
1677
                                                if SubjectsCollide(filter, subj) {
525✔
1678
                                                        resp.Streams = append(resp.Streams, stream)
235✔
1679
                                                        break
235✔
1680
                                                }
1681
                                        }
1682
                                }
1683
                        } else {
44✔
1684
                                resp.Streams = append(resp.Streams, stream)
44✔
1685
                        }
44✔
1686
                }
1687
                js.mu.RUnlock()
288✔
1688
                if len(resp.Streams) > 1 {
290✔
1689
                        slices.Sort(resp.Streams)
2✔
1690
                }
2✔
1691
                numStreams = len(resp.Streams)
288✔
1692
                if offset > numStreams {
288✔
1693
                        offset = numStreams
×
1694
                }
×
1695
                if offset > 0 {
288✔
1696
                        resp.Streams = resp.Streams[offset:]
×
1697
                }
×
1698
                if len(resp.Streams) > JSApiNamesLimit {
288✔
1699
                        resp.Streams = resp.Streams[:JSApiNamesLimit]
×
1700
                }
×
1701
        } else {
284✔
1702
                msets := acc.filteredStreams(filter)
284✔
1703
                // Since we page results order matters.
284✔
1704
                if len(msets) > 1 {
290✔
1705
                        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
16✔
1706
                }
1707

1708
                numStreams = len(msets)
284✔
1709
                if offset > numStreams {
284✔
1710
                        offset = numStreams
×
1711
                }
×
1712

1713
                for _, mset := range msets[offset:] {
578✔
1714
                        resp.Streams = append(resp.Streams, mset.cfg.Name)
294✔
1715
                        if len(resp.Streams) >= JSApiNamesLimit {
294✔
1716
                                break
×
1717
                        }
1718
                }
1719
        }
1720
        resp.Total = numStreams
572✔
1721
        resp.Limit = JSApiNamesLimit
572✔
1722
        resp.Offset = offset
572✔
1723

572✔
1724
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
572✔
1725
}
1726

1727
// Request for the list of all detailed stream info.
1728
// TODO(dlc) - combine with above long term
1729
func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
71✔
1730
        if c == nil || !s.JetStreamEnabled() {
71✔
1731
                return
×
1732
        }
×
1733
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
71✔
1734
        if err != nil {
71✔
1735
                s.Warnf(badAPIRequestT, msg)
×
1736
                return
×
1737
        }
×
1738

1739
        var resp = JSApiStreamListResponse{
71✔
1740
                ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
71✔
1741
                Streams:     []*StreamInfo{},
71✔
1742
        }
71✔
1743

71✔
1744
        // Determine if we should proceed here when we are in clustered mode.
71✔
1745
        if s.JetStreamIsClustered() {
135✔
1746
                js, cc := s.getJetStreamCluster()
64✔
1747
                if js == nil || cc == nil {
64✔
1748
                        return
×
1749
                }
×
1750
                if js.isLeaderless() {
65✔
1751
                        resp.Error = NewJSClusterNotAvailError()
1✔
1752
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1753
                        return
1✔
1754
                }
1✔
1755
                // Make sure we are meta leader.
1756
                if !s.JetStreamIsLeader() {
108✔
1757
                        return
45✔
1758
                }
45✔
1759
        }
1760

1761
        if errorOnRequiredApiLevel(hdr) {
26✔
1762
                resp.Error = NewJSRequiredApiLevelError()
1✔
1763
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1764
                return
1✔
1765
        }
1✔
1766

1767
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
24✔
1768
                if doErr {
×
1769
                        resp.Error = NewJSNotEnabledForAccountError()
×
1770
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1771
                }
×
1772
                return
×
1773
        }
1774

1775
        var offset int
24✔
1776
        var filter string
24✔
1777

24✔
1778
        if isJSONObjectOrArray(msg) {
37✔
1779
                var req JSApiStreamListRequest
13✔
1780
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
14✔
1781
                        resp.Error = NewJSInvalidJSONError(err)
1✔
1782
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1783
                        return
1✔
1784
                }
1✔
1785
                offset = max(req.Offset, 0)
12✔
1786
                if req.Subject != _EMPTY_ {
14✔
1787
                        filter = req.Subject
2✔
1788
                }
2✔
1789
        }
1790

1791
        // Clustered mode will invoke a scatter and gather.
1792
        if s.JetStreamIsClustered() {
41✔
1793
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
18✔
1794
                msg = copyBytes(msg)
18✔
1795
                s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) })
36✔
1796
                return
18✔
1797
        }
1798

1799
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1800
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1801
        var msets []*stream
5✔
1802
        if filter == _EMPTY_ {
9✔
1803
                msets = acc.streams()
4✔
1804
        } else {
5✔
1805
                msets = acc.filteredStreams(filter)
1✔
1806
        }
1✔
1807

1808
        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
6✔
1809

1810
        scnt := len(msets)
5✔
1811
        if offset > scnt {
5✔
1812
                offset = scnt
×
1813
        }
×
1814

1815
        var missingNames []string
5✔
1816
        for _, mset := range msets[offset:] {
11✔
1817
                if mset.offlineReason != _EMPTY_ {
7✔
1818
                        if resp.Offline == nil {
2✔
1819
                                resp.Offline = make(map[string]string, 1)
1✔
1820
                        }
1✔
1821
                        resp.Offline[mset.getCfgName()] = mset.offlineReason
1✔
1822
                        missingNames = append(missingNames, mset.getCfgName())
1✔
1823
                        continue
1✔
1824
                }
1825

1826
                config := mset.config()
5✔
1827
                resp.Streams = append(resp.Streams, &StreamInfo{
5✔
1828
                        Created:   mset.createdTime(),
5✔
1829
                        State:     mset.state(),
5✔
1830
                        Config:    config,
5✔
1831
                        Domain:    s.getOpts().JetStreamDomain,
5✔
1832
                        Mirror:    mset.mirrorInfo(),
5✔
1833
                        Sources:   mset.sourcesInfo(),
5✔
1834
                        TimeStamp: time.Now().UTC(),
5✔
1835
                })
5✔
1836
                if len(resp.Streams) >= JSApiListLimit {
5✔
1837
                        break
×
1838
                }
1839
        }
1840
        resp.Total = scnt
5✔
1841
        resp.Limit = JSApiListLimit
5✔
1842
        resp.Offset = offset
5✔
1843
        resp.Missing = missingNames
5✔
1844
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
5✔
1845
}
1846

1847
// Request for information about a stream.
1848
func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
29,138✔
1849
        if c == nil || !s.JetStreamEnabled() {
29,144✔
1850
                return
6✔
1851
        }
6✔
1852
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
29,132✔
1853
        if err != nil {
29,137✔
1854
                s.Warnf(badAPIRequestT, msg)
5✔
1855
                return
5✔
1856
        }
5✔
1857

1858
        streamName := streamNameFromSubject(subject)
29,127✔
1859

29,127✔
1860
        var resp = JSApiStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamInfoResponseType}}
29,127✔
1861

29,127✔
1862
        // If someone creates a duplicate stream that is identical we will get this request forwarded to us.
29,127✔
1863
        // Make sure the response type is for a create call.
29,127✔
1864
        if rt := getHeader(JSResponseType, hdr); len(rt) > 0 && string(rt) == jsCreateResponse {
29,127✔
1865
                resp.ApiResponse.Type = JSApiStreamCreateResponseType
×
1866
        }
×
1867

1868
        var clusterWideConsCount int
29,127✔
1869

29,127✔
1870
        js, cc := s.getJetStreamCluster()
29,127✔
1871
        if js == nil {
29,127✔
1872
                return
×
1873
        }
×
1874
        // If we are in clustered mode we need to be the stream leader to proceed.
1875
        if cc != nil {
42,059✔
1876
                // Check to make sure the stream is assigned.
12,932✔
1877
                js.mu.RLock()
12,932✔
1878
                isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, streamName)
12,932✔
1879
                var offline bool
12,932✔
1880
                if sa != nil {
24,648✔
1881
                        clusterWideConsCount = len(sa.consumers)
11,716✔
1882
                        offline = s.allPeersOffline(sa.Group)
11,716✔
1883
                        if sa.unsupported != nil && sa.Group != nil && cc.meta != nil && sa.Group.isMember(cc.meta.ID()) {
11,746✔
1884
                                // If we're a member for this stream, and it's not supported, report it as offline.
30✔
1885
                                resp.Error = NewJSStreamOfflineReasonError(errors.New(sa.unsupported.reason))
30✔
1886
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
30✔
1887
                                js.mu.RUnlock()
30✔
1888
                                return
30✔
1889
                        }
30✔
1890
                }
1891
                js.mu.RUnlock()
12,902✔
1892

12,902✔
1893
                if isLeader && sa == nil {
13,163✔
1894
                        // We can't find the stream, so mimic what would be the errors below.
261✔
1895
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
261✔
1896
                                if doErr {
×
1897
                                        resp.Error = NewJSNotEnabledForAccountError()
×
1898
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1899
                                }
×
1900
                                return
×
1901
                        }
1902
                        // No stream present.
1903
                        resp.Error = NewJSStreamNotFoundError()
261✔
1904
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
261✔
1905
                        return
261✔
1906
                } else if sa == nil {
13,596✔
1907
                        if js.isLeaderless() {
955✔
1908
                                resp.Error = NewJSClusterNotAvailError()
×
1909
                                // Delaying an error response gives the leader a chance to respond before us
×
1910
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
1911
                        }
×
1912
                        return
955✔
1913
                } else if isLeader && offline {
11,689✔
1914
                        resp.Error = NewJSStreamOfflineError()
3✔
1915
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
3✔
1916
                        return
3✔
1917
                }
3✔
1918

1919
                // Check to see if we are a member of the group and if the group has no leader.
1920
                isLeaderless := js.isGroupLeaderless(sa.Group)
11,683✔
1921

11,683✔
1922
                // We have the stream assigned and a leader, so only the stream leader should answer.
11,683✔
1923
                if !acc.JetStreamIsStreamLeader(streamName) && !isLeaderless {
21,053✔
1924
                        if js.isLeaderless() {
9,370✔
1925
                                resp.Error = NewJSClusterNotAvailError()
×
1926
                                // Delaying an error response gives the leader a chance to respond before us
×
1927
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), sa.Group, errRespDelay)
×
1928
                                return
×
1929
                        }
×
1930

1931
                        // We may be in process of electing a leader, but if this is a scale up from 1 we will still be the state leader
1932
                        // while the new members work through the election and catchup process.
1933
                        // Double check for that instead of exiting here and being silent. e.g. nats stream update test --replicas=3
1934
                        js.mu.RLock()
9,370✔
1935
                        rg := sa.Group
9,370✔
1936
                        var ourID string
9,370✔
1937
                        if cc.meta != nil {
18,740✔
1938
                                ourID = cc.meta.ID()
9,370✔
1939
                        }
9,370✔
1940
                        // We have seen cases where rg is nil at this point,
1941
                        // so check explicitly and bail if that is the case.
1942
                        bail := rg == nil || !rg.isMember(ourID)
9,370✔
1943
                        if !bail {
12,910✔
1944
                                // We know we are a member here, if this group is new and we are preferred allow us to answer.
3,540✔
1945
                                // Also, we have seen cases where rg.node is nil at this point,
3,540✔
1946
                                // so check explicitly and bail if that is the case.
3,540✔
1947
                                bail = rg.Preferred != ourID || (rg.node != nil && time.Since(rg.node.Created()) > lostQuorumIntervalDefault)
3,540✔
1948
                        }
3,540✔
1949
                        js.mu.RUnlock()
9,370✔
1950
                        if bail {
18,726✔
1951
                                return
9,356✔
1952
                        }
9,356✔
1953
                }
1954
        }
1955

1956
        if errorOnRequiredApiLevel(hdr) {
18,523✔
1957
                resp.Error = NewJSRequiredApiLevelError()
1✔
1958
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1959
                return
1✔
1960
        }
1✔
1961

1962
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
18,527✔
1963
                if doErr {
7✔
1964
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
1965
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1966
                }
1✔
1967
                return
6✔
1968
        }
1969

1970
        var details bool
18,515✔
1971
        var subjects string
18,515✔
1972
        var offset int
18,515✔
1973
        if isJSONObjectOrArray(msg) {
18,616✔
1974
                var req JSApiStreamInfoRequest
101✔
1975
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
102✔
1976
                        resp.Error = NewJSInvalidJSONError(err)
1✔
1977
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1978
                        return
1✔
1979
                }
1✔
1980
                details, subjects = req.DeletedDetails, req.SubjectsFilter
100✔
1981
                offset = max(req.Offset, 0)
100✔
1982
        }
1983

1984
        mset, err := acc.lookupStream(streamName)
18,514✔
1985
        // Error is not to be expected at this point, but could happen if same stream trying to be created.
18,514✔
1986
        if err != nil {
19,291✔
1987
                if cc != nil {
777✔
1988
                        // This could be inflight, pause for a short bit and try again.
×
1989
                        // This will not be inline, so ok.
×
1990
                        time.Sleep(10 * time.Millisecond)
×
1991
                        mset, err = acc.lookupStream(streamName)
×
1992
                }
×
1993
                // Check again.
1994
                if err != nil {
1,554✔
1995
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
777✔
1996
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
777✔
1997
                        return
777✔
1998
                }
777✔
1999
        }
2000

2001
        if mset.offlineReason != _EMPTY_ {
17,738✔
2002
                resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason))
1✔
2003
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
2004
                return
1✔
2005
        }
1✔
2006

2007
        config := mset.config()
17,736✔
2008
        resp.StreamInfo = &StreamInfo{
17,736✔
2009
                Created:    mset.createdTime(),
17,736✔
2010
                State:      mset.stateWithDetail(details),
17,736✔
2011
                Config:     *setDynamicStreamMetadata(&config),
17,736✔
2012
                Domain:     s.getOpts().JetStreamDomain,
17,736✔
2013
                Cluster:    js.clusterInfo(mset.raftGroup()),
17,736✔
2014
                Mirror:     mset.mirrorInfo(),
17,736✔
2015
                Sources:    mset.sourcesInfo(),
17,736✔
2016
                Alternates: js.streamAlternates(ci, config.Name),
17,736✔
2017
                TimeStamp:  time.Now().UTC(),
17,736✔
2018
        }
17,736✔
2019
        if clusterWideConsCount > 0 {
18,229✔
2020
                resp.StreamInfo.State.Consumers = clusterWideConsCount
493✔
2021
        }
493✔
2022

2023
        // Check if they have asked for subject details.
2024
        if subjects != _EMPTY_ {
17,834✔
2025
                st := mset.store.SubjectsTotals(subjects)
98✔
2026
                if lst := len(st); lst > 0 {
172✔
2027
                        // Common for both cases.
74✔
2028
                        resp.Offset = offset
74✔
2029
                        resp.Limit = JSMaxSubjectDetails
74✔
2030
                        resp.Total = lst
74✔
2031

74✔
2032
                        if offset == 0 && lst <= JSMaxSubjectDetails {
148✔
2033
                                resp.StreamInfo.State.Subjects = st
74✔
2034
                        } else {
74✔
2035
                                // Here we have to filter list due to offset or maximum constraints.
×
2036
                                subjs := make([]string, 0, len(st))
×
2037
                                for subj := range st {
×
2038
                                        subjs = append(subjs, subj)
×
2039
                                }
×
2040
                                // Sort it
2041
                                slices.Sort(subjs)
×
2042

×
2043
                                if offset > len(subjs) {
×
2044
                                        offset = len(subjs)
×
2045
                                }
×
2046

2047
                                end := offset + JSMaxSubjectDetails
×
2048
                                if end > len(subjs) {
×
2049
                                        end = len(subjs)
×
2050
                                }
×
2051
                                actualSize := end - offset
×
2052
                                var sd map[string]uint64
×
2053

×
2054
                                if actualSize > 0 {
×
2055
                                        sd = make(map[string]uint64, actualSize)
×
2056
                                        for _, ss := range subjs[offset:end] {
×
2057
                                                sd[ss] = st[ss]
×
2058
                                        }
×
2059
                                }
2060
                                resp.StreamInfo.State.Subjects = sd
×
2061
                        }
2062
                }
2063
        }
2064
        // Check for out of band catchups.
2065
        if mset.hasCatchupPeers() {
17,736✔
2066
                mset.checkClusterInfo(resp.StreamInfo.Cluster)
×
2067
        }
×
2068

2069
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
17,736✔
2070
}
2071

2072
// Request to have a stream leader stepdown.
2073
func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
279✔
2074
        if c == nil || !s.JetStreamEnabled() {
279✔
2075
                return
×
2076
        }
×
2077
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
279✔
2078
        if err != nil {
279✔
2079
                s.Warnf(badAPIRequestT, msg)
×
2080
                return
×
2081
        }
×
2082

2083
        // Have extra token for this one.
2084
        name := tokenAt(subject, 6)
279✔
2085

279✔
2086
        var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}}
279✔
2087

279✔
2088
        // If we are not in clustered mode this is a failed request.
279✔
2089
        if !s.JetStreamIsClustered() {
280✔
2090
                resp.Error = NewJSClusterRequiredError()
1✔
2091
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2092
                return
1✔
2093
        }
1✔
2094

2095
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2096
        js, cc := s.getJetStreamCluster()
278✔
2097
        if js == nil || cc == nil {
278✔
2098
                return
×
2099
        }
×
2100
        if js.isLeaderless() {
278✔
2101
                resp.Error = NewJSClusterNotAvailError()
×
2102
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2103
                return
×
2104
        }
×
2105

2106
        js.mu.RLock()
278✔
2107
        isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, name)
278✔
2108
        js.mu.RUnlock()
278✔
2109

278✔
2110
        if isLeader && sa == nil {
278✔
2111
                resp.Error = NewJSStreamNotFoundError()
×
2112
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2113
                return
×
2114
        } else if sa == nil {
279✔
2115
                return
1✔
2116
        }
1✔
2117

2118
        if errorOnRequiredApiLevel(hdr) {
277✔
2119
                resp.Error = NewJSRequiredApiLevelError()
×
2120
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2121
                return
×
2122
        }
×
2123

2124
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
277✔
2125
                if doErr {
×
2126
                        resp.Error = NewJSNotEnabledForAccountError()
×
2127
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2128
                }
×
2129
                return
×
2130
        }
2131

2132
        // Check to see if we are a member of the group and if the group has no leader.
2133
        if js.isGroupLeaderless(sa.Group) {
277✔
2134
                resp.Error = NewJSClusterNotAvailError()
×
2135
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2136
                return
×
2137
        }
×
2138

2139
        // We have the stream assigned and a leader, so only the stream leader should answer.
2140
        if !acc.JetStreamIsStreamLeader(name) {
490✔
2141
                return
213✔
2142
        }
213✔
2143

2144
        mset, err := acc.lookupStream(name)
64✔
2145
        if err != nil {
64✔
2146
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
2147
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2148
                return
×
2149
        }
×
2150

2151
        if mset == nil {
64✔
2152
                resp.Success = true
×
2153
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2154
                return
×
2155
        }
×
2156

2157
        node := mset.raftNode()
64✔
2158
        if node == nil {
64✔
2159
                resp.Success = true
×
2160
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2161
                return
×
2162
        }
×
2163

2164
        var preferredLeader string
64✔
2165
        if isJSONObjectOrArray(msg) {
77✔
2166
                var req JSApiLeaderStepdownRequest
13✔
2167
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2168
                        resp.Error = NewJSInvalidJSONError(err)
×
2169
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2170
                        return
×
2171
                }
×
2172
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(node, req.Placement); resp.Error != nil {
18✔
2173
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2174
                        return
5✔
2175
                }
5✔
2176
        }
2177

2178
        // Call actual stepdown.
2179
        err = node.StepDown(preferredLeader)
59✔
2180
        if err != nil {
59✔
2181
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2182
        } else {
59✔
2183
                resp.Success = true
59✔
2184
        }
59✔
2185
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
59✔
2186
}
2187

2188
// Request to have a consumer leader stepdown.
2189
func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
227✔
2190
        if c == nil || !s.JetStreamEnabled() {
227✔
2191
                return
×
2192
        }
×
2193
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
227✔
2194
        if err != nil {
227✔
2195
                s.Warnf(badAPIRequestT, msg)
×
2196
                return
×
2197
        }
×
2198

2199
        var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}}
227✔
2200

227✔
2201
        // If we are not in clustered mode this is a failed request.
227✔
2202
        if !s.JetStreamIsClustered() {
228✔
2203
                resp.Error = NewJSClusterRequiredError()
1✔
2204
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2205
                return
1✔
2206
        }
1✔
2207

2208
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2209
        js, cc := s.getJetStreamCluster()
226✔
2210
        if js == nil || cc == nil {
226✔
2211
                return
×
2212
        }
×
2213
        if js.isLeaderless() {
226✔
2214
                resp.Error = NewJSClusterNotAvailError()
×
2215
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2216
                return
×
2217
        }
×
2218

2219
        // Have extra token for this one.
2220
        stream := tokenAt(subject, 6)
226✔
2221
        consumer := tokenAt(subject, 7)
226✔
2222

226✔
2223
        js.mu.RLock()
226✔
2224
        isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, stream)
226✔
2225
        js.mu.RUnlock()
226✔
2226

226✔
2227
        if isLeader && sa == nil {
226✔
2228
                resp.Error = NewJSStreamNotFoundError()
×
2229
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2230
                return
×
2231
        } else if sa == nil {
226✔
2232
                return
×
2233
        }
×
2234

2235
        if errorOnRequiredApiLevel(hdr) {
226✔
2236
                resp.Error = NewJSRequiredApiLevelError()
×
2237
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2238
                return
×
2239
        }
×
2240

2241
        var ca *consumerAssignment
226✔
2242
        if sa.consumers != nil {
452✔
2243
                ca = sa.consumers[consumer]
226✔
2244
        }
226✔
2245
        if ca == nil {
226✔
2246
                resp.Error = NewJSConsumerNotFoundError()
×
2247
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2248
                return
×
2249
        }
×
2250
        // Check to see if we are a member of the group and if the group has no leader.
2251
        if js.isGroupLeaderless(ca.Group) {
226✔
2252
                resp.Error = NewJSClusterNotAvailError()
×
2253
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2254
                return
×
2255
        }
×
2256

2257
        if !acc.JetStreamIsConsumerLeader(stream, consumer) {
389✔
2258
                return
163✔
2259
        }
163✔
2260

2261
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
63✔
2262
                if doErr {
×
2263
                        resp.Error = NewJSNotEnabledForAccountError()
×
2264
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2265
                }
×
2266
                return
×
2267
        }
2268

2269
        mset, err := acc.lookupStream(stream)
63✔
2270
        if err != nil {
63✔
2271
                resp.Error = NewJSStreamNotFoundError()
×
2272
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2273
                return
×
2274
        }
×
2275
        o := mset.lookupConsumer(consumer)
63✔
2276
        if o == nil {
63✔
2277
                resp.Error = NewJSConsumerNotFoundError()
×
2278
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2279
                return
×
2280
        }
×
2281

2282
        n := o.raftNode()
63✔
2283
        if n == nil {
63✔
2284
                resp.Success = true
×
2285
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2286
                return
×
2287
        }
×
2288

2289
        var preferredLeader string
63✔
2290
        if isJSONObjectOrArray(msg) {
76✔
2291
                var req JSApiLeaderStepdownRequest
13✔
2292
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2293
                        resp.Error = NewJSInvalidJSONError(err)
×
2294
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2295
                        return
×
2296
                }
×
2297
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(n, req.Placement); resp.Error != nil {
18✔
2298
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2299
                        return
5✔
2300
                }
5✔
2301
        }
2302

2303
        // Call actual stepdown.
2304
        err = n.StepDown(preferredLeader)
58✔
2305
        if err != nil {
58✔
2306
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2307
        } else {
58✔
2308
                resp.Success = true
58✔
2309
        }
58✔
2310
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
58✔
2311
}
2312

2313
// Request to remove a peer from a clustered stream.
2314
func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
75✔
2315
        if c == nil || !s.JetStreamEnabled() {
75✔
2316
                return
×
2317
        }
×
2318
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
75✔
2319
        if err != nil {
75✔
2320
                s.Warnf(badAPIRequestT, msg)
×
2321
                return
×
2322
        }
×
2323

2324
        // Have extra token for this one.
2325
        name := tokenAt(subject, 6)
75✔
2326

75✔
2327
        var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}}
75✔
2328

75✔
2329
        // If we are not in clustered mode this is a failed request.
75✔
2330
        if !s.JetStreamIsClustered() {
76✔
2331
                resp.Error = NewJSClusterRequiredError()
1✔
2332
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2333
                return
1✔
2334
        }
1✔
2335

2336
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2337
        js, cc := s.getJetStreamCluster()
74✔
2338
        if js == nil || cc == nil {
74✔
2339
                return
×
2340
        }
×
2341
        if js.isLeaderless() {
74✔
2342
                resp.Error = NewJSClusterNotAvailError()
×
2343
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2344
                return
×
2345
        }
×
2346

2347
        js.mu.RLock()
74✔
2348
        isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, name)
74✔
2349
        js.mu.RUnlock()
74✔
2350

74✔
2351
        // Make sure we are meta leader.
74✔
2352
        if !isLeader {
136✔
2353
                return
62✔
2354
        }
62✔
2355

2356
        if errorOnRequiredApiLevel(hdr) {
12✔
2357
                resp.Error = NewJSRequiredApiLevelError()
×
2358
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2359
                return
×
2360
        }
×
2361

2362
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
12✔
2363
                if doErr {
×
2364
                        resp.Error = NewJSNotEnabledForAccountError()
×
2365
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2366
                }
×
2367
                return
×
2368
        }
2369
        if isEmptyRequest(msg) {
12✔
2370
                resp.Error = NewJSBadRequestError()
×
2371
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2372
                return
×
2373
        }
×
2374

2375
        var req JSApiStreamRemovePeerRequest
12✔
2376
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
2377
                resp.Error = NewJSInvalidJSONError(err)
×
2378
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2379
                return
×
2380
        }
×
2381
        if req.Peer == _EMPTY_ {
12✔
2382
                resp.Error = NewJSBadRequestError()
×
2383
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2384
                return
×
2385
        }
×
2386

2387
        if sa == nil {
12✔
2388
                // No stream present.
×
2389
                resp.Error = NewJSStreamNotFoundError()
×
2390
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2391
                return
×
2392
        }
×
2393

2394
        js.mu.RLock()
12✔
2395
        rg := sa.Group
12✔
2396

12✔
2397
        // Check to see if we are a member of the group.
12✔
2398
        // Peer here is either a peer ID or a server name, convert to node name.
12✔
2399
        nodeName := getHash(req.Peer)
12✔
2400
        isMember := rg.isMember(nodeName)
12✔
2401
        if !isMember {
14✔
2402
                nodeName = req.Peer
2✔
2403
                isMember = rg.isMember(nodeName)
2✔
2404
        }
2✔
2405
        js.mu.RUnlock()
12✔
2406

12✔
2407
        // Make sure we are a member.
12✔
2408
        if !isMember {
13✔
2409
                resp.Error = NewJSClusterPeerNotMemberError()
1✔
2410
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2411
                return
1✔
2412
        }
1✔
2413

2414
        // If we are here we have a valid peer member set for removal.
2415
        if !js.removePeerFromStream(sa, nodeName) {
13✔
2416
                resp.Error = NewJSPeerRemapError()
2✔
2417
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2418
                return
2✔
2419
        }
2✔
2420

2421
        resp.Success = true
9✔
2422
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
9✔
2423
}
2424

2425
// Request to have the metaleader remove a peer from the system.
2426
func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
13✔
2427
        if c == nil || !s.JetStreamEnabled() {
13✔
2428
                return
×
2429
        }
×
2430

2431
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
13✔
2432
        if err != nil {
13✔
2433
                s.Warnf(badAPIRequestT, msg)
×
2434
                return
×
2435
        }
×
2436
        if acc != s.SystemAccount() {
13✔
2437
                return
×
2438
        }
×
2439

2440
        js, cc := s.getJetStreamCluster()
13✔
2441
        if js == nil || cc == nil {
13✔
2442
                return
×
2443
        }
×
2444

2445
        js.mu.RLock()
13✔
2446
        isLeader := cc.isLeader()
13✔
2447
        meta := cc.meta
13✔
2448
        js.mu.RUnlock()
13✔
2449

13✔
2450
        // Extra checks here but only leader is listening.
13✔
2451
        if !isLeader {
13✔
2452
                return
×
2453
        }
×
2454

2455
        var resp = JSApiMetaServerRemoveResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerRemoveResponseType}}
13✔
2456
        if errorOnRequiredApiLevel(hdr) {
13✔
2457
                resp.Error = NewJSRequiredApiLevelError()
×
2458
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2459
                return
×
2460
        }
×
2461

2462
        if isEmptyRequest(msg) {
13✔
2463
                resp.Error = NewJSBadRequestError()
×
2464
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2465
                return
×
2466
        }
×
2467

2468
        var req JSApiMetaServerRemoveRequest
13✔
2469
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2470
                resp.Error = NewJSInvalidJSONError(err)
×
2471
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2472
                return
×
2473
        }
×
2474

2475
        js.mu.Lock()
13✔
2476
        defer js.mu.Unlock()
13✔
2477

13✔
2478
        // Another peer-remove is already in progress, don't allow multiple concurrent changes.
13✔
2479
        if cc.peerRemoveReply != nil {
15✔
2480
                resp.Error = NewJSClusterServerMemberChangeInflightError()
2✔
2481
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2482
                return
2✔
2483
        }
2✔
2484

2485
        var found string
11✔
2486
        for _, p := range meta.Peers() {
41✔
2487
                // If Peer is specified, it takes precedence
30✔
2488
                if req.Peer != _EMPTY_ {
35✔
2489
                        if p.ID == req.Peer {
6✔
2490
                                found = req.Peer
1✔
2491
                                break
1✔
2492
                        }
2493
                        continue
4✔
2494
                }
2495
                si, ok := s.nodeToInfo.Load(p.ID)
25✔
2496
                if ok && si.(nodeInfo).name == req.Server {
31✔
2497
                        found = p.ID
6✔
2498
                        break
6✔
2499
                }
2500
        }
2501

2502
        if found == _EMPTY_ {
15✔
2503
                resp.Error = NewJSClusterServerNotMemberError()
4✔
2504
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
2505
                return
4✔
2506
        }
4✔
2507

2508
        if err := meta.ProposeRemovePeer(found); err != nil {
7✔
2509
                if err == errMembershipChange {
×
2510
                        resp.Error = NewJSClusterServerMemberChangeInflightError()
×
2511
                } else {
×
2512
                        resp.Error = NewJSRaftGeneralError(err)
×
2513
                }
×
2514
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2515
                return
×
2516
        }
2517

2518
        if cc.peerRemoveReply == nil {
14✔
2519
                cc.peerRemoveReply = make(map[string]peerRemoveInfo, 1)
7✔
2520
        }
7✔
2521
        // Only copy the request, the subject and reply are already copied.
2522
        cc.peerRemoveReply[found] = peerRemoveInfo{ci: ci, subject: subject, reply: reply, request: string(msg)}
7✔
2523
}
2524

2525
func (s *Server) peerSetToNames(ps []string) []string {
172✔
2526
        names := make([]string, len(ps))
172✔
2527
        for i := 0; i < len(ps); i++ {
640✔
2528
                if si, ok := s.nodeToInfo.Load(ps[i]); !ok {
468✔
2529
                        names[i] = ps[i]
×
2530
                } else {
468✔
2531
                        names[i] = si.(nodeInfo).name
468✔
2532
                }
468✔
2533
        }
2534
        return names
172✔
2535
}
2536

2537
// looks up the peer id for a given server name. Cluster and domain name are optional filter criteria
2538
func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string {
29✔
2539
        js.mu.RLock()
29✔
2540
        defer js.mu.RUnlock()
29✔
2541
        if cc := js.cluster; cc != nil {
58✔
2542
                for _, p := range cc.meta.Peers() {
145✔
2543
                        si, ok := s.nodeToInfo.Load(p.ID)
116✔
2544
                        if ok && si.(nodeInfo).name == serverName {
145✔
2545
                                if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
58✔
2546
                                        if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
58✔
2547
                                                return p.ID
29✔
2548
                                        }
29✔
2549
                                }
2550
                        }
2551
                }
2552
        }
2553
        return _EMPTY_
×
2554
}
2555

2556
// Request to have the metaleader move a stream on a peer to another
2557
func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
33✔
2558
        if c == nil || !s.JetStreamEnabled() {
33✔
2559
                return
×
2560
        }
×
2561

2562
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
33✔
2563
        if err != nil {
33✔
2564
                s.Warnf(badAPIRequestT, msg)
×
2565
                return
×
2566
        }
×
2567

2568
        js, cc := s.getJetStreamCluster()
33✔
2569
        if js == nil || cc == nil {
33✔
2570
                return
×
2571
        }
×
2572

2573
        // Extra checks here but only leader is listening.
2574
        js.mu.RLock()
33✔
2575
        isLeader := cc.isLeader()
33✔
2576
        js.mu.RUnlock()
33✔
2577

33✔
2578
        if !isLeader {
33✔
2579
                return
×
2580
        }
×
2581

2582
        accName := tokenAt(subject, 6)
33✔
2583
        streamName := tokenAt(subject, 7)
33✔
2584

33✔
2585
        if acc.GetName() != accName && acc != s.SystemAccount() {
33✔
2586
                return
×
2587
        }
×
2588

2589
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
33✔
2590
        if errorOnRequiredApiLevel(hdr) {
33✔
2591
                resp.Error = NewJSRequiredApiLevelError()
×
2592
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2593
                return
×
2594
        }
×
2595

2596
        var req JSApiMetaServerStreamMoveRequest
33✔
2597
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
33✔
2598
                resp.Error = NewJSInvalidJSONError(err)
×
2599
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2600
                return
×
2601
        }
×
2602

2603
        srcPeer := _EMPTY_
33✔
2604
        if req.Server != _EMPTY_ {
62✔
2605
                srcPeer = s.nameToPeer(js, req.Server, req.Cluster, req.Domain)
29✔
2606
        }
29✔
2607

2608
        targetAcc, ok := s.accounts.Load(accName)
33✔
2609
        if !ok {
33✔
2610
                resp.Error = NewJSNoAccountError()
×
2611
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2612
                return
×
2613
        }
×
2614

2615
        var streamFound bool
33✔
2616
        cfg := StreamConfig{}
33✔
2617
        currPeers := []string{}
33✔
2618
        currCluster := _EMPTY_
33✔
2619
        js.mu.Lock()
33✔
2620
        streams, ok := cc.streams[accName]
33✔
2621
        if ok {
66✔
2622
                sa, ok := streams[streamName]
33✔
2623
                if ok {
66✔
2624
                        cfg = *sa.Config.clone()
33✔
2625
                        streamFound = true
33✔
2626
                        currPeers = sa.Group.Peers
33✔
2627
                        currCluster = sa.Group.Cluster
33✔
2628
                }
33✔
2629
        }
2630
        js.mu.Unlock()
33✔
2631

33✔
2632
        if !streamFound {
33✔
2633
                resp.Error = NewJSStreamNotFoundError()
×
2634
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2635
                return
×
2636
        }
×
2637

2638
        // if server was picked, make sure src peer exists and move it to first position.
2639
        // removal will drop peers from the left
2640
        if req.Server != _EMPTY_ {
62✔
2641
                if srcPeer == _EMPTY_ {
29✔
2642
                        resp.Error = NewJSClusterServerNotMemberError()
×
2643
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2644
                        return
×
2645
                }
×
2646
                var peerFound bool
29✔
2647
                for i := 0; i < len(currPeers); i++ {
84✔
2648
                        if currPeers[i] == srcPeer {
84✔
2649
                                copy(currPeers[1:], currPeers[:i])
29✔
2650
                                currPeers[0] = srcPeer
29✔
2651
                                peerFound = true
29✔
2652
                                break
29✔
2653
                        }
2654
                }
2655
                if !peerFound {
29✔
2656
                        resp.Error = NewJSClusterPeerNotMemberError()
×
2657
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2658
                        return
×
2659
                }
×
2660
        }
2661

2662
        // make sure client is scoped to requested account
2663
        ciNew := *(ci)
33✔
2664
        ciNew.Account = accName
33✔
2665

33✔
2666
        // backup placement such that peers can be looked up with modified tag list
33✔
2667
        var origPlacement *Placement
33✔
2668
        if cfg.Placement != nil {
33✔
2669
                tmp := *cfg.Placement
×
2670
                origPlacement = &tmp
×
2671
        }
×
2672

2673
        if len(req.Tags) > 0 {
60✔
2674
                if cfg.Placement == nil {
54✔
2675
                        cfg.Placement = &Placement{}
27✔
2676
                }
27✔
2677
                cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...)
27✔
2678
        }
2679

2680
        peers, e := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1, nil)
33✔
2681
        if len(peers) <= cfg.Replicas {
35✔
2682
                // since expanding in the same cluster did not yield a result, try in different cluster
2✔
2683
                peers = nil
2✔
2684

2✔
2685
                clusters := map[string]struct{}{}
2✔
2686
                s.nodeToInfo.Range(func(_, ni any) bool {
18✔
2687
                        if currCluster != ni.(nodeInfo).cluster {
24✔
2688
                                clusters[ni.(nodeInfo).cluster] = struct{}{}
8✔
2689
                        }
8✔
2690
                        return true
16✔
2691
                })
2692
                errs := &selectPeerError{}
2✔
2693
                errs.accumulate(e)
2✔
2694
                for cluster := range clusters {
4✔
2695
                        newPeers, e := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0, nil)
2✔
2696
                        if len(newPeers) >= cfg.Replicas {
4✔
2697
                                peers = append([]string{}, currPeers...)
2✔
2698
                                peers = append(peers, newPeers[:cfg.Replicas]...)
2✔
2699
                                break
2✔
2700
                        }
2701
                        errs.accumulate(e)
×
2702
                }
2703
                if peers == nil {
2✔
2704
                        resp.Error = NewJSClusterNoPeersError(errs)
×
2705
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2706
                        return
×
2707
                }
×
2708
        }
2709

2710
        cfg.Placement = origPlacement
33✔
2711

33✔
2712
        s.Noticef("Requested move for stream '%s > %s' R=%d from %+v to %+v",
33✔
2713
                accName, streamName, cfg.Replicas, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
33✔
2714

33✔
2715
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
33✔
2716
        // We should be fine ignoring pedantic mode here. as we do not touch configuration.
33✔
2717
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
33✔
2718
}
2719

2720
// Request to have the metaleader move a stream on a peer to another
2721
func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
4✔
2722
        if c == nil || !s.JetStreamEnabled() {
4✔
2723
                return
×
2724
        }
×
2725

2726
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
4✔
2727
        if err != nil {
4✔
2728
                s.Warnf(badAPIRequestT, msg)
×
2729
                return
×
2730
        }
×
2731

2732
        js, cc := s.getJetStreamCluster()
4✔
2733
        if js == nil || cc == nil {
4✔
2734
                return
×
2735
        }
×
2736

2737
        // Extra checks here but only leader is listening.
2738
        js.mu.RLock()
4✔
2739
        isLeader := cc.isLeader()
4✔
2740
        js.mu.RUnlock()
4✔
2741

4✔
2742
        if !isLeader {
4✔
2743
                return
×
2744
        }
×
2745

2746
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
4✔
2747
        if errorOnRequiredApiLevel(hdr) {
4✔
2748
                resp.Error = NewJSRequiredApiLevelError()
×
2749
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2750
                return
×
2751
        }
×
2752

2753
        accName := tokenAt(subject, 6)
4✔
2754
        streamName := tokenAt(subject, 7)
4✔
2755

4✔
2756
        if acc.GetName() != accName && acc != s.SystemAccount() {
4✔
2757
                return
×
2758
        }
×
2759

2760
        targetAcc, ok := s.accounts.Load(accName)
4✔
2761
        if !ok {
4✔
2762
                resp.Error = NewJSNoAccountError()
×
2763
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2764
                return
×
2765
        }
×
2766

2767
        streamFound := false
4✔
2768
        cfg := StreamConfig{}
4✔
2769
        currPeers := []string{}
4✔
2770
        js.mu.Lock()
4✔
2771
        streams, ok := cc.streams[accName]
4✔
2772
        if ok {
8✔
2773
                sa, ok := streams[streamName]
4✔
2774
                if ok {
8✔
2775
                        cfg = *sa.Config.clone()
4✔
2776
                        streamFound = true
4✔
2777
                        currPeers = sa.Group.Peers
4✔
2778
                }
4✔
2779
        }
2780
        js.mu.Unlock()
4✔
2781

4✔
2782
        if !streamFound {
4✔
2783
                resp.Error = NewJSStreamNotFoundError()
×
2784
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2785
                return
×
2786
        }
×
2787

2788
        if len(currPeers) <= cfg.Replicas {
4✔
2789
                resp.Error = NewJSStreamMoveNotInProgressError()
×
2790
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2791
                return
×
2792
        }
×
2793

2794
        // make sure client is scoped to requested account
2795
        ciNew := *(ci)
4✔
2796
        ciNew.Account = accName
4✔
2797

4✔
2798
        peers := currPeers[:cfg.Replicas]
4✔
2799

4✔
2800
        // Remove placement in case tags don't match
4✔
2801
        // This can happen if the move was initiated by modifying the tags.
4✔
2802
        // This is an account operation.
4✔
2803
        // This can NOT happen when the move was initiated by the system account.
4✔
2804
        // There move honors the original tag list.
4✔
2805
        if cfg.Placement != nil && len(cfg.Placement.Tags) != 0 {
5✔
2806
        FOR_TAGCHECK:
1✔
2807
                for _, peer := range peers {
2✔
2808
                        si, ok := s.nodeToInfo.Load(peer)
1✔
2809
                        if !ok {
1✔
2810
                                // can't verify tags, do the safe thing and error
×
2811
                                resp.Error = NewJSStreamGeneralError(
×
2812
                                        fmt.Errorf("peer %s not present for tag validation", peer))
×
2813
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2814
                                return
×
2815
                        }
×
2816
                        nodeTags := si.(nodeInfo).tags
1✔
2817
                        for _, tag := range cfg.Placement.Tags {
2✔
2818
                                if !nodeTags.Contains(tag) {
2✔
2819
                                        // clear placement as tags don't match
1✔
2820
                                        cfg.Placement = nil
1✔
2821
                                        break FOR_TAGCHECK
1✔
2822
                                }
2823
                        }
2824

2825
                }
2826
        }
2827

2828
        s.Noticef("Requested cancel of move: R=%d '%s > %s' to peer set %+v and restore previous peer set %+v",
4✔
2829
                cfg.Replicas, accName, streamName, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
4✔
2830

4✔
2831
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
4✔
2832
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
4✔
2833
}
2834

2835
// Request to have an account purged
2836
func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
7✔
2837
        if c == nil || !s.JetStreamEnabled() {
7✔
2838
                return
×
2839
        }
×
2840

2841
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
7✔
2842
        if err != nil {
7✔
2843
                s.Warnf(badAPIRequestT, msg)
×
2844
                return
×
2845
        }
×
2846
        if acc != s.SystemAccount() {
7✔
2847
                return
×
2848
        }
×
2849

2850
        js := s.getJetStream()
7✔
2851
        if js == nil {
7✔
2852
                return
×
2853
        }
×
2854

2855
        accName := tokenAt(subject, 5)
7✔
2856

7✔
2857
        var resp = JSApiAccountPurgeResponse{ApiResponse: ApiResponse{Type: JSApiAccountPurgeResponseType}}
7✔
2858

7✔
2859
        // Check for path like separators in the name.
7✔
2860
        if strings.ContainsAny(accName, `\/`) {
8✔
2861
                resp.Error = NewJSStreamGeneralError(errors.New("account name can not contain path separators"))
1✔
2862
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2863
                return
1✔
2864
        }
1✔
2865

2866
        if !s.JetStreamIsClustered() {
8✔
2867
                var streams []*stream
2✔
2868
                var ac *Account
2✔
2869
                if ac, err = s.lookupAccount(accName); err == nil && ac != nil {
3✔
2870
                        streams = ac.streams()
1✔
2871
                }
1✔
2872

2873
                s.Noticef("Purge request for account %s (streams: %d, hasAccount: %t)",
2✔
2874
                        accName, len(streams), ac != nil)
2✔
2875

2✔
2876
                for _, mset := range streams {
3✔
2877
                        err := mset.delete()
1✔
2878
                        if err != nil {
1✔
2879
                                resp.Error = NewJSStreamDeleteError(err)
×
2880
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2881
                                return
×
2882
                        }
×
2883
                }
2884
                if err := os.RemoveAll(filepath.Join(js.config.StoreDir, accName)); err != nil {
2✔
2885
                        resp.Error = NewJSStreamGeneralError(err)
×
2886
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2887
                        return
×
2888
                }
×
2889
                resp.Initiated = true
2✔
2890
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2891
                return
2✔
2892
        }
2893

2894
        _, cc := s.getJetStreamCluster()
4✔
2895

4✔
2896
        js.mu.RLock()
4✔
2897
        isLeader := cc.isLeader()
4✔
2898
        meta := cc.meta
4✔
2899
        js.mu.RUnlock()
4✔
2900

4✔
2901
        if !isLeader {
4✔
2902
                return
×
2903
        }
×
2904

2905
        if errorOnRequiredApiLevel(hdr) {
4✔
2906
                resp.Error = NewJSRequiredApiLevelError()
×
2907
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2908
                return
×
2909
        }
×
2910

2911
        if js.isMetaRecovering() {
4✔
2912
                // While in recovery mode, the data structures are not fully initialized
×
2913
                resp.Error = NewJSClusterNotAvailError()
×
2914
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2915
                return
×
2916
        }
×
2917

2918
        js.mu.Lock()
4✔
2919
        ns, nc := 0, 0
4✔
2920
        for osa := range js.streamAssignmentsOrInflightSeq(accName) {
12✔
2921
                for oca := range js.consumerAssignmentsOrInflightSeq(accName, osa.Config.Name) {
20✔
2922
                        ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client, Created: oca.Created}
12✔
2923
                        meta.Propose(encodeDeleteConsumerAssignment(ca))
12✔
2924
                        cc.trackInflightConsumerProposal(accName, osa.Config.Name, ca, true)
12✔
2925
                        nc++
12✔
2926
                }
12✔
2927
                sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client, Created: osa.Created}
8✔
2928
                meta.Propose(encodeDeleteStreamAssignment(sa))
8✔
2929
                cc.trackInflightStreamProposal(accName, sa, true)
8✔
2930
                ns++
8✔
2931
        }
2932
        js.mu.Unlock()
4✔
2933

4✔
2934
        hasAccount := ns > 0
4✔
2935
        s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)", accName, ns, nc, hasAccount)
4✔
2936

4✔
2937
        resp.Initiated = true
4✔
2938
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
2939
}
2940

2941
// Request to have the meta leader stepdown.
2942
// These will only be received by the meta leader, so less checking needed.
2943
func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
23✔
2944
        if c == nil || !s.JetStreamEnabled() {
23✔
2945
                return
×
2946
        }
×
2947

2948
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
23✔
2949
        if err != nil {
23✔
2950
                s.Warnf(badAPIRequestT, msg)
×
2951
                return
×
2952
        }
×
2953

2954
        // This should only be coming from the System Account.
2955
        if acc != s.SystemAccount() {
24✔
2956
                s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User)
1✔
2957
                return
1✔
2958
        }
1✔
2959

2960
        js, cc := s.getJetStreamCluster()
22✔
2961
        if js == nil || cc == nil {
22✔
2962
                return
×
2963
        }
×
2964

2965
        // Extra checks here but only leader is listening.
2966
        js.mu.RLock()
22✔
2967
        isLeader := cc.isLeader()
22✔
2968
        meta := cc.meta
22✔
2969
        js.mu.RUnlock()
22✔
2970

22✔
2971
        if !isLeader {
22✔
2972
                return
×
2973
        }
×
2974

2975
        var preferredLeader string
22✔
2976
        var resp = JSApiLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiLeaderStepDownResponseType}}
22✔
2977
        if errorOnRequiredApiLevel(hdr) {
22✔
2978
                resp.Error = NewJSRequiredApiLevelError()
×
2979
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2980
                return
×
2981
        }
×
2982

2983
        if isJSONObjectOrArray(msg) {
36✔
2984
                var req JSApiLeaderStepdownRequest
14✔
2985
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
14✔
2986
                        resp.Error = NewJSInvalidJSONError(err)
×
2987
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2988
                        return
×
2989
                }
×
2990
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(meta, req.Placement); resp.Error != nil {
20✔
2991
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
6✔
2992
                        return
6✔
2993
                }
6✔
2994
        }
2995

2996
        // Call actual stepdown.
2997
        err = meta.StepDown(preferredLeader)
16✔
2998
        if err != nil {
16✔
2999
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
3000
        } else {
16✔
3001
                resp.Success = true
16✔
3002
        }
16✔
3003
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
16✔
3004
}
3005

3006
// Check if given []bytes is a JSON Object or Array.
3007
// Technically, valid JSON can also be a plain string or number, but for our use case,
3008
// we care only for JSON objects or arrays which starts with `[` or `{`.
3009
// This function does not have to ensure valid JSON in its entirety. It is used merely
3010
// to hint the codepath if it should attempt to parse the request as JSON or not.
3011
func isJSONObjectOrArray(req []byte) bool {
25,857✔
3012
        // Skip leading JSON whitespace (space, tab, newline, carriage return)
25,857✔
3013
        i := 0
25,857✔
3014
        for i < len(req) && (req[i] == ' ' || req[i] == '\t' || req[i] == '\n' || req[i] == '\r') {
25,869✔
3015
                i++
12✔
3016
        }
12✔
3017
        // Check for empty input after trimming
3018
        if i >= len(req) {
44,646✔
3019
                return false
18,789✔
3020
        }
18,789✔
3021
        // Check if the first non-whitespace character is '{' or '['
3022
        return req[i] == '{' || req[i] == '['
7,068✔
3023
}
3024

3025
func isEmptyRequest(req []byte) bool {
49,082✔
3026
        if len(req) == 0 {
95,518✔
3027
                return true
46,436✔
3028
        }
46,436✔
3029
        if bytes.Equal(req, []byte("{}")) {
2,647✔
3030
                return true
1✔
3031
        }
1✔
3032
        // If we are here we didn't get our simple match, but still could be valid.
3033
        var v any
2,645✔
3034
        if err := json.Unmarshal(req, &v); err != nil {
2,645✔
3035
                return false
×
3036
        }
×
3037
        vm, ok := v.(map[string]any)
2,645✔
3038
        if !ok {
2,645✔
3039
                return false
×
3040
        }
×
3041
        return len(vm) == 0
2,645✔
3042
}
3043

3044
// getStepDownPreferredPlacement attempts to work out what the best placement is
3045
// for a stepdown request. The preferred server name always takes precedence, but
3046
// if not specified, the placement will be used to filter by cluster. The caller
3047
// should check for return API errors and return those to the requestor if needed.
3048
func (s *Server) getStepDownPreferredPlacement(group RaftNode, placement *Placement) (string, *ApiError) {
40✔
3049
        if placement == nil {
42✔
3050
                return _EMPTY_, nil
2✔
3051
        }
2✔
3052
        var preferredLeader string
38✔
3053
        if placement.Preferred != _EMPTY_ {
56✔
3054
                for _, p := range group.Peers() {
70✔
3055
                        si, ok := s.nodeToInfo.Load(p.ID)
52✔
3056
                        if !ok || si == nil {
52✔
3057
                                continue
×
3058
                        }
3059
                        if si.(nodeInfo).name == placement.Preferred {
66✔
3060
                                preferredLeader = p.ID
14✔
3061
                                break
14✔
3062
                        }
3063
                }
3064
                if preferredLeader == group.ID() {
22✔
3065
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q is already leader", placement.Preferred))
4✔
3066
                }
4✔
3067
                if preferredLeader == _EMPTY_ {
18✔
3068
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q not known", placement.Preferred))
4✔
3069
                }
4✔
3070
        } else {
20✔
3071
                possiblePeers := make(map[*Peer]nodeInfo, len(group.Peers()))
20✔
3072
                ourID := group.ID()
20✔
3073
                for _, p := range group.Peers() {
116✔
3074
                        if p == nil {
96✔
3075
                                continue // ... shouldn't happen.
×
3076
                        }
3077
                        si, ok := s.nodeToInfo.Load(p.ID)
96✔
3078
                        if !ok || si == nil {
96✔
3079
                                continue
×
3080
                        }
3081
                        ni := si.(nodeInfo)
96✔
3082
                        if ni.offline || p.ID == ourID {
116✔
3083
                                continue
20✔
3084
                        }
3085
                        possiblePeers[p] = ni
76✔
3086
                }
3087
                // If cluster is specified, filter out anything not matching the cluster name.
3088
                if placement.Cluster != _EMPTY_ {
31✔
3089
                        for p, si := range possiblePeers {
51✔
3090
                                if si.cluster != placement.Cluster {
66✔
3091
                                        delete(possiblePeers, p)
26✔
3092
                                }
26✔
3093
                        }
3094
                }
3095
                // If tags are specified, filter out anything not matching all supplied tags.
3096
                if len(placement.Tags) > 0 {
32✔
3097
                        for p, si := range possiblePeers {
55✔
3098
                                matchesAll := true
43✔
3099
                                for _, tag := range placement.Tags {
93✔
3100
                                        if matchesAll = matchesAll && si.tags.Contains(tag); !matchesAll {
82✔
3101
                                                break
32✔
3102
                                        }
3103
                                }
3104
                                if !matchesAll {
75✔
3105
                                        delete(possiblePeers, p)
32✔
3106
                                }
32✔
3107
                        }
3108
                }
3109
                // If there are no possible peers, return an error.
3110
                if len(possiblePeers) == 0 {
28✔
3111
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
8✔
3112
                }
8✔
3113
                // Take advantage of random map iteration order to select the preferred.
3114
                for p := range possiblePeers {
24✔
3115
                        preferredLeader = p.ID
12✔
3116
                        break
12✔
3117
                }
3118
        }
3119
        return preferredLeader, nil
22✔
3120
}
3121

3122
// Request to delete a stream.
3123
func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
571✔
3124
        if c == nil || !s.JetStreamEnabled() {
571✔
3125
                return
×
3126
        }
×
3127
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
571✔
3128
        if err != nil {
571✔
3129
                s.Warnf(badAPIRequestT, msg)
×
3130
                return
×
3131
        }
×
3132

3133
        var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
571✔
3134

571✔
3135
        // Determine if we should proceed here when we are in clustered mode.
571✔
3136
        if s.JetStreamIsClustered() {
1,093✔
3137
                js, cc := s.getJetStreamCluster()
522✔
3138
                if js == nil || cc == nil {
522✔
3139
                        return
×
3140
                }
×
3141
                if js.isLeaderless() {
523✔
3142
                        resp.Error = NewJSClusterNotAvailError()
1✔
3143
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3144
                        return
1✔
3145
                }
1✔
3146
                // Make sure we are meta leader.
3147
                if !s.JetStreamIsLeader() {
925✔
3148
                        return
404✔
3149
                }
404✔
3150
        }
3151

3152
        if errorOnRequiredApiLevel(hdr) {
167✔
3153
                resp.Error = NewJSRequiredApiLevelError()
1✔
3154
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3155
                return
1✔
3156
        }
1✔
3157

3158
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
165✔
3159
                if doErr {
×
3160
                        resp.Error = NewJSNotEnabledForAccountError()
×
3161
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3162
                }
×
3163
                return
×
3164
        }
3165

3166
        if !isEmptyRequest(msg) {
166✔
3167
                resp.Error = NewJSNotEmptyRequestError()
1✔
3168
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3169
                return
1✔
3170
        }
1✔
3171
        stream := streamNameFromSubject(subject)
164✔
3172

164✔
3173
        // Clustered.
164✔
3174
        if s.JetStreamIsClustered() {
281✔
3175
                s.jsClusteredStreamDeleteRequest(ci, acc, stream, subject, reply, msg)
117✔
3176
                return
117✔
3177
        }
117✔
3178

3179
        mset, err := acc.lookupStream(stream)
47✔
3180
        if err != nil {
52✔
3181
                resp.Error = NewJSStreamNotFoundError(Unless(err))
5✔
3182
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
3183
                return
5✔
3184
        }
5✔
3185

3186
        if err := mset.delete(); err != nil {
42✔
3187
                resp.Error = NewJSStreamDeleteError(err, Unless(err))
×
3188
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3189
                return
×
3190
        }
×
3191
        resp.Success = true
42✔
3192
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
42✔
3193
}
3194

3195
// Request to delete a message.
3196
// This expects a stream sequence number as the msg body.
3197
func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,174✔
3198
        if c == nil || !s.JetStreamEnabled() {
1,183✔
3199
                return
9✔
3200
        }
9✔
3201
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
1,165✔
3202
        if err != nil {
1,165✔
3203
                s.Warnf(badAPIRequestT, msg)
×
3204
                return
×
3205
        }
×
3206

3207
        stream := tokenAt(subject, 6)
1,165✔
3208

1,165✔
3209
        var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
1,165✔
3210

1,165✔
3211
        // If we are in clustered mode we need to be the stream leader to proceed.
1,165✔
3212
        if s.JetStreamIsClustered() {
1,566✔
3213
                // Check to make sure the stream is assigned.
401✔
3214
                js, cc := s.getJetStreamCluster()
401✔
3215
                if js == nil || cc == nil {
408✔
3216
                        return
7✔
3217
                }
7✔
3218
                if js.isLeaderless() {
395✔
3219
                        resp.Error = NewJSClusterNotAvailError()
1✔
3220
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3221
                        return
1✔
3222
                }
1✔
3223

3224
                js.mu.RLock()
393✔
3225
                isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, stream)
393✔
3226
                js.mu.RUnlock()
393✔
3227

393✔
3228
                if isLeader && sa == nil {
393✔
3229
                        // We can't find the stream, so mimic what would be the errors below.
×
3230
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3231
                                if doErr {
×
3232
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3233
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3234
                                }
×
3235
                                return
×
3236
                        }
3237
                        // No stream present.
3238
                        resp.Error = NewJSStreamNotFoundError()
×
3239
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3240
                        return
×
3241
                } else if sa == nil {
393✔
3242
                        return
×
3243
                }
×
3244

3245
                // Check to see if we are a member of the group and if the group has no leader.
3246
                if js.isGroupLeaderless(sa.Group) {
393✔
3247
                        resp.Error = NewJSClusterNotAvailError()
×
3248
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3249
                        return
×
3250
                }
×
3251

3252
                // We have the stream assigned and a leader, so only the stream leader should answer.
3253
                if !acc.JetStreamIsStreamLeader(stream) {
650✔
3254
                        return
257✔
3255
                }
257✔
3256
        }
3257

3258
        if errorOnRequiredApiLevel(hdr) {
901✔
3259
                resp.Error = NewJSRequiredApiLevelError()
1✔
3260
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3261
                return
1✔
3262
        }
1✔
3263

3264
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
1,052✔
3265
                if doErr {
304✔
3266
                        resp.Error = NewJSNotEnabledForAccountError()
151✔
3267
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
151✔
3268
                }
151✔
3269
                return
153✔
3270
        }
3271
        if isEmptyRequest(msg) {
746✔
3272
                resp.Error = NewJSBadRequestError()
×
3273
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3274
                return
×
3275
        }
×
3276
        var req JSApiMsgDeleteRequest
746✔
3277
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
746✔
3278
                resp.Error = NewJSInvalidJSONError(err)
×
3279
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3280
                return
×
3281
        }
×
3282

3283
        mset, err := acc.lookupStream(stream)
746✔
3284
        if err != nil {
748✔
3285
                resp.Error = NewJSStreamNotFoundError(Unless(err))
2✔
3286
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3287
                return
2✔
3288
        }
2✔
3289
        if mset.cfg.Sealed {
746✔
3290
                resp.Error = NewJSStreamSealedError()
2✔
3291
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3292
                return
2✔
3293
        }
2✔
3294
        if mset.cfg.DenyDelete {
743✔
3295
                resp.Error = NewJSStreamMsgDeleteFailedError(errors.New("message delete not permitted"))
1✔
3296
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3297
                return
1✔
3298
        }
1✔
3299

3300
        if s.JetStreamIsClustered() {
875✔
3301
                s.jsClusteredMsgDeleteRequest(ci, acc, mset, stream, subject, reply, &req, rmsg)
134✔
3302
                return
134✔
3303
        }
134✔
3304

3305
        var removed bool
607✔
3306
        if req.NoErase {
1,212✔
3307
                removed, err = mset.removeMsg(req.Seq)
605✔
3308
        } else {
607✔
3309
                removed, err = mset.eraseMsg(req.Seq)
2✔
3310
        }
2✔
3311
        if err != nil {
607✔
3312
                resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err))
×
3313
        } else if !removed {
607✔
3314
                resp.Error = NewJSSequenceNotFoundError(req.Seq)
×
3315
        } else {
607✔
3316
                resp.Success = true
607✔
3317
        }
607✔
3318
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
607✔
3319
}
3320

3321
// Request to get a raw stream message.
3322
func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
3,118✔
3323
        if c == nil || !s.JetStreamEnabled() {
3,118✔
3324
                return
×
3325
        }
×
3326
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
3,118✔
3327
        if err != nil {
3,118✔
3328
                s.Warnf(badAPIRequestT, msg)
×
3329
                return
×
3330
        }
×
3331

3332
        stream := tokenAt(subject, 6)
3,118✔
3333

3,118✔
3334
        var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
3,118✔
3335

3,118✔
3336
        // If we are in clustered mode we need to be the stream leader to proceed.
3,118✔
3337
        if s.JetStreamIsClustered() {
5,034✔
3338
                // Check to make sure the stream is assigned.
1,916✔
3339
                js, cc := s.getJetStreamCluster()
1,916✔
3340
                if js == nil || cc == nil {
1,916✔
3341
                        return
×
3342
                }
×
3343
                if js.isLeaderless() {
1,916✔
3344
                        resp.Error = NewJSClusterNotAvailError()
×
3345
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3346
                        return
×
3347
                }
×
3348

3349
                js.mu.RLock()
1,916✔
3350
                isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, stream)
1,916✔
3351
                js.mu.RUnlock()
1,916✔
3352

1,916✔
3353
                if isLeader && sa == nil {
1,916✔
3354
                        // We can't find the stream, so mimic what would be the errors below.
×
3355
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3356
                                if doErr {
×
3357
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3358
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3359
                                }
×
3360
                                return
×
3361
                        }
3362
                        // No stream present.
3363
                        resp.Error = NewJSStreamNotFoundError()
×
3364
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3365
                        return
×
3366
                } else if sa == nil {
1,916✔
3367
                        return
×
3368
                }
×
3369

3370
                // Check to see if we are a member of the group and if the group has no leader.
3371
                if js.isGroupLeaderless(sa.Group) {
1,916✔
3372
                        resp.Error = NewJSClusterNotAvailError()
×
3373
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3374
                        return
×
3375
                }
×
3376

3377
                // We have the stream assigned and a leader, so only the stream leader should answer.
3378
                if !acc.JetStreamIsStreamLeader(stream) {
3,201✔
3379
                        return
1,285✔
3380
                }
1,285✔
3381
        }
3382

3383
        if errorOnRequiredApiLevel(hdr) {
1,834✔
3384
                resp.Error = NewJSRequiredApiLevelError()
1✔
3385
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3386
                return
1✔
3387
        }
1✔
3388

3389
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
1,835✔
3390
                if doErr {
3✔
3391
                        resp.Error = NewJSNotEnabledForAccountError()
×
3392
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3393
                }
×
3394
                return
3✔
3395
        }
3396
        if isEmptyRequest(msg) {
1,829✔
3397
                resp.Error = NewJSBadRequestError()
×
3398
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3399
                return
×
3400
        }
×
3401
        var req JSApiMsgGetRequest
1,829✔
3402
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
1,829✔
3403
                resp.Error = NewJSInvalidJSONError(err)
×
3404
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3405
                return
×
3406
        }
×
3407

3408
        // This version does not support batch.
3409
        if req.Batch > 0 || req.MaxBytes > 0 {
1,830✔
3410
                resp.Error = NewJSBadRequestError()
1✔
3411
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3412
                return
1✔
3413
        }
1✔
3414

3415
        // Validate non-conflicting options. Seq, LastFor, and AsOfTime are mutually exclusive.
3416
        // NextFor can be paired with Seq or AsOfTime indicating a filter subject.
3417
        if (req.Seq > 0 && req.LastFor != _EMPTY_) ||
1,828✔
3418
                (req.Seq == 0 && req.LastFor == _EMPTY_ && req.NextFor == _EMPTY_ && req.StartTime == nil) ||
1,828✔
3419
                (req.Seq > 0 && req.StartTime != nil) ||
1,828✔
3420
                (req.StartTime != nil && req.LastFor != _EMPTY_) ||
1,828✔
3421
                (req.LastFor != _EMPTY_ && req.NextFor != _EMPTY_) {
1,832✔
3422
                resp.Error = NewJSBadRequestError()
4✔
3423
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3424
                return
4✔
3425
        }
4✔
3426

3427
        mset, err := acc.lookupStream(stream)
1,824✔
3428
        if err != nil {
1,824✔
3429
                resp.Error = NewJSStreamNotFoundError()
×
3430
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3431
                return
×
3432
        }
×
3433
        if mset.offlineReason != _EMPTY_ {
1,824✔
3434
                // Just let the request time out.
×
3435
                return
×
3436
        }
×
3437

3438
        var svp StoreMsg
1,824✔
3439
        var sm *StoreMsg
1,824✔
3440

1,824✔
3441
        // Ensure this read request is isolated and doesn't interleave with writes.
1,824✔
3442
        mset.mu.RLock()
1,824✔
3443
        defer mset.mu.RUnlock()
1,824✔
3444

1,824✔
3445
        // If AsOfTime is set, perform this first to get the sequence.
1,824✔
3446
        var seq uint64
1,824✔
3447
        if req.StartTime != nil {
1,830✔
3448
                seq = mset.store.GetSeqFromTime(*req.StartTime)
6✔
3449
        } else {
1,824✔
3450
                seq = req.Seq
1,818✔
3451
        }
1,818✔
3452

3453
        if seq > 0 && req.NextFor == _EMPTY_ {
2,421✔
3454
                sm, err = mset.store.LoadMsg(seq, &svp)
597✔
3455
        } else if req.NextFor != _EMPTY_ {
1,926✔
3456
                sm, _, err = mset.store.LoadNextMsg(req.NextFor, subjectHasWildcard(req.NextFor), seq, &svp)
102✔
3457
        } else {
1,227✔
3458
                sm, err = mset.store.LoadLastMsg(req.LastFor, &svp)
1,125✔
3459
        }
1,125✔
3460
        if err != nil {
2,785✔
3461
                resp.Error = NewJSNoMessageFoundError()
961✔
3462
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
961✔
3463
                return
961✔
3464
        }
961✔
3465
        resp.Message = &StoredMsg{
863✔
3466
                Subject:  sm.subj,
863✔
3467
                Sequence: sm.seq,
863✔
3468
                Data:     sm.msg,
863✔
3469
                Time:     time.Unix(0, sm.ts).UTC(),
863✔
3470
        }
863✔
3471
        if !req.NoHeaders {
1,725✔
3472
                resp.Message.Header = sm.hdr
862✔
3473
        }
862✔
3474

3475
        // Don't send response through API layer for this call.
3476
        s.sendInternalAccountMsg(nil, reply, s.jsonResponse(resp))
863✔
3477
}
3478

3479
func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
28✔
3480
        if c == nil || !s.JetStreamEnabled() {
28✔
3481
                return
×
3482
        }
×
3483

3484
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
28✔
3485
        if err != nil {
28✔
3486
                s.Warnf(badAPIRequestT, msg)
×
3487
                return
×
3488
        }
×
3489

3490
        stream := streamNameFromSubject(subject)
28✔
3491
        consumer := consumerNameFromSubject(subject)
28✔
3492

28✔
3493
        var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}}
28✔
3494

28✔
3495
        if s.JetStreamIsClustered() {
46✔
3496
                // Check to make sure the stream is assigned.
18✔
3497
                js, cc := s.getJetStreamCluster()
18✔
3498
                if js == nil || cc == nil {
18✔
3499
                        return
×
3500
                }
×
3501

3502
                // First check if the stream and consumer is there.
3503
                js.mu.RLock()
18✔
3504
                sa := js.streamAssignment(acc.Name, stream)
18✔
3505
                if sa == nil {
21✔
3506
                        js.mu.RUnlock()
3✔
3507
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
3✔
3508
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3509
                        return
3✔
3510
                }
3✔
3511
                if sa.unsupported != nil {
15✔
3512
                        js.mu.RUnlock()
×
3513
                        // Just let the request time out.
×
3514
                        return
×
3515
                }
×
3516

3517
                ca, ok := sa.consumers[consumer]
15✔
3518
                if !ok || ca == nil {
18✔
3519
                        js.mu.RUnlock()
3✔
3520
                        resp.Error = NewJSConsumerNotFoundError()
3✔
3521
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3522
                        return
3✔
3523
                }
3✔
3524
                if ca.unsupported != nil {
12✔
3525
                        js.mu.RUnlock()
×
3526
                        // Just let the request time out.
×
3527
                        return
×
3528
                }
×
3529
                js.mu.RUnlock()
12✔
3530

12✔
3531
                // Then check if we are the leader.
12✔
3532
                mset, err := acc.lookupStream(stream)
12✔
3533
                if err != nil {
12✔
3534
                        return
×
3535
                }
×
3536

3537
                o := mset.lookupConsumer(consumer)
12✔
3538
                if o == nil {
12✔
3539
                        return
×
3540
                }
×
3541
                if !o.isLeader() {
20✔
3542
                        return
8✔
3543
                }
8✔
3544
        }
3545

3546
        if errorOnRequiredApiLevel(hdr) {
15✔
3547
                resp.Error = NewJSRequiredApiLevelError()
1✔
3548
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3549
                return
1✔
3550
        }
1✔
3551

3552
        var req JSApiConsumerUnpinRequest
13✔
3553
        if err := json.Unmarshal(msg, &req); err != nil {
13✔
3554
                resp.Error = NewJSInvalidJSONError(err)
×
3555
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3556
                return
×
3557
        }
×
3558

3559
        if req.Group == _EMPTY_ {
15✔
3560
                resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
2✔
3561
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3562
                return
2✔
3563
        }
2✔
3564

3565
        if !validGroupName.MatchString(req.Group) {
13✔
3566
                resp.Error = NewJSConsumerInvalidGroupNameError()
2✔
3567
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3568
                return
2✔
3569
        }
2✔
3570

3571
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
9✔
3572
                if doErr {
×
3573
                        resp.Error = NewJSNotEnabledForAccountError()
×
3574
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3575
                }
×
3576
                return
×
3577
        }
3578

3579
        mset, err := acc.lookupStream(stream)
9✔
3580
        if err != nil {
10✔
3581
                resp.Error = NewJSStreamNotFoundError()
1✔
3582
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3583
                return
1✔
3584
        }
1✔
3585
        if mset.offlineReason != _EMPTY_ {
8✔
3586
                // Just let the request time out.
×
3587
                return
×
3588
        }
×
3589
        o := mset.lookupConsumer(consumer)
8✔
3590
        if o == nil {
9✔
3591
                resp.Error = NewJSConsumerNotFoundError()
1✔
3592
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3593
                return
1✔
3594
        }
1✔
3595
        if o.offlineReason != _EMPTY_ {
7✔
3596
                // Just let the request time out.
×
3597
                return
×
3598
        }
×
3599

3600
        var foundPriority bool
7✔
3601
        for _, group := range o.config().PriorityGroups {
14✔
3602
                if group == req.Group {
12✔
3603
                        foundPriority = true
5✔
3604
                        break
5✔
3605
                }
3606
        }
3607
        if !foundPriority {
9✔
3608
                resp.Error = NewJSConsumerInvalidPriorityGroupError()
2✔
3609
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3610
                return
2✔
3611
        }
2✔
3612

3613
        o.mu.Lock()
5✔
3614
        o.unassignPinId()
5✔
3615
        o.sendUnpinnedAdvisoryLocked(req.Group, "admin")
5✔
3616
        o.mu.Unlock()
5✔
3617
        o.signalNewMessages()
5✔
3618
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
5✔
3619
}
3620

3621
// Request to purge a stream.
3622
func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
147✔
3623
        if c == nil || !s.JetStreamEnabled() {
147✔
3624
                return
×
3625
        }
×
3626
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
147✔
3627
        if err != nil {
147✔
3628
                s.Warnf(badAPIRequestT, msg)
×
3629
                return
×
3630
        }
×
3631

3632
        stream := streamNameFromSubject(subject)
147✔
3633

147✔
3634
        var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
147✔
3635

147✔
3636
        // If we are in clustered mode we need to be the stream leader to proceed.
147✔
3637
        if s.JetStreamIsClustered() {
257✔
3638
                // Check to make sure the stream is assigned.
110✔
3639
                js, cc := s.getJetStreamCluster()
110✔
3640
                if js == nil || cc == nil {
110✔
3641
                        return
×
3642
                }
×
3643

3644
                js.mu.RLock()
110✔
3645
                isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, stream)
110✔
3646
                js.mu.RUnlock()
110✔
3647

110✔
3648
                if isLeader && sa == nil {
110✔
3649
                        // We can't find the stream, so mimic what would be the errors below.
×
3650
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3651
                                if doErr {
×
3652
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3653
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3654
                                }
×
3655
                                return
×
3656
                        }
3657
                        // No stream present.
3658
                        resp.Error = NewJSStreamNotFoundError()
×
3659
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3660
                        return
×
3661
                } else if sa == nil {
111✔
3662
                        if js.isLeaderless() {
1✔
3663
                                resp.Error = NewJSClusterNotAvailError()
×
3664
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3665
                        }
×
3666
                        return
1✔
3667
                }
3668

3669
                // Check to see if we are a member of the group and if the group has no leader.
3670
                if js.isGroupLeaderless(sa.Group) {
109✔
3671
                        resp.Error = NewJSClusterNotAvailError()
×
3672
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3673
                        return
×
3674
                }
×
3675

3676
                // We have the stream assigned and a leader, so only the stream leader should answer.
3677
                if !acc.JetStreamIsStreamLeader(stream) {
183✔
3678
                        if js.isLeaderless() {
75✔
3679
                                resp.Error = NewJSClusterNotAvailError()
1✔
3680
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3681
                        }
1✔
3682
                        return
74✔
3683
                }
3684
        }
3685

3686
        if errorOnRequiredApiLevel(hdr) {
73✔
3687
                resp.Error = NewJSRequiredApiLevelError()
1✔
3688
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3689
                return
1✔
3690
        }
1✔
3691

3692
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
71✔
3693
                if doErr {
×
3694
                        resp.Error = NewJSNotEnabledForAccountError()
×
3695
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3696
                }
×
3697
                return
×
3698
        }
3699

3700
        var purgeRequest *JSApiStreamPurgeRequest
71✔
3701
        if isJSONObjectOrArray(msg) {
105✔
3702
                var req JSApiStreamPurgeRequest
34✔
3703
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
34✔
3704
                        resp.Error = NewJSInvalidJSONError(err)
×
3705
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3706
                        return
×
3707
                }
×
3708
                if req.Sequence > 0 && req.Keep > 0 {
34✔
3709
                        resp.Error = NewJSBadRequestError()
×
3710
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3711
                        return
×
3712
                }
×
3713
                purgeRequest = &req
34✔
3714
        }
3715

3716
        mset, err := acc.lookupStream(stream)
71✔
3717
        if err != nil {
71✔
3718
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
3719
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3720
                return
×
3721
        }
×
3722
        if mset.cfg.Sealed {
73✔
3723
                resp.Error = NewJSStreamSealedError()
2✔
3724
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3725
                return
2✔
3726
        }
2✔
3727
        if mset.cfg.DenyPurge {
70✔
3728
                resp.Error = NewJSStreamPurgeFailedError(errors.New("stream purge not permitted"))
1✔
3729
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3730
                return
1✔
3731
        }
1✔
3732

3733
        if s.JetStreamIsClustered() {
101✔
3734
                s.jsClusteredStreamPurgeRequest(ci, acc, mset, stream, subject, reply, rmsg, purgeRequest)
33✔
3735
                return
33✔
3736
        }
33✔
3737

3738
        purged, err := mset.purge(purgeRequest)
35✔
3739
        if err != nil {
35✔
3740
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
3741
        } else {
35✔
3742
                resp.Purged = purged
35✔
3743
                resp.Success = true
35✔
3744
        }
35✔
3745
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
35✔
3746
}
3747

3748
func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError {
1,535✔
3749
        var replicas int
1,535✔
3750
        if cfg != nil {
3,070✔
3751
                replicas = cfg.Replicas
1,535✔
3752
        }
1,535✔
3753
        selectedLimits, tier, jsa, apiErr := acc.selectLimits(replicas)
1,535✔
3754
        if apiErr != nil {
1,535✔
3755
                return apiErr
×
3756
        }
×
3757
        jsa.js.mu.RLock()
1,535✔
3758
        defer jsa.js.mu.RUnlock()
1,535✔
3759
        jsa.mu.RLock()
1,535✔
3760
        defer jsa.mu.RUnlock()
1,535✔
3761
        if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, cfg) >= selectedLimits.MaxStreams {
1,538✔
3762
                return NewJSMaximumStreamsLimitError()
3✔
3763
        }
3✔
3764
        reserved := jsa.tieredReservation(tier, cfg)
1,532✔
3765
        if err := jsa.js.checkAllLimits(selectedLimits, cfg, reserved, 0); err != nil {
1,539✔
3766
                return NewJSStreamLimitsError(err, Unless(err))
7✔
3767
        }
7✔
3768
        return nil
1,525✔
3769
}
3770

3771
// Request to restore a stream.
3772
func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
58✔
3773
        if c == nil || !s.JetStreamIsLeader() {
86✔
3774
                return
28✔
3775
        }
28✔
3776
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
30✔
3777
        if err != nil {
30✔
3778
                s.Warnf(badAPIRequestT, msg)
×
3779
                return
×
3780
        }
×
3781

3782
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
30✔
3783
        if errorOnRequiredApiLevel(hdr) {
31✔
3784
                resp.Error = NewJSRequiredApiLevelError()
1✔
3785
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3786
                return
1✔
3787
        }
1✔
3788
        if !acc.JetStreamEnabled() {
29✔
3789
                resp.Error = NewJSNotEnabledForAccountError()
×
3790
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3791
                return
×
3792
        }
×
3793
        if isEmptyRequest(msg) {
30✔
3794
                resp.Error = NewJSBadRequestError()
1✔
3795
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3796
                return
1✔
3797
        }
1✔
3798

3799
        var req JSApiStreamRestoreRequest
28✔
3800
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
28✔
3801
                resp.Error = NewJSInvalidJSONError(err)
×
3802
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3803
                return
×
3804
        }
×
3805

3806
        stream := streamNameFromSubject(subject)
28✔
3807

28✔
3808
        if stream != req.Config.Name && req.Config.Name == _EMPTY_ {
30✔
3809
                req.Config.Name = stream
2✔
3810
        }
2✔
3811
        if stream != req.Config.Name {
30✔
3812
                resp.Error = NewJSStreamMismatchError()
2✔
3813
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3814
                return
2✔
3815
        }
2✔
3816

3817
        // Check for path like separators in the name.
3818
        if strings.ContainsAny(stream, `\/`) {
26✔
3819
                resp.Error = NewJSStreamNameContainsPathSeparatorsError()
×
3820
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3821
                return
×
3822
        }
×
3823

3824
        if s.JetStreamIsClustered() {
39✔
3825
                s.jsClusteredStreamRestoreRequest(ci, acc, &req, subject, reply, rmsg)
13✔
3826
                return
13✔
3827
        }
13✔
3828

3829
        // check stream config at the start of the restore process, not at the end
3830
        cfg, apiErr := s.checkStreamCfg(&req.Config, acc, false)
13✔
3831
        if apiErr != nil {
14✔
3832
                resp.Error = apiErr
1✔
3833
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3834
                return
1✔
3835
        }
1✔
3836

3837
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil {
14✔
3838
                resp.Error = err
2✔
3839
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3840
                return
2✔
3841
        }
2✔
3842

3843
        if _, err := acc.lookupStream(stream); err == nil {
12✔
3844
                resp.Error = NewJSStreamNameExistRestoreFailedError()
2✔
3845
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3846
                return
2✔
3847
        }
2✔
3848

3849
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
8✔
3850
                if doErr {
×
3851
                        resp.Error = NewJSNotEnabledForAccountError()
×
3852
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3853
                }
×
3854
                return
×
3855
        }
3856

3857
        s.processStreamRestore(ci, acc, &cfg, subject, reply, string(msg))
8✔
3858
}
3859

3860
func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamConfig, subject, reply, msg string) <-chan error {
16✔
3861
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
16✔
3862

16✔
3863
        streamName := cfg.Name
16✔
3864
        s.Noticef("Starting restore for stream '%s > %s'", acc.Name, streamName)
16✔
3865

16✔
3866
        start := time.Now().UTC()
16✔
3867
        domain := s.getOpts().JetStreamDomain
16✔
3868
        s.publishAdvisory(acc, JSAdvisoryStreamRestoreCreatePre+"."+streamName, &JSRestoreCreateAdvisory{
16✔
3869
                TypedEvent: TypedEvent{
16✔
3870
                        Type: JSRestoreCreateAdvisoryType,
16✔
3871
                        ID:   nuid.Next(),
16✔
3872
                        Time: start,
16✔
3873
                },
16✔
3874
                Stream: streamName,
16✔
3875
                Client: ci.forAdvisory(),
16✔
3876
                Domain: domain,
16✔
3877
        })
16✔
3878

16✔
3879
        // Create our internal subscription to accept the snapshot.
16✔
3880
        restoreSubj := fmt.Sprintf(jsRestoreDeliverT, streamName, nuid.Next())
16✔
3881

16✔
3882
        type result struct {
16✔
3883
                err   error
16✔
3884
                reply string
16✔
3885
        }
16✔
3886

16✔
3887
        // For signaling to upper layers.
16✔
3888
        var resultOnce sync.Once
16✔
3889
        var closeOnce sync.Once
16✔
3890
        resultCh := make(chan result, 1)
16✔
3891
        pr, pw := io.Pipe()
16✔
3892

16✔
3893
        setResult := func(err error, reply string) {
32✔
3894
                resultOnce.Do(func() {
32✔
3895
                        resultCh <- result{err, reply}
16✔
3896
                })
16✔
3897
        }
3898
        activeQ := newIPQueue[int](s, fmt.Sprintf("[ACC:%s] stream '%s' restore", acc.Name, streamName))
16✔
3899
        restoreCh := make(chan struct {
16✔
3900
                mset *stream
16✔
3901
                err  error
16✔
3902
        }, 1)
16✔
3903
        closeWithError := func(err error) {
48✔
3904
                closeOnce.Do(func() {
48✔
3905
                        if err != nil {
17✔
3906
                                pw.CloseWithError(err)
1✔
3907
                        } else {
16✔
3908
                                pw.Close()
15✔
3909
                        }
15✔
3910
                })
3911
        }
3912

3913
        s.startGoRoutine(func() {
32✔
3914
                defer s.grWG.Done()
16✔
3915
                mset, err := acc.RestoreStream(cfg, pr)
16✔
3916
                if err != nil {
20✔
3917
                        pr.CloseWithError(err)
4✔
3918
                } else {
16✔
3919
                        pr.Close()
12✔
3920
                }
12✔
3921
                restoreCh <- struct {
16✔
3922
                        mset *stream
16✔
3923
                        err  error
16✔
3924
                }{
16✔
3925
                        mset: mset,
16✔
3926
                        err:  err,
16✔
3927
                }
16✔
3928
        })
3929

3930
        processChunk := func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
125✔
3931
                // We require reply subjects to communicate back failures, flow etc. If they do not have one log and cancel.
109✔
3932
                if reply == _EMPTY_ {
110✔
3933
                        sub.client.processUnsub(sub.sid)
1✔
3934
                        setResult(fmt.Errorf("restore for stream '%s > %s' requires reply subject for each chunk", acc.Name, streamName), reply)
1✔
3935
                        return
1✔
3936
                }
1✔
3937
                // Account client messages have \r\n on end. This is an error.
3938
                if len(msg) < LEN_CR_LF {
108✔
3939
                        sub.client.processUnsub(sub.sid)
×
3940
                        setResult(fmt.Errorf("restore for stream '%s > %s' received short chunk", acc.Name, streamName), reply)
×
3941
                        return
×
3942
                }
×
3943
                // Adjust.
3944
                msg = msg[:len(msg)-LEN_CR_LF]
108✔
3945

108✔
3946
                // This means we are complete with our transfer from the client.
108✔
3947
                if len(msg) == 0 {
123✔
3948
                        s.Debugf("Finished streaming restore for stream '%s > %s'", acc.Name, streamName)
15✔
3949
                        closeWithError(nil)
15✔
3950
                        setResult(nil, reply)
15✔
3951
                        return
15✔
3952
                }
15✔
3953

3954
                // Signal activity before and after the blocking write.
3955
                // The pre-write signal refreshes the stall watchdog when the
3956
                // chunk arrives; the post-write signal refreshes it again once
3957
                // RestoreStream has consumed the data. This keeps the idle
3958
                // window between chunks anchored to the end of the previous
3959
                // write instead of its start.
3960
                activeQ.push(0)
93✔
3961

93✔
3962
                if _, err := pw.Write(msg); err != nil {
93✔
3963
                        closeWithError(err)
×
3964
                        sub.client.processUnsub(sub.sid)
×
3965
                        var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
×
3966
                        if IsNatsErr(err, JSStorageResourcesExceededErr, JSMemoryResourcesExceededErr) {
×
3967
                                s.resourcesExceededError(cfg.Storage)
×
3968
                        }
×
3969
                        resp.Error = NewJSStreamRestoreError(err, Unless(err))
×
3970
                        if s.sendInternalAccountMsg(acc, reply, s.jsonResponse(&resp)) == nil {
×
3971
                                reply = _EMPTY_
×
3972
                        }
×
3973
                        setResult(err, reply)
×
3974
                        return
×
3975
                }
3976

3977
                activeQ.push(len(msg))
93✔
3978

93✔
3979
                s.sendInternalAccountMsg(acc, reply, nil)
93✔
3980
        }
3981

3982
        sub, err := acc.subscribeInternal(restoreSubj, processChunk)
16✔
3983
        if err != nil {
16✔
3984
                closeWithError(err)
×
3985
                resp.Error = NewJSRestoreSubscribeFailedError(err, restoreSubj)
×
3986
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
3987
                return nil
×
3988
        }
×
3989

3990
        // Mark the subject so the end user knows where to send the snapshot chunks.
3991
        resp.DeliverSubject = restoreSubj
16✔
3992
        s.sendAPIResponse(ci, acc, subject, reply, msg, s.jsonResponse(resp))
16✔
3993

16✔
3994
        // Returned to the caller to wait for completion.
16✔
3995
        doneCh := make(chan error, 1)
16✔
3996

16✔
3997
        // Monitor the progress from another Go routine.
16✔
3998
        s.startGoRoutine(func() {
32✔
3999
                defer s.grWG.Done()
16✔
4000
                defer func() {
32✔
4001
                        closeWithError(ErrConnectionClosed)
16✔
4002
                        sub.client.processUnsub(sub.sid)
16✔
4003
                        activeQ.unregister()
16✔
4004
                }()
16✔
4005

4006
                const activityInterval = 5 * time.Second
16✔
4007
                notActive := time.NewTimer(activityInterval)
16✔
4008
                defer notActive.Stop()
16✔
4009

16✔
4010
                total := 0
16✔
4011
                var inputDone bool
16✔
4012
                var replySubj string
16✔
4013
                var inputErr error
16✔
4014
                var restoreDone bool
16✔
4015
                var restoreResult struct {
16✔
4016
                        mset *stream
16✔
4017
                        err  error
16✔
4018
                }
16✔
4019

16✔
4020
                finish := func(reply string, err error, mset *stream) {
32✔
4021
                        end := time.Now().UTC()
16✔
4022

16✔
4023
                        s.publishAdvisory(acc, JSAdvisoryStreamRestoreCompletePre+"."+streamName, &JSRestoreCompleteAdvisory{
16✔
4024
                                TypedEvent: TypedEvent{
16✔
4025
                                        Type: JSRestoreCompleteAdvisoryType,
16✔
4026
                                        ID:   nuid.Next(),
16✔
4027
                                        Time: end,
16✔
4028
                                },
16✔
4029
                                Stream: streamName,
16✔
4030
                                Start:  start,
16✔
4031
                                End:    end,
16✔
4032
                                Bytes:  int64(total),
16✔
4033
                                Client: ci.forAdvisory(),
16✔
4034
                                Domain: domain,
16✔
4035
                        })
16✔
4036

16✔
4037
                        var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
16✔
4038
                        if err != nil {
20✔
4039
                                if IsNatsErr(err, JSStorageResourcesExceededErr, JSMemoryResourcesExceededErr) {
4✔
4040
                                        s.resourcesExceededError(cfg.Storage)
×
4041
                                }
×
4042
                                resp.Error = NewJSStreamRestoreError(err, Unless(err))
4✔
4043
                                s.Warnf("Restore failed for %s for stream '%s > %s' in %v",
4✔
4044
                                        friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start))
4✔
4045
                        } else {
12✔
4046
                                msetCfg := mset.config()
12✔
4047
                                resp.StreamInfo = &StreamInfo{
12✔
4048
                                        Created:   mset.createdTime(),
12✔
4049
                                        State:     mset.state(),
12✔
4050
                                        Config:    *setDynamicStreamMetadata(&msetCfg),
12✔
4051
                                        TimeStamp: time.Now().UTC(),
12✔
4052
                                }
12✔
4053
                                s.Noticef("Completed restore of %s for stream '%s > %s' in %v",
12✔
4054
                                        friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start).Round(time.Millisecond))
12✔
4055
                        }
12✔
4056
                        if reply != _EMPTY_ {
31✔
4057
                                s.sendInternalAccountMsg(acc, reply, s.jsonResponse(&resp))
15✔
4058
                        }
15✔
4059
                        doneCh <- err
16✔
4060
                }
4061

4062
                for {
234✔
4063
                        select {
218✔
4064
                        case result := <-resultCh:
16✔
4065
                                replySubj = result.reply
16✔
4066
                                inputDone = true
16✔
4067
                                inputErr = result.err
16✔
4068
                                notActive.Stop()
16✔
4069
                                if result.err != nil {
17✔
4070
                                        closeWithError(result.err)
1✔
4071
                                        s.Warnf(result.err.Error())
1✔
4072
                                }
1✔
4073
                                if restoreDone {
16✔
4074
                                        err := inputErr
×
4075
                                        if err == nil {
×
4076
                                                err = restoreResult.err
×
4077
                                        }
×
4078
                                        finish(replySubj, err, restoreResult.mset)
×
4079
                                        return
×
4080
                                }
4081
                        case rr := <-restoreCh:
16✔
4082
                                restoreDone = true
16✔
4083
                                restoreResult = rr
16✔
4084
                                if inputDone {
32✔
4085
                                        err := inputErr
16✔
4086
                                        if err == nil {
31✔
4087
                                                err = rr.err
15✔
4088
                                        }
15✔
4089
                                        finish(replySubj, err, rr.mset)
16✔
4090
                                        return
16✔
4091
                                }
4092
                        case <-activeQ.ch:
186✔
4093
                                if n, ok := activeQ.popOne(); ok {
372✔
4094
                                        total += n
186✔
4095
                                        if !inputDone {
372✔
4096
                                                notActive.Reset(activityInterval)
186✔
4097
                                        }
186✔
4098
                                }
4099
                        case <-notActive.C:
×
4100
                                err := fmt.Errorf("restore for stream '%s > %s' is stalled", acc.Name, streamName)
×
4101
                                closeWithError(err)
×
4102
                                doneCh <- err
×
4103
                                return
×
4104
                        }
4105
                }
4106
        })
4107

4108
        return doneCh
16✔
4109
}
4110

4111
// Process a snapshot request.
4112
func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
29✔
4113
        if c == nil || !s.JetStreamEnabled() {
29✔
4114
                return
×
4115
        }
×
4116
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
29✔
4117
        if err != nil {
29✔
4118
                s.Warnf(badAPIRequestT, msg)
×
4119
                return
×
4120
        }
×
4121

4122
        smsg := string(msg)
29✔
4123
        stream := streamNameFromSubject(subject)
29✔
4124

29✔
4125
        // If we are in clustered mode we need to be the stream leader to proceed.
29✔
4126
        if s.JetStreamIsClustered() && !acc.JetStreamIsStreamLeader(stream) {
44✔
4127
                return
15✔
4128
        }
15✔
4129

4130
        var resp = JSApiStreamSnapshotResponse{ApiResponse: ApiResponse{Type: JSApiStreamSnapshotResponseType}}
14✔
4131
        if errorOnRequiredApiLevel(hdr) {
15✔
4132
                resp.Error = NewJSRequiredApiLevelError()
1✔
4133
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4134
                return
1✔
4135
        }
1✔
4136
        if !acc.JetStreamEnabled() {
13✔
4137
                resp.Error = NewJSNotEnabledForAccountError()
×
4138
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4139
                return
×
4140
        }
×
4141
        if isEmptyRequest(msg) {
14✔
4142
                resp.Error = NewJSBadRequestError()
1✔
4143
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4144
                return
1✔
4145
        }
1✔
4146

4147
        mset, err := acc.lookupStream(stream)
12✔
4148
        if err != nil {
13✔
4149
                resp.Error = NewJSStreamNotFoundError(Unless(err))
1✔
4150
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4151
                return
1✔
4152
        }
1✔
4153

4154
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
4155
                if doErr {
×
4156
                        resp.Error = NewJSNotEnabledForAccountError()
×
4157
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4158
                }
×
4159
                return
×
4160
        }
4161

4162
        var req JSApiStreamSnapshotRequest
11✔
4163
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
4164
                resp.Error = NewJSInvalidJSONError(err)
×
4165
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4166
                return
×
4167
        }
×
4168
        if !IsValidSubject(req.DeliverSubject) {
12✔
4169
                resp.Error = NewJSSnapshotDeliverSubjectInvalidError()
1✔
4170
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4171
                return
1✔
4172
        }
1✔
4173

4174
        // We will do the snapshot in a go routine as well since check msgs may
4175
        // stall this go routine.
4176
        go func() {
20✔
4177
                if req.CheckMsgs {
12✔
4178
                        s.Noticef("Starting health check and snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
2✔
4179
                } else {
10✔
4180
                        s.Noticef("Starting snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
8✔
4181
                }
8✔
4182

4183
                start := time.Now().UTC()
10✔
4184

10✔
4185
                sr, err := mset.snapshot(0, req.CheckMsgs, !req.NoConsumers)
10✔
4186
                if err != nil {
10✔
4187
                        s.Warnf("Snapshot of stream '%s > %s' failed: %v", mset.jsa.account.Name, mset.name(), err)
×
4188
                        resp.Error = NewJSStreamSnapshotError(err, Unless(err))
×
4189
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4190
                        return
×
4191
                }
×
4192

4193
                config := mset.config()
10✔
4194
                resp.State = &sr.State
10✔
4195
                resp.Config = &config
10✔
4196

10✔
4197
                s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(resp))
10✔
4198

10✔
4199
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCreatePre+"."+mset.name(), &JSSnapshotCreateAdvisory{
10✔
4200
                        TypedEvent: TypedEvent{
10✔
4201
                                Type: JSSnapshotCreatedAdvisoryType,
10✔
4202
                                ID:   nuid.Next(),
10✔
4203
                                Time: time.Now().UTC(),
10✔
4204
                        },
10✔
4205
                        Stream: mset.name(),
10✔
4206
                        State:  sr.State,
10✔
4207
                        Client: ci.forAdvisory(),
10✔
4208
                        Domain: s.getOpts().JetStreamDomain,
10✔
4209
                })
10✔
4210

10✔
4211
                // Now do the real streaming.
10✔
4212
                s.streamSnapshot(acc, mset, sr, &req)
10✔
4213

10✔
4214
                end := time.Now().UTC()
10✔
4215

10✔
4216
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCompletePre+"."+mset.name(), &JSSnapshotCompleteAdvisory{
10✔
4217
                        TypedEvent: TypedEvent{
10✔
4218
                                Type: JSSnapshotCompleteAdvisoryType,
10✔
4219
                                ID:   nuid.Next(),
10✔
4220
                                Time: end,
10✔
4221
                        },
10✔
4222
                        Stream: mset.name(),
10✔
4223
                        Start:  start,
10✔
4224
                        End:    end,
10✔
4225
                        Client: ci.forAdvisory(),
10✔
4226
                        Domain: s.getOpts().JetStreamDomain,
10✔
4227
                })
10✔
4228

10✔
4229
                s.Noticef("Completed snapshot of %s for stream '%s > %s' in %v",
10✔
4230
                        friendlyBytes(int64(sr.State.Bytes)),
10✔
4231
                        mset.jsa.account.Name,
10✔
4232
                        mset.name(),
10✔
4233
                        end.Sub(start))
10✔
4234
        }()
4235
}
4236

4237
// Default chunk size for now.
4238
const defaultSnapshotChunkSize = 128 * 1024       // 128KiB
4239
const defaultSnapshotWindowSize = 8 * 1024 * 1024 // 8MiB
4240
const defaultSnapshotAckTimeout = 5 * time.Second
4241

4242
var snapshotAckTimeout = defaultSnapshotAckTimeout
4243

4244
// streamSnapshot will stream out our snapshot to the reply subject.
4245
func (s *Server) streamSnapshot(acc *Account, mset *stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) {
10✔
4246
        chunkSize, wndSize := req.ChunkSize, req.WindowSize
10✔
4247
        if chunkSize == 0 {
12✔
4248
                chunkSize = defaultSnapshotChunkSize
2✔
4249
        }
2✔
4250
        if wndSize == 0 {
20✔
4251
                wndSize = defaultSnapshotWindowSize
10✔
4252
        }
10✔
4253
        chunkSize = min(max(1024, chunkSize), 1024*1024) // Clamp within 1KiB to 1MiB
10✔
4254
        wndSize = min(max(1024, wndSize), 32*1024*1024)  // Clamp within 1KiB to 32MiB
10✔
4255
        wndSize = max(wndSize, chunkSize)                // Guarantee at least one chunk
10✔
4256
        maxInflight := wndSize / chunkSize               // Between 1 and 32,768
10✔
4257

10✔
4258
        // Setup for the chunk stream.
10✔
4259
        reply := req.DeliverSubject
10✔
4260
        r := sr.Reader
10✔
4261
        defer r.Close()
10✔
4262

10✔
4263
        // In case we run into an error, this allows subscription callbacks
10✔
4264
        // to not sit and block endlessly.
10✔
4265
        done := make(chan struct{})
10✔
4266
        defer close(done)
10✔
4267

10✔
4268
        // Check interest for the snapshot deliver subject.
10✔
4269
        inch := make(chan bool, 1)
10✔
4270
        acc.sl.RegisterNotification(req.DeliverSubject, inch)
10✔
4271
        defer acc.sl.ClearNotification(req.DeliverSubject, inch)
10✔
4272
        hasInterest := <-inch
10✔
4273
        if !hasInterest {
15✔
4274
                // Allow 2 seconds or so for interest to show up.
5✔
4275
                select {
5✔
4276
                case <-inch:
4✔
4277
                case <-time.After(2 * time.Second):
1✔
4278
                }
4279
        }
4280

4281
        // One slot per chunk. Each chunk read takes a slot, each ack will
4282
        // replace it. Smooths out in-flight number of chunks.
4283
        slots := make(chan struct{}, maxInflight)
10✔
4284
        for range maxInflight {
65,674✔
4285
                slots <- struct{}{}
65,664✔
4286
        }
65,664✔
4287

4288
        // We will place sequence number and size of chunk sent in the reply.
4289
        ackSubj := fmt.Sprintf(jsSnapshotAckT, mset.name(), nuid.Next())
10✔
4290
        ackSub, _ := mset.subscribeInternal(ackSubj+".>", func(_ *subscription, _ *client, _ *Account, subject, _ string, _ []byte) {
12✔
4291
                select {
2✔
4292
                case slots <- struct{}{}:
2✔
4293
                case <-done:
×
4294
                }
4295
        })
4296
        defer mset.unsubscribe(ackSub)
10✔
4297

10✔
4298
        var hdr []byte
10✔
4299
        chunk := make([]byte, chunkSize)
10✔
4300
        for index := 1; ; index++ {
56✔
4301
                select {
46✔
4302
                case <-slots:
46✔
4303
                        // A slot has become available.
4304
                case <-inch:
×
4305
                        // The receiver appears to have gone away.
×
4306
                        hdr = []byte("NATS/1.0 408 No Interest\r\n\r\n")
×
4307
                        goto done
×
4308
                case err := <-sr.errCh:
×
4309
                        // The snapshotting goroutine has failed for some reason.
×
4310
                        hdr = []byte(fmt.Sprintf("NATS/1.0 500 %s\r\n\r\n", err))
×
4311
                        goto done
×
4312
                case <-time.After(snapshotAckTimeout):
×
4313
                        // It's taking a very long time for the receiver to send us acks,
×
4314
                        // they have probably stalled or there is high loss on the link.
×
4315
                        hdr = []byte("NATS/1.0 408 No Flow Response\r\n\r\n")
×
4316
                        goto done
×
4317
                }
4318
                n, err := io.ReadFull(r, chunk)
46✔
4319
                chunk := chunk[:n]
46✔
4320
                if err != nil {
56✔
4321
                        if n > 0 {
20✔
4322
                                mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0))
10✔
4323
                        }
10✔
4324
                        break
10✔
4325
                }
4326
                ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index)
36✔
4327
                if hdr == nil {
41✔
4328
                        hdr = []byte("NATS/1.0 204\r\n\r\n")
5✔
4329
                }
5✔
4330
                mset.outq.send(newJSPubMsg(reply, _EMPTY_, ackReply, nil, chunk, nil, 0))
36✔
4331
        }
4332

4333
done:
4334
        mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
10✔
4335
}
4336

4337
// For determining consumer request type.
4338
type ccReqType uint8
4339

4340
const (
4341
        ccNew = iota
4342
        ccLegacyEphemeral
4343
        ccLegacyDurable
4344
)
4345

4346
// Request to create a consumer where stream and optional consumer name are part of the subject, and optional
4347
// filtered subjects can be at the tail end.
4348
// Assumes stream and consumer names are single tokens.
4349
func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
11,413✔
4350
        if c == nil || !s.JetStreamEnabled() {
11,413✔
4351
                return
×
4352
        }
×
4353

4354
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
11,413✔
4355
        if err != nil {
11,414✔
4356
                s.Warnf(badAPIRequestT, msg)
1✔
4357
                return
1✔
4358
        }
1✔
4359

4360
        var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
11,412✔
4361

11,412✔
4362
        var req CreateConsumerRequest
11,412✔
4363
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11,413✔
4364
                resp.Error = NewJSInvalidJSONError(err)
1✔
4365
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4366
                return
1✔
4367
        }
1✔
4368

4369
        var js *jetStream
11,411✔
4370
        isClustered := s.JetStreamIsClustered()
11,411✔
4371

11,411✔
4372
        // Determine if we should proceed here when we are in clustered mode.
11,411✔
4373
        direct := req.Config.Direct
11,411✔
4374
        if isClustered {
21,723✔
4375
                if direct {
10,802✔
4376
                        // If it's just a direct consumer, check for stream leader.
490✔
4377
                        if !req.Config.Sourcing {
490✔
4378
                                // Check to see if we have this stream and are the stream leader.
×
4379
                                if !acc.JetStreamIsStreamLeader(streamNameFromSubject(subject)) {
×
4380
                                        return
×
4381
                                }
×
4382
                        } else {
490✔
4383
                                // Otherwise, we either need this to be answered by the stream or meta leader.
490✔
4384
                                var cc *jetStreamCluster
490✔
4385
                                js, cc = s.getJetStreamCluster()
490✔
4386
                                if js == nil || cc == nil {
490✔
4387
                                        return
×
4388
                                }
×
4389
                                js.mu.RLock()
490✔
4390
                                sa := js.streamAssignmentOrInflight(acc.Name, streamNameFromSubject(subject))
490✔
4391
                                if sa == nil {
496✔
4392
                                        js.mu.RUnlock()
6✔
4393
                                        return
6✔
4394
                                }
6✔
4395
                                // If the stream is WQ or Interest, we need the meta leader to answer.
4396
                                if sa.Config.Retention != LimitsPolicy {
544✔
4397
                                        direct = false
60✔
4398
                                }
60✔
4399
                                js.mu.RUnlock()
484✔
4400
                                if direct {
908✔
4401
                                        // Check to see if we have this stream and are the stream leader.
424✔
4402
                                        if !acc.JetStreamIsStreamLeader(streamNameFromSubject(subject)) {
745✔
4403
                                                return
321✔
4404
                                        }
321✔
4405
                                } else {
60✔
4406
                                        if js.isLeaderless() {
60✔
4407
                                                resp.Error = NewJSClusterNotAvailError()
×
4408
                                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4409
                                                return
×
4410
                                        }
×
4411
                                        // Make sure we are meta leader.
4412
                                        if !s.JetStreamIsLeader() {
100✔
4413
                                                return
40✔
4414
                                        }
40✔
4415
                                }
4416
                        }
4417
                } else {
9,822✔
4418
                        var cc *jetStreamCluster
9,822✔
4419
                        js, cc = s.getJetStreamCluster()
9,822✔
4420
                        if js == nil || cc == nil {
9,822✔
4421
                                return
×
4422
                        }
×
4423
                        if js.isLeaderless() {
9,822✔
4424
                                resp.Error = NewJSClusterNotAvailError()
×
4425
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4426
                                return
×
4427
                        }
×
4428
                        // Make sure we are meta leader.
4429
                        if !s.JetStreamIsLeader() {
16,457✔
4430
                                return
6,635✔
4431
                        }
6,635✔
4432
                }
4433
        }
4434

4435
        if errorOnRequiredApiLevel(hdr) {
4,412✔
4436
                resp.Error = NewJSRequiredApiLevelError()
3✔
4437
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4438
                return
3✔
4439
        }
3✔
4440

4441
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
4,408✔
4442
                if doErr {
2✔
4443
                        resp.Error = NewJSNotEnabledForAccountError()
×
4444
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4445
                }
×
4446
                return
2✔
4447
        }
4448

4449
        var streamName, consumerName, filteredSubject string
4,404✔
4450
        var rt ccReqType
4,404✔
4451

4,404✔
4452
        if n := numTokens(subject); n < 5 {
4,404✔
4453
                s.Warnf(badAPIRequestT, msg)
×
4454
                return
×
4455
        } else if n == 5 {
5,265✔
4456
                // Legacy ephemeral.
861✔
4457
                rt = ccLegacyEphemeral
861✔
4458
                streamName = streamNameFromSubject(subject)
861✔
4459
                consumerName = req.Config.Name
861✔
4460
        } else {
4,404✔
4461
                // New style and durable legacy.
3,543✔
4462
                if tokenAt(subject, 4) == "DURABLE" {
3,842✔
4463
                        rt = ccLegacyDurable
299✔
4464
                        if n != 7 {
299✔
4465
                                resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4466
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4467
                                return
×
4468
                        }
×
4469
                        streamName = tokenAt(subject, 6)
299✔
4470
                        consumerName = tokenAt(subject, 7)
299✔
4471
                } else {
3,244✔
4472
                        streamName = streamNameFromSubject(subject)
3,244✔
4473
                        consumerName = consumerNameFromSubject(subject)
3,244✔
4474
                        // New has optional filtered subject as part of main subject..
3,244✔
4475
                        if n > 6 {
5,912✔
4476
                                tokens := strings.Split(subject, tsep)
2,668✔
4477
                                filteredSubject = strings.Join(tokens[6:], tsep)
2,668✔
4478
                        }
2,668✔
4479
                }
4480
        }
4481

4482
        if streamName != req.Stream {
4,405✔
4483
                resp.Error = NewJSStreamMismatchError()
1✔
4484
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4485
                return
1✔
4486
        }
1✔
4487

4488
        if consumerName != _EMPTY_ {
8,357✔
4489
                // Check for path like separators in the name.
3,954✔
4490
                if strings.ContainsAny(consumerName, `\/`) {
3,958✔
4491
                        resp.Error = NewJSConsumerNameContainsPathSeparatorsError()
4✔
4492
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4493
                        return
4✔
4494
                }
4✔
4495
        }
4496

4497
        // Should we expect a durable name
4498
        if rt == ccLegacyDurable {
4,698✔
4499
                if numTokens(subject) < 7 {
299✔
4500
                        resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4501
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4502
                        return
×
4503
                }
×
4504
                // Now check on requirements for durable request.
4505
                if req.Config.Durable == _EMPTY_ {
300✔
4506
                        resp.Error = NewJSConsumerDurableNameNotSetError()
1✔
4507
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4508
                        return
1✔
4509
                }
1✔
4510
                if consumerName != req.Config.Durable {
298✔
4511
                        resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4512
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4513
                        return
×
4514
                }
×
4515
                // Durable, so we need to honor the name.
4516
                req.Config.Name = consumerName
298✔
4517
        }
4518
        // If new style and durable set make sure they match.
4519
        if rt == ccNew {
7,638✔
4520
                if req.Config.Durable != _EMPTY_ {
5,963✔
4521
                        if consumerName != req.Config.Durable {
2,723✔
4522
                                resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4523
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4524
                                return
×
4525
                        }
×
4526
                }
4527
                // New style ephemeral so we need to honor the name.
4528
                req.Config.Name = consumerName
3,240✔
4529
        }
4530
        // Check for legacy ephemeral mis-configuration.
4531
        if rt == ccLegacyEphemeral && req.Config.Durable != _EMPTY_ {
4,401✔
4532
                resp.Error = NewJSConsumerEphemeralWithDurableNameError()
3✔
4533
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4534
                return
3✔
4535
        }
3✔
4536

4537
        // in case of multiple filters provided, error if new API is used.
4538
        if filteredSubject != _EMPTY_ && len(req.Config.FilterSubjects) != 0 {
4,396✔
4539
                resp.Error = NewJSConsumerMultipleFiltersNotAllowedError()
1✔
4540
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4541
                return
1✔
4542
        }
1✔
4543

4544
        // Check for a filter subject.
4545
        if filteredSubject != _EMPTY_ && req.Config.FilterSubject != filteredSubject {
4,396✔
4546
                resp.Error = NewJSConsumerCreateFilterSubjectMismatchError()
2✔
4547
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
4548
                return
2✔
4549
        }
2✔
4550

4551
        if isClustered && !direct {
7,597✔
4552
                s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
3,205✔
4553
                return
3,205✔
4554
        }
3,205✔
4555

4556
        // If we are here we are single server mode.
4557
        if req.Config.Replicas > 1 {
1,187✔
4558
                resp.Error = NewJSStreamReplicasNotSupportedError()
×
4559
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4560
                return
×
4561
        }
×
4562

4563
        stream, err := acc.lookupStream(req.Stream)
1,187✔
4564
        if err != nil {
1,191✔
4565
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
4566
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4567
                return
4✔
4568
        }
4✔
4569
        if stream.offlineReason != _EMPTY_ {
1,183✔
4570
                resp.Error = NewJSStreamOfflineReasonError(errors.New(stream.offlineReason))
×
4571
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4572
                return
×
4573
        }
×
4574

4575
        // If the consumer is a direct sourcing consumer, we need to "upgrade"
4576
        // it to be durable without AckNone if not a Limits-based stream.
4577
        if req.Config.Direct && req.Config.Sourcing && req.Config.Name != _EMPTY_ {
1,403✔
4578
                if !isClustered && stream.isInterestRetention() {
238✔
4579
                        req.Config.Direct = false
18✔
4580
                        req.Config.Durable = req.Config.Name
18✔
4581
                        req.Config.AckPolicy = AckFlowControl
18✔
4582
                        req.Config.AckWait = 0
18✔
4583
                        req.Config.MaxDeliver = 0
18✔
4584
                        req.Config.InactiveThreshold = 0
18✔
4585
                } else {
220✔
4586
                        // Otherwise, need to append a randomized suffix since the source uses a stable name.
202✔
4587
                        req.Config.Name = fmt.Sprintf("%s-%s", req.Config.Name, createConsumerName())
202✔
4588
                        consumerName = req.Config.Name
202✔
4589
                }
202✔
4590
        }
4591

4592
        if o := stream.lookupConsumer(consumerName); o != nil {
1,238✔
4593
                if o.offlineReason != _EMPTY_ {
55✔
4594
                        resp.Error = NewJSConsumerOfflineReasonError(errors.New(o.offlineReason))
×
4595
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4596
                        return
×
4597
                }
×
4598
                // If the consumer already exists then don't allow updating the PauseUntil, just set
4599
                // it back to whatever the current configured value is.
4600
                o.mu.RLock()
55✔
4601
                req.Config.PauseUntil = o.cfg.PauseUntil
55✔
4602
                // If a durable sourcing consumer is used, we need to reset the deliver policy.
55✔
4603
                if req.Config.Sourcing && req.Config.Durable != _EMPTY_ {
55✔
4604
                        req.Config.DeliverPolicy = o.cfg.DeliverPolicy
×
4605
                        req.Config.OptStartSeq = o.cfg.OptStartSeq
×
4606
                        req.Config.OptStartTime = o.cfg.OptStartTime
×
4607
                }
×
4608
                o.mu.RUnlock()
55✔
4609
        }
4610

4611
        // Initialize/update asset version metadata.
4612
        setStaticConsumerMetadata(&req.Config)
1,183✔
4613

1,183✔
4614
        o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic)
1,183✔
4615

1,183✔
4616
        if err != nil {
1,240✔
4617
                if IsNatsErr(err, JSConsumerStoreFailedErrF) {
57✔
4618
                        cname := req.Config.Durable // Will be empty if ephemeral.
×
4619
                        s.Warnf("Consumer create failed for '%s > %s > %s': %v", acc, req.Stream, cname, err)
×
4620
                        err = errConsumerStoreFailed
×
4621
                }
×
4622
                resp.Error = NewJSConsumerCreateError(err, Unless(err))
57✔
4623
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
57✔
4624
                return
57✔
4625
        }
4626
        resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.initialInfo())
1,126✔
4627
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,126✔
4628

1,126✔
4629
        o.mu.RLock()
1,126✔
4630
        if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
1,130✔
4631
                o.sendPauseAdvisoryLocked(&o.cfg)
4✔
4632
        }
4✔
4633
        o.mu.RUnlock()
1,126✔
4634
}
4635

4636
// Request for the list of all consumer names.
4637
func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
41✔
4638
        if c == nil || !s.JetStreamEnabled() {
41✔
4639
                return
×
4640
        }
×
4641
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
41✔
4642
        if err != nil {
41✔
4643
                s.Warnf(badAPIRequestT, msg)
×
4644
                return
×
4645
        }
×
4646

4647
        var resp = JSApiConsumerNamesResponse{
41✔
4648
                ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType},
41✔
4649
                Consumers:   []string{},
41✔
4650
        }
41✔
4651

41✔
4652
        // Determine if we should proceed here when we are in clustered mode.
41✔
4653
        if s.JetStreamIsClustered() {
74✔
4654
                js, cc := s.getJetStreamCluster()
33✔
4655
                if js == nil || cc == nil {
33✔
4656
                        return
×
4657
                }
×
4658
                if js.isLeaderless() {
33✔
4659
                        resp.Error = NewJSClusterNotAvailError()
×
4660
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4661
                        return
×
4662
                }
×
4663
                // Make sure we are meta leader.
4664
                if !s.JetStreamIsLeader() {
55✔
4665
                        return
22✔
4666
                }
22✔
4667
        }
4668

4669
        if errorOnRequiredApiLevel(hdr) {
20✔
4670
                resp.Error = NewJSRequiredApiLevelError()
1✔
4671
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4672
                return
1✔
4673
        }
1✔
4674

4675
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
18✔
4676
                if doErr {
×
4677
                        resp.Error = NewJSNotEnabledForAccountError()
×
4678
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4679
                }
×
4680
                return
×
4681
        }
4682

4683
        var offset int
18✔
4684
        if isJSONObjectOrArray(msg) {
30✔
4685
                var req JSApiConsumersRequest
12✔
4686
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
4687
                        resp.Error = NewJSInvalidJSONError(err)
×
4688
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4689
                        return
×
4690
                }
×
4691
                offset = max(req.Offset, 0)
12✔
4692
        }
4693

4694
        streamName := streamNameFromSubject(subject)
18✔
4695
        var numConsumers int
18✔
4696

18✔
4697
        if s.JetStreamIsClustered() {
29✔
4698
                js, cc := s.getJetStreamCluster()
11✔
4699
                if js == nil || cc == nil {
11✔
4700
                        // TODO(dlc) - Debug or Warn?
×
4701
                        return
×
4702
                }
×
4703
                js.mu.RLock()
11✔
4704
                sas := cc.streams[acc.Name]
11✔
4705
                if sas == nil {
11✔
4706
                        js.mu.RUnlock()
×
4707
                        resp.Error = NewJSStreamNotFoundError()
×
4708
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4709
                        return
×
4710
                }
×
4711
                sa := sas[streamName]
11✔
4712
                if sa == nil || sa.err != nil {
11✔
4713
                        js.mu.RUnlock()
×
4714
                        resp.Error = NewJSStreamNotFoundError()
×
4715
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4716
                        return
×
4717
                }
×
4718
                for consumer := range sa.consumers {
25✔
4719
                        resp.Consumers = append(resp.Consumers, consumer)
14✔
4720
                }
14✔
4721
                if len(resp.Consumers) > 1 {
15✔
4722
                        slices.Sort(resp.Consumers)
4✔
4723
                }
4✔
4724
                numConsumers = len(resp.Consumers)
11✔
4725
                if offset > numConsumers {
11✔
4726
                        offset = numConsumers
×
4727
                }
×
4728
                resp.Consumers = resp.Consumers[offset:]
11✔
4729
                if len(resp.Consumers) > JSApiNamesLimit {
11✔
4730
                        resp.Consumers = resp.Consumers[:JSApiNamesLimit]
×
4731
                }
×
4732
                js.mu.RUnlock()
11✔
4733

4734
        } else {
7✔
4735
                mset, err := acc.lookupStream(streamName)
7✔
4736
                if err != nil {
7✔
4737
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4738
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4739
                        return
×
4740
                }
×
4741

4742
                obs := mset.getPublicConsumers()
7✔
4743
                slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
7✔
4744

4745
                numConsumers = len(obs)
7✔
4746
                if offset > numConsumers {
7✔
4747
                        offset = numConsumers
×
4748
                }
×
4749

4750
                for _, o := range obs[offset:] {
12✔
4751
                        resp.Consumers = append(resp.Consumers, o.String())
5✔
4752
                        if len(resp.Consumers) >= JSApiNamesLimit {
5✔
4753
                                break
×
4754
                        }
4755
                }
4756
        }
4757
        resp.Total = numConsumers
18✔
4758
        resp.Limit = JSApiNamesLimit
18✔
4759
        resp.Offset = offset
18✔
4760
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
18✔
4761
}
4762

4763
// Request for the list of all detailed consumer information.
4764
func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
94✔
4765
        if c == nil || !s.JetStreamEnabled() {
98✔
4766
                return
4✔
4767
        }
4✔
4768

4769
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
90✔
4770
        if err != nil {
90✔
4771
                s.Warnf(badAPIRequestT, msg)
×
4772
                return
×
4773
        }
×
4774

4775
        var resp = JSApiConsumerListResponse{
90✔
4776
                ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
90✔
4777
                Consumers:   []*ConsumerInfo{},
90✔
4778
        }
90✔
4779

90✔
4780
        // Determine if we should proceed here when we are in clustered mode.
90✔
4781
        if s.JetStreamIsClustered() {
170✔
4782
                js, cc := s.getJetStreamCluster()
80✔
4783
                if js == nil || cc == nil {
80✔
4784
                        return
×
4785
                }
×
4786
                if js.isLeaderless() {
81✔
4787
                        resp.Error = NewJSClusterNotAvailError()
1✔
4788
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4789
                        return
1✔
4790
                }
1✔
4791
                // Make sure we are meta leader.
4792
                if !s.JetStreamIsLeader() {
138✔
4793
                        return
59✔
4794
                }
59✔
4795
        }
4796

4797
        if errorOnRequiredApiLevel(hdr) {
31✔
4798
                resp.Error = NewJSRequiredApiLevelError()
1✔
4799
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4800
                return
1✔
4801
        }
1✔
4802

4803
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
29✔
4804
                if doErr {
×
4805
                        resp.Error = NewJSNotEnabledForAccountError()
×
4806
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4807
                }
×
4808
                return
×
4809
        }
4810

4811
        var offset int
29✔
4812
        if isJSONObjectOrArray(msg) {
44✔
4813
                var req JSApiConsumersRequest
15✔
4814
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
16✔
4815
                        resp.Error = NewJSInvalidJSONError(err)
1✔
4816
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4817
                        return
1✔
4818
                }
1✔
4819
                offset = max(req.Offset, 0)
14✔
4820
        }
4821

4822
        streamName := streamNameFromSubject(subject)
28✔
4823

28✔
4824
        // Clustered mode will invoke a scatter and gather.
28✔
4825
        if s.JetStreamIsClustered() {
48✔
4826
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
20✔
4827
                msg = copyBytes(msg)
20✔
4828
                s.startGoRoutine(func() {
40✔
4829
                        s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg)
20✔
4830
                })
20✔
4831
                return
20✔
4832
        }
4833

4834
        mset, err := acc.lookupStream(streamName)
8✔
4835
        if err != nil {
8✔
4836
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4837
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4838
                return
×
4839
        }
×
4840

4841
        obs := mset.getPublicConsumers()
8✔
4842
        slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
8✔
4843

4844
        ocnt := len(obs)
8✔
4845
        if offset > ocnt {
8✔
4846
                offset = ocnt
×
4847
        }
×
4848

4849
        var missingNames []string
8✔
4850
        for _, o := range obs[offset:] {
15✔
4851
                if o.offlineReason != _EMPTY_ {
8✔
4852
                        if resp.Offline == nil {
2✔
4853
                                resp.Offline = make(map[string]string, 1)
1✔
4854
                        }
1✔
4855
                        resp.Offline[o.name] = o.offlineReason
1✔
4856
                        missingNames = append(missingNames, o.name)
1✔
4857
                        continue
1✔
4858
                }
4859
                if cinfo := o.info(); cinfo != nil {
12✔
4860
                        resp.Consumers = append(resp.Consumers, cinfo)
6✔
4861
                }
6✔
4862
                if len(resp.Consumers) >= JSApiListLimit {
6✔
4863
                        break
×
4864
                }
4865
        }
4866
        resp.Total = ocnt
8✔
4867
        resp.Limit = JSApiListLimit
8✔
4868
        resp.Offset = offset
8✔
4869
        resp.Missing = missingNames
8✔
4870
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
8✔
4871
}
4872

4873
// Request for information about an consumer.
4874
func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
43,506✔
4875
        if c == nil || !s.JetStreamEnabled() {
43,506✔
4876
                return
×
4877
        }
×
4878
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
43,506✔
4879
        if err != nil {
43,506✔
4880
                s.Warnf(badAPIRequestT, msg)
×
4881
                return
×
4882
        }
×
4883

4884
        streamName := streamNameFromSubject(subject)
43,506✔
4885
        consumerName := consumerNameFromSubject(subject)
43,506✔
4886

43,506✔
4887
        var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}
43,506✔
4888

43,506✔
4889
        if !isEmptyRequest(msg) {
43,507✔
4890
                resp.Error = NewJSNotEmptyRequestError()
1✔
4891
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4892
                return
1✔
4893
        }
1✔
4894

4895
        // If we are in clustered mode we need to be the consumer leader to proceed.
4896
        if s.JetStreamIsClustered() {
86,462✔
4897
                // Check to make sure the consumer is assigned.
42,957✔
4898
                js, cc := s.getJetStreamCluster()
42,957✔
4899
                if js == nil || cc == nil {
42,957✔
4900
                        return
×
4901
                }
×
4902

4903
                js.mu.RLock()
42,957✔
4904
                meta := cc.meta
42,957✔
4905
                js.mu.RUnlock()
42,957✔
4906

42,957✔
4907
                if meta == nil {
42,957✔
4908
                        return
×
4909
                }
×
4910

4911
                // Since these could wait on the Raft group lock, don't do so under the JS lock.
4912
                ourID := meta.ID()
42,957✔
4913
                groupLeaderless := meta.Leaderless()
42,957✔
4914
                groupCreated := meta.Created()
42,957✔
4915

42,957✔
4916
                js.mu.RLock()
42,957✔
4917
                isLeader, sa, ca := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, streamName), js.consumerAssignmentOrInflight(acc.Name, streamName, consumerName)
42,957✔
4918
                var rg *raftGroup
42,957✔
4919
                var offline, isMember bool
42,957✔
4920
                if ca != nil {
48,250✔
4921
                        if rg = ca.Group; rg != nil {
10,586✔
4922
                                offline = s.allPeersOffline(rg)
5,293✔
4923
                                isMember = rg.isMember(ourID)
5,293✔
4924
                        }
5,293✔
4925
                        if ca.unsupported != nil && isMember {
5,311✔
4926
                                // If we're a member for this consumer, and it's not supported, report it as offline.
18✔
4927
                                resp.Error = NewJSConsumerOfflineReasonError(errors.New(ca.unsupported.reason))
18✔
4928
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
18✔
4929
                                js.mu.RUnlock()
18✔
4930
                                return
18✔
4931
                        }
18✔
4932
                }
4933
                // Capture consumer leader here.
4934
                isConsumerLeader := cc.isConsumerLeader(acc.Name, streamName, consumerName)
42,939✔
4935
                // Also capture if we think there is no meta leader.
42,939✔
4936
                var isLeaderLess bool
42,939✔
4937
                if !isLeader {
71,791✔
4938
                        isLeaderLess = groupLeaderless && time.Since(groupCreated) > lostQuorumIntervalDefault
28,852✔
4939
                }
28,852✔
4940
                js.mu.RUnlock()
42,939✔
4941

42,939✔
4942
                if isLeader && ca == nil {
55,406✔
4943
                        // We can't find the consumer, so mimic what would be the errors below.
12,467✔
4944
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
12,467✔
4945
                                if doErr {
×
4946
                                        resp.Error = NewJSNotEnabledForAccountError()
×
4947
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4948
                                }
×
4949
                                return
×
4950
                        }
4951
                        if sa == nil {
22,470✔
4952
                                resp.Error = NewJSStreamNotFoundError()
10,003✔
4953
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
10,003✔
4954
                                return
10,003✔
4955
                        }
10,003✔
4956
                        // If we are here the consumer is not present.
4957
                        resp.Error = NewJSConsumerNotFoundError()
2,464✔
4958
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2,464✔
4959
                        return
2,464✔
4960
                } else if ca == nil {
55,669✔
4961
                        if isLeaderLess {
25,199✔
4962
                                resp.Error = NewJSClusterNotAvailError()
2✔
4963
                                // Delaying an error response gives the leader a chance to respond before us
2✔
4964
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
4965
                        }
2✔
4966
                        return
25,197✔
4967
                } else if isLeader && offline {
5,278✔
4968
                        resp.Error = NewJSConsumerOfflineError()
3✔
4969
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
3✔
4970
                        return
3✔
4971
                }
3✔
4972

4973
                // Check to see if we are a member of the group and if the group has no leader.
4974
                if isMember && js.isGroupLeaderless(ca.Group) {
5,273✔
4975
                        resp.Error = NewJSClusterNotAvailError()
1✔
4976
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4977
                        return
1✔
4978
                }
1✔
4979

4980
                // We have the consumer assigned and a leader, so only the consumer leader should answer.
4981
                if !isConsumerLeader {
8,933✔
4982
                        if isLeaderLess {
3,662✔
4983
                                resp.Error = NewJSClusterNotAvailError()
×
4984
                                // Delaying an error response gives the leader a chance to respond before us
×
4985
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group, errRespDelay)
×
4986
                                return
×
4987
                        }
×
4988

4989
                        var node RaftNode
3,662✔
4990
                        var leaderNotPartOfGroup bool
3,662✔
4991

3,662✔
4992
                        // We have a consumer assignment.
3,662✔
4993
                        if isMember {
6,420✔
4994
                                js.mu.RLock()
2,758✔
4995
                                if rg != nil && rg.node != nil {
5,515✔
4996
                                        node = rg.node
2,757✔
4997
                                        if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
2,757✔
4998
                                                leaderNotPartOfGroup = true
×
4999
                                        }
×
5000
                                }
5001
                                js.mu.RUnlock()
2,758✔
5002
                        }
5003

5004
                        // Check if we should ignore all together.
5005
                        if node == nil {
4,567✔
5006
                                // We have been assigned but have not created a node yet. If we are a member return
905✔
5007
                                // our config and defaults for state and no cluster info.
905✔
5008
                                if isMember {
906✔
5009
                                        // Since we access consumerAssignment, need js lock.
1✔
5010
                                        js.mu.RLock()
1✔
5011
                                        resp.ConsumerInfo = &ConsumerInfo{
1✔
5012
                                                Stream:    ca.Stream,
1✔
5013
                                                Name:      ca.Name,
1✔
5014
                                                Created:   ca.Created,
1✔
5015
                                                Config:    setDynamicConsumerMetadata(ca.Config),
1✔
5016
                                                TimeStamp: time.Now().UTC(),
1✔
5017
                                        }
1✔
5018
                                        b := s.jsonResponse(resp)
1✔
5019
                                        js.mu.RUnlock()
1✔
5020
                                        s.sendAPIResponse(ci, acc, subject, reply, string(msg), b)
1✔
5021
                                }
1✔
5022
                                return
905✔
5023
                        }
5024
                        // If we are a member and we have a group leader or we had a previous leader consider bailing out.
5025
                        if !node.Leaderless() || node.HadPreviousLeader() || (rg != nil && rg.Preferred != _EMPTY_ && rg.Preferred != ourID) {
5,510✔
5026
                                if leaderNotPartOfGroup {
2,753✔
5027
                                        resp.Error = NewJSConsumerOfflineError()
×
5028
                                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
5029
                                }
×
5030
                                return
2,753✔
5031
                        }
5032
                        // If we are here we are a member and this is just a new consumer that does not have a (preferred) leader yet.
5033
                        // Will fall through and return what we have. All consumers can respond but this should be very rare
5034
                        // but makes more sense to clients when they try to create, get a consumer exists, and then do consumer info.
5035
                }
5036
        }
5037

5038
        if errorOnRequiredApiLevel(hdr) {
2,162✔
5039
                resp.Error = NewJSRequiredApiLevelError()
1✔
5040
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5041
                return
1✔
5042
        }
1✔
5043

5044
        if !acc.JetStreamEnabled() {
2,160✔
5045
                resp.Error = NewJSNotEnabledForAccountError()
×
5046
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5047
                return
×
5048
        }
×
5049

5050
        mset, err := acc.lookupStream(streamName)
2,160✔
5051
        if err != nil {
2,160✔
5052
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5053
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5054
                return
×
5055
        }
×
5056

5057
        obs := mset.lookupConsumer(consumerName)
2,160✔
5058
        if obs == nil {
2,383✔
5059
                resp.Error = NewJSConsumerNotFoundError()
223✔
5060
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
223✔
5061
                return
223✔
5062
        }
223✔
5063

5064
        if obs.offlineReason != _EMPTY_ {
1,938✔
5065
                resp.Error = NewJSConsumerOfflineReasonError(errors.New(obs.offlineReason))
1✔
5066
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
5067
                return
1✔
5068
        }
1✔
5069

5070
        if resp.ConsumerInfo = setDynamicConsumerInfoMetadata(obs.info()); resp.ConsumerInfo == nil {
1,938✔
5071
                // This consumer returned nil which means it's closed. Respond with not found.
2✔
5072
                resp.Error = NewJSConsumerNotFoundError()
2✔
5073
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
5074
                return
2✔
5075
        }
2✔
5076
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,934✔
5077
}
5078

5079
// Request to delete an Consumer.
5080
func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
7,525✔
5081
        if c == nil || !s.JetStreamEnabled() {
7,530✔
5082
                return
5✔
5083
        }
5✔
5084
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
7,520✔
5085
        if err != nil {
7,520✔
5086
                s.Warnf(badAPIRequestT, msg)
×
5087
                return
×
5088
        }
×
5089

5090
        var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
7,520✔
5091

7,520✔
5092
        // Determine if we should proceed here when we are in clustered mode.
7,520✔
5093
        if s.JetStreamIsClustered() {
14,567✔
5094
                js, cc := s.getJetStreamCluster()
7,047✔
5095
                if js == nil || cc == nil {
7,048✔
5096
                        return
1✔
5097
                }
1✔
5098
                if js.isLeaderless() {
7,047✔
5099
                        resp.Error = NewJSClusterNotAvailError()
1✔
5100
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5101
                        return
1✔
5102
                }
1✔
5103
                // Make sure we are meta leader.
5104
                if !s.JetStreamIsLeader() {
11,725✔
5105
                        return
4,680✔
5106
                }
4,680✔
5107
        }
5108

5109
        if errorOnRequiredApiLevel(hdr) {
2,839✔
5110
                resp.Error = NewJSRequiredApiLevelError()
1✔
5111
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5112
                return
1✔
5113
        }
1✔
5114

5115
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
2,905✔
5116
                if doErr {
134✔
5117
                        resp.Error = NewJSNotEnabledForAccountError()
66✔
5118
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
66✔
5119
                }
66✔
5120
                return
68✔
5121
        }
5122
        if !isEmptyRequest(msg) {
2,770✔
5123
                resp.Error = NewJSNotEmptyRequestError()
1✔
5124
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5125
                return
1✔
5126
        }
1✔
5127
        stream := streamNameFromSubject(subject)
2,768✔
5128
        consumer := consumerNameFromSubject(subject)
2,768✔
5129

2,768✔
5130
        if s.JetStreamIsClustered() {
5,133✔
5131
                s.jsClusteredConsumerDeleteRequest(ci, acc, stream, consumer, subject, reply, rmsg)
2,365✔
5132
                return
2,365✔
5133
        }
2,365✔
5134

5135
        mset, err := acc.lookupStream(stream)
403✔
5136
        if err != nil {
404✔
5137
                resp.Error = NewJSStreamNotFoundError(Unless(err))
1✔
5138
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5139
                return
1✔
5140
        }
1✔
5141

5142
        obs := mset.lookupConsumer(consumer)
402✔
5143
        if obs == nil {
576✔
5144
                resp.Error = NewJSConsumerNotFoundError()
174✔
5145
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
174✔
5146
                return
174✔
5147
        }
174✔
5148
        if err := obs.delete(); err != nil {
228✔
5149
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
5150
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5151
                return
×
5152
        }
×
5153
        resp.Success = true
228✔
5154
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
228✔
5155
}
5156

5157
// Request to pause or unpause a Consumer.
5158
func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
6,464✔
5159
        if c == nil || !s.JetStreamEnabled() {
6,464✔
5160
                return
×
5161
        }
×
5162
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
6,464✔
5163
        if err != nil {
6,464✔
5164
                s.Warnf(badAPIRequestT, msg)
×
5165
                return
×
5166
        }
×
5167

5168
        var req JSApiConsumerPauseRequest
6,464✔
5169
        var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}}
6,464✔
5170

6,464✔
5171
        if isJSONObjectOrArray(msg) {
12,919✔
5172
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
6,455✔
5173
                        resp.Error = NewJSInvalidJSONError(err)
×
5174
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5175
                        return
×
5176
                }
×
5177
        }
5178

5179
        // Determine if we should proceed here when we are in clustered mode.
5180
        isClustered := s.JetStreamIsClustered()
6,464✔
5181
        js, cc := s.getJetStreamCluster()
6,464✔
5182
        if isClustered {
11,318✔
5183
                if js == nil || cc == nil {
4,854✔
5184
                        return
×
5185
                }
×
5186
                if js.isLeaderless() {
4,854✔
5187
                        resp.Error = NewJSClusterNotAvailError()
×
5188
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5189
                        return
×
5190
                }
×
5191
                // Make sure we are meta leader.
5192
                if !s.JetStreamIsLeader() {
8,094✔
5193
                        return
3,240✔
5194
                }
3,240✔
5195
        }
5196

5197
        if errorOnRequiredApiLevel(hdr) {
3,225✔
5198
                resp.Error = NewJSRequiredApiLevelError()
1✔
5199
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5200
                return
1✔
5201
        }
1✔
5202

5203
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
3,223✔
5204
                if doErr {
×
5205
                        resp.Error = NewJSNotEnabledForAccountError()
×
5206
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5207
                }
×
5208
                return
×
5209
        }
5210

5211
        stream := streamNameFromSubject(subject)
3,223✔
5212
        consumer := consumerNameFromSubject(subject)
3,223✔
5213

3,223✔
5214
        if isClustered {
4,837✔
5215
                js.mu.RLock()
1,614✔
5216
                sa := js.streamAssignment(acc.Name, stream)
1,614✔
5217
                if sa == nil {
1,614✔
5218
                        js.mu.RUnlock()
×
5219
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5220
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5221
                        return
×
5222
                }
×
5223
                if sa.unsupported != nil {
1,614✔
5224
                        js.mu.RUnlock()
×
5225
                        // Just let the request time out.
×
5226
                        return
×
5227
                }
×
5228

5229
                ca, ok := sa.consumers[consumer]
1,614✔
5230
                if !ok || ca == nil {
1,614✔
5231
                        js.mu.RUnlock()
×
5232
                        resp.Error = NewJSConsumerNotFoundError()
×
5233
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5234
                        return
×
5235
                }
×
5236
                if ca.unsupported != nil {
1,614✔
5237
                        js.mu.RUnlock()
×
5238
                        // Just let the request time out.
×
5239
                        return
×
5240
                }
×
5241

5242
                nca := ca.clone()
1,614✔
5243
                // We're only holding the read lock and release below,
1,614✔
5244
                // we need a copy to prevent concurrent reads/writes.
1,614✔
5245
                ncfg := *ca.Config
1,614✔
5246
                ncfg.Metadata = maps.Clone(ncfg.Metadata)
1,614✔
5247
                nca.Config = &ncfg
1,614✔
5248
                meta := cc.meta
1,614✔
5249
                js.mu.RUnlock()
1,614✔
5250
                pauseUTC := req.PauseUntil.UTC()
1,614✔
5251
                if !pauseUTC.IsZero() {
3,224✔
5252
                        nca.Config.PauseUntil = &pauseUTC
1,610✔
5253
                } else {
1,614✔
5254
                        nca.Config.PauseUntil = nil
4✔
5255
                }
4✔
5256

5257
                // Update asset version metadata due to updating pause/resume.
5258
                // Only PauseUntil is updated above, so reuse config for both.
5259
                setStaticConsumerMetadata(nca.Config)
1,614✔
5260

1,614✔
5261
                eca := encodeAddConsumerAssignment(nca)
1,614✔
5262
                meta.Propose(eca)
1,614✔
5263

1,614✔
5264
                resp.PauseUntil = pauseUTC
1,614✔
5265
                if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
3,224✔
5266
                        resp.PauseRemaining = time.Until(pauseUTC)
1,610✔
5267
                }
1,610✔
5268
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,614✔
5269
                return
1,614✔
5270
        }
5271

5272
        mset, err := acc.lookupStream(stream)
1,609✔
5273
        if err != nil {
1,609✔
5274
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5275
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5276
                return
×
5277
        }
×
5278
        if mset.offlineReason != _EMPTY_ {
1,609✔
5279
                // Just let the request time out.
×
5280
                return
×
5281
        }
×
5282

5283
        obs := mset.lookupConsumer(consumer)
1,609✔
5284
        if obs == nil {
1,609✔
5285
                resp.Error = NewJSConsumerNotFoundError()
×
5286
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5287
                return
×
5288
        }
×
5289
        if obs.offlineReason != _EMPTY_ {
1,609✔
5290
                // Just let the request time out.
×
5291
                return
×
5292
        }
×
5293

5294
        // We're only holding the read lock and release below,
5295
        // we need a copy to prevent concurrent reads/writes.
5296
        obs.mu.RLock()
1,609✔
5297
        ncfg := obs.cfg
1,609✔
5298
        ncfg.Metadata = maps.Clone(ncfg.Metadata)
1,609✔
5299
        obs.mu.RUnlock()
1,609✔
5300

1,609✔
5301
        pauseUTC := req.PauseUntil.UTC()
1,609✔
5302
        if !pauseUTC.IsZero() {
3,214✔
5303
                ncfg.PauseUntil = &pauseUTC
1,605✔
5304
        } else {
1,609✔
5305
                ncfg.PauseUntil = nil
4✔
5306
        }
4✔
5307

5308
        // Update asset version metadata due to updating pause/resume.
5309
        setStaticConsumerMetadata(&ncfg)
1,609✔
5310

1,609✔
5311
        if err := obs.updateConfig(&ncfg); err != nil {
1,609✔
5312
                // The only type of error that should be returned here is from o.store,
×
5313
                // so use a store failed error type.
×
5314
                resp.Error = NewJSConsumerStoreFailedError(err)
×
5315
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5316
                return
×
5317
        }
×
5318

5319
        resp.PauseUntil = pauseUTC
1,609✔
5320
        if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
3,214✔
5321
                resp.PauseRemaining = time.Until(pauseUTC)
1,605✔
5322
        }
1,605✔
5323
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,609✔
5324
}
5325

5326
// sendJetStreamAPIAuditAdvisory will send the audit event for a given event.
5327
func (s *Server) sendJetStreamAPIAuditAdvisory(ci *ClientInfo, acc *Account, subject, request, response string) {
78,832✔
5328
        s.publishAdvisory(acc, JSAuditAdvisory, JSAPIAudit{
78,832✔
5329
                TypedEvent: TypedEvent{
78,832✔
5330
                        Type: JSAPIAuditType,
78,832✔
5331
                        ID:   nuid.Next(),
78,832✔
5332
                        Time: time.Now().UTC(),
78,832✔
5333
                },
78,832✔
5334
                Server:   s.Name(),
78,832✔
5335
                Client:   ci.forAdvisory(),
78,832✔
5336
                Subject:  subject,
78,832✔
5337
                Request:  request,
78,832✔
5338
                Response: response,
78,832✔
5339
                Domain:   s.getOpts().JetStreamDomain,
78,832✔
5340
        })
78,832✔
5341
}
78,832✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc