• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 23037063390

12 Mar 2026 01:10PM UTC coverage: 83.085% (-0.04%) from 83.122%
23037063390

push

github

web-flow
[FIXED] Panic on malformed fixed32/fixed64 fields (#7941)

Add a length check when scanning for protocol fixed32 and fixed64
fields.

Signed-off-by: Daniele Sciascia <daniele@nats.io>

75339 of 90677 relevant lines covered (83.09%)

367545.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.0
/server/jetstream_api.go
1
// Copyright 2020-2026 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "cmp"
19
        "encoding/json"
20
        "errors"
21
        "fmt"
22
        "io"
23
        "os"
24
        "path/filepath"
25
        "runtime"
26
        "slices"
27
        "strings"
28
        "sync/atomic"
29
        "time"
30
        "unicode"
31

32
        "github.com/nats-io/nuid"
33
)
34

35
// Request API subjects for JetStream.
36
const (
37
        // All API endpoints.
38
        jsAllAPI = "$JS.API.>"
39

40
        // For constructing JetStream domain prefixes.
41
        jsDomainAPI = "$JS.%s.API.>"
42

43
        JSApiPrefix = "$JS.API"
44

45
        // JSApiAccountInfo is for obtaining general information about JetStream for this account.
46
        // Will return JSON response.
47
        JSApiAccountInfo = "$JS.API.INFO"
48

49
        // JSApiStreamCreate is the endpoint to create new streams.
50
        // Will return JSON response.
51
        JSApiStreamCreate  = "$JS.API.STREAM.CREATE.*"
52
        JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"
53

54
        // JSApiStreamUpdate is the endpoint to update existing streams.
55
        // Will return JSON response.
56
        JSApiStreamUpdate  = "$JS.API.STREAM.UPDATE.*"
57
        JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"
58

59
        // JSApiStreams is the endpoint to list all stream names for this account.
60
        // Will return JSON response.
61
        JSApiStreams = "$JS.API.STREAM.NAMES"
62
        // JSApiStreamList is the endpoint that will return all detailed stream information
63
        JSApiStreamList = "$JS.API.STREAM.LIST"
64

65
        // JSApiStreamInfo is for obtaining general information about a named stream.
66
        // Will return JSON response.
67
        JSApiStreamInfo  = "$JS.API.STREAM.INFO.*"
68
        JSApiStreamInfoT = "$JS.API.STREAM.INFO.%s"
69

70
        // JSApiStreamDelete is the endpoint to delete streams.
71
        // Will return JSON response.
72
        JSApiStreamDelete  = "$JS.API.STREAM.DELETE.*"
73
        JSApiStreamDeleteT = "$JS.API.STREAM.DELETE.%s"
74

75
        // JSApiStreamPurge is the endpoint to purge streams.
76
        // Will return JSON response.
77
        JSApiStreamPurge  = "$JS.API.STREAM.PURGE.*"
78
        JSApiStreamPurgeT = "$JS.API.STREAM.PURGE.%s"
79

80
        // JSApiStreamSnapshot is the endpoint to snapshot streams.
81
        // Will return a stream of chunks with a nil chunk as EOF to
82
        // the deliver subject. Caller should respond to each chunk
83
        // with a nil body response for ack flow.
84
        JSApiStreamSnapshot  = "$JS.API.STREAM.SNAPSHOT.*"
85
        JSApiStreamSnapshotT = "$JS.API.STREAM.SNAPSHOT.%s"
86

87
        // JSApiStreamRestore is the endpoint to restore a stream from a snapshot.
88
        // Caller should respond to each chunk with a nil body response.
89
        JSApiStreamRestore  = "$JS.API.STREAM.RESTORE.*"
90
        JSApiStreamRestoreT = "$JS.API.STREAM.RESTORE.%s"
91

92
        // JSApiMsgDelete is the endpoint to delete messages from a stream.
93
        // Will return JSON response.
94
        JSApiMsgDelete  = "$JS.API.STREAM.MSG.DELETE.*"
95
        JSApiMsgDeleteT = "$JS.API.STREAM.MSG.DELETE.%s"
96

97
        // JSApiMsgGet is the template for direct requests for a message by its stream sequence number.
98
        // Will return JSON response.
99
        JSApiMsgGet  = "$JS.API.STREAM.MSG.GET.*"
100
        JSApiMsgGetT = "$JS.API.STREAM.MSG.GET.%s"
101

102
        // JSDirectMsgGet is the template for non-api layer direct requests for a message by its stream sequence number or last by subject.
103
        // Will return the message similar to how a consumer receives the message, no JSON processing.
104
        // If the message can not be found we will use a status header of 404. If the stream does not exist the client will get a no-responders or timeout.
105
        JSDirectMsgGet  = "$JS.API.DIRECT.GET.*"
106
        JSDirectMsgGetT = "$JS.API.DIRECT.GET.%s"
107

108
        // This is a direct version of get last by subject, which will be the dominant pattern for KV access once 2.9 is released.
109
        // The stream and the key will be part of the subject to allow for no-marshal payloads and subject based security permissions.
110
        JSDirectGetLastBySubject  = "$JS.API.DIRECT.GET.*.>"
111
        JSDirectGetLastBySubjectT = "$JS.API.DIRECT.GET.%s.%s"
112

113
        // jsDirectGetPre
114
        jsDirectGetPre = "$JS.API.DIRECT.GET"
115

116
        // JSApiConsumerCreate is the endpoint to create consumers for streams.
117
        // This was also the legacy endpoint for ephemeral consumers.
118
        // It now can take consumer name and optional filter subject, which when part of the subject controls access.
119
        // Will return JSON response.
120
        JSApiConsumerCreate    = "$JS.API.CONSUMER.CREATE.*"
121
        JSApiConsumerCreateT   = "$JS.API.CONSUMER.CREATE.%s"
122
        JSApiConsumerCreateEx  = "$JS.API.CONSUMER.CREATE.*.>"
123
        JSApiConsumerCreateExT = "$JS.API.CONSUMER.CREATE.%s.%s.%s"
124

125
        // JSApiDurableCreate is the endpoint to create durable consumers for streams.
126
        // You need to include the stream and consumer name in the subject.
127
        JSApiDurableCreate  = "$JS.API.CONSUMER.DURABLE.CREATE.*.*"
128
        JSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s"
129

130
        // JSApiConsumers is the endpoint to list all consumer names for the stream.
131
        // Will return JSON response.
132
        JSApiConsumers  = "$JS.API.CONSUMER.NAMES.*"
133
        JSApiConsumersT = "$JS.API.CONSUMER.NAMES.%s"
134

135
        // JSApiConsumerList is the endpoint that will return all detailed consumer information
136
        JSApiConsumerList  = "$JS.API.CONSUMER.LIST.*"
137
        JSApiConsumerListT = "$JS.API.CONSUMER.LIST.%s"
138

139
        // JSApiConsumerInfo is for obtaining general information about a consumer.
140
        // Will return JSON response.
141
        JSApiConsumerInfo  = "$JS.API.CONSUMER.INFO.*.*"
142
        JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s"
143

144
        // JSApiConsumerDelete is the endpoint to delete consumers.
145
        // Will return JSON response.
146
        JSApiConsumerDelete  = "$JS.API.CONSUMER.DELETE.*.*"
147
        JSApiConsumerDeleteT = "$JS.API.CONSUMER.DELETE.%s.%s"
148

149
        // JSApiConsumerPause is the endpoint to pause or unpause consumers.
150
        // Will return JSON response.
151
        JSApiConsumerPause  = "$JS.API.CONSUMER.PAUSE.*.*"
152
        JSApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"
153

154
        // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
155
        JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
156

157
        // JSApiConsumerResetT is the prefix for resetting a given consumer to a new starting sequence.
158
        JSApiConsumerResetT = "$JS.API.CONSUMER.RESET.%s.%s"
159

160
        // JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
161
        JSApiConsumerUnpin  = "$JS.API.CONSUMER.UNPIN.*.*"
162
        JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"
163

164
        // jsRequestNextPre
165
        jsRequestNextPre = "$JS.API.CONSUMER.MSG.NEXT."
166

167
        // For snapshots and restores. The ack will have additional tokens.
168
        jsSnapshotAckT    = "$JS.SNAPSHOT.ACK.%s.%s"
169
        jsRestoreDeliverT = "$JS.SNAPSHOT.RESTORE.%s.%s"
170

171
        // JSApiStreamRemovePeer is the endpoint to remove a peer from a clustered stream and its consumers.
172
        // Will return JSON response.
173
        JSApiStreamRemovePeer  = "$JS.API.STREAM.PEER.REMOVE.*"
174
        JSApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.%s"
175

176
        // JSApiStreamLeaderStepDown is the endpoint to have stream leader stepdown.
177
        // Will return JSON response.
178
        JSApiStreamLeaderStepDown  = "$JS.API.STREAM.LEADER.STEPDOWN.*"
179
        JSApiStreamLeaderStepDownT = "$JS.API.STREAM.LEADER.STEPDOWN.%s"
180

181
        // JSApiConsumerLeaderStepDown is the endpoint to have consumer leader stepdown.
182
        // Will return JSON response.
183
        JSApiConsumerLeaderStepDown  = "$JS.API.CONSUMER.LEADER.STEPDOWN.*.*"
184
        JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
185

186
        // JSApiLeaderStepDown is the endpoint to have our metaleader stepdown.
187
        // Only works from system account.
188
        // Will return JSON response.
189
        JSApiLeaderStepDown = "$JS.API.META.LEADER.STEPDOWN"
190

191
        // JSApiRemoveServer is the endpoint to remove a peer server from the cluster.
192
        // Only works from system account.
193
        // Will return JSON response.
194
        JSApiRemoveServer = "$JS.API.SERVER.REMOVE"
195

196
        // JSApiAccountPurge is the endpoint to purge the js content of an account
197
        // Only works from system account.
198
        // Will return JSON response.
199
        JSApiAccountPurge  = "$JS.API.ACCOUNT.PURGE.*"
200
        JSApiAccountPurgeT = "$JS.API.ACCOUNT.PURGE.%s"
201

202
        // JSApiServerStreamMove is the endpoint to move streams off a server
203
        // Only works from system account.
204
        // Will return JSON response.
205
        JSApiServerStreamMove  = "$JS.API.ACCOUNT.STREAM.MOVE.*.*"
206
        JSApiServerStreamMoveT = "$JS.API.ACCOUNT.STREAM.MOVE.%s.%s"
207

208
        // JSApiServerStreamCancelMove is the endpoint to cancel a stream move
209
        // Only works from system account.
210
        // Will return JSON response.
211
        JSApiServerStreamCancelMove  = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.*.*"
212
        JSApiServerStreamCancelMoveT = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.%s.%s"
213

214
        // The prefix for system level account API.
215
        jsAPIAccountPre = "$JS.API.ACCOUNT."
216

217
        // jsAckT is the template for the ack message stream coming back from a consumer
218
        // when they ACK/NAK, etc a message.
219
        jsAckT      = "$JS.ACK.%s.%s"
220
        jsAckPre    = "$JS.ACK."
221
        jsAckPreLen = len(jsAckPre)
222

223
        // jsFlowControl is for flow control subjects.
224
        jsFlowControlPre = "$JS.FC."
225
        // jsFlowControl is for FC responses.
226
        jsFlowControl = "$JS.FC.%s.%s.*"
227

228
        // JSAdvisoryPrefix is a prefix for all JetStream advisories.
229
        JSAdvisoryPrefix = "$JS.EVENT.ADVISORY"
230

231
        // JSMetricPrefix is a prefix for all JetStream metrics.
232
        JSMetricPrefix = "$JS.EVENT.METRIC"
233

234
        // JSMetricConsumerAckPre is a metric containing ack latency.
235
        JSMetricConsumerAckPre = "$JS.EVENT.METRIC.CONSUMER.ACK"
236

237
        // JSAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold.
238
        JSAdvisoryConsumerMaxDeliveryExceedPre = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES"
239

240
        // JSAdvisoryConsumerMsgNakPre is a notification published when a message has been naked
241
        JSAdvisoryConsumerMsgNakPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED"
242

243
        // JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated.
244
        JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"
245

246
        // JSAdvisoryStreamCreatedPre notification that a stream was created.
247
        JSAdvisoryStreamCreatedPre = "$JS.EVENT.ADVISORY.STREAM.CREATED"
248

249
        // JSAdvisoryStreamDeletedPre notification that a stream was deleted.
250
        JSAdvisoryStreamDeletedPre = "$JS.EVENT.ADVISORY.STREAM.DELETED"
251

252
        // JSAdvisoryStreamUpdatedPre notification that a stream was updated.
253
        JSAdvisoryStreamUpdatedPre = "$JS.EVENT.ADVISORY.STREAM.UPDATED"
254

255
        // JSAdvisoryConsumerCreatedPre notification that a consumer was created.
256
        JSAdvisoryConsumerCreatedPre = "$JS.EVENT.ADVISORY.CONSUMER.CREATED"
257

258
        // JSAdvisoryConsumerDeletedPre notification that a consumer was deleted.
259
        JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"
260

261
        // JSAdvisoryConsumerPausePre notification that a consumer paused/unpaused.
262
        JSAdvisoryConsumerPausePre = "$JS.EVENT.ADVISORY.CONSUMER.PAUSE"
263

264
        // JSAdvisoryConsumerPinnedPre notification that a consumer was pinned.
265
        JSAdvisoryConsumerPinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.PINNED"
266

267
        // JSAdvisoryConsumerUnpinnedPre notification that a consumer was unpinned.
268
        JSAdvisoryConsumerUnpinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.UNPINNED"
269

270
        // JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
271
        JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"
272

273
        // JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed.
274
        JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"
275

276
        // JSAdvisoryStreamRestoreCreatePre notification that a restore was start.
277
        JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"
278

279
        // JSAdvisoryStreamRestoreCompletePre notification that a restore was completed.
280
        JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"
281

282
        // JSAdvisoryDomainLeaderElectedPre notification that a jetstream domain has elected a leader.
283
        JSAdvisoryDomainLeaderElected = "$JS.EVENT.ADVISORY.DOMAIN.LEADER_ELECTED"
284

285
        // JSAdvisoryStreamLeaderElectedPre notification that a replicated stream has elected a leader.
286
        JSAdvisoryStreamLeaderElectedPre = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED"
287

288
        // JSAdvisoryStreamQuorumLostPre notification that a stream and its consumers are stalled.
289
        JSAdvisoryStreamQuorumLostPre = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST"
290

291
        // JSAdvisoryStreamBatchAbandonedPre notification that a stream's batch was abandoned.
292
        JSAdvisoryStreamBatchAbandonedPre = "$JS.EVENT.ADVISORY.STREAM.BATCH_ABANDONED"
293

294
        // JSAdvisoryConsumerLeaderElectedPre notification that a replicated consumer has elected a leader.
295
        JSAdvisoryConsumerLeaderElectedPre = "$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED"
296

297
        // JSAdvisoryConsumerQuorumLostPre notification that a consumer is stalled.
298
        JSAdvisoryConsumerQuorumLostPre = "$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST"
299

300
        // JSAdvisoryServerOutOfStorage notification that a server has no more storage.
301
        JSAdvisoryServerOutOfStorage = "$JS.EVENT.ADVISORY.SERVER.OUT_OF_STORAGE"
302

303
        // JSAdvisoryServerRemoved notification that a server has been removed from the system.
304
        JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED"
305

306
        // JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit.
307
        JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED"
308

309
        // JSAuditAdvisory is a notification about JetStream API access.
310
        // FIXME - Add in details about who..
311
        JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
312
)
313

314
// Headers used in $JS.API.> requests.
315
const (
316
        // JSRequiredApiLevel requires the API level of the responding server to have the specified minimum value.
317
        JSRequiredApiLevel = "Nats-Required-Api-Level"
318
)
319

320
var denyAllClientJs = []string{jsAllAPI, "$KV.>", "$OBJ.>"}
321
var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI, "$KV.>", "$OBJ.>"}
322

323
func generateJSMappingTable(domain string) map[string]string {
792✔
324
        mappings := map[string]string{}
792✔
325
        // This set of mappings is very very very ugly.
792✔
326
        // It is a consequence of what we defined the domain prefix to be "$JS.domain.API" and it's mapping to "$JS.API"
792✔
327
        // For optics $KV and $OBJ where made to be independent subject spaces.
792✔
328
        // As materialized views of JS, they did not simply extend that subject space to say "$JS.API.KV" "$JS.API.OBJ"
792✔
329
        // This is very unfortunate!!!
792✔
330
        // Furthermore, it seemed bad to require different domain prefixes for JS/KV/OBJ.
792✔
331
        // Especially since the actual API for say KV, does use stream create from JS.
792✔
332
        // To avoid overlaps KV and OBJ views append the prefix to their API.
792✔
333
        // (Replacing $KV with the prefix allows users to create collisions with say the bucket name)
792✔
334
        // This mapping therefore needs to have extra token so that the mapping can properly discern between $JS, $KV, $OBJ
792✔
335
        for srcMappingSuffix, to := range map[string]string{
792✔
336
                "INFO":       JSApiAccountInfo,
792✔
337
                "STREAM.>":   "$JS.API.STREAM.>",
792✔
338
                "CONSUMER.>": "$JS.API.CONSUMER.>",
792✔
339
                "DIRECT.>":   "$JS.API.DIRECT.>",
792✔
340
                "META.>":     "$JS.API.META.>",
792✔
341
                "SERVER.>":   "$JS.API.SERVER.>",
792✔
342
                "ACCOUNT.>":  "$JS.API.ACCOUNT.>",
792✔
343
                "$KV.>":      "$KV.>",
792✔
344
                "$OBJ.>":     "$OBJ.>",
792✔
345
        } {
7,920✔
346
                mappings[fmt.Sprintf("$JS.%s.API.%s", domain, srcMappingSuffix)] = to
7,128✔
347
        }
7,128✔
348
        return mappings
792✔
349
}
350

351
// JSMaxDescription is the maximum description length for streams and consumers.
352
const JSMaxDescriptionLen = 4 * 1024
353

354
// JSMaxMetadataLen is the maximum length for streams and consumers metadata map.
355
// It's calculated by summing length of all keys and values.
356
const JSMaxMetadataLen = 128 * 1024
357

358
// JSMaxNameLen is the maximum name lengths for streams, consumers and templates.
359
// Picked 255 as it seems to be a widely used file name limit
360
const JSMaxNameLen = 255
361

362
// JSDefaultRequestQueueLimit is the default number of entries that we will
363
// put on the global request queue before we react.
364
const JSDefaultRequestQueueLimit = 10_000
365

366
// Responses for API calls.
367

368
// ApiResponse is a standard response from the JetStream JSON API
369
type ApiResponse struct {
370
        Type  string    `json:"type"`
371
        Error *ApiError `json:"error,omitempty"`
372
}
373

374
const JSApiSystemResponseType = "io.nats.jetstream.api.v1.system_response"
375

376
// When passing back to the clients generalize store failures.
377
var (
378
        errStreamStoreFailed   = errors.New("error creating store for stream")
379
        errConsumerStoreFailed = errors.New("error creating store for consumer")
380
)
381

382
// ToError checks if the response has a error and if it does converts it to an error avoiding
383
// the pitfalls described by https://yourbasic.org/golang/gotcha-why-nil-error-not-equal-nil/
384
func (r *ApiResponse) ToError() error {
3,522✔
385
        if r.Error == nil {
5,455✔
386
                return nil
1,933✔
387
        }
1,933✔
388

389
        return r.Error
1,589✔
390
}
391

392
const JSApiOverloadedType = "io.nats.jetstream.api.v1.system_overloaded"
393

394
// ApiPaged includes variables used to create paged responses from the JSON API
395
type ApiPaged struct {
396
        Total  int `json:"total"`
397
        Offset int `json:"offset"`
398
        Limit  int `json:"limit"`
399
}
400

401
// ApiPagedRequest includes parameters allowing specific pages to be requests from APIs responding with ApiPaged
402
type ApiPagedRequest struct {
403
        Offset int `json:"offset"`
404
}
405

406
// JSApiAccountInfoResponse reports back information on jetstream for this account.
407
type JSApiAccountInfoResponse struct {
408
        ApiResponse
409
        *JetStreamAccountStats
410
}
411

412
const JSApiAccountInfoResponseType = "io.nats.jetstream.api.v1.account_info_response"
413

414
// JSApiStreamCreateResponse stream creation.
415
type JSApiStreamCreateResponse struct {
416
        ApiResponse
417
        *StreamInfo
418
        DidCreate bool `json:"did_create,omitempty"`
419
}
420

421
const JSApiStreamCreateResponseType = "io.nats.jetstream.api.v1.stream_create_response"
422

423
// JSApiStreamDeleteResponse stream removal.
424
type JSApiStreamDeleteResponse struct {
425
        ApiResponse
426
        Success bool `json:"success,omitempty"`
427
}
428

429
const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"
430

431
// JSMaxSubjectDetails The limit of the number of subject details we will send in a stream info response.
432
const JSMaxSubjectDetails = 100_000
433

434
type JSApiStreamInfoRequest struct {
435
        ApiPagedRequest
436
        DeletedDetails bool   `json:"deleted_details,omitempty"`
437
        SubjectsFilter string `json:"subjects_filter,omitempty"`
438
}
439

440
type JSApiStreamInfoResponse struct {
441
        ApiResponse
442
        ApiPaged
443
        *StreamInfo
444
}
445

446
const JSApiStreamInfoResponseType = "io.nats.jetstream.api.v1.stream_info_response"
447

448
// JSApiNamesLimit is the maximum entries we will return for streams or consumers lists.
449
// TODO(dlc) - with header or request support could request chunked response.
450
const JSApiNamesLimit = 1024
451
const JSApiListLimit = 256
452

453
type JSApiStreamNamesRequest struct {
454
        ApiPagedRequest
455
        // These are filters that can be applied to the list.
456
        Subject string `json:"subject,omitempty"`
457
}
458

459
// JSApiStreamNamesResponse list of streams.
460
// A nil request is valid and means all streams.
461
type JSApiStreamNamesResponse struct {
462
        ApiResponse
463
        ApiPaged
464
        Streams []string `json:"streams"`
465
}
466

467
const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"
468

469
type JSApiStreamListRequest struct {
470
        ApiPagedRequest
471
        // These are filters that can be applied to the list.
472
        Subject string `json:"subject,omitempty"`
473
}
474

475
// JSApiStreamListResponse list of detailed stream information.
476
// A nil request is valid and means all streams.
477
type JSApiStreamListResponse struct {
478
        ApiResponse
479
        ApiPaged
480
        Streams []*StreamInfo     `json:"streams"`
481
        Missing []string          `json:"missing,omitempty"`
482
        Offline map[string]string `json:"offline,omitempty"`
483
}
484

485
const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response"
486

487
// JSApiStreamPurgeRequest is optional request information to the purge API.
488
// Subject will filter the purge request to only messages that match the subject, which can have wildcards.
489
// Sequence will purge up to but not including this sequence and can be combined with subject filtering.
490
// Keep will specify how many messages to keep. This can also be combined with subject filtering.
491
// Note that Sequence and Keep are mutually exclusive, so both can not be set at the same time.
492
type JSApiStreamPurgeRequest struct {
493
        // Purge up to but not including sequence.
494
        Sequence uint64 `json:"seq,omitempty"`
495
        // Subject to match against messages for the purge command.
496
        Subject string `json:"filter,omitempty"`
497
        // Number of messages to keep.
498
        Keep uint64 `json:"keep,omitempty"`
499
}
500

501
type JSApiStreamPurgeResponse struct {
502
        ApiResponse
503
        Success bool   `json:"success,omitempty"`
504
        Purged  uint64 `json:"purged"`
505
}
506

507
const JSApiStreamPurgeResponseType = "io.nats.jetstream.api.v1.stream_purge_response"
508

509
type JSApiConsumerUnpinRequest struct {
510
        Group string `json:"group"`
511
}
512

513
type JSApiConsumerUnpinResponse struct {
514
        ApiResponse
515
}
516

517
const JSApiConsumerUnpinResponseType = "io.nats.jetstream.api.v1.consumer_unpin_response"
518

519
// JSApiStreamUpdateResponse for updating a stream.
520
type JSApiStreamUpdateResponse struct {
521
        ApiResponse
522
        *StreamInfo
523
}
524

525
const JSApiStreamUpdateResponseType = "io.nats.jetstream.api.v1.stream_update_response"
526

527
// JSApiMsgDeleteRequest delete message request.
528
type JSApiMsgDeleteRequest struct {
529
        Seq     uint64 `json:"seq"`
530
        NoErase bool   `json:"no_erase,omitempty"`
531
}
532

533
type JSApiMsgDeleteResponse struct {
534
        ApiResponse
535
        Success bool `json:"success,omitempty"`
536
}
537

538
const JSApiMsgDeleteResponseType = "io.nats.jetstream.api.v1.stream_msg_delete_response"
539

540
type JSApiStreamSnapshotRequest struct {
541
        // Subject to deliver the chunks to for the snapshot.
542
        DeliverSubject string `json:"deliver_subject"`
543
        // Do not include consumers in the snapshot.
544
        NoConsumers bool `json:"no_consumers,omitempty"`
545
        // Optional chunk size preference. Defaults to 128KB,
546
        // automatically clamped to within the range 1KB to 1MB.
547
        // A smaller chunk size means more in-flight messages
548
        // and more acks needed. Links with good throughput
549
        // but high latency may need to increase this.
550
        ChunkSize int `json:"chunk_size,omitempty"`
551
        // Optional window size preference. Defaults to 8MB,
552
        // automatically clamped to within the range 1KB to 32MB.
553
        // very slow connections may need to reduce this to
554
        // avoid slow consumer issues.
555
        WindowSize int `json:"window_size,omitempty"`
556
        // Check all message's checksums prior to snapshot.
557
        CheckMsgs bool `json:"jsck,omitempty"`
558
}
559

560
// JSApiStreamSnapshotResponse is the direct response to the snapshot request.
561
type JSApiStreamSnapshotResponse struct {
562
        ApiResponse
563
        // Configuration of the given stream.
564
        Config *StreamConfig `json:"config,omitempty"`
565
        // Current State for the given stream.
566
        State *StreamState `json:"state,omitempty"`
567
}
568

569
const JSApiStreamSnapshotResponseType = "io.nats.jetstream.api.v1.stream_snapshot_response"
570

571
// JSApiStreamRestoreRequest is the required restore request.
572
type JSApiStreamRestoreRequest struct {
573
        // Configuration of the given stream.
574
        Config StreamConfig `json:"config"`
575
        // Current State for the given stream.
576
        State StreamState `json:"state"`
577
}
578

579
// JSApiStreamRestoreResponse is the direct response to the restore request.
580
type JSApiStreamRestoreResponse struct {
581
        ApiResponse
582
        // Subject to deliver the chunks to for the snapshot restore.
583
        DeliverSubject string `json:"deliver_subject"`
584
}
585

586
const JSApiStreamRestoreResponseType = "io.nats.jetstream.api.v1.stream_restore_response"
587

588
// JSApiStreamRemovePeerRequest is the required remove peer request.
589
type JSApiStreamRemovePeerRequest struct {
590
        // Server name of the peer to be removed.
591
        Peer string `json:"peer"`
592
}
593

594
// JSApiStreamRemovePeerResponse is the response to a remove peer request.
595
type JSApiStreamRemovePeerResponse struct {
596
        ApiResponse
597
        Success bool `json:"success,omitempty"`
598
}
599

600
const JSApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response"
601

602
// JSApiStreamLeaderStepDownResponse is the response to a leader stepdown request.
603
type JSApiStreamLeaderStepDownResponse struct {
604
        ApiResponse
605
        Success bool `json:"success,omitempty"`
606
}
607

608
const JSApiStreamLeaderStepDownResponseType = "io.nats.jetstream.api.v1.stream_leader_stepdown_response"
609

610
// JSApiConsumerLeaderStepDownResponse is the response to a consumer leader stepdown request.
611
type JSApiConsumerLeaderStepDownResponse struct {
612
        ApiResponse
613
        Success bool `json:"success,omitempty"`
614
}
615

616
const JSApiConsumerLeaderStepDownResponseType = "io.nats.jetstream.api.v1.consumer_leader_stepdown_response"
617

618
// JSApiLeaderStepdownRequest allows placement control over the meta leader placement.
619
type JSApiLeaderStepdownRequest struct {
620
        Placement *Placement `json:"placement,omitempty"`
621
}
622

623
// JSApiLeaderStepDownResponse is the response to a meta leader stepdown request.
624
type JSApiLeaderStepDownResponse struct {
625
        ApiResponse
626
        Success bool `json:"success,omitempty"`
627
}
628

629
const JSApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_stepdown_response"
630

631
// JSApiMetaServerRemoveRequest will remove a peer from the meta group.
632
type JSApiMetaServerRemoveRequest struct {
633
        // Server name of the peer to be removed.
634
        Server string `json:"peer"`
635
        // Peer ID of the peer to be removed. If specified this is used
636
        // instead of the server name.
637
        Peer string `json:"peer_id,omitempty"`
638
}
639

640
// JSApiMetaServerRemoveResponse is the response to a peer removal request in the meta group.
641
type JSApiMetaServerRemoveResponse struct {
642
        ApiResponse
643
        Success bool `json:"success,omitempty"`
644
}
645

646
const JSApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response"
647

648
// JSApiMetaServerStreamMoveRequest will move a stream on a server to another
649
// response to this will come as JSApiStreamUpdateResponse/JSApiStreamUpdateResponseType
650
type JSApiMetaServerStreamMoveRequest struct {
651
        // Server name of the peer to be evacuated.
652
        Server string `json:"server,omitempty"`
653
        // Cluster the server is in
654
        Cluster string `json:"cluster,omitempty"`
655
        // Domain the sever is in
656
        Domain string `json:"domain,omitempty"`
657
        // Ephemeral placement tags for the move
658
        Tags []string `json:"tags,omitempty"`
659
}
660

661
const JSApiAccountPurgeResponseType = "io.nats.jetstream.api.v1.account_purge_response"
662

663
// JSApiAccountPurgeResponse is the response to a purge request in the meta group.
664
type JSApiAccountPurgeResponse struct {
665
        ApiResponse
666
        Initiated bool `json:"initiated,omitempty"`
667
}
668

669
// JSApiMsgGetRequest get a message request.
670
type JSApiMsgGetRequest struct {
671
        Seq     uint64 `json:"seq,omitempty"`
672
        LastFor string `json:"last_by_subj,omitempty"`
673
        NextFor string `json:"next_by_subj,omitempty"`
674

675
        // Batch support. Used to request more than one msg at a time.
676
        // Can be used with simple starting seq, but also NextFor with wildcards.
677
        Batch int `json:"batch,omitempty"`
678
        // This will make sure we limit how much data we blast out. If not set we will
679
        // inherit the slow consumer default max setting of the server. Default is MAX_PENDING_SIZE.
680
        MaxBytes int `json:"max_bytes,omitempty"`
681
        // Return messages as of this start time.
682
        StartTime *time.Time `json:"start_time,omitempty"`
683

684
        // Multiple response support. Will get the last msgs matching the subjects. These can include wildcards.
685
        MultiLastFor []string `json:"multi_last,omitempty"`
686
        // Only return messages up to this sequence. If not set, will be last sequence for the stream.
687
        UpToSeq uint64 `json:"up_to_seq,omitempty"`
688
        // Only return messages up to this time.
689
        UpToTime *time.Time `json:"up_to_time,omitempty"`
690
        // Only return the message payload, excluding headers if present.
691
        NoHeaders bool `json:"no_hdr,omitempty"`
692
}
693

694
type JSApiMsgGetResponse struct {
695
        ApiResponse
696
        Message *StoredMsg `json:"message,omitempty"`
697
}
698

699
const JSApiMsgGetResponseType = "io.nats.jetstream.api.v1.stream_msg_get_response"
700

701
// JSWaitQueueDefaultMax is the default max number of outstanding requests for pull consumers.
702
const JSWaitQueueDefaultMax = 512
703

704
type JSApiConsumerCreateResponse struct {
705
        ApiResponse
706
        *ConsumerInfo
707
}
708

709
const JSApiConsumerCreateResponseType = "io.nats.jetstream.api.v1.consumer_create_response"
710

711
type JSApiConsumerDeleteResponse struct {
712
        ApiResponse
713
        Success bool `json:"success,omitempty"`
714
}
715

716
const JSApiConsumerDeleteResponseType = "io.nats.jetstream.api.v1.consumer_delete_response"
717

718
type JSApiConsumerPauseRequest struct {
719
        PauseUntil time.Time `json:"pause_until,omitempty"`
720
}
721

722
const JSApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response"
723

724
type JSApiConsumerPauseResponse struct {
725
        ApiResponse
726
        Paused         bool          `json:"paused"`
727
        PauseUntil     time.Time     `json:"pause_until"`
728
        PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
729
}
730

731
type JSApiConsumerInfoResponse struct {
732
        ApiResponse
733
        *ConsumerInfo
734
}
735

736
const JSApiConsumerInfoResponseType = "io.nats.jetstream.api.v1.consumer_info_response"
737

738
type JSApiConsumersRequest struct {
739
        ApiPagedRequest
740
}
741

742
type JSApiConsumerNamesResponse struct {
743
        ApiResponse
744
        ApiPaged
745
        Consumers []string `json:"consumers"`
746
}
747

748
const JSApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_response"
749

750
type JSApiConsumerListResponse struct {
751
        ApiResponse
752
        ApiPaged
753
        Consumers []*ConsumerInfo   `json:"consumers"`
754
        Missing   []string          `json:"missing,omitempty"`
755
        Offline   map[string]string `json:"offline,omitempty"`
756
}
757

758
const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"
759

760
// JSApiConsumerGetNextRequest is for getting next messages for pull based consumers.
761
type JSApiConsumerGetNextRequest struct {
762
        Expires   time.Duration `json:"expires,omitempty"`
763
        Batch     int           `json:"batch,omitempty"`
764
        MaxBytes  int           `json:"max_bytes,omitempty"`
765
        NoWait    bool          `json:"no_wait,omitempty"`
766
        Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
767
        PriorityGroup
768
}
769

770
// JSApiConsumerResetRequest is for resetting a consumer to a specific sequence.
771
type JSApiConsumerResetRequest struct {
772
        Seq uint64 `json:"seq,omitempty"`
773
}
774

775
type JSApiConsumerResetResponse struct {
776
        ApiResponse
777
        *ConsumerInfo
778
        ResetSeq uint64 `json:"reset_seq"`
779
}
780

781
const JSApiConsumerResetResponseType = "io.nats.jetstream.api.v1.consumer_reset_response"
782

783
// Structure that holds state for a JetStream API request that is processed
784
// in a separate long-lived go routine. This is to avoid blocking connections.
785
type jsAPIRoutedReq struct {
786
        jsub    *subscription
787
        sub     *subscription
788
        acc     *Account
789
        subject string
790
        reply   string
791
        msg     []byte
792
        pa      pubArg
793
}
794

795
func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) {
134,484✔
796
        // Ignore system level directives meta stepdown and peer remove requests here.
134,484✔
797
        if subject == JSApiLeaderStepDown ||
134,484✔
798
                subject == JSApiRemoveServer ||
134,484✔
799
                strings.HasPrefix(subject, jsAPIAccountPre) {
134,957✔
800
                return
473✔
801
        }
473✔
802
        // No lock needed, those are immutable.
803
        s, rr := js.srv, js.apiSubs.Match(subject)
134,011✔
804

134,011✔
805
        hdr, msg := c.msgParts(rmsg)
134,011✔
806
        if len(sliceHeader(ClientInfoHdr, hdr)) == 0 {
134,018✔
807
                // Check if this is the system account. We will let these through for the account info only.
7✔
808
                sacc := s.SystemAccount()
7✔
809
                if sacc != acc {
7✔
810
                        return
×
811
                }
×
812
                if subject != JSApiAccountInfo {
11✔
813
                        // Only respond from the initial server entry to the NATS system.
4✔
814
                        if c.kind == CLIENT || c.kind == LEAF {
6✔
815
                                var resp = ApiResponse{
2✔
816
                                        Type:  JSApiSystemResponseType,
2✔
817
                                        Error: NewJSNotEnabledForAccountError(),
2✔
818
                                }
2✔
819
                                s.sendAPIErrResponse(nil, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
820
                        }
2✔
821
                        return
4✔
822
                }
823
        }
824

825
        // Short circuit for no interest.
826
        if len(rr.psubs)+len(rr.qsubs) == 0 {
153,195✔
827
                if (c.kind == CLIENT || c.kind == LEAF) && acc != s.SystemAccount() {
19,188✔
828
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
829
                        var resp = ApiResponse{
×
830
                                Type:  JSApiSystemResponseType,
×
831
                                Error: NewJSBadRequestError(),
×
832
                        }
×
833
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
834
                }
×
835
                return
19,188✔
836
        }
837

838
        // We should only have psubs and only 1 per result.
839
        if len(rr.psubs) != 1 {
114,819✔
840
                s.Warnf("Malformed JetStream API Request: [%s] %q", subject, rmsg)
×
841
                if c.kind == CLIENT || c.kind == LEAF {
×
842
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
843
                        var resp = ApiResponse{
×
844
                                Type:  JSApiSystemResponseType,
×
845
                                Error: NewJSBadRequestError(),
×
846
                        }
×
847
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
848
                }
×
849
                return
×
850
        }
851
        jsub := rr.psubs[0]
114,819✔
852

114,819✔
853
        // We need to make sure not to block. We will send the request to a long-lived
114,819✔
854
        // pool of go routines.
114,819✔
855

114,819✔
856
        // Increment inflight. Do this before queueing.
114,819✔
857
        atomic.AddInt64(&js.apiInflight, 1)
114,819✔
858

114,819✔
859
        // Copy the state. Note the JSAPI only uses the hdr index to piece apart the
114,819✔
860
        // header from the msg body. No other references are needed.
114,819✔
861
        // Check pending and warn if getting backed up.
114,819✔
862
        var queue *ipQueue[*jsAPIRoutedReq]
114,819✔
863
        var limit int64
114,819✔
864
        if js.infoSubs.HasInterest(subject) {
184,138✔
865
                queue = s.jsAPIRoutedInfoReqs
69,319✔
866
                limit = atomic.LoadInt64(&js.infoQueueLimit)
69,319✔
867
        } else {
114,819✔
868
                queue = s.jsAPIRoutedReqs
45,500✔
869
                limit = atomic.LoadInt64(&js.queueLimit)
45,500✔
870
        }
45,500✔
871
        pending, _ := queue.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
114,819✔
872
        if pending >= int(limit) {
114,946✔
873
                s.rateLimitFormatWarnf("%s limit reached, dropping %d requests", queue.name, pending)
127✔
874
                drained := int64(queue.drain())
127✔
875
                atomic.AddInt64(&js.apiInflight, -drained)
127✔
876

127✔
877
                s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
127✔
878
                        TypedEvent: TypedEvent{
127✔
879
                                Type: JSAPILimitReachedAdvisoryType,
127✔
880
                                ID:   nuid.Next(),
127✔
881
                                Time: time.Now().UTC(),
127✔
882
                        },
127✔
883
                        Server:  s.Name(),
127✔
884
                        Domain:  js.config.Domain,
127✔
885
                        Dropped: drained,
127✔
886
                })
127✔
887
        }
127✔
888
}
889

890
func (s *Server) processJSAPIRoutedRequests() {
17,724✔
891
        defer s.grWG.Done()
17,724✔
892

17,724✔
893
        s.mu.RLock()
17,724✔
894
        queue, infoqueue := s.jsAPIRoutedReqs, s.jsAPIRoutedInfoReqs
17,724✔
895
        client := &client{srv: s, kind: JETSTREAM}
17,724✔
896
        s.mu.RUnlock()
17,724✔
897

17,724✔
898
        js := s.getJetStream()
17,724✔
899

17,724✔
900
        processFromQueue := func(ipq *ipQueue[*jsAPIRoutedReq]) {
123,329✔
901
                // Only pop one item at a time here, otherwise if the system is recovering
105,605✔
902
                // from queue buildup, then one worker will pull off all the tasks and the
105,605✔
903
                // others will be starved of work.
105,605✔
904
                if r, ok := ipq.popOne(); ok && r != nil {
211,185✔
905
                        client.pa = r.pa
105,580✔
906
                        start := time.Now()
105,580✔
907
                        r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
105,580✔
908
                        if dur := time.Since(start); dur >= readLoopReportThreshold {
105,581✔
909
                                s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
1✔
910
                        }
1✔
911
                        atomic.AddInt64(&js.apiInflight, -1)
105,580✔
912
                }
913
        }
914

915
        for {
141,053✔
916
                // First select case is prioritizing queue, we will only fall through
123,329✔
917
                // to the second select case that considers infoqueue if queue is empty.
123,329✔
918
                // This effectively means infos are deprioritized.
123,329✔
919
                select {
123,329✔
920
                case <-queue.ch:
2,376✔
921
                        processFromQueue(queue)
2,376✔
922
                case <-s.quitCh:
8✔
923
                        return
8✔
924
                default:
120,945✔
925
                        select {
120,945✔
926
                        case <-infoqueue.ch:
69,218✔
927
                                processFromQueue(infoqueue)
69,218✔
928
                        case <-queue.ch:
34,011✔
929
                                processFromQueue(queue)
34,011✔
930
                        case <-s.quitCh:
17,684✔
931
                                return
17,684✔
932
                        }
933
                }
934
        }
935
}
936

937
func (s *Server) setJetStreamExportSubs() error {
4,431✔
938
        js := s.getJetStream()
4,431✔
939
        if js == nil {
4,431✔
940
                return NewJSNotEnabledError()
×
941
        }
×
942

943
        // Start the go routine that will process API requests received by the
944
        // subscription below when they are coming from routes, etc..
945
        const maxProcs = 16
4,431✔
946
        mp := runtime.GOMAXPROCS(0)
4,431✔
947
        // Cap at 16 max for now on larger core setups.
4,431✔
948
        if mp > maxProcs {
4,431✔
949
                mp = maxProcs
×
950
        }
×
951
        s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "JetStream API queue")
4,431✔
952
        s.jsAPIRoutedInfoReqs = newIPQueue[*jsAPIRoutedReq](s, "JetStream API info queue")
4,431✔
953
        for i := 0; i < mp; i++ {
22,155✔
954
                s.startGoRoutine(s.processJSAPIRoutedRequests)
17,724✔
955
        }
17,724✔
956

957
        // This is the catch all now for all JetStream API calls.
958
        if _, err := s.sysSubscribe(jsAllAPI, js.apiDispatch); err != nil {
4,431✔
959
                return err
×
960
        }
×
961

962
        if err := s.SystemAccount().AddServiceExport(jsAllAPI, nil); err != nil {
4,431✔
963
                s.Warnf("Error setting up jetstream service exports: %v", err)
×
964
                return err
×
965
        }
×
966

967
        // API handles themselves.
968
        // infopairs are deprioritized compared to pairs in processJSAPIRoutedRequests.
969
        pairs := []struct {
4,431✔
970
                subject string
4,431✔
971
                handler msgHandler
4,431✔
972
        }{
4,431✔
973
                {JSApiStreamCreate, s.jsStreamCreateRequest},
4,431✔
974
                {JSApiStreamUpdate, s.jsStreamUpdateRequest},
4,431✔
975
                {JSApiStreamDelete, s.jsStreamDeleteRequest},
4,431✔
976
                {JSApiStreamPurge, s.jsStreamPurgeRequest},
4,431✔
977
                {JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
4,431✔
978
                {JSApiStreamRestore, s.jsStreamRestoreRequest},
4,431✔
979
                {JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},
4,431✔
980
                {JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},
4,431✔
981
                {JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
4,431✔
982
                {JSApiMsgDelete, s.jsMsgDeleteRequest},
4,431✔
983
                {JSApiMsgGet, s.jsMsgGetRequest},
4,431✔
984
                {JSApiConsumerCreateEx, s.jsConsumerCreateRequest},
4,431✔
985
                {JSApiConsumerCreate, s.jsConsumerCreateRequest},
4,431✔
986
                {JSApiDurableCreate, s.jsConsumerCreateRequest},
4,431✔
987
                {JSApiConsumerDelete, s.jsConsumerDeleteRequest},
4,431✔
988
                {JSApiConsumerPause, s.jsConsumerPauseRequest},
4,431✔
989
                {JSApiConsumerUnpin, s.jsConsumerUnpinRequest},
4,431✔
990
        }
4,431✔
991
        infopairs := []struct {
4,431✔
992
                subject string
4,431✔
993
                handler msgHandler
4,431✔
994
        }{
4,431✔
995
                {JSApiAccountInfo, s.jsAccountInfoRequest},
4,431✔
996
                {JSApiStreams, s.jsStreamNamesRequest},
4,431✔
997
                {JSApiStreamList, s.jsStreamListRequest},
4,431✔
998
                {JSApiStreamInfo, s.jsStreamInfoRequest},
4,431✔
999
                {JSApiConsumers, s.jsConsumerNamesRequest},
4,431✔
1000
                {JSApiConsumerList, s.jsConsumerListRequest},
4,431✔
1001
                {JSApiConsumerInfo, s.jsConsumerInfoRequest},
4,431✔
1002
        }
4,431✔
1003

4,431✔
1004
        js.mu.Lock()
4,431✔
1005
        defer js.mu.Unlock()
4,431✔
1006

4,431✔
1007
        // As well as populating js.apiSubs for the dispatch function to use, we
4,431✔
1008
        // will also populate js.infoSubs, so that the dispatch function can
4,431✔
1009
        // decide quickly whether or not the request is an info request or not.
4,431✔
1010
        for _, p := range append(infopairs, pairs...) {
110,775✔
1011
                sub := &subscription{subject: []byte(p.subject), icb: p.handler}
106,344✔
1012
                if err := js.apiSubs.Insert(sub); err != nil {
106,344✔
1013
                        return err
×
1014
                }
×
1015
        }
1016
        for _, p := range infopairs {
35,448✔
1017
                if err := js.infoSubs.Insert(p.subject, struct{}{}); err != nil {
31,017✔
1018
                        return err
×
1019
                }
×
1020
        }
1021

1022
        return nil
4,431✔
1023
}
1024

1025
func (s *Server) sendAPIResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
52,480✔
1026
        acc.trackAPI()
52,480✔
1027
        if reply != _EMPTY_ {
83,158✔
1028
                s.sendInternalAccountMsg(nil, reply, response)
30,678✔
1029
        }
30,678✔
1030
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
52,480✔
1031
}
1032

1033
func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
18,443✔
1034
        acc.trackAPIErr()
18,443✔
1035
        if reply != _EMPTY_ {
33,534✔
1036
                s.sendInternalAccountMsg(nil, reply, response)
15,091✔
1037
        }
15,091✔
1038
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
18,443✔
1039
}
1040

1041
const errRespDelay = 500 * time.Millisecond
1042

1043
type delayedAPIResponse struct {
1044
        ci       *ClientInfo
1045
        acc      *Account
1046
        subject  string
1047
        reply    string
1048
        request  string
1049
        hdr      []byte
1050
        response string
1051
        rg       *raftGroup
1052
        deadline time.Time
1053
        noJs     bool
1054
        next     *delayedAPIResponse
1055
}
1056

1057
// Add `r` in the list that is maintained ordered by the `delayedAPIResponse.deadline` time.
1058
func addDelayedResponse(head, tail **delayedAPIResponse, r *delayedAPIResponse) {
112✔
1059
        // Check if list empty.
112✔
1060
        if *head == nil {
210✔
1061
                *head, *tail = r, r
98✔
1062
                return
98✔
1063
        }
98✔
1064
        // Check if it should be added at the end, which is if after or equal to the tail.
1065
        if r.deadline.After((*tail).deadline) || r.deadline.Equal((*tail).deadline) {
26✔
1066
                (*tail).next, *tail = r, r
12✔
1067
                return
12✔
1068
        }
12✔
1069
        // Find its spot in the list.
1070
        var prev *delayedAPIResponse
2✔
1071
        for c := *head; c != nil; c = c.next {
6✔
1072
                // We insert only if we are stricly before the current `c`.
4✔
1073
                if r.deadline.Before(c.deadline) {
6✔
1074
                        r.next = c
2✔
1075
                        if prev != nil {
3✔
1076
                                prev.next = r
1✔
1077
                        } else {
2✔
1078
                                *head = r
1✔
1079
                        }
1✔
1080
                        return
2✔
1081
                }
1082
                prev = c
2✔
1083
        }
1084
}
1085

1086
func (s *Server) delayedAPIResponder() {
6,794✔
1087
        defer s.grWG.Done()
6,794✔
1088
        var (
6,794✔
1089
                head, tail *delayedAPIResponse // Linked list.
6,794✔
1090
                r          *delayedAPIResponse // Updated by calling next().
6,794✔
1091
                rqch       <-chan struct{}     // Quit channel of the Raft group (if present).
6,794✔
1092
                tm         = time.NewTimer(time.Hour)
6,794✔
1093
        )
6,794✔
1094
        next := func() {
6,995✔
1095
                r, rqch = nil, nil
201✔
1096
                // Check that JetStream is still on. Do not exit the go routine
201✔
1097
                // since JS can be enabled/disabled. The go routine will exit
201✔
1098
                // only if server is shutdown.
201✔
1099
                js := s.getJetStream()
201✔
1100
                if js == nil {
202✔
1101
                        // Reset head and tail here. Also drain the ipQueue.
1✔
1102
                        head, tail = nil, nil
1✔
1103
                        s.delayedAPIResponses.drain()
1✔
1104
                        // Fall back into next "if" that resets timer.
1✔
1105
                }
1✔
1106
                // If there are no delayed messages then delay the timer for
1107
                // a while.
1108
                if head == nil {
299✔
1109
                        tm.Reset(time.Hour)
98✔
1110
                        return
98✔
1111
                }
98✔
1112
                // Get the first expected message and then reset the timer.
1113
                r = head
103✔
1114
                js.mu.RLock()
103✔
1115
                if r.rg != nil && r.rg.node != nil {
104✔
1116
                        // If there's an attached Raft group to the delayed response
1✔
1117
                        // then pull out the quit channel, so that we don't bother
1✔
1118
                        // sending responses for entities which are now no longer
1✔
1119
                        // running.
1✔
1120
                        rqch = r.rg.node.QuitC()
1✔
1121
                }
1✔
1122
                js.mu.RUnlock()
103✔
1123
                tm.Reset(time.Until(r.deadline))
103✔
1124
        }
1125
        pop := func() {
6,896✔
1126
                if head == nil {
102✔
1127
                        return
×
1128
                }
×
1129
                head = head.next
102✔
1130
                if head == nil {
199✔
1131
                        tail = nil
97✔
1132
                }
97✔
1133
        }
1134
        for {
13,802✔
1135
                select {
7,008✔
1136
                case <-s.delayedAPIResponses.ch:
112✔
1137
                        v, ok := s.delayedAPIResponses.popOne()
112✔
1138
                        if !ok {
112✔
1139
                                continue
×
1140
                        }
1141
                        // Add it to the list, and if ends up being the head, set things up.
1142
                        addDelayedResponse(&head, &tail, v)
112✔
1143
                        if v == head {
211✔
1144
                                next()
99✔
1145
                        }
99✔
1146
                case <-s.quitCh:
6,788✔
1147
                        return
6,788✔
1148
                case <-rqch:
1✔
1149
                        // If we were the head, drop and setup things for next.
1✔
1150
                        if r != nil && r == head {
2✔
1151
                                pop()
1✔
1152
                        }
1✔
1153
                        next()
1✔
1154
                case <-tm.C:
101✔
1155
                        if r != nil {
202✔
1156
                                // If it's not a JS API error, send it as a raw response without additional API/audit tracking.
101✔
1157
                                if r.noJs {
137✔
1158
                                        s.sendInternalAccountMsgWithReply(r.acc, r.subject, _EMPTY_, r.hdr, r.response, false)
36✔
1159
                                } else {
101✔
1160
                                        s.sendAPIErrResponse(r.ci, r.acc, r.subject, r.reply, r.request, r.response)
65✔
1161
                                }
65✔
1162
                                pop()
101✔
1163
                        }
1164
                        next()
101✔
1165
                }
1166
        }
1167
}
1168

1169
func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup, duration time.Duration) {
76✔
1170
        s.delayedAPIResponses.push(&delayedAPIResponse{
76✔
1171
                ci, acc, subject, reply, request, nil, response, rg, time.Now().Add(duration), false, nil,
76✔
1172
        })
76✔
1173
}
76✔
1174

1175
func (s *Server) sendDelayedErrResponse(acc *Account, subject string, hdr []byte, response string, duration time.Duration) {
36✔
1176
        s.delayedAPIResponses.push(&delayedAPIResponse{
36✔
1177
                nil, acc, subject, _EMPTY_, _EMPTY_, hdr, response, nil, time.Now().Add(duration), true, nil,
36✔
1178
        })
36✔
1179
}
36✔
1180

1181
func (s *Server) getRequestInfo(c *client, raw []byte) (pci *ClientInfo, acc *Account, hdr, msg []byte, err error) {
106,361✔
1182
        hdr, msg = c.msgParts(raw)
106,361✔
1183
        var ci ClientInfo
106,361✔
1184

106,361✔
1185
        if len(hdr) > 0 {
212,641✔
1186
                if err := json.Unmarshal(sliceHeader(ClientInfoHdr, hdr), &ci); err != nil {
106,280✔
1187
                        return nil, nil, nil, nil, err
×
1188
                }
×
1189
        }
1190

1191
        if ci.Service != _EMPTY_ {
106,425✔
1192
                acc, _ = s.LookupAccount(ci.Service)
64✔
1193
        } else if ci.Account != _EMPTY_ {
212,577✔
1194
                acc, _ = s.LookupAccount(ci.Account)
106,216✔
1195
        } else {
106,297✔
1196
                // Direct $SYS access.
81✔
1197
                acc = c.acc
81✔
1198
                if acc == nil {
90✔
1199
                        acc = s.SystemAccount()
9✔
1200
                }
9✔
1201
        }
1202
        if acc == nil {
106,371✔
1203
                return nil, nil, nil, nil, ErrMissingAccount
10✔
1204
        }
10✔
1205
        return &ci, acc, hdr, msg, nil
106,351✔
1206
}
1207

1208
func (s *Server) unmarshalRequest(c *client, acc *Account, subject string, msg []byte, v any) error {
19,038✔
1209
        decoder := json.NewDecoder(bytes.NewReader(msg))
19,038✔
1210
        decoder.DisallowUnknownFields()
19,038✔
1211

19,038✔
1212
        for {
57,105✔
1213
                if err := decoder.Decode(v); err != nil {
57,105✔
1214
                        if err == io.EOF {
38,070✔
1215
                                return nil
19,032✔
1216
                        }
19,032✔
1217

1218
                        var syntaxErr *json.SyntaxError
6✔
1219
                        if errors.As(err, &syntaxErr) {
6✔
1220
                                err = fmt.Errorf("%w at offset %d", err, syntaxErr.Offset)
×
1221
                        }
×
1222

1223
                        c.RateLimitWarnf("Invalid JetStream request '%s > %s': %s", acc, subject, err)
6✔
1224

6✔
1225
                        if s.JetStreamConfig().Strict {
12✔
1226
                                return err
6✔
1227
                        }
6✔
1228

1229
                        return json.Unmarshal(msg, v)
×
1230
                }
1231
        }
1232
}
1233

1234
func (a *Account) trackAPI() {
52,480✔
1235
        a.mu.RLock()
52,480✔
1236
        jsa := a.js
52,480✔
1237
        a.mu.RUnlock()
52,480✔
1238
        if jsa != nil {
104,868✔
1239
                jsa.usageMu.Lock()
52,388✔
1240
                jsa.usageApi++
52,388✔
1241
                jsa.apiTotal++
52,388✔
1242
                jsa.sendClusterUsageUpdate()
52,388✔
1243
                atomic.AddInt64(&jsa.js.apiTotal, 1)
52,388✔
1244
                jsa.usageMu.Unlock()
52,388✔
1245
        }
52,388✔
1246
}
1247

1248
func (a *Account) trackAPIErr() {
18,443✔
1249
        a.mu.RLock()
18,443✔
1250
        jsa := a.js
18,443✔
1251
        a.mu.RUnlock()
18,443✔
1252
        if jsa != nil {
36,666✔
1253
                jsa.usageMu.Lock()
18,223✔
1254
                jsa.usageApi++
18,223✔
1255
                jsa.apiTotal++
18,223✔
1256
                jsa.usageErr++
18,223✔
1257
                jsa.apiErrors++
18,223✔
1258
                jsa.sendClusterUsageUpdate()
18,223✔
1259
                atomic.AddInt64(&jsa.js.apiTotal, 1)
18,223✔
1260
                atomic.AddInt64(&jsa.js.apiErrors, 1)
18,223✔
1261
                jsa.usageMu.Unlock()
18,223✔
1262
        }
18,223✔
1263
}
1264

1265
const badAPIRequestT = "Malformed JetStream API Request: %q"
1266

1267
// Helper function to check on JetStream being enabled but also on status of leafnodes
1268
// If the local account is not enabled but does have leafnode connectivity we will not
1269
// want to error immediately and let the other side decide.
1270
func (a *Account) checkJetStream() (enabled, shouldError bool) {
45,176✔
1271
        a.mu.RLock()
45,176✔
1272
        defer a.mu.RUnlock()
45,176✔
1273
        return a.js != nil, a.nleafs+a.nrleafs == 0
45,176✔
1274
}
45,176✔
1275

1276
// Request for current usage and limits for this account.
1277
func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
463✔
1278
        if c == nil || !s.JetStreamEnabled() {
463✔
1279
                return
×
1280
        }
×
1281

1282
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
463✔
1283
        if err != nil {
464✔
1284
                s.Warnf(badAPIRequestT, msg)
1✔
1285
                return
1✔
1286
        }
1✔
1287

1288
        var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}}
462✔
1289

462✔
1290
        // Determine if we should proceed here when we are in clustered mode.
462✔
1291
        if s.JetStreamIsClustered() {
848✔
1292
                js, cc := s.getJetStreamCluster()
386✔
1293
                if js == nil || cc == nil {
386✔
1294
                        return
×
1295
                }
×
1296
                if js.isLeaderless() {
387✔
1297
                        resp.Error = NewJSClusterNotAvailError()
1✔
1298
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1299
                        return
1✔
1300
                }
1✔
1301
                // Make sure we are meta leader.
1302
                if !s.JetStreamIsLeader() {
652✔
1303
                        return
267✔
1304
                }
267✔
1305
        }
1306

1307
        if errorOnRequiredApiLevel(hdr) {
195✔
1308
                resp.Error = NewJSRequiredApiLevelError()
1✔
1309
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1310
                return
1✔
1311
        }
1✔
1312

1313
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
199✔
1314
                if !doErr {
7✔
1315
                        return
1✔
1316
                }
1✔
1317
                resp.Error = NewJSNotEnabledForAccountError()
5✔
1318
        } else {
187✔
1319
                stats := acc.JetStreamUsage()
187✔
1320
                resp.JetStreamAccountStats = &stats
187✔
1321
        }
187✔
1322
        b, err := json.Marshal(resp)
192✔
1323
        if err != nil {
192✔
1324
                return
×
1325
        }
×
1326

1327
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), string(b))
192✔
1328
}
1329

1330
// Helpers for token extraction.
1331
func streamNameFromSubject(subject string) string {
79,655✔
1332
        return tokenAt(subject, 5)
79,655✔
1333
}
79,655✔
1334

1335
func consumerNameFromSubject(subject string) string {
48,403✔
1336
        return tokenAt(subject, 6)
48,403✔
1337
}
48,403✔
1338

1339
func (s *Server) jsonResponse(v any) string {
71,576✔
1340
        b, err := json.Marshal(v)
71,576✔
1341
        if err != nil {
71,576✔
1342
                s.Warnf("Problem marshaling JSON for JetStream API:", err)
×
1343
                return ""
×
1344
        }
×
1345
        return string(b)
71,576✔
1346
}
1347

1348
// Read lock must be held
1349
func (jsa *jsAccount) tieredReservation(tier string, cfg *StreamConfig) int64 {
3,105✔
1350
        var reservation int64
3,105✔
1351
        for _, sa := range jsa.streams {
6,834✔
1352
                // Don't count the stream toward the limit if it already exists.
3,729✔
1353
                if sa.cfg.Name == cfg.Name {
3,842✔
1354
                        continue
113✔
1355
                }
1356
                if (tier == _EMPTY_ || isSameTier(&sa.cfg, cfg)) && sa.cfg.MaxBytes > 0 && sa.cfg.Storage == cfg.Storage {
3,631✔
1357
                        // If tier is empty, all storage is flat and we should adjust for replicas.
15✔
1358
                        // Otherwise if tiered, storage replication already taken into consideration.
15✔
1359
                        if tier == _EMPTY_ && sa.cfg.Replicas > 1 {
16✔
1360
                                reservation += (int64(sa.cfg.Replicas) * sa.cfg.MaxBytes)
1✔
1361
                        } else {
15✔
1362
                                reservation += sa.cfg.MaxBytes
14✔
1363
                        }
14✔
1364
                }
1365
        }
1366
        return reservation
3,105✔
1367
}
1368

1369
// Request to create a stream.
1370
func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
11,551✔
1371
        if c == nil || !s.JetStreamEnabled() {
11,823✔
1372
                return
272✔
1373
        }
272✔
1374
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
11,279✔
1375
        if err != nil {
11,282✔
1376
                s.Warnf(badAPIRequestT, msg)
3✔
1377
                return
3✔
1378
        }
3✔
1379

1380
        var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
11,276✔
1381

11,276✔
1382
        // Determine if we should proceed here when we are in clustered mode.
11,276✔
1383
        if s.JetStreamIsClustered() {
21,136✔
1384
                js, cc := s.getJetStreamCluster()
9,860✔
1385
                if js == nil || cc == nil {
9,860✔
1386
                        return
×
1387
                }
×
1388
                if js.isLeaderless() {
9,861✔
1389
                        resp.Error = NewJSClusterNotAvailError()
1✔
1390
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1391
                        return
1✔
1392
                }
1✔
1393
                // Make sure we are meta leader.
1394
                if !s.JetStreamIsLeader() {
16,619✔
1395
                        return
6,760✔
1396
                }
6,760✔
1397
        }
1398

1399
        if errorOnRequiredApiLevel(hdr) {
4,516✔
1400
                resp.Error = NewJSRequiredApiLevelError()
1✔
1401
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1402
                return
1✔
1403
        }
1✔
1404

1405
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
4,522✔
1406
                if doErr {
8✔
1407
                        resp.Error = NewJSNotEnabledForAccountError()
×
1408
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1409
                }
×
1410
                return
8✔
1411
        }
1412

1413
        var cfg StreamConfigRequest
4,506✔
1414
        if err := s.unmarshalRequest(c, acc, subject, msg, &cfg); err != nil {
4,507✔
1415
                resp.Error = NewJSInvalidJSONError(err)
1✔
1416
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1417
                return
1✔
1418
        }
1✔
1419

1420
        // Initialize asset version metadata.
1421
        setStaticStreamMetadata(&cfg.StreamConfig)
4,505✔
1422

4,505✔
1423
        streamName := streamNameFromSubject(subject)
4,505✔
1424
        if streamName != cfg.Name {
4,506✔
1425
                resp.Error = NewJSStreamMismatchError()
1✔
1426
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1427
                return
1✔
1428
        }
1✔
1429

1430
        // Check for path like separators in the name.
1431
        if strings.ContainsAny(streamName, `\/`) {
4,506✔
1432
                resp.Error = NewJSStreamNameContainsPathSeparatorsError()
2✔
1433
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1434
                return
2✔
1435
        }
2✔
1436

1437
        // Can't create a stream with a sealed state.
1438
        if cfg.Sealed {
4,504✔
1439
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed"))
2✔
1440
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1441
                return
2✔
1442
        }
2✔
1443

1444
        // If we are told to do mirror direct but are not mirroring, error.
1445
        if cfg.MirrorDirect && cfg.Mirror == nil {
4,500✔
1446
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream has no mirror but does have mirror direct"))
×
1447
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1448
                return
×
1449
        }
×
1450

1451
        // Hand off to cluster for processing.
1452
        if s.JetStreamIsClustered() {
7,597✔
1453
                s.jsClusteredStreamRequest(ci, acc, subject, reply, rmsg, &cfg)
3,097✔
1454
                return
3,097✔
1455
        }
3,097✔
1456

1457
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg.StreamConfig); err != nil {
1,411✔
1458
                resp.Error = err
8✔
1459
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
8✔
1460
                return
8✔
1461
        }
8✔
1462

1463
        mset, err := acc.addStreamPedantic(&cfg.StreamConfig, cfg.Pedantic)
1,395✔
1464
        if err != nil {
1,457✔
1465
                if IsNatsErr(err, JSStreamStoreFailedF) {
62✔
1466
                        s.Warnf("Stream create failed for '%s > %s': %v", acc, streamName, err)
×
1467
                        err = errStreamStoreFailed
×
1468
                }
×
1469
                resp.Error = NewJSStreamCreateError(err, Unless(err))
62✔
1470
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
62✔
1471
                return
62✔
1472
        }
1473
        msetCfg := mset.config()
1,333✔
1474
        resp.StreamInfo = &StreamInfo{
1,333✔
1475
                Created:   mset.createdTime(),
1,333✔
1476
                State:     mset.state(),
1,333✔
1477
                Config:    *setDynamicStreamMetadata(&msetCfg),
1,333✔
1478
                TimeStamp: time.Now().UTC(),
1,333✔
1479
                Mirror:    mset.mirrorInfo(),
1,333✔
1480
                Sources:   mset.sourcesInfo(),
1,333✔
1481
        }
1,333✔
1482
        resp.DidCreate = true
1,333✔
1483
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,333✔
1484
}
1485

1486
// Request to update a stream.
1487
func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
934✔
1488
        if c == nil || !s.JetStreamEnabled() {
934✔
1489
                return
×
1490
        }
×
1491

1492
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
934✔
1493
        if err != nil {
934✔
1494
                s.Warnf(badAPIRequestT, msg)
×
1495
                return
×
1496
        }
×
1497

1498
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
934✔
1499

934✔
1500
        // Determine if we should proceed here when we are in clustered mode.
934✔
1501
        if s.JetStreamIsClustered() {
1,767✔
1502
                js, cc := s.getJetStreamCluster()
833✔
1503
                if js == nil || cc == nil {
833✔
1504
                        return
×
1505
                }
×
1506
                if js.isLeaderless() {
835✔
1507
                        resp.Error = NewJSClusterNotAvailError()
2✔
1508
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1509
                        return
2✔
1510
                }
2✔
1511
                // Make sure we are meta leader.
1512
                if !s.JetStreamIsLeader() {
1,455✔
1513
                        return
624✔
1514
                }
624✔
1515
        }
1516

1517
        if errorOnRequiredApiLevel(hdr) {
309✔
1518
                resp.Error = NewJSRequiredApiLevelError()
1✔
1519
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1520
                return
1✔
1521
        }
1✔
1522

1523
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
307✔
1524
                if doErr {
×
1525
                        resp.Error = NewJSNotEnabledForAccountError()
×
1526
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1527
                }
×
1528
                return
×
1529
        }
1530
        var ncfg StreamConfigRequest
307✔
1531
        if err := s.unmarshalRequest(c, acc, subject, msg, &ncfg); err != nil {
308✔
1532
                resp.Error = NewJSInvalidJSONError(err)
1✔
1533
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1534
                return
1✔
1535
        }
1✔
1536

1537
        cfg, apiErr := s.checkStreamCfg(&ncfg.StreamConfig, acc, ncfg.Pedantic)
306✔
1538
        if apiErr != nil {
325✔
1539
                resp.Error = apiErr
19✔
1540
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
19✔
1541
                return
19✔
1542
        }
19✔
1543

1544
        streamName := streamNameFromSubject(subject)
287✔
1545
        if streamName != cfg.Name {
288✔
1546
                resp.Error = NewJSStreamMismatchError()
1✔
1547
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1548
                return
1✔
1549
        }
1✔
1550

1551
        // Handle clustered version here.
1552
        if s.JetStreamIsClustered() {
488✔
1553
                s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
202✔
1554
                return
202✔
1555
        }
202✔
1556

1557
        mset, err := acc.lookupStream(streamName)
84✔
1558
        if err != nil {
88✔
1559
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
1560
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
1561
                return
4✔
1562
        }
4✔
1563
        if mset.offlineReason != _EMPTY_ {
80✔
1564
                resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason))
×
1565
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
1566
                return
×
1567
        }
×
1568

1569
        // Update asset version metadata.
1570
        setStaticStreamMetadata(&cfg)
80✔
1571

80✔
1572
        if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil {
92✔
1573
                resp.Error = NewJSStreamUpdateError(err, Unless(err))
12✔
1574
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
12✔
1575
                return
12✔
1576
        }
12✔
1577

1578
        msetCfg := mset.config()
68✔
1579
        resp.StreamInfo = &StreamInfo{
68✔
1580
                Created:   mset.createdTime(),
68✔
1581
                State:     mset.state(),
68✔
1582
                Config:    *setDynamicStreamMetadata(&msetCfg),
68✔
1583
                Domain:    s.getOpts().JetStreamDomain,
68✔
1584
                Mirror:    mset.mirrorInfo(),
68✔
1585
                Sources:   mset.sourcesInfo(),
68✔
1586
                TimeStamp: time.Now().UTC(),
68✔
1587
        }
68✔
1588
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
68✔
1589
}
1590

1591
// Request for the list of all stream names.
1592
func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,319✔
1593
        if c == nil || !s.JetStreamEnabled() {
1,319✔
1594
                return
×
1595
        }
×
1596
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
1,319✔
1597
        if err != nil {
1,319✔
1598
                s.Warnf(badAPIRequestT, msg)
×
1599
                return
×
1600
        }
×
1601

1602
        var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}}
1,319✔
1603

1,319✔
1604
        // Determine if we should proceed here when we are in clustered mode.
1,319✔
1605
        if s.JetStreamIsClustered() {
2,362✔
1606
                js, cc := s.getJetStreamCluster()
1,043✔
1607
                if js == nil || cc == nil {
1,043✔
1608
                        return
×
1609
                }
×
1610
                if js.isLeaderless() {
1,043✔
1611
                        resp.Error = NewJSClusterNotAvailError()
×
1612
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1613
                        return
×
1614
                }
×
1615
                // Make sure we are meta leader.
1616
                if !s.JetStreamIsLeader() {
1,805✔
1617
                        return
762✔
1618
                }
762✔
1619
        }
1620

1621
        if errorOnRequiredApiLevel(hdr) {
558✔
1622
                resp.Error = NewJSRequiredApiLevelError()
1✔
1623
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1624
                return
1✔
1625
        }
1✔
1626

1627
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
557✔
1628
                if doErr {
1✔
1629
                        resp.Error = NewJSNotEnabledForAccountError()
×
1630
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1631
                }
×
1632
                return
1✔
1633
        }
1634

1635
        var offset int
555✔
1636
        var filter string
555✔
1637

555✔
1638
        if isJSONObjectOrArray(msg) {
929✔
1639
                var req JSApiStreamNamesRequest
374✔
1640
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
374✔
1641
                        resp.Error = NewJSInvalidJSONError(err)
×
1642
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1643
                        return
×
1644
                }
×
1645
                offset = req.Offset
374✔
1646
                if req.Subject != _EMPTY_ {
724✔
1647
                        filter = req.Subject
350✔
1648
                }
350✔
1649
        }
1650

1651
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1652
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1653
        var numStreams int
555✔
1654
        if s.JetStreamIsClustered() {
836✔
1655
                js, cc := s.getJetStreamCluster()
281✔
1656
                if js == nil || cc == nil {
281✔
1657
                        // TODO(dlc) - Debug or Warn?
×
1658
                        return
×
1659
                }
×
1660
                js.mu.RLock()
281✔
1661
                for stream, sa := range cc.streams[acc.Name] {
610✔
1662
                        if IsNatsErr(sa.err, JSClusterNotAssignedErr) {
329✔
1663
                                continue
×
1664
                        }
1665
                        if filter != _EMPTY_ {
615✔
1666
                                // These could not have subjects auto-filled in since they are raw and unprocessed.
286✔
1667
                                if len(sa.Config.Subjects) == 0 {
288✔
1668
                                        if SubjectsCollide(filter, sa.Config.Name) {
2✔
1669
                                                resp.Streams = append(resp.Streams, stream)
×
1670
                                        }
×
1671
                                } else {
284✔
1672
                                        for _, subj := range sa.Config.Subjects {
568✔
1673
                                                if SubjectsCollide(filter, subj) {
513✔
1674
                                                        resp.Streams = append(resp.Streams, stream)
229✔
1675
                                                        break
229✔
1676
                                                }
1677
                                        }
1678
                                }
1679
                        } else {
43✔
1680
                                resp.Streams = append(resp.Streams, stream)
43✔
1681
                        }
43✔
1682
                }
1683
                js.mu.RUnlock()
281✔
1684
                if len(resp.Streams) > 1 {
283✔
1685
                        slices.Sort(resp.Streams)
2✔
1686
                }
2✔
1687
                numStreams = len(resp.Streams)
281✔
1688
                if offset > numStreams {
281✔
1689
                        offset = numStreams
×
1690
                }
×
1691
                if offset > 0 {
281✔
1692
                        resp.Streams = resp.Streams[offset:]
×
1693
                }
×
1694
                if len(resp.Streams) > JSApiNamesLimit {
281✔
1695
                        resp.Streams = resp.Streams[:JSApiNamesLimit]
×
1696
                }
×
1697
        } else {
274✔
1698
                msets := acc.filteredStreams(filter)
274✔
1699
                // Since we page results order matters.
274✔
1700
                if len(msets) > 1 {
280✔
1701
                        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
19✔
1702
                }
1703

1704
                numStreams = len(msets)
274✔
1705
                if offset > numStreams {
274✔
1706
                        offset = numStreams
×
1707
                }
×
1708

1709
                for _, mset := range msets[offset:] {
558✔
1710
                        resp.Streams = append(resp.Streams, mset.cfg.Name)
284✔
1711
                        if len(resp.Streams) >= JSApiNamesLimit {
284✔
1712
                                break
×
1713
                        }
1714
                }
1715
        }
1716
        resp.Total = numStreams
555✔
1717
        resp.Limit = JSApiNamesLimit
555✔
1718
        resp.Offset = offset
555✔
1719

555✔
1720
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
555✔
1721
}
1722

1723
// Request for the list of all detailed stream info.
1724
// TODO(dlc) - combine with above long term
1725
func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
67✔
1726
        if c == nil || !s.JetStreamEnabled() {
67✔
1727
                return
×
1728
        }
×
1729
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
67✔
1730
        if err != nil {
67✔
1731
                s.Warnf(badAPIRequestT, msg)
×
1732
                return
×
1733
        }
×
1734

1735
        var resp = JSApiStreamListResponse{
67✔
1736
                ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
67✔
1737
                Streams:     []*StreamInfo{},
67✔
1738
        }
67✔
1739

67✔
1740
        // Determine if we should proceed here when we are in clustered mode.
67✔
1741
        if s.JetStreamIsClustered() {
128✔
1742
                js, cc := s.getJetStreamCluster()
61✔
1743
                if js == nil || cc == nil {
61✔
1744
                        return
×
1745
                }
×
1746
                if js.isLeaderless() {
62✔
1747
                        resp.Error = NewJSClusterNotAvailError()
1✔
1748
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1749
                        return
1✔
1750
                }
1✔
1751
                // Make sure we are meta leader.
1752
                if !s.JetStreamIsLeader() {
103✔
1753
                        return
43✔
1754
                }
43✔
1755
        }
1756

1757
        if errorOnRequiredApiLevel(hdr) {
24✔
1758
                resp.Error = NewJSRequiredApiLevelError()
1✔
1759
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1760
                return
1✔
1761
        }
1✔
1762

1763
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
22✔
1764
                if doErr {
×
1765
                        resp.Error = NewJSNotEnabledForAccountError()
×
1766
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1767
                }
×
1768
                return
×
1769
        }
1770

1771
        var offset int
22✔
1772
        var filter string
22✔
1773

22✔
1774
        if isJSONObjectOrArray(msg) {
33✔
1775
                var req JSApiStreamListRequest
11✔
1776
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
1777
                        resp.Error = NewJSInvalidJSONError(err)
1✔
1778
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1779
                        return
1✔
1780
                }
1✔
1781
                offset = req.Offset
10✔
1782
                if req.Subject != _EMPTY_ {
12✔
1783
                        filter = req.Subject
2✔
1784
                }
2✔
1785
        }
1786

1787
        // Clustered mode will invoke a scatter and gather.
1788
        if s.JetStreamIsClustered() {
38✔
1789
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
17✔
1790
                msg = copyBytes(msg)
17✔
1791
                s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) })
34✔
1792
                return
17✔
1793
        }
1794

1795
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1796
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1797
        var msets []*stream
4✔
1798
        if filter == _EMPTY_ {
7✔
1799
                msets = acc.streams()
3✔
1800
        } else {
4✔
1801
                msets = acc.filteredStreams(filter)
1✔
1802
        }
1✔
1803

1804
        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
5✔
1805

1806
        scnt := len(msets)
4✔
1807
        if offset > scnt {
4✔
1808
                offset = scnt
×
1809
        }
×
1810

1811
        var missingNames []string
4✔
1812
        for _, mset := range msets[offset:] {
9✔
1813
                if mset.offlineReason != _EMPTY_ {
6✔
1814
                        if resp.Offline == nil {
2✔
1815
                                resp.Offline = make(map[string]string, 1)
1✔
1816
                        }
1✔
1817
                        resp.Offline[mset.getCfgName()] = mset.offlineReason
1✔
1818
                        missingNames = append(missingNames, mset.getCfgName())
1✔
1819
                        continue
1✔
1820
                }
1821

1822
                config := mset.config()
4✔
1823
                resp.Streams = append(resp.Streams, &StreamInfo{
4✔
1824
                        Created:   mset.createdTime(),
4✔
1825
                        State:     mset.state(),
4✔
1826
                        Config:    config,
4✔
1827
                        Domain:    s.getOpts().JetStreamDomain,
4✔
1828
                        Mirror:    mset.mirrorInfo(),
4✔
1829
                        Sources:   mset.sourcesInfo(),
4✔
1830
                        TimeStamp: time.Now().UTC(),
4✔
1831
                })
4✔
1832
                if len(resp.Streams) >= JSApiListLimit {
4✔
1833
                        break
×
1834
                }
1835
        }
1836
        resp.Total = scnt
4✔
1837
        resp.Limit = JSApiListLimit
4✔
1838
        resp.Offset = offset
4✔
1839
        resp.Missing = missingNames
4✔
1840
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
4✔
1841
}
1842

1843
// Request for information about a stream.
1844
func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
24,853✔
1845
        if c == nil || !s.JetStreamEnabled() {
24,860✔
1846
                return
7✔
1847
        }
7✔
1848
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
24,846✔
1849
        if err != nil {
24,851✔
1850
                s.Warnf(badAPIRequestT, msg)
5✔
1851
                return
5✔
1852
        }
5✔
1853

1854
        streamName := streamNameFromSubject(subject)
24,841✔
1855

24,841✔
1856
        var resp = JSApiStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamInfoResponseType}}
24,841✔
1857

24,841✔
1858
        // If someone creates a duplicate stream that is identical we will get this request forwarded to us.
24,841✔
1859
        // Make sure the response type is for a create call.
24,841✔
1860
        if rt := getHeader(JSResponseType, hdr); len(rt) > 0 && string(rt) == jsCreateResponse {
24,841✔
1861
                resp.ApiResponse.Type = JSApiStreamCreateResponseType
×
1862
        }
×
1863

1864
        var clusterWideConsCount int
24,841✔
1865

24,841✔
1866
        js, cc := s.getJetStreamCluster()
24,841✔
1867
        if js == nil {
24,841✔
1868
                return
×
1869
        }
×
1870
        // If we are in clustered mode we need to be the stream leader to proceed.
1871
        if cc != nil {
34,831✔
1872
                // Check to make sure the stream is assigned.
9,990✔
1873
                js.mu.RLock()
9,990✔
1874
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName)
9,990✔
1875
                var offline bool
9,990✔
1876
                if sa != nil {
18,763✔
1877
                        clusterWideConsCount = len(sa.consumers)
8,773✔
1878
                        offline = s.allPeersOffline(sa.Group)
8,773✔
1879
                        if sa.unsupported != nil && sa.Group != nil && cc.meta != nil && sa.Group.isMember(cc.meta.ID()) {
8,803✔
1880
                                // If we're a member for this stream, and it's not supported, report it as offline.
30✔
1881
                                resp.Error = NewJSStreamOfflineReasonError(errors.New(sa.unsupported.reason))
30✔
1882
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
30✔
1883
                                js.mu.RUnlock()
30✔
1884
                                return
30✔
1885
                        }
30✔
1886
                }
1887
                js.mu.RUnlock()
9,960✔
1888

9,960✔
1889
                if isLeader && sa == nil {
10,222✔
1890
                        // We can't find the stream, so mimic what would be the errors below.
262✔
1891
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
262✔
1892
                                if doErr {
×
1893
                                        resp.Error = NewJSNotEnabledForAccountError()
×
1894
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1895
                                }
×
1896
                                return
×
1897
                        }
1898
                        // No stream present.
1899
                        resp.Error = NewJSStreamNotFoundError()
262✔
1900
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
262✔
1901
                        return
262✔
1902
                } else if sa == nil {
10,653✔
1903
                        if js.isLeaderless() {
955✔
1904
                                resp.Error = NewJSClusterNotAvailError()
×
1905
                                // Delaying an error response gives the leader a chance to respond before us
×
1906
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
1907
                        }
×
1908
                        return
955✔
1909
                } else if isLeader && offline {
8,745✔
1910
                        resp.Error = NewJSStreamOfflineError()
2✔
1911
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
1912
                        return
2✔
1913
                }
2✔
1914

1915
                // Check to see if we are a member of the group and if the group has no leader.
1916
                isLeaderless := js.isGroupLeaderless(sa.Group)
8,741✔
1917

8,741✔
1918
                // We have the stream assigned and a leader, so only the stream leader should answer.
8,741✔
1919
                if !acc.JetStreamIsStreamLeader(streamName) && !isLeaderless {
15,755✔
1920
                        if js.isLeaderless() {
7,014✔
1921
                                resp.Error = NewJSClusterNotAvailError()
×
1922
                                // Delaying an error response gives the leader a chance to respond before us
×
1923
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), sa.Group, errRespDelay)
×
1924
                                return
×
1925
                        }
×
1926

1927
                        // We may be in process of electing a leader, but if this is a scale up from 1 we will still be the state leader
1928
                        // while the new members work through the election and catchup process.
1929
                        // Double check for that instead of exiting here and being silent. e.g. nats stream update test --replicas=3
1930
                        js.mu.RLock()
7,014✔
1931
                        rg := sa.Group
7,014✔
1932
                        var ourID string
7,014✔
1933
                        if cc.meta != nil {
14,028✔
1934
                                ourID = cc.meta.ID()
7,014✔
1935
                        }
7,014✔
1936
                        // We have seen cases where rg is nil at this point,
1937
                        // so check explicitly and bail if that is the case.
1938
                        bail := rg == nil || !rg.isMember(ourID)
7,014✔
1939
                        if !bail {
9,498✔
1940
                                // We know we are a member here, if this group is new and we are preferred allow us to answer.
2,484✔
1941
                                // Also, we have seen cases where rg.node is nil at this point,
2,484✔
1942
                                // so check explicitly and bail if that is the case.
2,484✔
1943
                                bail = rg.Preferred != ourID || (rg.node != nil && time.Since(rg.node.Created()) > lostQuorumIntervalDefault)
2,484✔
1944
                        }
2,484✔
1945
                        js.mu.RUnlock()
7,014✔
1946
                        if bail {
14,016✔
1947
                                return
7,002✔
1948
                        }
7,002✔
1949
                }
1950
        }
1951

1952
        if errorOnRequiredApiLevel(hdr) {
16,591✔
1953
                resp.Error = NewJSRequiredApiLevelError()
1✔
1954
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1955
                return
1✔
1956
        }
1✔
1957

1958
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
16,595✔
1959
                if doErr {
7✔
1960
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
1961
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1962
                }
1✔
1963
                return
6✔
1964
        }
1965

1966
        var details bool
16,583✔
1967
        var subjects string
16,583✔
1968
        var offset int
16,583✔
1969
        if isJSONObjectOrArray(msg) {
16,618✔
1970
                var req JSApiStreamInfoRequest
35✔
1971
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
36✔
1972
                        resp.Error = NewJSInvalidJSONError(err)
1✔
1973
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1974
                        return
1✔
1975
                }
1✔
1976
                details, subjects = req.DeletedDetails, req.SubjectsFilter
34✔
1977
                offset = req.Offset
34✔
1978
        }
1979

1980
        mset, err := acc.lookupStream(streamName)
16,582✔
1981
        // Error is not to be expected at this point, but could happen if same stream trying to be created.
16,582✔
1982
        if err != nil {
17,329✔
1983
                if cc != nil {
747✔
1984
                        // This could be inflight, pause for a short bit and try again.
×
1985
                        // This will not be inline, so ok.
×
1986
                        time.Sleep(10 * time.Millisecond)
×
1987
                        mset, err = acc.lookupStream(streamName)
×
1988
                }
×
1989
                // Check again.
1990
                if err != nil {
1,494✔
1991
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
747✔
1992
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
747✔
1993
                        return
747✔
1994
                }
747✔
1995
        }
1996

1997
        if mset.offlineReason != _EMPTY_ {
15,836✔
1998
                resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason))
1✔
1999
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
2000
                return
1✔
2001
        }
1✔
2002

2003
        config := mset.config()
15,834✔
2004
        resp.StreamInfo = &StreamInfo{
15,834✔
2005
                Created:    mset.createdTime(),
15,834✔
2006
                State:      mset.stateWithDetail(details),
15,834✔
2007
                Config:     *setDynamicStreamMetadata(&config),
15,834✔
2008
                Domain:     s.getOpts().JetStreamDomain,
15,834✔
2009
                Cluster:    js.clusterInfo(mset.raftGroup()),
15,834✔
2010
                Mirror:     mset.mirrorInfo(),
15,834✔
2011
                Sources:    mset.sourcesInfo(),
15,834✔
2012
                Alternates: js.streamAlternates(ci, config.Name),
15,834✔
2013
                TimeStamp:  time.Now().UTC(),
15,834✔
2014
        }
15,834✔
2015
        if clusterWideConsCount > 0 {
16,306✔
2016
                resp.StreamInfo.State.Consumers = clusterWideConsCount
472✔
2017
        }
472✔
2018

2019
        // Check if they have asked for subject details.
2020
        if subjects != _EMPTY_ {
15,866✔
2021
                st := mset.store.SubjectsTotals(subjects)
32✔
2022
                if lst := len(st); lst > 0 {
60✔
2023
                        // Common for both cases.
28✔
2024
                        resp.Offset = offset
28✔
2025
                        resp.Limit = JSMaxSubjectDetails
28✔
2026
                        resp.Total = lst
28✔
2027

28✔
2028
                        if offset == 0 && lst <= JSMaxSubjectDetails {
56✔
2029
                                resp.StreamInfo.State.Subjects = st
28✔
2030
                        } else {
28✔
2031
                                // Here we have to filter list due to offset or maximum constraints.
×
2032
                                subjs := make([]string, 0, len(st))
×
2033
                                for subj := range st {
×
2034
                                        subjs = append(subjs, subj)
×
2035
                                }
×
2036
                                // Sort it
2037
                                slices.Sort(subjs)
×
2038

×
2039
                                if offset > len(subjs) {
×
2040
                                        offset = len(subjs)
×
2041
                                }
×
2042

2043
                                end := offset + JSMaxSubjectDetails
×
2044
                                if end > len(subjs) {
×
2045
                                        end = len(subjs)
×
2046
                                }
×
2047
                                actualSize := end - offset
×
2048
                                var sd map[string]uint64
×
2049

×
2050
                                if actualSize > 0 {
×
2051
                                        sd = make(map[string]uint64, actualSize)
×
2052
                                        for _, ss := range subjs[offset:end] {
×
2053
                                                sd[ss] = st[ss]
×
2054
                                        }
×
2055
                                }
2056
                                resp.StreamInfo.State.Subjects = sd
×
2057
                        }
2058
                }
2059
        }
2060
        // Check for out of band catchups.
2061
        if mset.hasCatchupPeers() {
15,834✔
2062
                mset.checkClusterInfo(resp.StreamInfo.Cluster)
×
2063
        }
×
2064

2065
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
15,834✔
2066
}
2067

2068
// Request to have a stream leader stepdown.
2069
func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
265✔
2070
        if c == nil || !s.JetStreamEnabled() {
265✔
2071
                return
×
2072
        }
×
2073
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
265✔
2074
        if err != nil {
265✔
2075
                s.Warnf(badAPIRequestT, msg)
×
2076
                return
×
2077
        }
×
2078

2079
        // Have extra token for this one.
2080
        name := tokenAt(subject, 6)
265✔
2081

265✔
2082
        var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}}
265✔
2083

265✔
2084
        // If we are not in clustered mode this is a failed request.
265✔
2085
        if !s.JetStreamIsClustered() {
266✔
2086
                resp.Error = NewJSClusterRequiredError()
1✔
2087
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2088
                return
1✔
2089
        }
1✔
2090

2091
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2092
        js, cc := s.getJetStreamCluster()
264✔
2093
        if js == nil || cc == nil {
264✔
2094
                return
×
2095
        }
×
2096
        if js.isLeaderless() {
264✔
2097
                resp.Error = NewJSClusterNotAvailError()
×
2098
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2099
                return
×
2100
        }
×
2101

2102
        js.mu.RLock()
264✔
2103
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
264✔
2104
        js.mu.RUnlock()
264✔
2105

264✔
2106
        if isLeader && sa == nil {
264✔
2107
                resp.Error = NewJSStreamNotFoundError()
×
2108
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2109
                return
×
2110
        } else if sa == nil {
265✔
2111
                return
1✔
2112
        }
1✔
2113

2114
        if errorOnRequiredApiLevel(hdr) {
263✔
2115
                resp.Error = NewJSRequiredApiLevelError()
×
2116
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2117
                return
×
2118
        }
×
2119

2120
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
263✔
2121
                if doErr {
×
2122
                        resp.Error = NewJSNotEnabledForAccountError()
×
2123
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2124
                }
×
2125
                return
×
2126
        }
2127

2128
        // Check to see if we are a member of the group and if the group has no leader.
2129
        if js.isGroupLeaderless(sa.Group) {
263✔
2130
                resp.Error = NewJSClusterNotAvailError()
×
2131
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2132
                return
×
2133
        }
×
2134

2135
        // We have the stream assigned and a leader, so only the stream leader should answer.
2136
        if !acc.JetStreamIsStreamLeader(name) {
467✔
2137
                return
204✔
2138
        }
204✔
2139

2140
        mset, err := acc.lookupStream(name)
59✔
2141
        if err != nil {
59✔
2142
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
2143
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2144
                return
×
2145
        }
×
2146

2147
        if mset == nil {
59✔
2148
                resp.Success = true
×
2149
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2150
                return
×
2151
        }
×
2152

2153
        node := mset.raftNode()
59✔
2154
        if node == nil {
59✔
2155
                resp.Success = true
×
2156
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2157
                return
×
2158
        }
×
2159

2160
        var preferredLeader string
59✔
2161
        if isJSONObjectOrArray(msg) {
72✔
2162
                var req JSApiLeaderStepdownRequest
13✔
2163
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2164
                        resp.Error = NewJSInvalidJSONError(err)
×
2165
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2166
                        return
×
2167
                }
×
2168
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(node, req.Placement); resp.Error != nil {
18✔
2169
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2170
                        return
5✔
2171
                }
5✔
2172
        }
2173

2174
        // Call actual stepdown.
2175
        err = node.StepDown(preferredLeader)
54✔
2176
        if err != nil {
54✔
2177
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2178
        } else {
54✔
2179
                resp.Success = true
54✔
2180
        }
54✔
2181
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
54✔
2182
}
2183

2184
// Request to have a consumer leader stepdown.
2185
func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
224✔
2186
        if c == nil || !s.JetStreamEnabled() {
224✔
2187
                return
×
2188
        }
×
2189
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
224✔
2190
        if err != nil {
224✔
2191
                s.Warnf(badAPIRequestT, msg)
×
2192
                return
×
2193
        }
×
2194

2195
        var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}}
224✔
2196

224✔
2197
        // If we are not in clustered mode this is a failed request.
224✔
2198
        if !s.JetStreamIsClustered() {
225✔
2199
                resp.Error = NewJSClusterRequiredError()
1✔
2200
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2201
                return
1✔
2202
        }
1✔
2203

2204
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2205
        js, cc := s.getJetStreamCluster()
223✔
2206
        if js == nil || cc == nil {
223✔
2207
                return
×
2208
        }
×
2209
        if js.isLeaderless() {
223✔
2210
                resp.Error = NewJSClusterNotAvailError()
×
2211
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2212
                return
×
2213
        }
×
2214

2215
        // Have extra token for this one.
2216
        stream := tokenAt(subject, 6)
223✔
2217
        consumer := tokenAt(subject, 7)
223✔
2218

223✔
2219
        js.mu.RLock()
223✔
2220
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
223✔
2221
        js.mu.RUnlock()
223✔
2222

223✔
2223
        if isLeader && sa == nil {
223✔
2224
                resp.Error = NewJSStreamNotFoundError()
×
2225
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2226
                return
×
2227
        } else if sa == nil {
223✔
2228
                return
×
2229
        }
×
2230

2231
        if errorOnRequiredApiLevel(hdr) {
223✔
2232
                resp.Error = NewJSRequiredApiLevelError()
×
2233
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2234
                return
×
2235
        }
×
2236

2237
        var ca *consumerAssignment
223✔
2238
        if sa.consumers != nil {
446✔
2239
                ca = sa.consumers[consumer]
223✔
2240
        }
223✔
2241
        if ca == nil {
223✔
2242
                resp.Error = NewJSConsumerNotFoundError()
×
2243
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2244
                return
×
2245
        }
×
2246
        // Check to see if we are a member of the group and if the group has no leader.
2247
        if js.isGroupLeaderless(ca.Group) {
223✔
2248
                resp.Error = NewJSClusterNotAvailError()
×
2249
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2250
                return
×
2251
        }
×
2252

2253
        if !acc.JetStreamIsConsumerLeader(stream, consumer) {
384✔
2254
                return
161✔
2255
        }
161✔
2256

2257
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
62✔
2258
                if doErr {
×
2259
                        resp.Error = NewJSNotEnabledForAccountError()
×
2260
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2261
                }
×
2262
                return
×
2263
        }
2264

2265
        mset, err := acc.lookupStream(stream)
62✔
2266
        if err != nil {
62✔
2267
                resp.Error = NewJSStreamNotFoundError()
×
2268
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2269
                return
×
2270
        }
×
2271
        o := mset.lookupConsumer(consumer)
62✔
2272
        if o == nil {
62✔
2273
                resp.Error = NewJSConsumerNotFoundError()
×
2274
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2275
                return
×
2276
        }
×
2277

2278
        n := o.raftNode()
62✔
2279
        if n == nil {
62✔
2280
                resp.Success = true
×
2281
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2282
                return
×
2283
        }
×
2284

2285
        var preferredLeader string
62✔
2286
        if isJSONObjectOrArray(msg) {
74✔
2287
                var req JSApiLeaderStepdownRequest
12✔
2288
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
2289
                        resp.Error = NewJSInvalidJSONError(err)
×
2290
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2291
                        return
×
2292
                }
×
2293
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(n, req.Placement); resp.Error != nil {
17✔
2294
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2295
                        return
5✔
2296
                }
5✔
2297
        }
2298

2299
        // Call actual stepdown.
2300
        err = n.StepDown(preferredLeader)
57✔
2301
        if err != nil {
57✔
2302
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2303
        } else {
57✔
2304
                resp.Success = true
57✔
2305
        }
57✔
2306
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
57✔
2307
}
2308

2309
// Request to remove a peer from a clustered stream.
2310
func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
70✔
2311
        if c == nil || !s.JetStreamEnabled() {
70✔
2312
                return
×
2313
        }
×
2314
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
70✔
2315
        if err != nil {
70✔
2316
                s.Warnf(badAPIRequestT, msg)
×
2317
                return
×
2318
        }
×
2319

2320
        // Have extra token for this one.
2321
        name := tokenAt(subject, 6)
70✔
2322

70✔
2323
        var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}}
70✔
2324

70✔
2325
        // If we are not in clustered mode this is a failed request.
70✔
2326
        if !s.JetStreamIsClustered() {
71✔
2327
                resp.Error = NewJSClusterRequiredError()
1✔
2328
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2329
                return
1✔
2330
        }
1✔
2331

2332
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2333
        js, cc := s.getJetStreamCluster()
69✔
2334
        if js == nil || cc == nil {
69✔
2335
                return
×
2336
        }
×
2337
        if js.isLeaderless() {
69✔
2338
                resp.Error = NewJSClusterNotAvailError()
×
2339
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2340
                return
×
2341
        }
×
2342

2343
        js.mu.RLock()
69✔
2344
        isLeader, sa := cc.isLeader(), js.streamAssignmentOrInflight(acc.Name, name)
69✔
2345
        js.mu.RUnlock()
69✔
2346

69✔
2347
        // Make sure we are meta leader.
69✔
2348
        if !isLeader {
127✔
2349
                return
58✔
2350
        }
58✔
2351

2352
        if errorOnRequiredApiLevel(hdr) {
11✔
2353
                resp.Error = NewJSRequiredApiLevelError()
×
2354
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2355
                return
×
2356
        }
×
2357

2358
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
2359
                if doErr {
×
2360
                        resp.Error = NewJSNotEnabledForAccountError()
×
2361
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2362
                }
×
2363
                return
×
2364
        }
2365
        if isEmptyRequest(msg) {
11✔
2366
                resp.Error = NewJSBadRequestError()
×
2367
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2368
                return
×
2369
        }
×
2370

2371
        var req JSApiStreamRemovePeerRequest
11✔
2372
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
2373
                resp.Error = NewJSInvalidJSONError(err)
×
2374
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2375
                return
×
2376
        }
×
2377
        if req.Peer == _EMPTY_ {
11✔
2378
                resp.Error = NewJSBadRequestError()
×
2379
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2380
                return
×
2381
        }
×
2382

2383
        if sa == nil {
11✔
2384
                // No stream present.
×
2385
                resp.Error = NewJSStreamNotFoundError()
×
2386
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2387
                return
×
2388
        }
×
2389

2390
        // Check to see if we are a member of the group and if the group has no leader.
2391
        // Peers here is a server name, convert to node name.
2392
        nodeName := getHash(req.Peer)
11✔
2393

11✔
2394
        js.mu.RLock()
11✔
2395
        rg := sa.Group
11✔
2396
        isMember := rg.isMember(nodeName)
11✔
2397
        js.mu.RUnlock()
11✔
2398

11✔
2399
        // Make sure we are a member.
11✔
2400
        if !isMember {
12✔
2401
                resp.Error = NewJSClusterPeerNotMemberError()
1✔
2402
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2403
                return
1✔
2404
        }
1✔
2405

2406
        // If we are here we have a valid peer member set for removal.
2407
        if !js.removePeerFromStream(sa, nodeName) {
12✔
2408
                resp.Error = NewJSPeerRemapError()
2✔
2409
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2410
                return
2✔
2411
        }
2✔
2412

2413
        resp.Success = true
8✔
2414
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
8✔
2415
}
2416

2417
// Request to have the metaleader remove a peer from the system.
2418
func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
13✔
2419
        if c == nil || !s.JetStreamEnabled() {
13✔
2420
                return
×
2421
        }
×
2422

2423
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
13✔
2424
        if err != nil {
13✔
2425
                s.Warnf(badAPIRequestT, msg)
×
2426
                return
×
2427
        }
×
2428
        if acc != s.SystemAccount() {
13✔
2429
                return
×
2430
        }
×
2431

2432
        js, cc := s.getJetStreamCluster()
13✔
2433
        if js == nil || cc == nil {
13✔
2434
                return
×
2435
        }
×
2436

2437
        js.mu.RLock()
13✔
2438
        isLeader := cc.isLeader()
13✔
2439
        meta := cc.meta
13✔
2440
        js.mu.RUnlock()
13✔
2441

13✔
2442
        // Extra checks here but only leader is listening.
13✔
2443
        if !isLeader {
13✔
2444
                return
×
2445
        }
×
2446

2447
        var resp = JSApiMetaServerRemoveResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerRemoveResponseType}}
13✔
2448
        if errorOnRequiredApiLevel(hdr) {
13✔
2449
                resp.Error = NewJSRequiredApiLevelError()
×
2450
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2451
                return
×
2452
        }
×
2453

2454
        if isEmptyRequest(msg) {
13✔
2455
                resp.Error = NewJSBadRequestError()
×
2456
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2457
                return
×
2458
        }
×
2459

2460
        var req JSApiMetaServerRemoveRequest
13✔
2461
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2462
                resp.Error = NewJSInvalidJSONError(err)
×
2463
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2464
                return
×
2465
        }
×
2466

2467
        js.mu.Lock()
13✔
2468
        defer js.mu.Unlock()
13✔
2469

13✔
2470
        // Another peer-remove is already in progress, don't allow multiple concurrent changes.
13✔
2471
        if cc.peerRemoveReply != nil {
15✔
2472
                resp.Error = NewJSClusterServerMemberChangeInflightError()
2✔
2473
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2474
                return
2✔
2475
        }
2✔
2476

2477
        var found string
11✔
2478
        for _, p := range meta.Peers() {
43✔
2479
                // If Peer is specified, it takes precedence
32✔
2480
                if req.Peer != _EMPTY_ {
36✔
2481
                        if p.ID == req.Peer {
5✔
2482
                                found = req.Peer
1✔
2483
                                break
1✔
2484
                        }
2485
                        continue
3✔
2486
                }
2487
                si, ok := s.nodeToInfo.Load(p.ID)
28✔
2488
                if ok && si.(nodeInfo).name == req.Server {
34✔
2489
                        found = p.ID
6✔
2490
                        break
6✔
2491
                }
2492
        }
2493

2494
        if found == _EMPTY_ {
15✔
2495
                resp.Error = NewJSClusterServerNotMemberError()
4✔
2496
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
2497
                return
4✔
2498
        }
4✔
2499

2500
        if err := meta.ProposeRemovePeer(found); err != nil {
7✔
2501
                if err == errMembershipChange {
×
2502
                        resp.Error = NewJSClusterServerMemberChangeInflightError()
×
2503
                } else {
×
2504
                        resp.Error = NewJSRaftGeneralError(err)
×
2505
                }
×
2506
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2507
                return
×
2508
        }
2509

2510
        if cc.peerRemoveReply == nil {
14✔
2511
                cc.peerRemoveReply = make(map[string]peerRemoveInfo, 1)
7✔
2512
        }
7✔
2513
        // Only copy the request, the subject and reply are already copied.
2514
        cc.peerRemoveReply[found] = peerRemoveInfo{ci: ci, subject: subject, reply: reply, request: string(msg)}
7✔
2515
}
2516

2517
func (s *Server) peerSetToNames(ps []string) []string {
173✔
2518
        names := make([]string, len(ps))
173✔
2519
        for i := 0; i < len(ps); i++ {
642✔
2520
                if si, ok := s.nodeToInfo.Load(ps[i]); !ok {
469✔
2521
                        names[i] = ps[i]
×
2522
                } else {
469✔
2523
                        names[i] = si.(nodeInfo).name
469✔
2524
                }
469✔
2525
        }
2526
        return names
173✔
2527
}
2528

2529
// looks up the peer id for a given server name. Cluster and domain name are optional filter criteria
2530
func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string {
29✔
2531
        js.mu.RLock()
29✔
2532
        defer js.mu.RUnlock()
29✔
2533
        if cc := js.cluster; cc != nil {
58✔
2534
                for _, p := range cc.meta.Peers() {
145✔
2535
                        si, ok := s.nodeToInfo.Load(p.ID)
116✔
2536
                        if ok && si.(nodeInfo).name == serverName {
145✔
2537
                                if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
58✔
2538
                                        if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
58✔
2539
                                                return p.ID
29✔
2540
                                        }
29✔
2541
                                }
2542
                        }
2543
                }
2544
        }
2545
        return _EMPTY_
×
2546
}
2547

2548
// Request to have the metaleader move a stream on a peer to another
2549
func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
33✔
2550
        if c == nil || !s.JetStreamEnabled() {
33✔
2551
                return
×
2552
        }
×
2553

2554
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
33✔
2555
        if err != nil {
33✔
2556
                s.Warnf(badAPIRequestT, msg)
×
2557
                return
×
2558
        }
×
2559

2560
        js, cc := s.getJetStreamCluster()
33✔
2561
        if js == nil || cc == nil {
33✔
2562
                return
×
2563
        }
×
2564

2565
        // Extra checks here but only leader is listening.
2566
        js.mu.RLock()
33✔
2567
        isLeader := cc.isLeader()
33✔
2568
        js.mu.RUnlock()
33✔
2569

33✔
2570
        if !isLeader {
33✔
2571
                return
×
2572
        }
×
2573

2574
        accName := tokenAt(subject, 6)
33✔
2575
        streamName := tokenAt(subject, 7)
33✔
2576

33✔
2577
        if acc.GetName() != accName && acc != s.SystemAccount() {
33✔
2578
                return
×
2579
        }
×
2580

2581
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
33✔
2582
        if errorOnRequiredApiLevel(hdr) {
33✔
2583
                resp.Error = NewJSRequiredApiLevelError()
×
2584
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2585
                return
×
2586
        }
×
2587

2588
        var req JSApiMetaServerStreamMoveRequest
33✔
2589
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
33✔
2590
                resp.Error = NewJSInvalidJSONError(err)
×
2591
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2592
                return
×
2593
        }
×
2594

2595
        srcPeer := _EMPTY_
33✔
2596
        if req.Server != _EMPTY_ {
62✔
2597
                srcPeer = s.nameToPeer(js, req.Server, req.Cluster, req.Domain)
29✔
2598
        }
29✔
2599

2600
        targetAcc, ok := s.accounts.Load(accName)
33✔
2601
        if !ok {
33✔
2602
                resp.Error = NewJSNoAccountError()
×
2603
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2604
                return
×
2605
        }
×
2606

2607
        var streamFound bool
33✔
2608
        cfg := StreamConfig{}
33✔
2609
        currPeers := []string{}
33✔
2610
        currCluster := _EMPTY_
33✔
2611
        js.mu.Lock()
33✔
2612
        streams, ok := cc.streams[accName]
33✔
2613
        if ok {
66✔
2614
                sa, ok := streams[streamName]
33✔
2615
                if ok {
66✔
2616
                        cfg = *sa.Config.clone()
33✔
2617
                        streamFound = true
33✔
2618
                        currPeers = sa.Group.Peers
33✔
2619
                        currCluster = sa.Group.Cluster
33✔
2620
                }
33✔
2621
        }
2622
        js.mu.Unlock()
33✔
2623

33✔
2624
        if !streamFound {
33✔
2625
                resp.Error = NewJSStreamNotFoundError()
×
2626
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2627
                return
×
2628
        }
×
2629

2630
        // if server was picked, make sure src peer exists and move it to first position.
2631
        // removal will drop peers from the left
2632
        if req.Server != _EMPTY_ {
62✔
2633
                if srcPeer == _EMPTY_ {
29✔
2634
                        resp.Error = NewJSClusterServerNotMemberError()
×
2635
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2636
                        return
×
2637
                }
×
2638
                var peerFound bool
29✔
2639
                for i := 0; i < len(currPeers); i++ {
87✔
2640
                        if currPeers[i] == srcPeer {
87✔
2641
                                copy(currPeers[1:], currPeers[:i])
29✔
2642
                                currPeers[0] = srcPeer
29✔
2643
                                peerFound = true
29✔
2644
                                break
29✔
2645
                        }
2646
                }
2647
                if !peerFound {
29✔
2648
                        resp.Error = NewJSClusterPeerNotMemberError()
×
2649
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2650
                        return
×
2651
                }
×
2652
        }
2653

2654
        // make sure client is scoped to requested account
2655
        ciNew := *(ci)
33✔
2656
        ciNew.Account = accName
33✔
2657

33✔
2658
        // backup placement such that peers can be looked up with modified tag list
33✔
2659
        var origPlacement *Placement
33✔
2660
        if cfg.Placement != nil {
33✔
2661
                tmp := *cfg.Placement
×
2662
                origPlacement = &tmp
×
2663
        }
×
2664

2665
        if len(req.Tags) > 0 {
60✔
2666
                if cfg.Placement == nil {
54✔
2667
                        cfg.Placement = &Placement{}
27✔
2668
                }
27✔
2669
                cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...)
27✔
2670
        }
2671

2672
        peers, e := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1, nil)
33✔
2673
        if len(peers) <= cfg.Replicas {
35✔
2674
                // since expanding in the same cluster did not yield a result, try in different cluster
2✔
2675
                peers = nil
2✔
2676

2✔
2677
                clusters := map[string]struct{}{}
2✔
2678
                s.nodeToInfo.Range(func(_, ni any) bool {
18✔
2679
                        if currCluster != ni.(nodeInfo).cluster {
24✔
2680
                                clusters[ni.(nodeInfo).cluster] = struct{}{}
8✔
2681
                        }
8✔
2682
                        return true
16✔
2683
                })
2684
                errs := &selectPeerError{}
2✔
2685
                errs.accumulate(e)
2✔
2686
                for cluster := range clusters {
4✔
2687
                        newPeers, e := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0, nil)
2✔
2688
                        if len(newPeers) >= cfg.Replicas {
4✔
2689
                                peers = append([]string{}, currPeers...)
2✔
2690
                                peers = append(peers, newPeers[:cfg.Replicas]...)
2✔
2691
                                break
2✔
2692
                        }
2693
                        errs.accumulate(e)
×
2694
                }
2695
                if peers == nil {
2✔
2696
                        resp.Error = NewJSClusterNoPeersError(errs)
×
2697
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2698
                        return
×
2699
                }
×
2700
        }
2701

2702
        cfg.Placement = origPlacement
33✔
2703

33✔
2704
        s.Noticef("Requested move for stream '%s > %s' R=%d from %+v to %+v",
33✔
2705
                accName, streamName, cfg.Replicas, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
33✔
2706

33✔
2707
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
33✔
2708
        // We should be fine ignoring pedantic mode here. as we do not touch configuration.
33✔
2709
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
33✔
2710
}
2711

2712
// Request to have the metaleader move a stream on a peer to another
2713
func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
4✔
2714
        if c == nil || !s.JetStreamEnabled() {
4✔
2715
                return
×
2716
        }
×
2717

2718
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
4✔
2719
        if err != nil {
4✔
2720
                s.Warnf(badAPIRequestT, msg)
×
2721
                return
×
2722
        }
×
2723

2724
        js, cc := s.getJetStreamCluster()
4✔
2725
        if js == nil || cc == nil {
4✔
2726
                return
×
2727
        }
×
2728

2729
        // Extra checks here but only leader is listening.
2730
        js.mu.RLock()
4✔
2731
        isLeader := cc.isLeader()
4✔
2732
        js.mu.RUnlock()
4✔
2733

4✔
2734
        if !isLeader {
4✔
2735
                return
×
2736
        }
×
2737

2738
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
4✔
2739
        if errorOnRequiredApiLevel(hdr) {
4✔
2740
                resp.Error = NewJSRequiredApiLevelError()
×
2741
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2742
                return
×
2743
        }
×
2744

2745
        accName := tokenAt(subject, 6)
4✔
2746
        streamName := tokenAt(subject, 7)
4✔
2747

4✔
2748
        if acc.GetName() != accName && acc != s.SystemAccount() {
4✔
2749
                return
×
2750
        }
×
2751

2752
        targetAcc, ok := s.accounts.Load(accName)
4✔
2753
        if !ok {
4✔
2754
                resp.Error = NewJSNoAccountError()
×
2755
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2756
                return
×
2757
        }
×
2758

2759
        streamFound := false
4✔
2760
        cfg := StreamConfig{}
4✔
2761
        currPeers := []string{}
4✔
2762
        js.mu.Lock()
4✔
2763
        streams, ok := cc.streams[accName]
4✔
2764
        if ok {
8✔
2765
                sa, ok := streams[streamName]
4✔
2766
                if ok {
8✔
2767
                        cfg = *sa.Config.clone()
4✔
2768
                        streamFound = true
4✔
2769
                        currPeers = sa.Group.Peers
4✔
2770
                }
4✔
2771
        }
2772
        js.mu.Unlock()
4✔
2773

4✔
2774
        if !streamFound {
4✔
2775
                resp.Error = NewJSStreamNotFoundError()
×
2776
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2777
                return
×
2778
        }
×
2779

2780
        if len(currPeers) <= cfg.Replicas {
4✔
2781
                resp.Error = NewJSStreamMoveNotInProgressError()
×
2782
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2783
                return
×
2784
        }
×
2785

2786
        // make sure client is scoped to requested account
2787
        ciNew := *(ci)
4✔
2788
        ciNew.Account = accName
4✔
2789

4✔
2790
        peers := currPeers[:cfg.Replicas]
4✔
2791

4✔
2792
        // Remove placement in case tags don't match
4✔
2793
        // This can happen if the move was initiated by modifying the tags.
4✔
2794
        // This is an account operation.
4✔
2795
        // This can NOT happen when the move was initiated by the system account.
4✔
2796
        // There move honors the original tag list.
4✔
2797
        if cfg.Placement != nil && len(cfg.Placement.Tags) != 0 {
5✔
2798
        FOR_TAGCHECK:
1✔
2799
                for _, peer := range peers {
2✔
2800
                        si, ok := s.nodeToInfo.Load(peer)
1✔
2801
                        if !ok {
1✔
2802
                                // can't verify tags, do the safe thing and error
×
2803
                                resp.Error = NewJSStreamGeneralError(
×
2804
                                        fmt.Errorf("peer %s not present for tag validation", peer))
×
2805
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2806
                                return
×
2807
                        }
×
2808
                        nodeTags := si.(nodeInfo).tags
1✔
2809
                        for _, tag := range cfg.Placement.Tags {
2✔
2810
                                if !nodeTags.Contains(tag) {
2✔
2811
                                        // clear placement as tags don't match
1✔
2812
                                        cfg.Placement = nil
1✔
2813
                                        break FOR_TAGCHECK
1✔
2814
                                }
2815
                        }
2816

2817
                }
2818
        }
2819

2820
        s.Noticef("Requested cancel of move: R=%d '%s > %s' to peer set %+v and restore previous peer set %+v",
4✔
2821
                cfg.Replicas, accName, streamName, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
4✔
2822

4✔
2823
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
4✔
2824
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
4✔
2825
}
2826

2827
// Request to have an account purged
2828
func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
6✔
2829
        if c == nil || !s.JetStreamEnabled() {
6✔
2830
                return
×
2831
        }
×
2832

2833
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
6✔
2834
        if err != nil {
6✔
2835
                s.Warnf(badAPIRequestT, msg)
×
2836
                return
×
2837
        }
×
2838
        if acc != s.SystemAccount() {
6✔
2839
                return
×
2840
        }
×
2841

2842
        js := s.getJetStream()
6✔
2843
        if js == nil {
6✔
2844
                return
×
2845
        }
×
2846

2847
        accName := tokenAt(subject, 5)
6✔
2848

6✔
2849
        var resp = JSApiAccountPurgeResponse{ApiResponse: ApiResponse{Type: JSApiAccountPurgeResponseType}}
6✔
2850

6✔
2851
        if !s.JetStreamIsClustered() {
8✔
2852
                var streams []*stream
2✔
2853
                var ac *Account
2✔
2854
                if ac, err = s.lookupAccount(accName); err == nil && ac != nil {
3✔
2855
                        streams = ac.streams()
1✔
2856
                }
1✔
2857

2858
                s.Noticef("Purge request for account %s (streams: %d, hasAccount: %t)",
2✔
2859
                        accName, len(streams), ac != nil)
2✔
2860

2✔
2861
                for _, mset := range streams {
3✔
2862
                        err := mset.delete()
1✔
2863
                        if err != nil {
1✔
2864
                                resp.Error = NewJSStreamDeleteError(err)
×
2865
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2866
                                return
×
2867
                        }
×
2868
                }
2869
                if err := os.RemoveAll(filepath.Join(js.config.StoreDir, accName)); err != nil {
2✔
2870
                        resp.Error = NewJSStreamGeneralError(err)
×
2871
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2872
                        return
×
2873
                }
×
2874
                resp.Initiated = true
2✔
2875
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2876
                return
2✔
2877
        }
2878

2879
        _, cc := s.getJetStreamCluster()
4✔
2880

4✔
2881
        js.mu.RLock()
4✔
2882
        isLeader := cc.isLeader()
4✔
2883
        meta := cc.meta
4✔
2884
        js.mu.RUnlock()
4✔
2885

4✔
2886
        if !isLeader {
4✔
2887
                return
×
2888
        }
×
2889

2890
        if errorOnRequiredApiLevel(hdr) {
4✔
2891
                resp.Error = NewJSRequiredApiLevelError()
×
2892
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2893
                return
×
2894
        }
×
2895

2896
        if js.isMetaRecovering() {
4✔
2897
                // While in recovery mode, the data structures are not fully initialized
×
2898
                resp.Error = NewJSClusterNotAvailError()
×
2899
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2900
                return
×
2901
        }
×
2902

2903
        js.mu.Lock()
4✔
2904
        ns, nc := 0, 0
4✔
2905
        for osa := range js.streamAssignmentsOrInflightSeq(accName) {
12✔
2906
                for oca := range js.consumerAssignmentsOrInflightSeq(accName, osa.Config.Name) {
20✔
2907
                        ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client, Created: oca.Created}
12✔
2908
                        meta.Propose(encodeDeleteConsumerAssignment(ca))
12✔
2909
                        cc.trackInflightConsumerProposal(accName, osa.Config.Name, ca, true)
12✔
2910
                        nc++
12✔
2911
                }
12✔
2912
                sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client, Created: osa.Created}
8✔
2913
                meta.Propose(encodeDeleteStreamAssignment(sa))
8✔
2914
                cc.trackInflightStreamProposal(accName, sa, true)
8✔
2915
                ns++
8✔
2916
        }
2917
        js.mu.Unlock()
4✔
2918

4✔
2919
        hasAccount := ns > 0
4✔
2920
        s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)", accName, ns, nc, hasAccount)
4✔
2921

4✔
2922
        resp.Initiated = true
4✔
2923
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
2924
}
2925

2926
// Request to have the meta leader stepdown.
2927
// These will only be received by the meta leader, so less checking needed.
2928
func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
23✔
2929
        if c == nil || !s.JetStreamEnabled() {
23✔
2930
                return
×
2931
        }
×
2932

2933
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
23✔
2934
        if err != nil {
23✔
2935
                s.Warnf(badAPIRequestT, msg)
×
2936
                return
×
2937
        }
×
2938

2939
        // This should only be coming from the System Account.
2940
        if acc != s.SystemAccount() {
24✔
2941
                s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User)
1✔
2942
                return
1✔
2943
        }
1✔
2944

2945
        js, cc := s.getJetStreamCluster()
22✔
2946
        if js == nil || cc == nil {
22✔
2947
                return
×
2948
        }
×
2949

2950
        // Extra checks here but only leader is listening.
2951
        js.mu.RLock()
22✔
2952
        isLeader := cc.isLeader()
22✔
2953
        meta := cc.meta
22✔
2954
        js.mu.RUnlock()
22✔
2955

22✔
2956
        if !isLeader {
22✔
2957
                return
×
2958
        }
×
2959

2960
        var preferredLeader string
22✔
2961
        var resp = JSApiLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiLeaderStepDownResponseType}}
22✔
2962
        if errorOnRequiredApiLevel(hdr) {
22✔
2963
                resp.Error = NewJSRequiredApiLevelError()
×
2964
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2965
                return
×
2966
        }
×
2967

2968
        if isJSONObjectOrArray(msg) {
37✔
2969
                var req JSApiLeaderStepdownRequest
15✔
2970
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
15✔
2971
                        resp.Error = NewJSInvalidJSONError(err)
×
2972
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2973
                        return
×
2974
                }
×
2975
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(meta, req.Placement); resp.Error != nil {
21✔
2976
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
6✔
2977
                        return
6✔
2978
                }
6✔
2979
        }
2980

2981
        // Call actual stepdown.
2982
        err = meta.StepDown(preferredLeader)
16✔
2983
        if err != nil {
16✔
2984
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2985
        } else {
16✔
2986
                resp.Success = true
16✔
2987
        }
16✔
2988
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
16✔
2989
}
2990

2991
// Check if given []bytes is a JSON Object or Array.
2992
// Technically, valid JSON can also be a plain string or number, but for our use case,
2993
// we care only for JSON objects or arrays which starts with `[` or `{`.
2994
// This function does not have to ensure valid JSON in its entirety. It is used merely
2995
// to hint the codepath if it should attempt to parse the request as JSON or not.
2996
func isJSONObjectOrArray(req []byte) bool {
17,485✔
2997
        // Skip leading JSON whitespace (space, tab, newline, carriage return)
17,485✔
2998
        i := 0
17,485✔
2999
        for i < len(req) && (req[i] == ' ' || req[i] == '\t' || req[i] == '\n' || req[i] == '\r') {
17,497✔
3000
                i++
12✔
3001
        }
12✔
3002
        // Check for empty input after trimming
3003
        if i >= len(req) {
34,385✔
3004
                return false
16,900✔
3005
        }
16,900✔
3006
        // Check if the first non-whitespace character is '{' or '['
3007
        return req[i] == '{' || req[i] == '['
585✔
3008
}
3009

3010
func isEmptyRequest(req []byte) bool {
47,733✔
3011
        if len(req) == 0 {
93,005✔
3012
                return true
45,272✔
3013
        }
45,272✔
3014
        if bytes.Equal(req, []byte("{}")) {
2,462✔
3015
                return true
1✔
3016
        }
1✔
3017
        // If we are here we didn't get our simple match, but still could be valid.
3018
        var v any
2,460✔
3019
        if err := json.Unmarshal(req, &v); err != nil {
2,460✔
3020
                return false
×
3021
        }
×
3022
        vm, ok := v.(map[string]any)
2,460✔
3023
        if !ok {
2,460✔
3024
                return false
×
3025
        }
×
3026
        return len(vm) == 0
2,460✔
3027
}
3028

3029
// getStepDownPreferredPlacement attempts to work out what the best placement is
3030
// for a stepdown request. The preferred server name always takes precedence, but
3031
// if not specified, the placement will be used to filter by cluster. The caller
3032
// should check for return API errors and return those to the requestor if needed.
3033
func (s *Server) getStepDownPreferredPlacement(group RaftNode, placement *Placement) (string, *ApiError) {
40✔
3034
        if placement == nil {
42✔
3035
                return _EMPTY_, nil
2✔
3036
        }
2✔
3037
        var preferredLeader string
38✔
3038
        if placement.Preferred != _EMPTY_ {
56✔
3039
                for _, p := range group.Peers() {
75✔
3040
                        si, ok := s.nodeToInfo.Load(p.ID)
57✔
3041
                        if !ok || si == nil {
57✔
3042
                                continue
×
3043
                        }
3044
                        if si.(nodeInfo).name == placement.Preferred {
71✔
3045
                                preferredLeader = p.ID
14✔
3046
                                break
14✔
3047
                        }
3048
                }
3049
                if preferredLeader == group.ID() {
22✔
3050
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q is already leader", placement.Preferred))
4✔
3051
                }
4✔
3052
                if preferredLeader == _EMPTY_ {
18✔
3053
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q not known", placement.Preferred))
4✔
3054
                }
4✔
3055
        } else {
20✔
3056
                possiblePeers := make(map[*Peer]nodeInfo, len(group.Peers()))
20✔
3057
                ourID := group.ID()
20✔
3058
                for _, p := range group.Peers() {
116✔
3059
                        if p == nil {
96✔
3060
                                continue // ... shouldn't happen.
×
3061
                        }
3062
                        si, ok := s.nodeToInfo.Load(p.ID)
96✔
3063
                        if !ok || si == nil {
96✔
3064
                                continue
×
3065
                        }
3066
                        ni := si.(nodeInfo)
96✔
3067
                        if ni.offline || p.ID == ourID {
116✔
3068
                                continue
20✔
3069
                        }
3070
                        possiblePeers[p] = ni
76✔
3071
                }
3072
                // If cluster is specified, filter out anything not matching the cluster name.
3073
                if placement.Cluster != _EMPTY_ {
31✔
3074
                        for p, si := range possiblePeers {
51✔
3075
                                if si.cluster != placement.Cluster {
66✔
3076
                                        delete(possiblePeers, p)
26✔
3077
                                }
26✔
3078
                        }
3079
                }
3080
                // If tags are specified, filter out anything not matching all supplied tags.
3081
                if len(placement.Tags) > 0 {
32✔
3082
                        for p, si := range possiblePeers {
55✔
3083
                                matchesAll := true
43✔
3084
                                for _, tag := range placement.Tags {
93✔
3085
                                        if matchesAll = matchesAll && si.tags.Contains(tag); !matchesAll {
82✔
3086
                                                break
32✔
3087
                                        }
3088
                                }
3089
                                if !matchesAll {
75✔
3090
                                        delete(possiblePeers, p)
32✔
3091
                                }
32✔
3092
                        }
3093
                }
3094
                // If there are no possible peers, return an error.
3095
                if len(possiblePeers) == 0 {
28✔
3096
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
8✔
3097
                }
8✔
3098
                // Take advantage of random map iteration order to select the preferred.
3099
                for p := range possiblePeers {
24✔
3100
                        preferredLeader = p.ID
12✔
3101
                        break
12✔
3102
                }
3103
        }
3104
        return preferredLeader, nil
22✔
3105
}
3106

3107
// Request to delete a stream.
3108
func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
558✔
3109
        if c == nil || !s.JetStreamEnabled() {
558✔
3110
                return
×
3111
        }
×
3112
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
558✔
3113
        if err != nil {
558✔
3114
                s.Warnf(badAPIRequestT, msg)
×
3115
                return
×
3116
        }
×
3117

3118
        var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
558✔
3119

558✔
3120
        // Determine if we should proceed here when we are in clustered mode.
558✔
3121
        if s.JetStreamIsClustered() {
1,071✔
3122
                js, cc := s.getJetStreamCluster()
513✔
3123
                if js == nil || cc == nil {
513✔
3124
                        return
×
3125
                }
×
3126
                if js.isLeaderless() {
514✔
3127
                        resp.Error = NewJSClusterNotAvailError()
1✔
3128
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3129
                        return
1✔
3130
                }
1✔
3131
                // Make sure we are meta leader.
3132
                if !s.JetStreamIsLeader() {
910✔
3133
                        return
398✔
3134
                }
398✔
3135
        }
3136

3137
        if errorOnRequiredApiLevel(hdr) {
160✔
3138
                resp.Error = NewJSRequiredApiLevelError()
1✔
3139
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3140
                return
1✔
3141
        }
1✔
3142

3143
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
158✔
3144
                if doErr {
×
3145
                        resp.Error = NewJSNotEnabledForAccountError()
×
3146
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3147
                }
×
3148
                return
×
3149
        }
3150

3151
        if !isEmptyRequest(msg) {
159✔
3152
                resp.Error = NewJSNotEmptyRequestError()
1✔
3153
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3154
                return
1✔
3155
        }
1✔
3156
        stream := streamNameFromSubject(subject)
157✔
3157

157✔
3158
        // Clustered.
157✔
3159
        if s.JetStreamIsClustered() {
271✔
3160
                s.jsClusteredStreamDeleteRequest(ci, acc, stream, subject, reply, msg)
114✔
3161
                return
114✔
3162
        }
114✔
3163

3164
        mset, err := acc.lookupStream(stream)
43✔
3165
        if err != nil {
48✔
3166
                resp.Error = NewJSStreamNotFoundError(Unless(err))
5✔
3167
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
3168
                return
5✔
3169
        }
5✔
3170

3171
        if err := mset.delete(); err != nil {
38✔
3172
                resp.Error = NewJSStreamDeleteError(err, Unless(err))
×
3173
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3174
                return
×
3175
        }
×
3176
        resp.Success = true
38✔
3177
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
38✔
3178
}
3179

3180
// Request to delete a message.
3181
// This expects a stream sequence number as the msg body.
3182
func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,167✔
3183
        if c == nil || !s.JetStreamEnabled() {
1,176✔
3184
                return
9✔
3185
        }
9✔
3186
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
1,158✔
3187
        if err != nil {
1,158✔
3188
                s.Warnf(badAPIRequestT, msg)
×
3189
                return
×
3190
        }
×
3191

3192
        stream := tokenAt(subject, 6)
1,158✔
3193

1,158✔
3194
        var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
1,158✔
3195

1,158✔
3196
        // If we are in clustered mode we need to be the stream leader to proceed.
1,158✔
3197
        if s.JetStreamIsClustered() {
1,559✔
3198
                // Check to make sure the stream is assigned.
401✔
3199
                js, cc := s.getJetStreamCluster()
401✔
3200
                if js == nil || cc == nil {
407✔
3201
                        return
6✔
3202
                }
6✔
3203
                if js.isLeaderless() {
396✔
3204
                        resp.Error = NewJSClusterNotAvailError()
1✔
3205
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3206
                        return
1✔
3207
                }
1✔
3208

3209
                js.mu.RLock()
394✔
3210
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
394✔
3211
                js.mu.RUnlock()
394✔
3212

394✔
3213
                if isLeader && sa == nil {
394✔
3214
                        // We can't find the stream, so mimic what would be the errors below.
×
3215
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3216
                                if doErr {
×
3217
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3218
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3219
                                }
×
3220
                                return
×
3221
                        }
3222
                        // No stream present.
3223
                        resp.Error = NewJSStreamNotFoundError()
×
3224
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3225
                        return
×
3226
                } else if sa == nil {
394✔
3227
                        return
×
3228
                }
×
3229

3230
                // Check to see if we are a member of the group and if the group has no leader.
3231
                if js.isGroupLeaderless(sa.Group) {
394✔
3232
                        resp.Error = NewJSClusterNotAvailError()
×
3233
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3234
                        return
×
3235
                }
×
3236

3237
                // We have the stream assigned and a leader, so only the stream leader should answer.
3238
                if !acc.JetStreamIsStreamLeader(stream) {
654✔
3239
                        return
260✔
3240
                }
260✔
3241
        }
3242

3243
        if errorOnRequiredApiLevel(hdr) {
892✔
3244
                resp.Error = NewJSRequiredApiLevelError()
1✔
3245
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3246
                return
1✔
3247
        }
1✔
3248

3249
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
1,032✔
3250
                if doErr {
282✔
3251
                        resp.Error = NewJSNotEnabledForAccountError()
140✔
3252
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
140✔
3253
                }
140✔
3254
                return
142✔
3255
        }
3256
        if isEmptyRequest(msg) {
748✔
3257
                resp.Error = NewJSBadRequestError()
×
3258
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3259
                return
×
3260
        }
×
3261
        var req JSApiMsgDeleteRequest
748✔
3262
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
748✔
3263
                resp.Error = NewJSInvalidJSONError(err)
×
3264
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3265
                return
×
3266
        }
×
3267

3268
        mset, err := acc.lookupStream(stream)
748✔
3269
        if err != nil {
750✔
3270
                resp.Error = NewJSStreamNotFoundError(Unless(err))
2✔
3271
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3272
                return
2✔
3273
        }
2✔
3274
        if mset.cfg.Sealed {
748✔
3275
                resp.Error = NewJSStreamSealedError()
2✔
3276
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3277
                return
2✔
3278
        }
2✔
3279
        if mset.cfg.DenyDelete {
745✔
3280
                resp.Error = NewJSStreamMsgDeleteFailedError(errors.New("message delete not permitted"))
1✔
3281
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3282
                return
1✔
3283
        }
1✔
3284

3285
        if s.JetStreamIsClustered() {
875✔
3286
                s.jsClusteredMsgDeleteRequest(ci, acc, mset, stream, subject, reply, &req, rmsg)
132✔
3287
                return
132✔
3288
        }
132✔
3289

3290
        var removed bool
611✔
3291
        if req.NoErase {
1,220✔
3292
                removed, err = mset.removeMsg(req.Seq)
609✔
3293
        } else {
611✔
3294
                removed, err = mset.eraseMsg(req.Seq)
2✔
3295
        }
2✔
3296
        if err != nil {
611✔
3297
                resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err))
×
3298
        } else if !removed {
611✔
3299
                resp.Error = NewJSSequenceNotFoundError(req.Seq)
×
3300
        } else {
611✔
3301
                resp.Success = true
611✔
3302
        }
611✔
3303
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
611✔
3304
}
3305

3306
// Request to get a raw stream message.
3307
func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
2,720✔
3308
        if c == nil || !s.JetStreamEnabled() {
2,720✔
3309
                return
×
3310
        }
×
3311
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
2,720✔
3312
        if err != nil {
2,720✔
3313
                s.Warnf(badAPIRequestT, msg)
×
3314
                return
×
3315
        }
×
3316

3317
        stream := tokenAt(subject, 6)
2,720✔
3318

2,720✔
3319
        var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
2,720✔
3320

2,720✔
3321
        // If we are in clustered mode we need to be the stream leader to proceed.
2,720✔
3322
        if s.JetStreamIsClustered() {
4,312✔
3323
                // Check to make sure the stream is assigned.
1,592✔
3324
                js, cc := s.getJetStreamCluster()
1,592✔
3325
                if js == nil || cc == nil {
1,592✔
3326
                        return
×
3327
                }
×
3328
                if js.isLeaderless() {
1,592✔
3329
                        resp.Error = NewJSClusterNotAvailError()
×
3330
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3331
                        return
×
3332
                }
×
3333

3334
                js.mu.RLock()
1,592✔
3335
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
1,592✔
3336
                js.mu.RUnlock()
1,592✔
3337

1,592✔
3338
                if isLeader && sa == nil {
1,592✔
3339
                        // We can't find the stream, so mimic what would be the errors below.
×
3340
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3341
                                if doErr {
×
3342
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3343
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3344
                                }
×
3345
                                return
×
3346
                        }
3347
                        // No stream present.
3348
                        resp.Error = NewJSStreamNotFoundError()
×
3349
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3350
                        return
×
3351
                } else if sa == nil {
1,592✔
3352
                        return
×
3353
                }
×
3354

3355
                // Check to see if we are a member of the group and if the group has no leader.
3356
                if js.isGroupLeaderless(sa.Group) {
1,592✔
3357
                        resp.Error = NewJSClusterNotAvailError()
×
3358
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3359
                        return
×
3360
                }
×
3361

3362
                // We have the stream assigned and a leader, so only the stream leader should answer.
3363
                if !acc.JetStreamIsStreamLeader(stream) {
2,661✔
3364
                        return
1,069✔
3365
                }
1,069✔
3366
        }
3367

3368
        if errorOnRequiredApiLevel(hdr) {
1,652✔
3369
                resp.Error = NewJSRequiredApiLevelError()
1✔
3370
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3371
                return
1✔
3372
        }
1✔
3373

3374
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
1,653✔
3375
                if doErr {
3✔
3376
                        resp.Error = NewJSNotEnabledForAccountError()
×
3377
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3378
                }
×
3379
                return
3✔
3380
        }
3381
        if isEmptyRequest(msg) {
1,647✔
3382
                resp.Error = NewJSBadRequestError()
×
3383
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3384
                return
×
3385
        }
×
3386
        var req JSApiMsgGetRequest
1,647✔
3387
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
1,647✔
3388
                resp.Error = NewJSInvalidJSONError(err)
×
3389
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3390
                return
×
3391
        }
×
3392

3393
        // This version does not support batch.
3394
        if req.Batch > 0 || req.MaxBytes > 0 {
1,648✔
3395
                resp.Error = NewJSBadRequestError()
1✔
3396
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3397
                return
1✔
3398
        }
1✔
3399

3400
        // Validate non-conflicting options. Seq, LastFor, and AsOfTime are mutually exclusive.
3401
        // NextFor can be paired with Seq or AsOfTime indicating a filter subject.
3402
        if (req.Seq > 0 && req.LastFor != _EMPTY_) ||
1,646✔
3403
                (req.Seq == 0 && req.LastFor == _EMPTY_ && req.NextFor == _EMPTY_ && req.StartTime == nil) ||
1,646✔
3404
                (req.Seq > 0 && req.StartTime != nil) ||
1,646✔
3405
                (req.StartTime != nil && req.LastFor != _EMPTY_) ||
1,646✔
3406
                (req.LastFor != _EMPTY_ && req.NextFor != _EMPTY_) {
1,650✔
3407
                resp.Error = NewJSBadRequestError()
4✔
3408
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3409
                return
4✔
3410
        }
4✔
3411

3412
        mset, err := acc.lookupStream(stream)
1,642✔
3413
        if err != nil {
1,642✔
3414
                resp.Error = NewJSStreamNotFoundError()
×
3415
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3416
                return
×
3417
        }
×
3418
        if mset.offlineReason != _EMPTY_ {
1,642✔
3419
                // Just let the request time out.
×
3420
                return
×
3421
        }
×
3422

3423
        var svp StoreMsg
1,642✔
3424
        var sm *StoreMsg
1,642✔
3425

1,642✔
3426
        // Ensure this read request is isolated and doesn't interleave with writes.
1,642✔
3427
        mset.mu.RLock()
1,642✔
3428
        defer mset.mu.RUnlock()
1,642✔
3429

1,642✔
3430
        // If AsOfTime is set, perform this first to get the sequence.
1,642✔
3431
        var seq uint64
1,642✔
3432
        if req.StartTime != nil {
1,648✔
3433
                seq = mset.store.GetSeqFromTime(*req.StartTime)
6✔
3434
        } else {
1,642✔
3435
                seq = req.Seq
1,636✔
3436
        }
1,636✔
3437

3438
        if seq > 0 && req.NextFor == _EMPTY_ {
2,145✔
3439
                sm, err = mset.store.LoadMsg(seq, &svp)
503✔
3440
        } else if req.NextFor != _EMPTY_ {
1,744✔
3441
                sm, _, err = mset.store.LoadNextMsg(req.NextFor, subjectHasWildcard(req.NextFor), seq, &svp)
102✔
3442
        } else {
1,139✔
3443
                sm, err = mset.store.LoadLastMsg(req.LastFor, &svp)
1,037✔
3444
        }
1,037✔
3445
        if err != nil {
2,494✔
3446
                resp.Error = NewJSNoMessageFoundError()
852✔
3447
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
852✔
3448
                return
852✔
3449
        }
852✔
3450
        resp.Message = &StoredMsg{
790✔
3451
                Subject:  sm.subj,
790✔
3452
                Sequence: sm.seq,
790✔
3453
                Data:     sm.msg,
790✔
3454
                Time:     time.Unix(0, sm.ts).UTC(),
790✔
3455
        }
790✔
3456
        if !req.NoHeaders {
1,579✔
3457
                resp.Message.Header = sm.hdr
789✔
3458
        }
789✔
3459

3460
        // Don't send response through API layer for this call.
3461
        s.sendInternalAccountMsg(nil, reply, s.jsonResponse(resp))
790✔
3462
}
3463

3464
func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
28✔
3465
        if c == nil || !s.JetStreamEnabled() {
28✔
3466
                return
×
3467
        }
×
3468

3469
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
28✔
3470
        if err != nil {
28✔
3471
                s.Warnf(badAPIRequestT, msg)
×
3472
                return
×
3473
        }
×
3474

3475
        stream := streamNameFromSubject(subject)
28✔
3476
        consumer := consumerNameFromSubject(subject)
28✔
3477

28✔
3478
        var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}}
28✔
3479

28✔
3480
        if s.JetStreamIsClustered() {
46✔
3481
                // Check to make sure the stream is assigned.
18✔
3482
                js, cc := s.getJetStreamCluster()
18✔
3483
                if js == nil || cc == nil {
18✔
3484
                        return
×
3485
                }
×
3486

3487
                // First check if the stream and consumer is there.
3488
                js.mu.RLock()
18✔
3489
                sa := js.streamAssignment(acc.Name, stream)
18✔
3490
                if sa == nil {
21✔
3491
                        js.mu.RUnlock()
3✔
3492
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
3✔
3493
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3494
                        return
3✔
3495
                }
3✔
3496
                if sa.unsupported != nil {
15✔
3497
                        js.mu.RUnlock()
×
3498
                        // Just let the request time out.
×
3499
                        return
×
3500
                }
×
3501

3502
                ca, ok := sa.consumers[consumer]
15✔
3503
                if !ok || ca == nil {
18✔
3504
                        js.mu.RUnlock()
3✔
3505
                        resp.Error = NewJSConsumerNotFoundError()
3✔
3506
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3507
                        return
3✔
3508
                }
3✔
3509
                if ca.unsupported != nil {
12✔
3510
                        js.mu.RUnlock()
×
3511
                        // Just let the request time out.
×
3512
                        return
×
3513
                }
×
3514
                js.mu.RUnlock()
12✔
3515

12✔
3516
                // Then check if we are the leader.
12✔
3517
                mset, err := acc.lookupStream(stream)
12✔
3518
                if err != nil {
12✔
3519
                        return
×
3520
                }
×
3521

3522
                o := mset.lookupConsumer(consumer)
12✔
3523
                if o == nil {
12✔
3524
                        return
×
3525
                }
×
3526
                if !o.isLeader() {
20✔
3527
                        return
8✔
3528
                }
8✔
3529
        }
3530

3531
        if errorOnRequiredApiLevel(hdr) {
15✔
3532
                resp.Error = NewJSRequiredApiLevelError()
1✔
3533
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3534
                return
1✔
3535
        }
1✔
3536

3537
        var req JSApiConsumerUnpinRequest
13✔
3538
        if err := json.Unmarshal(msg, &req); err != nil {
13✔
3539
                resp.Error = NewJSInvalidJSONError(err)
×
3540
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3541
                return
×
3542
        }
×
3543

3544
        if req.Group == _EMPTY_ {
15✔
3545
                resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
2✔
3546
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3547
                return
2✔
3548
        }
2✔
3549

3550
        if !validGroupName.MatchString(req.Group) {
13✔
3551
                resp.Error = NewJSConsumerInvalidGroupNameError()
2✔
3552
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3553
                return
2✔
3554
        }
2✔
3555

3556
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
9✔
3557
                if doErr {
×
3558
                        resp.Error = NewJSNotEnabledForAccountError()
×
3559
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3560
                }
×
3561
                return
×
3562
        }
3563

3564
        mset, err := acc.lookupStream(stream)
9✔
3565
        if err != nil {
10✔
3566
                resp.Error = NewJSStreamNotFoundError()
1✔
3567
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3568
                return
1✔
3569
        }
1✔
3570
        if mset.offlineReason != _EMPTY_ {
8✔
3571
                // Just let the request time out.
×
3572
                return
×
3573
        }
×
3574
        o := mset.lookupConsumer(consumer)
8✔
3575
        if o == nil {
9✔
3576
                resp.Error = NewJSConsumerNotFoundError()
1✔
3577
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3578
                return
1✔
3579
        }
1✔
3580
        if o.offlineReason != _EMPTY_ {
7✔
3581
                // Just let the request time out.
×
3582
                return
×
3583
        }
×
3584

3585
        var foundPriority bool
7✔
3586
        for _, group := range o.config().PriorityGroups {
14✔
3587
                if group == req.Group {
12✔
3588
                        foundPriority = true
5✔
3589
                        break
5✔
3590
                }
3591
        }
3592
        if !foundPriority {
9✔
3593
                resp.Error = NewJSConsumerInvalidPriorityGroupError()
2✔
3594
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3595
                return
2✔
3596
        }
2✔
3597

3598
        o.mu.Lock()
5✔
3599
        o.unassignPinId()
5✔
3600
        o.sendUnpinnedAdvisoryLocked(req.Group, "admin")
5✔
3601
        o.mu.Unlock()
5✔
3602
        o.signalNewMessages()
5✔
3603
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
5✔
3604
}
3605

3606
// Request to purge a stream.
3607
func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
136✔
3608
        if c == nil || !s.JetStreamEnabled() {
136✔
3609
                return
×
3610
        }
×
3611
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
136✔
3612
        if err != nil {
136✔
3613
                s.Warnf(badAPIRequestT, msg)
×
3614
                return
×
3615
        }
×
3616

3617
        stream := streamNameFromSubject(subject)
136✔
3618

136✔
3619
        var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
136✔
3620

136✔
3621
        // If we are in clustered mode we need to be the stream leader to proceed.
136✔
3622
        if s.JetStreamIsClustered() {
237✔
3623
                // Check to make sure the stream is assigned.
101✔
3624
                js, cc := s.getJetStreamCluster()
101✔
3625
                if js == nil || cc == nil {
101✔
3626
                        return
×
3627
                }
×
3628

3629
                js.mu.RLock()
101✔
3630
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
101✔
3631
                js.mu.RUnlock()
101✔
3632

101✔
3633
                if isLeader && sa == nil {
101✔
3634
                        // We can't find the stream, so mimic what would be the errors below.
×
3635
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3636
                                if doErr {
×
3637
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3638
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3639
                                }
×
3640
                                return
×
3641
                        }
3642
                        // No stream present.
3643
                        resp.Error = NewJSStreamNotFoundError()
×
3644
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3645
                        return
×
3646
                } else if sa == nil {
102✔
3647
                        if js.isLeaderless() {
1✔
3648
                                resp.Error = NewJSClusterNotAvailError()
×
3649
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3650
                        }
×
3651
                        return
1✔
3652
                }
3653

3654
                // Check to see if we are a member of the group and if the group has no leader.
3655
                if js.isGroupLeaderless(sa.Group) {
101✔
3656
                        resp.Error = NewJSClusterNotAvailError()
1✔
3657
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3658
                        return
1✔
3659
                }
1✔
3660

3661
                // We have the stream assigned and a leader, so only the stream leader should answer.
3662
                if !acc.JetStreamIsStreamLeader(stream) {
166✔
3663
                        if js.isLeaderless() {
67✔
3664
                                resp.Error = NewJSClusterNotAvailError()
×
3665
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3666
                        }
×
3667
                        return
67✔
3668
                }
3669
        }
3670

3671
        if errorOnRequiredApiLevel(hdr) {
68✔
3672
                resp.Error = NewJSRequiredApiLevelError()
1✔
3673
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3674
                return
1✔
3675
        }
1✔
3676

3677
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
66✔
3678
                if doErr {
×
3679
                        resp.Error = NewJSNotEnabledForAccountError()
×
3680
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3681
                }
×
3682
                return
×
3683
        }
3684

3685
        var purgeRequest *JSApiStreamPurgeRequest
66✔
3686
        if isJSONObjectOrArray(msg) {
100✔
3687
                var req JSApiStreamPurgeRequest
34✔
3688
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
34✔
3689
                        resp.Error = NewJSInvalidJSONError(err)
×
3690
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3691
                        return
×
3692
                }
×
3693
                if req.Sequence > 0 && req.Keep > 0 {
34✔
3694
                        resp.Error = NewJSBadRequestError()
×
3695
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3696
                        return
×
3697
                }
×
3698
                purgeRequest = &req
34✔
3699
        }
3700

3701
        mset, err := acc.lookupStream(stream)
66✔
3702
        if err != nil {
66✔
3703
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
3704
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3705
                return
×
3706
        }
×
3707
        if mset.cfg.Sealed {
68✔
3708
                resp.Error = NewJSStreamSealedError()
2✔
3709
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3710
                return
2✔
3711
        }
2✔
3712
        if mset.cfg.DenyPurge {
65✔
3713
                resp.Error = NewJSStreamPurgeFailedError(errors.New("stream purge not permitted"))
1✔
3714
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3715
                return
1✔
3716
        }
1✔
3717

3718
        if s.JetStreamIsClustered() {
93✔
3719
                s.jsClusteredStreamPurgeRequest(ci, acc, mset, stream, subject, reply, rmsg, purgeRequest)
30✔
3720
                return
30✔
3721
        }
30✔
3722

3723
        purged, err := mset.purge(purgeRequest)
33✔
3724
        if err != nil {
33✔
3725
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
3726
        } else {
33✔
3727
                resp.Purged = purged
33✔
3728
                resp.Success = true
33✔
3729
        }
33✔
3730
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
33✔
3731
}
3732

3733
func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError {
1,414✔
3734
        var replicas int
1,414✔
3735
        if cfg != nil {
2,828✔
3736
                replicas = cfg.Replicas
1,414✔
3737
        }
1,414✔
3738
        selectedLimits, tier, jsa, apiErr := acc.selectLimits(replicas)
1,414✔
3739
        if apiErr != nil {
1,414✔
3740
                return apiErr
×
3741
        }
×
3742
        jsa.js.mu.RLock()
1,414✔
3743
        defer jsa.js.mu.RUnlock()
1,414✔
3744
        jsa.mu.RLock()
1,414✔
3745
        defer jsa.mu.RUnlock()
1,414✔
3746
        if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, cfg) >= selectedLimits.MaxStreams {
1,417✔
3747
                return NewJSMaximumStreamsLimitError()
3✔
3748
        }
3✔
3749
        reserved := jsa.tieredReservation(tier, cfg)
1,411✔
3750
        if err := jsa.js.checkAllLimits(selectedLimits, cfg, reserved, 0); err != nil {
1,418✔
3751
                return NewJSStreamLimitsError(err, Unless(err))
7✔
3752
        }
7✔
3753
        return nil
1,404✔
3754
}
3755

3756
// Request to restore a stream.
3757
func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
50✔
3758
        if c == nil || !s.JetStreamIsLeader() {
74✔
3759
                return
24✔
3760
        }
24✔
3761
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
26✔
3762
        if err != nil {
26✔
3763
                s.Warnf(badAPIRequestT, msg)
×
3764
                return
×
3765
        }
×
3766

3767
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
26✔
3768
        if errorOnRequiredApiLevel(hdr) {
27✔
3769
                resp.Error = NewJSRequiredApiLevelError()
1✔
3770
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3771
                return
1✔
3772
        }
1✔
3773
        if !acc.JetStreamEnabled() {
25✔
3774
                resp.Error = NewJSNotEnabledForAccountError()
×
3775
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3776
                return
×
3777
        }
×
3778
        if isEmptyRequest(msg) {
26✔
3779
                resp.Error = NewJSBadRequestError()
1✔
3780
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3781
                return
1✔
3782
        }
1✔
3783

3784
        var req JSApiStreamRestoreRequest
24✔
3785
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
24✔
3786
                resp.Error = NewJSInvalidJSONError(err)
×
3787
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3788
                return
×
3789
        }
×
3790

3791
        stream := streamNameFromSubject(subject)
24✔
3792

24✔
3793
        if stream != req.Config.Name && req.Config.Name == _EMPTY_ {
24✔
3794
                req.Config.Name = stream
×
3795
        }
×
3796

3797
        // check stream config at the start of the restore process, not at the end
3798
        cfg, apiErr := s.checkStreamCfg(&req.Config, acc, false)
24✔
3799
        if apiErr != nil {
26✔
3800
                resp.Error = apiErr
2✔
3801
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3802
                return
2✔
3803
        }
2✔
3804

3805
        if s.JetStreamIsClustered() {
33✔
3806
                s.jsClusteredStreamRestoreRequest(ci, acc, &req, subject, reply, rmsg)
11✔
3807
                return
11✔
3808
        }
11✔
3809

3810
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil {
13✔
3811
                resp.Error = err
2✔
3812
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3813
                return
2✔
3814
        }
2✔
3815

3816
        if _, err := acc.lookupStream(stream); err == nil {
10✔
3817
                resp.Error = NewJSStreamNameExistRestoreFailedError()
1✔
3818
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3819
                return
1✔
3820
        }
1✔
3821

3822
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
8✔
3823
                if doErr {
×
3824
                        resp.Error = NewJSNotEnabledForAccountError()
×
3825
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3826
                }
×
3827
                return
×
3828
        }
3829

3830
        s.processStreamRestore(ci, acc, &req.Config, subject, reply, string(msg))
8✔
3831
}
3832

3833
func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamConfig, subject, reply, msg string) <-chan error {
16✔
3834
        js := s.getJetStream()
16✔
3835

16✔
3836
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
16✔
3837

16✔
3838
        snapDir := filepath.Join(js.config.StoreDir, snapStagingDir)
16✔
3839
        if _, err := os.Stat(snapDir); os.IsNotExist(err) {
27✔
3840
                if err := os.MkdirAll(snapDir, defaultDirPerms); err != nil {
11✔
3841
                        resp.Error = &ApiError{Code: 503, Description: "JetStream unable to create temp storage for restore"}
×
3842
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3843
                        return nil
×
3844
                }
×
3845
        }
3846

3847
        tfile, err := os.CreateTemp(snapDir, "js-restore-")
16✔
3848
        if err != nil {
16✔
3849
                resp.Error = NewJSTempStorageFailedError()
×
3850
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
3851
                return nil
×
3852
        }
×
3853

3854
        streamName := cfg.Name
16✔
3855
        s.Noticef("Starting restore for stream '%s > %s'", acc.Name, streamName)
16✔
3856

16✔
3857
        start := time.Now().UTC()
16✔
3858
        domain := s.getOpts().JetStreamDomain
16✔
3859
        s.publishAdvisory(acc, JSAdvisoryStreamRestoreCreatePre+"."+streamName, &JSRestoreCreateAdvisory{
16✔
3860
                TypedEvent: TypedEvent{
16✔
3861
                        Type: JSRestoreCreateAdvisoryType,
16✔
3862
                        ID:   nuid.Next(),
16✔
3863
                        Time: start,
16✔
3864
                },
16✔
3865
                Stream: streamName,
16✔
3866
                Client: ci.forAdvisory(),
16✔
3867
                Domain: domain,
16✔
3868
        })
16✔
3869

16✔
3870
        // Create our internal subscription to accept the snapshot.
16✔
3871
        restoreSubj := fmt.Sprintf(jsRestoreDeliverT, streamName, nuid.Next())
16✔
3872

16✔
3873
        type result struct {
16✔
3874
                err   error
16✔
3875
                reply string
16✔
3876
        }
16✔
3877

16✔
3878
        // For signaling to upper layers.
16✔
3879
        resultCh := make(chan result, 1)
16✔
3880
        activeQ := newIPQueue[int](s, fmt.Sprintf("[ACC:%s] stream '%s' restore", acc.Name, streamName)) // of int
16✔
3881

16✔
3882
        var total int
16✔
3883

16✔
3884
        // FIXME(dlc) - Probably take out of network path eventually due to disk I/O?
16✔
3885
        processChunk := func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
128✔
3886
                // We require reply subjects to communicate back failures, flow etc. If they do not have one log and cancel.
112✔
3887
                if reply == _EMPTY_ {
113✔
3888
                        sub.client.processUnsub(sub.sid)
1✔
3889
                        resultCh <- result{
1✔
3890
                                fmt.Errorf("restore for stream '%s > %s' requires reply subject for each chunk", acc.Name, streamName),
1✔
3891
                                reply,
1✔
3892
                        }
1✔
3893
                        return
1✔
3894
                }
1✔
3895
                // Account client messages have \r\n on end. This is an error.
3896
                if len(msg) < LEN_CR_LF {
111✔
3897
                        sub.client.processUnsub(sub.sid)
×
3898
                        resultCh <- result{
×
3899
                                fmt.Errorf("restore for stream '%s > %s' received short chunk", acc.Name, streamName),
×
3900
                                reply,
×
3901
                        }
×
3902
                        return
×
3903
                }
×
3904
                // Adjust.
3905
                msg = msg[:len(msg)-LEN_CR_LF]
111✔
3906

111✔
3907
                // This means we are complete with our transfer from the client.
111✔
3908
                if len(msg) == 0 {
126✔
3909
                        s.Debugf("Finished staging restore for stream '%s > %s'", acc.Name, streamName)
15✔
3910
                        resultCh <- result{err, reply}
15✔
3911
                        return
15✔
3912
                }
15✔
3913

3914
                // We track total and check on server limits.
3915
                // TODO(dlc) - We could check apriori and cancel initial request if we know it won't fit.
3916
                total += len(msg)
96✔
3917
                if js.wouldExceedLimits(FileStorage, total) {
96✔
3918
                        s.resourcesExceededError(FileStorage)
×
3919
                        resultCh <- result{NewJSInsufficientResourcesError(), reply}
×
3920
                        return
×
3921
                }
×
3922

3923
                // Append chunk to temp file. Mark as issue if we encounter an error.
3924
                if n, err := tfile.Write(msg); n != len(msg) || err != nil {
96✔
3925
                        resultCh <- result{err, reply}
×
3926
                        if reply != _EMPTY_ {
×
3927
                                s.sendInternalAccountMsg(acc, reply, "-ERR 'storage failure during restore'")
×
3928
                        }
×
3929
                        return
×
3930
                }
3931

3932
                activeQ.push(len(msg))
96✔
3933

96✔
3934
                s.sendInternalAccountMsg(acc, reply, nil)
96✔
3935
        }
3936

3937
        sub, err := acc.subscribeInternal(restoreSubj, processChunk)
16✔
3938
        if err != nil {
16✔
3939
                tfile.Close()
×
3940
                os.Remove(tfile.Name())
×
3941
                resp.Error = NewJSRestoreSubscribeFailedError(err, restoreSubj)
×
3942
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
3943
                return nil
×
3944
        }
×
3945

3946
        // Mark the subject so the end user knows where to send the snapshot chunks.
3947
        resp.DeliverSubject = restoreSubj
16✔
3948
        s.sendAPIResponse(ci, acc, subject, reply, msg, s.jsonResponse(resp))
16✔
3949

16✔
3950
        doneCh := make(chan error, 1)
16✔
3951

16✔
3952
        // Monitor the progress from another Go routine.
16✔
3953
        s.startGoRoutine(func() {
32✔
3954
                defer s.grWG.Done()
16✔
3955
                defer func() {
32✔
3956
                        tfile.Close()
16✔
3957
                        os.Remove(tfile.Name())
16✔
3958
                        sub.client.processUnsub(sub.sid)
16✔
3959
                        activeQ.unregister()
16✔
3960
                }()
16✔
3961

3962
                const activityInterval = 5 * time.Second
16✔
3963
                notActive := time.NewTimer(activityInterval)
16✔
3964
                defer notActive.Stop()
16✔
3965

16✔
3966
                total := 0
16✔
3967
                for {
128✔
3968
                        select {
112✔
3969
                        case result := <-resultCh:
16✔
3970
                                err := result.err
16✔
3971
                                var mset *stream
16✔
3972

16✔
3973
                                // If we staged properly go ahead and do restore now.
16✔
3974
                                if err == nil {
31✔
3975
                                        s.Debugf("Finalizing restore for stream '%s > %s'", acc.Name, streamName)
15✔
3976
                                        tfile.Seek(0, 0)
15✔
3977
                                        mset, err = acc.RestoreStream(cfg, tfile)
15✔
3978
                                } else {
16✔
3979
                                        errStr := err.Error()
1✔
3980
                                        tmp := []rune(errStr)
1✔
3981
                                        tmp[0] = unicode.ToUpper(tmp[0])
1✔
3982
                                        s.Warnf(errStr)
1✔
3983
                                }
1✔
3984

3985
                                end := time.Now().UTC()
16✔
3986

16✔
3987
                                // TODO(rip) - Should this have the error code in it??
16✔
3988
                                s.publishAdvisory(acc, JSAdvisoryStreamRestoreCompletePre+"."+streamName, &JSRestoreCompleteAdvisory{
16✔
3989
                                        TypedEvent: TypedEvent{
16✔
3990
                                                Type: JSRestoreCompleteAdvisoryType,
16✔
3991
                                                ID:   nuid.Next(),
16✔
3992
                                                Time: end,
16✔
3993
                                        },
16✔
3994
                                        Stream: streamName,
16✔
3995
                                        Start:  start,
16✔
3996
                                        End:    end,
16✔
3997
                                        Bytes:  int64(total),
16✔
3998
                                        Client: ci.forAdvisory(),
16✔
3999
                                        Domain: domain,
16✔
4000
                                })
16✔
4001

16✔
4002
                                var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
16✔
4003

16✔
4004
                                if err != nil {
20✔
4005
                                        resp.Error = NewJSStreamRestoreError(err, Unless(err))
4✔
4006
                                        s.Warnf("Restore failed for %s for stream '%s > %s' in %v",
4✔
4007
                                                friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start))
4✔
4008
                                } else {
16✔
4009
                                        msetCfg := mset.config()
12✔
4010
                                        resp.StreamInfo = &StreamInfo{
12✔
4011
                                                Created:   mset.createdTime(),
12✔
4012
                                                State:     mset.state(),
12✔
4013
                                                Config:    *setDynamicStreamMetadata(&msetCfg),
12✔
4014
                                                TimeStamp: time.Now().UTC(),
12✔
4015
                                        }
12✔
4016
                                        s.Noticef("Completed restore of %s for stream '%s > %s' in %v",
12✔
4017
                                                friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start).Round(time.Millisecond))
12✔
4018
                                }
12✔
4019

4020
                                // On the last EOF, send back the stream info or error status.
4021
                                s.sendInternalAccountMsg(acc, result.reply, s.jsonResponse(&resp))
16✔
4022
                                // Signal to the upper layers.
16✔
4023
                                doneCh <- err
16✔
4024
                                return
16✔
4025
                        case <-activeQ.ch:
96✔
4026
                                if n, ok := activeQ.popOne(); ok {
192✔
4027
                                        total += n
96✔
4028
                                        notActive.Reset(activityInterval)
96✔
4029
                                }
96✔
4030
                        case <-notActive.C:
×
4031
                                err := fmt.Errorf("restore for stream '%s > %s' is stalled", acc, streamName)
×
4032
                                doneCh <- err
×
4033
                                return
×
4034
                        }
4035
                }
4036
        })
4037

4038
        return doneCh
16✔
4039
}
4040

4041
// Process a snapshot request.
4042
func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
29✔
4043
        if c == nil || !s.JetStreamEnabled() {
29✔
4044
                return
×
4045
        }
×
4046
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
29✔
4047
        if err != nil {
29✔
4048
                s.Warnf(badAPIRequestT, msg)
×
4049
                return
×
4050
        }
×
4051

4052
        smsg := string(msg)
29✔
4053
        stream := streamNameFromSubject(subject)
29✔
4054

29✔
4055
        // If we are in clustered mode we need to be the stream leader to proceed.
29✔
4056
        if s.JetStreamIsClustered() && !acc.JetStreamIsStreamLeader(stream) {
44✔
4057
                return
15✔
4058
        }
15✔
4059

4060
        var resp = JSApiStreamSnapshotResponse{ApiResponse: ApiResponse{Type: JSApiStreamSnapshotResponseType}}
14✔
4061
        if errorOnRequiredApiLevel(hdr) {
15✔
4062
                resp.Error = NewJSRequiredApiLevelError()
1✔
4063
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4064
                return
1✔
4065
        }
1✔
4066
        if !acc.JetStreamEnabled() {
13✔
4067
                resp.Error = NewJSNotEnabledForAccountError()
×
4068
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4069
                return
×
4070
        }
×
4071
        if isEmptyRequest(msg) {
14✔
4072
                resp.Error = NewJSBadRequestError()
1✔
4073
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4074
                return
1✔
4075
        }
1✔
4076

4077
        mset, err := acc.lookupStream(stream)
12✔
4078
        if err != nil {
13✔
4079
                resp.Error = NewJSStreamNotFoundError(Unless(err))
1✔
4080
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4081
                return
1✔
4082
        }
1✔
4083

4084
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
4085
                if doErr {
×
4086
                        resp.Error = NewJSNotEnabledForAccountError()
×
4087
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4088
                }
×
4089
                return
×
4090
        }
4091

4092
        var req JSApiStreamSnapshotRequest
11✔
4093
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
4094
                resp.Error = NewJSInvalidJSONError(err)
×
4095
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4096
                return
×
4097
        }
×
4098
        if !IsValidSubject(req.DeliverSubject) {
12✔
4099
                resp.Error = NewJSSnapshotDeliverSubjectInvalidError()
1✔
4100
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4101
                return
1✔
4102
        }
1✔
4103

4104
        // We will do the snapshot in a go routine as well since check msgs may
4105
        // stall this go routine.
4106
        go func() {
20✔
4107
                if req.CheckMsgs {
12✔
4108
                        s.Noticef("Starting health check and snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
2✔
4109
                } else {
10✔
4110
                        s.Noticef("Starting snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
8✔
4111
                }
8✔
4112

4113
                start := time.Now().UTC()
10✔
4114

10✔
4115
                sr, err := mset.snapshot(0, req.CheckMsgs, !req.NoConsumers)
10✔
4116
                if err != nil {
10✔
4117
                        s.Warnf("Snapshot of stream '%s > %s' failed: %v", mset.jsa.account.Name, mset.name(), err)
×
4118
                        resp.Error = NewJSStreamSnapshotError(err, Unless(err))
×
4119
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4120
                        return
×
4121
                }
×
4122

4123
                config := mset.config()
10✔
4124
                resp.State = &sr.State
10✔
4125
                resp.Config = &config
10✔
4126

10✔
4127
                s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(resp))
10✔
4128

10✔
4129
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCreatePre+"."+mset.name(), &JSSnapshotCreateAdvisory{
10✔
4130
                        TypedEvent: TypedEvent{
10✔
4131
                                Type: JSSnapshotCreatedAdvisoryType,
10✔
4132
                                ID:   nuid.Next(),
10✔
4133
                                Time: time.Now().UTC(),
10✔
4134
                        },
10✔
4135
                        Stream: mset.name(),
10✔
4136
                        State:  sr.State,
10✔
4137
                        Client: ci.forAdvisory(),
10✔
4138
                        Domain: s.getOpts().JetStreamDomain,
10✔
4139
                })
10✔
4140

10✔
4141
                // Now do the real streaming.
10✔
4142
                s.streamSnapshot(acc, mset, sr, &req)
10✔
4143

10✔
4144
                end := time.Now().UTC()
10✔
4145

10✔
4146
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCompletePre+"."+mset.name(), &JSSnapshotCompleteAdvisory{
10✔
4147
                        TypedEvent: TypedEvent{
10✔
4148
                                Type: JSSnapshotCompleteAdvisoryType,
10✔
4149
                                ID:   nuid.Next(),
10✔
4150
                                Time: end,
10✔
4151
                        },
10✔
4152
                        Stream: mset.name(),
10✔
4153
                        Start:  start,
10✔
4154
                        End:    end,
10✔
4155
                        Client: ci.forAdvisory(),
10✔
4156
                        Domain: s.getOpts().JetStreamDomain,
10✔
4157
                })
10✔
4158

10✔
4159
                s.Noticef("Completed snapshot of %s for stream '%s > %s' in %v",
10✔
4160
                        friendlyBytes(int64(sr.State.Bytes)),
10✔
4161
                        mset.jsa.account.Name,
10✔
4162
                        mset.name(),
10✔
4163
                        end.Sub(start))
10✔
4164
        }()
4165
}
4166

4167
// Default chunk size for now.
4168
const defaultSnapshotChunkSize = 128 * 1024       // 128KiB
4169
const defaultSnapshotWindowSize = 8 * 1024 * 1024 // 8MiB
4170
const defaultSnapshotAckTimeout = 5 * time.Second
4171

4172
var snapshotAckTimeout = defaultSnapshotAckTimeout
4173

4174
// streamSnapshot will stream out our snapshot to the reply subject.
4175
func (s *Server) streamSnapshot(acc *Account, mset *stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) {
10✔
4176
        chunkSize, wndSize := req.ChunkSize, req.WindowSize
10✔
4177
        if chunkSize == 0 {
12✔
4178
                chunkSize = defaultSnapshotChunkSize
2✔
4179
        }
2✔
4180
        if wndSize == 0 {
20✔
4181
                wndSize = defaultSnapshotWindowSize
10✔
4182
        }
10✔
4183
        chunkSize = min(max(1024, chunkSize), 1024*1024) // Clamp within 1KiB to 1MiB
10✔
4184
        wndSize = min(max(1024, wndSize), 32*1024*1024)  // Clamp within 1KiB to 32MiB
10✔
4185
        wndSize = max(wndSize, chunkSize)                // Guarantee at least one chunk
10✔
4186
        maxInflight := wndSize / chunkSize               // Between 1 and 32,768
10✔
4187

10✔
4188
        // Setup for the chunk stream.
10✔
4189
        reply := req.DeliverSubject
10✔
4190
        r := sr.Reader
10✔
4191
        defer r.Close()
10✔
4192

10✔
4193
        // In case we run into an error, this allows subscription callbacks
10✔
4194
        // to not sit and block endlessly.
10✔
4195
        done := make(chan struct{})
10✔
4196
        defer close(done)
10✔
4197

10✔
4198
        // Check interest for the snapshot deliver subject.
10✔
4199
        inch := make(chan bool, 1)
10✔
4200
        acc.sl.RegisterNotification(req.DeliverSubject, inch)
10✔
4201
        defer acc.sl.ClearNotification(req.DeliverSubject, inch)
10✔
4202
        hasInterest := <-inch
10✔
4203
        if !hasInterest {
15✔
4204
                // Allow 2 seconds or so for interest to show up.
5✔
4205
                select {
5✔
4206
                case <-inch:
4✔
4207
                case <-time.After(2 * time.Second):
1✔
4208
                }
4209
        }
4210

4211
        // One slot per chunk. Each chunk read takes a slot, each ack will
4212
        // replace it. Smooths out in-flight number of chunks.
4213
        slots := make(chan struct{}, maxInflight)
10✔
4214
        for range maxInflight {
65,674✔
4215
                slots <- struct{}{}
65,664✔
4216
        }
65,664✔
4217

4218
        // We will place sequence number and size of chunk sent in the reply.
4219
        ackSubj := fmt.Sprintf(jsSnapshotAckT, mset.name(), nuid.Next())
10✔
4220
        ackSub, _ := mset.subscribeInternal(ackSubj+".>", func(_ *subscription, _ *client, _ *Account, subject, _ string, _ []byte) {
28✔
4221
                select {
18✔
4222
                case slots <- struct{}{}:
18✔
4223
                case <-done:
×
4224
                }
4225
        })
4226
        defer mset.unsubscribe(ackSub)
10✔
4227

10✔
4228
        var hdr []byte
10✔
4229
        chunk := make([]byte, chunkSize)
10✔
4230
        for index := 1; ; index++ {
56✔
4231
                select {
46✔
4232
                case <-slots:
46✔
4233
                        // A slot has become available.
4234
                case <-inch:
×
4235
                        // The receiver appears to have gone away.
×
4236
                        hdr = []byte("NATS/1.0 408 No Interest\r\n\r\n")
×
4237
                        goto done
×
4238
                case err := <-sr.errCh:
×
4239
                        // The snapshotting goroutine has failed for some reason.
×
4240
                        hdr = []byte(fmt.Sprintf("NATS/1.0 500 %s\r\n\r\n", err))
×
4241
                        goto done
×
4242
                case <-time.After(snapshotAckTimeout):
×
4243
                        // It's taking a very long time for the receiver to send us acks,
×
4244
                        // they have probably stalled or there is high loss on the link.
×
4245
                        hdr = []byte("NATS/1.0 408 No Flow Response\r\n\r\n")
×
4246
                        goto done
×
4247
                }
4248
                n, err := io.ReadFull(r, chunk)
46✔
4249
                chunk := chunk[:n]
46✔
4250
                if err != nil {
56✔
4251
                        if n > 0 {
20✔
4252
                                mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0))
10✔
4253
                        }
10✔
4254
                        break
10✔
4255
                }
4256
                ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index)
36✔
4257
                if hdr == nil {
41✔
4258
                        hdr = []byte("NATS/1.0 204\r\n\r\n")
5✔
4259
                }
5✔
4260
                mset.outq.send(newJSPubMsg(reply, _EMPTY_, ackReply, nil, chunk, nil, 0))
36✔
4261
        }
4262

4263
done:
4264
        mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
10✔
4265
}
4266

4267
// For determining consumer request type.
4268
type ccReqType uint8
4269

4270
const (
4271
        ccNew = iota
4272
        ccLegacyEphemeral
4273
        ccLegacyDurable
4274
)
4275

4276
// Request to create a consumer where stream and optional consumer name are part of the subject, and optional
4277
// filtered subjects can be at the tail end.
4278
// Assumes stream and consumer names are single tokens.
4279
func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
11,167✔
4280
        if c == nil || !s.JetStreamEnabled() {
11,167✔
4281
                return
×
4282
        }
×
4283

4284
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
11,167✔
4285
        if err != nil {
11,168✔
4286
                s.Warnf(badAPIRequestT, msg)
1✔
4287
                return
1✔
4288
        }
1✔
4289

4290
        var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
11,166✔
4291

11,166✔
4292
        var req CreateConsumerRequest
11,166✔
4293
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11,167✔
4294
                resp.Error = NewJSInvalidJSONError(err)
1✔
4295
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4296
                return
1✔
4297
        }
1✔
4298

4299
        var js *jetStream
11,165✔
4300
        isClustered := s.JetStreamIsClustered()
11,165✔
4301

11,165✔
4302
        // Determine if we should proceed here when we are in clustered mode.
11,165✔
4303
        if isClustered {
21,248✔
4304
                if req.Config.Direct {
10,509✔
4305
                        // Check to see if we have this stream and are the stream leader.
426✔
4306
                        if !acc.JetStreamIsStreamLeader(streamNameFromSubject(subject)) {
750✔
4307
                                return
324✔
4308
                        }
324✔
4309
                } else {
9,657✔
4310
                        var cc *jetStreamCluster
9,657✔
4311
                        js, cc = s.getJetStreamCluster()
9,657✔
4312
                        if js == nil || cc == nil {
9,657✔
4313
                                return
×
4314
                        }
×
4315
                        if js.isLeaderless() {
9,657✔
4316
                                resp.Error = NewJSClusterNotAvailError()
×
4317
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4318
                                return
×
4319
                        }
×
4320
                        // Make sure we are meta leader.
4321
                        if !s.JetStreamIsLeader() {
16,182✔
4322
                                return
6,525✔
4323
                        }
6,525✔
4324
                }
4325
        }
4326

4327
        if errorOnRequiredApiLevel(hdr) {
4,319✔
4328
                resp.Error = NewJSRequiredApiLevelError()
3✔
4329
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4330
                return
3✔
4331
        }
3✔
4332

4333
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
4,316✔
4334
                if doErr {
4✔
4335
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
4336
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4337
                }
1✔
4338
                return
3✔
4339
        }
4340

4341
        var streamName, consumerName, filteredSubject string
4,310✔
4342
        var rt ccReqType
4,310✔
4343

4,310✔
4344
        if n := numTokens(subject); n < 5 {
4,310✔
4345
                s.Warnf(badAPIRequestT, msg)
×
4346
                return
×
4347
        } else if n == 5 {
5,121✔
4348
                // Legacy ephemeral.
811✔
4349
                rt = ccLegacyEphemeral
811✔
4350
                streamName = streamNameFromSubject(subject)
811✔
4351
        } else {
4,310✔
4352
                // New style and durable legacy.
3,499✔
4353
                if tokenAt(subject, 4) == "DURABLE" {
3,763✔
4354
                        rt = ccLegacyDurable
264✔
4355
                        if n != 7 {
264✔
4356
                                resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4357
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4358
                                return
×
4359
                        }
×
4360
                        streamName = tokenAt(subject, 6)
264✔
4361
                        consumerName = tokenAt(subject, 7)
264✔
4362
                } else {
3,235✔
4363
                        streamName = streamNameFromSubject(subject)
3,235✔
4364
                        consumerName = consumerNameFromSubject(subject)
3,235✔
4365
                        // New has optional filtered subject as part of main subject..
3,235✔
4366
                        if n > 6 {
5,928✔
4367
                                tokens := strings.Split(subject, tsep)
2,693✔
4368
                                filteredSubject = strings.Join(tokens[6:], tsep)
2,693✔
4369
                        }
2,693✔
4370
                }
4371
        }
4372

4373
        if streamName != req.Stream {
4,311✔
4374
                resp.Error = NewJSStreamMismatchError()
1✔
4375
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4376
                return
1✔
4377
        }
1✔
4378

4379
        if consumerName != _EMPTY_ {
7,808✔
4380
                // Check for path like separators in the name.
3,499✔
4381
                if strings.ContainsAny(consumerName, `\/`) {
3,503✔
4382
                        resp.Error = NewJSConsumerNameContainsPathSeparatorsError()
4✔
4383
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4384
                        return
4✔
4385
                }
4✔
4386
        }
4387

4388
        // Should we expect a durable name
4389
        if rt == ccLegacyDurable {
4,569✔
4390
                if numTokens(subject) < 7 {
264✔
4391
                        resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4392
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4393
                        return
×
4394
                }
×
4395
                // Now check on requirements for durable request.
4396
                if req.Config.Durable == _EMPTY_ {
265✔
4397
                        resp.Error = NewJSConsumerDurableNameNotSetError()
1✔
4398
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4399
                        return
1✔
4400
                }
1✔
4401
                if consumerName != req.Config.Durable {
263✔
4402
                        resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4403
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4404
                        return
×
4405
                }
×
4406
                // Durable, so we need to honor the name.
4407
                req.Config.Name = consumerName
263✔
4408
        }
4409
        // If new style and durable set make sure they match.
4410
        if rt == ccNew {
7,535✔
4411
                if req.Config.Durable != _EMPTY_ {
5,916✔
4412
                        if consumerName != req.Config.Durable {
2,685✔
4413
                                resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4414
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4415
                                return
×
4416
                        }
×
4417
                }
4418
                // New style ephemeral so we need to honor the name.
4419
                req.Config.Name = consumerName
3,231✔
4420
        }
4421
        // Check for legacy ephemeral mis-configuration.
4422
        if rt == ccLegacyEphemeral && req.Config.Durable != _EMPTY_ {
4,307✔
4423
                resp.Error = NewJSConsumerEphemeralWithDurableNameError()
3✔
4424
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4425
                return
3✔
4426
        }
3✔
4427

4428
        // in case of multiple filters provided, error if new API is used.
4429
        if filteredSubject != _EMPTY_ && len(req.Config.FilterSubjects) != 0 {
4,302✔
4430
                resp.Error = NewJSConsumerMultipleFiltersNotAllowedError()
1✔
4431
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4432
                return
1✔
4433
        }
1✔
4434

4435
        // Check for a filter subject.
4436
        if filteredSubject != _EMPTY_ && req.Config.FilterSubject != filteredSubject {
4,302✔
4437
                resp.Error = NewJSConsumerCreateFilterSubjectMismatchError()
2✔
4438
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
4439
                return
2✔
4440
        }
2✔
4441

4442
        if isClustered && !req.Config.Direct {
7,428✔
4443
                s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
3,130✔
4444
                return
3,130✔
4445
        }
3,130✔
4446

4447
        // If we are here we are single server mode.
4448
        if req.Config.Replicas > 1 {
1,168✔
4449
                resp.Error = NewJSStreamReplicasNotSupportedError()
×
4450
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4451
                return
×
4452
        }
×
4453

4454
        stream, err := acc.lookupStream(req.Stream)
1,168✔
4455
        if err != nil {
1,172✔
4456
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
4457
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4458
                return
4✔
4459
        }
4✔
4460
        if stream.offlineReason != _EMPTY_ {
1,164✔
4461
                resp.Error = NewJSStreamOfflineReasonError(errors.New(stream.offlineReason))
×
4462
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4463
                return
×
4464
        }
×
4465

4466
        if o := stream.lookupConsumer(consumerName); o != nil {
1,219✔
4467
                if o.offlineReason != _EMPTY_ {
55✔
4468
                        resp.Error = NewJSConsumerOfflineReasonError(errors.New(o.offlineReason))
×
4469
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4470
                        return
×
4471
                }
×
4472
                // If the consumer already exists then don't allow updating the PauseUntil, just set
4473
                // it back to whatever the current configured value is.
4474
                o.mu.RLock()
55✔
4475
                req.Config.PauseUntil = o.cfg.PauseUntil
55✔
4476
                o.mu.RUnlock()
55✔
4477
        }
4478

4479
        // Initialize/update asset version metadata.
4480
        setStaticConsumerMetadata(&req.Config)
1,164✔
4481

1,164✔
4482
        o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic)
1,164✔
4483

1,164✔
4484
        if err != nil {
1,221✔
4485
                if IsNatsErr(err, JSConsumerStoreFailedErrF) {
57✔
4486
                        cname := req.Config.Durable // Will be empty if ephemeral.
×
4487
                        s.Warnf("Consumer create failed for '%s > %s > %s': %v", acc, req.Stream, cname, err)
×
4488
                        err = errConsumerStoreFailed
×
4489
                }
×
4490
                resp.Error = NewJSConsumerCreateError(err, Unless(err))
57✔
4491
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
57✔
4492
                return
57✔
4493
        }
4494
        resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.initialInfo())
1,107✔
4495
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,107✔
4496

1,107✔
4497
        o.mu.RLock()
1,107✔
4498
        if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
1,111✔
4499
                o.sendPauseAdvisoryLocked(&o.cfg)
4✔
4500
        }
4✔
4501
        o.mu.RUnlock()
1,107✔
4502
}
4503

4504
// Request for the list of all consumer names.
4505
func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
37✔
4506
        if c == nil || !s.JetStreamEnabled() {
37✔
4507
                return
×
4508
        }
×
4509
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
37✔
4510
        if err != nil {
37✔
4511
                s.Warnf(badAPIRequestT, msg)
×
4512
                return
×
4513
        }
×
4514

4515
        var resp = JSApiConsumerNamesResponse{
37✔
4516
                ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType},
37✔
4517
                Consumers:   []string{},
37✔
4518
        }
37✔
4519

37✔
4520
        // Determine if we should proceed here when we are in clustered mode.
37✔
4521
        if s.JetStreamIsClustered() {
67✔
4522
                js, cc := s.getJetStreamCluster()
30✔
4523
                if js == nil || cc == nil {
30✔
4524
                        return
×
4525
                }
×
4526
                if js.isLeaderless() {
30✔
4527
                        resp.Error = NewJSClusterNotAvailError()
×
4528
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4529
                        return
×
4530
                }
×
4531
                // Make sure we are meta leader.
4532
                if !s.JetStreamIsLeader() {
50✔
4533
                        return
20✔
4534
                }
20✔
4535
        }
4536

4537
        if errorOnRequiredApiLevel(hdr) {
18✔
4538
                resp.Error = NewJSRequiredApiLevelError()
1✔
4539
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4540
                return
1✔
4541
        }
1✔
4542

4543
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
16✔
4544
                if doErr {
×
4545
                        resp.Error = NewJSNotEnabledForAccountError()
×
4546
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4547
                }
×
4548
                return
×
4549
        }
4550

4551
        var offset int
16✔
4552
        if isJSONObjectOrArray(msg) {
26✔
4553
                var req JSApiConsumersRequest
10✔
4554
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
10✔
4555
                        resp.Error = NewJSInvalidJSONError(err)
×
4556
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4557
                        return
×
4558
                }
×
4559
                offset = req.Offset
10✔
4560
        }
4561

4562
        streamName := streamNameFromSubject(subject)
16✔
4563
        var numConsumers int
16✔
4564

16✔
4565
        if s.JetStreamIsClustered() {
26✔
4566
                js, cc := s.getJetStreamCluster()
10✔
4567
                if js == nil || cc == nil {
10✔
4568
                        // TODO(dlc) - Debug or Warn?
×
4569
                        return
×
4570
                }
×
4571
                js.mu.RLock()
10✔
4572
                sas := cc.streams[acc.Name]
10✔
4573
                if sas == nil {
10✔
4574
                        js.mu.RUnlock()
×
4575
                        resp.Error = NewJSStreamNotFoundError()
×
4576
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4577
                        return
×
4578
                }
×
4579
                sa := sas[streamName]
10✔
4580
                if sa == nil || sa.err != nil {
10✔
4581
                        js.mu.RUnlock()
×
4582
                        resp.Error = NewJSStreamNotFoundError()
×
4583
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4584
                        return
×
4585
                }
×
4586
                for consumer := range sa.consumers {
23✔
4587
                        resp.Consumers = append(resp.Consumers, consumer)
13✔
4588
                }
13✔
4589
                if len(resp.Consumers) > 1 {
14✔
4590
                        slices.Sort(resp.Consumers)
4✔
4591
                }
4✔
4592
                numConsumers = len(resp.Consumers)
10✔
4593
                if offset > numConsumers {
10✔
4594
                        offset = numConsumers
×
4595
                }
×
4596
                resp.Consumers = resp.Consumers[offset:]
10✔
4597
                if len(resp.Consumers) > JSApiNamesLimit {
10✔
4598
                        resp.Consumers = resp.Consumers[:JSApiNamesLimit]
×
4599
                }
×
4600
                js.mu.RUnlock()
10✔
4601

4602
        } else {
6✔
4603
                mset, err := acc.lookupStream(streamName)
6✔
4604
                if err != nil {
6✔
4605
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4606
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4607
                        return
×
4608
                }
×
4609

4610
                obs := mset.getPublicConsumers()
6✔
4611
                slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
6✔
4612

4613
                numConsumers = len(obs)
6✔
4614
                if offset > numConsumers {
6✔
4615
                        offset = numConsumers
×
4616
                }
×
4617

4618
                for _, o := range obs[offset:] {
10✔
4619
                        resp.Consumers = append(resp.Consumers, o.String())
4✔
4620
                        if len(resp.Consumers) >= JSApiNamesLimit {
4✔
4621
                                break
×
4622
                        }
4623
                }
4624
        }
4625
        resp.Total = numConsumers
16✔
4626
        resp.Limit = JSApiNamesLimit
16✔
4627
        resp.Offset = offset
16✔
4628
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
16✔
4629
}
4630

4631
// Request for the list of all detailed consumer information.
4632
func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
63✔
4633
        if c == nil || !s.JetStreamEnabled() {
63✔
4634
                return
×
4635
        }
×
4636

4637
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
63✔
4638
        if err != nil {
63✔
4639
                s.Warnf(badAPIRequestT, msg)
×
4640
                return
×
4641
        }
×
4642

4643
        var resp = JSApiConsumerListResponse{
63✔
4644
                ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
63✔
4645
                Consumers:   []*ConsumerInfo{},
63✔
4646
        }
63✔
4647

63✔
4648
        // Determine if we should proceed here when we are in clustered mode.
63✔
4649
        if s.JetStreamIsClustered() {
117✔
4650
                js, cc := s.getJetStreamCluster()
54✔
4651
                if js == nil || cc == nil {
54✔
4652
                        return
×
4653
                }
×
4654
                if js.isLeaderless() {
55✔
4655
                        resp.Error = NewJSClusterNotAvailError()
1✔
4656
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4657
                        return
1✔
4658
                }
1✔
4659
                // Make sure we are meta leader.
4660
                if !s.JetStreamIsLeader() {
93✔
4661
                        return
40✔
4662
                }
40✔
4663
        }
4664

4665
        if errorOnRequiredApiLevel(hdr) {
23✔
4666
                resp.Error = NewJSRequiredApiLevelError()
1✔
4667
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4668
                return
1✔
4669
        }
1✔
4670

4671
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
21✔
4672
                if doErr {
×
4673
                        resp.Error = NewJSNotEnabledForAccountError()
×
4674
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4675
                }
×
4676
                return
×
4677
        }
4678

4679
        var offset int
21✔
4680
        if isJSONObjectOrArray(msg) {
34✔
4681
                var req JSApiConsumersRequest
13✔
4682
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
14✔
4683
                        resp.Error = NewJSInvalidJSONError(err)
1✔
4684
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4685
                        return
1✔
4686
                }
1✔
4687
                offset = req.Offset
12✔
4688
        }
4689

4690
        streamName := streamNameFromSubject(subject)
20✔
4691

20✔
4692
        // Clustered mode will invoke a scatter and gather.
20✔
4693
        if s.JetStreamIsClustered() {
33✔
4694
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
13✔
4695
                msg = copyBytes(msg)
13✔
4696
                s.startGoRoutine(func() {
26✔
4697
                        s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg)
13✔
4698
                })
13✔
4699
                return
13✔
4700
        }
4701

4702
        mset, err := acc.lookupStream(streamName)
7✔
4703
        if err != nil {
7✔
4704
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4705
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4706
                return
×
4707
        }
×
4708

4709
        obs := mset.getPublicConsumers()
7✔
4710
        slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
7✔
4711

4712
        ocnt := len(obs)
7✔
4713
        if offset > ocnt {
7✔
4714
                offset = ocnt
×
4715
        }
×
4716

4717
        var missingNames []string
7✔
4718
        for _, o := range obs[offset:] {
13✔
4719
                if o.offlineReason != _EMPTY_ {
7✔
4720
                        if resp.Offline == nil {
2✔
4721
                                resp.Offline = make(map[string]string, 1)
1✔
4722
                        }
1✔
4723
                        resp.Offline[o.name] = o.offlineReason
1✔
4724
                        missingNames = append(missingNames, o.name)
1✔
4725
                        continue
1✔
4726
                }
4727
                if cinfo := o.info(); cinfo != nil {
10✔
4728
                        resp.Consumers = append(resp.Consumers, cinfo)
5✔
4729
                }
5✔
4730
                if len(resp.Consumers) >= JSApiListLimit {
5✔
4731
                        break
×
4732
                }
4733
        }
4734
        resp.Total = ocnt
7✔
4735
        resp.Limit = JSApiListLimit
7✔
4736
        resp.Offset = offset
7✔
4737
        resp.Missing = missingNames
7✔
4738
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
7✔
4739
}
4740

4741
// Request for information about an consumer.
4742
func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
42,391✔
4743
        if c == nil || !s.JetStreamEnabled() {
42,391✔
4744
                return
×
4745
        }
×
4746
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
42,391✔
4747
        if err != nil {
42,391✔
4748
                s.Warnf(badAPIRequestT, msg)
×
4749
                return
×
4750
        }
×
4751

4752
        streamName := streamNameFromSubject(subject)
42,391✔
4753
        consumerName := consumerNameFromSubject(subject)
42,391✔
4754

42,391✔
4755
        var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}
42,391✔
4756

42,391✔
4757
        if !isEmptyRequest(msg) {
42,392✔
4758
                resp.Error = NewJSNotEmptyRequestError()
1✔
4759
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4760
                return
1✔
4761
        }
1✔
4762

4763
        // If we are in clustered mode we need to be the consumer leader to proceed.
4764
        if s.JetStreamIsClustered() {
84,231✔
4765
                // Check to make sure the consumer is assigned.
41,841✔
4766
                js, cc := s.getJetStreamCluster()
41,841✔
4767
                if js == nil || cc == nil {
41,841✔
4768
                        return
×
4769
                }
×
4770

4771
                js.mu.RLock()
41,841✔
4772
                meta := cc.meta
41,841✔
4773
                js.mu.RUnlock()
41,841✔
4774

41,841✔
4775
                if meta == nil {
41,841✔
4776
                        return
×
4777
                }
×
4778

4779
                // Since these could wait on the Raft group lock, don't do so under the JS lock.
4780
                ourID := meta.ID()
41,841✔
4781
                groupLeaderless := meta.Leaderless()
41,841✔
4782
                groupCreated := meta.Created()
41,841✔
4783

41,841✔
4784
                js.mu.RLock()
41,841✔
4785
                isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName)
41,841✔
4786
                var rg *raftGroup
41,841✔
4787
                var offline, isMember bool
41,841✔
4788
                if ca != nil {
46,095✔
4789
                        if rg = ca.Group; rg != nil {
8,508✔
4790
                                offline = s.allPeersOffline(rg)
4,254✔
4791
                                isMember = rg.isMember(ourID)
4,254✔
4792
                        }
4,254✔
4793
                        if ca.unsupported != nil && isMember {
4,272✔
4794
                                // If we're a member for this consumer, and it's not supported, report it as offline.
18✔
4795
                                resp.Error = NewJSConsumerOfflineReasonError(errors.New(ca.unsupported.reason))
18✔
4796
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
18✔
4797
                                js.mu.RUnlock()
18✔
4798
                                return
18✔
4799
                        }
18✔
4800
                }
4801
                // Capture consumer leader here.
4802
                isConsumerLeader := cc.isConsumerLeader(acc.Name, streamName, consumerName)
41,823✔
4803
                // Also capture if we think there is no meta leader.
41,823✔
4804
                var isLeaderLess bool
41,823✔
4805
                if !isLeader {
69,911✔
4806
                        isLeaderLess = groupLeaderless && time.Since(groupCreated) > lostQuorumIntervalDefault
28,088✔
4807
                }
28,088✔
4808
                js.mu.RUnlock()
41,823✔
4809

41,823✔
4810
                if isLeader && ca == nil {
54,265✔
4811
                        // We can't find the consumer, so mimic what would be the errors below.
12,442✔
4812
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
12,442✔
4813
                                if doErr {
×
4814
                                        resp.Error = NewJSNotEnabledForAccountError()
×
4815
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4816
                                }
×
4817
                                return
×
4818
                        }
4819
                        if sa == nil {
22,445✔
4820
                                resp.Error = NewJSStreamNotFoundError()
10,003✔
4821
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
10,003✔
4822
                                return
10,003✔
4823
                        }
10,003✔
4824
                        // If we are here the consumer is not present.
4825
                        resp.Error = NewJSConsumerNotFoundError()
2,439✔
4826
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2,439✔
4827
                        return
2,439✔
4828
                } else if ca == nil {
54,526✔
4829
                        if isLeaderLess {
25,147✔
4830
                                resp.Error = NewJSClusterNotAvailError()
2✔
4831
                                // Delaying an error response gives the leader a chance to respond before us
2✔
4832
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
4833
                        }
2✔
4834
                        return
25,145✔
4835
                } else if isLeader && offline {
4,238✔
4836
                        resp.Error = NewJSConsumerOfflineError()
2✔
4837
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
4838
                        return
2✔
4839
                }
2✔
4840

4841
                // Check to see if we are a member of the group and if the group has no leader.
4842
                if isMember && js.isGroupLeaderless(ca.Group) {
4,235✔
4843
                        resp.Error = NewJSClusterNotAvailError()
1✔
4844
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4845
                        return
1✔
4846
                }
1✔
4847

4848
                // We have the consumer assigned and a leader, so only the consumer leader should answer.
4849
                if !isConsumerLeader {
7,181✔
4850
                        if isLeaderLess {
2,948✔
4851
                                resp.Error = NewJSClusterNotAvailError()
×
4852
                                // Delaying an error response gives the leader a chance to respond before us
×
4853
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group, errRespDelay)
×
4854
                                return
×
4855
                        }
×
4856

4857
                        var node RaftNode
2,948✔
4858
                        var leaderNotPartOfGroup bool
2,948✔
4859

2,948✔
4860
                        // We have a consumer assignment.
2,948✔
4861
                        if isMember {
5,071✔
4862
                                js.mu.RLock()
2,123✔
4863
                                if rg != nil && rg.node != nil {
4,246✔
4864
                                        node = rg.node
2,123✔
4865
                                        if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
2,123✔
4866
                                                leaderNotPartOfGroup = true
×
4867
                                        }
×
4868
                                }
4869
                                js.mu.RUnlock()
2,123✔
4870
                        }
4871

4872
                        // Check if we should ignore all together.
4873
                        if node == nil {
3,773✔
4874
                                // We have been assigned but have not created a node yet. If we are a member return
825✔
4875
                                // our config and defaults for state and no cluster info.
825✔
4876
                                if isMember {
825✔
4877
                                        // Since we access consumerAssignment, need js lock.
×
4878
                                        js.mu.RLock()
×
4879
                                        resp.ConsumerInfo = &ConsumerInfo{
×
4880
                                                Stream:    ca.Stream,
×
4881
                                                Name:      ca.Name,
×
4882
                                                Created:   ca.Created,
×
4883
                                                Config:    setDynamicConsumerMetadata(ca.Config),
×
4884
                                                TimeStamp: time.Now().UTC(),
×
4885
                                        }
×
4886
                                        b := s.jsonResponse(resp)
×
4887
                                        js.mu.RUnlock()
×
4888
                                        s.sendAPIResponse(ci, acc, subject, reply, string(msg), b)
×
4889
                                }
×
4890
                                return
825✔
4891
                        }
4892
                        // If we are a member and we have a group leader or we had a previous leader consider bailing out.
4893
                        if !node.Leaderless() || node.HadPreviousLeader() || (rg != nil && rg.Preferred != _EMPTY_ && rg.Preferred != ourID) {
4,242✔
4894
                                if leaderNotPartOfGroup {
2,119✔
4895
                                        resp.Error = NewJSConsumerOfflineError()
×
4896
                                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4897
                                }
×
4898
                                return
2,119✔
4899
                        }
4900
                        // If we are here we are a member and this is just a new consumer that does not have a (preferred) leader yet.
4901
                        // Will fall through and return what we have. All consumers can respond but this should be very rare
4902
                        // but makes more sense to clients when they try to create, get a consumer exists, and then do consumer info.
4903
                }
4904
        }
4905

4906
        if errorOnRequiredApiLevel(hdr) {
1,839✔
4907
                resp.Error = NewJSRequiredApiLevelError()
1✔
4908
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4909
                return
1✔
4910
        }
1✔
4911

4912
        if !acc.JetStreamEnabled() {
1,837✔
4913
                resp.Error = NewJSNotEnabledForAccountError()
×
4914
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4915
                return
×
4916
        }
×
4917

4918
        mset, err := acc.lookupStream(streamName)
1,837✔
4919
        if err != nil {
1,837✔
4920
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4921
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4922
                return
×
4923
        }
×
4924

4925
        obs := mset.lookupConsumer(consumerName)
1,837✔
4926
        if obs == nil {
2,044✔
4927
                resp.Error = NewJSConsumerNotFoundError()
207✔
4928
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
207✔
4929
                return
207✔
4930
        }
207✔
4931

4932
        if obs.offlineReason != _EMPTY_ {
1,631✔
4933
                resp.Error = NewJSConsumerOfflineReasonError(errors.New(obs.offlineReason))
1✔
4934
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
4935
                return
1✔
4936
        }
1✔
4937

4938
        if resp.ConsumerInfo = setDynamicConsumerInfoMetadata(obs.info()); resp.ConsumerInfo == nil {
1,629✔
4939
                // This consumer returned nil which means it's closed. Respond with not found.
×
4940
                resp.Error = NewJSConsumerNotFoundError()
×
4941
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4942
                return
×
4943
        }
×
4944
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,629✔
4945
}
4946

4947
// Request to delete an Consumer.
4948
func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
7,415✔
4949
        if c == nil || !s.JetStreamEnabled() {
7,418✔
4950
                return
3✔
4951
        }
3✔
4952
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
7,412✔
4953
        if err != nil {
7,412✔
4954
                s.Warnf(badAPIRequestT, msg)
×
4955
                return
×
4956
        }
×
4957

4958
        var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
7,412✔
4959

7,412✔
4960
        // Determine if we should proceed here when we are in clustered mode.
7,412✔
4961
        if s.JetStreamIsClustered() {
14,374✔
4962
                js, cc := s.getJetStreamCluster()
6,962✔
4963
                if js == nil || cc == nil {
6,963✔
4964
                        return
1✔
4965
                }
1✔
4966
                if js.isLeaderless() {
6,962✔
4967
                        resp.Error = NewJSClusterNotAvailError()
1✔
4968
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4969
                        return
1✔
4970
                }
1✔
4971
                // Make sure we are meta leader.
4972
                if !s.JetStreamIsLeader() {
11,579✔
4973
                        return
4,619✔
4974
                }
4,619✔
4975
        }
4976

4977
        if errorOnRequiredApiLevel(hdr) {
2,792✔
4978
                resp.Error = NewJSRequiredApiLevelError()
1✔
4979
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4980
                return
1✔
4981
        }
1✔
4982

4983
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
2,853✔
4984
                if doErr {
124✔
4985
                        resp.Error = NewJSNotEnabledForAccountError()
61✔
4986
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
61✔
4987
                }
61✔
4988
                return
63✔
4989
        }
4990
        if !isEmptyRequest(msg) {
2,728✔
4991
                resp.Error = NewJSNotEmptyRequestError()
1✔
4992
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4993
                return
1✔
4994
        }
1✔
4995
        stream := streamNameFromSubject(subject)
2,726✔
4996
        consumer := consumerNameFromSubject(subject)
2,726✔
4997

2,726✔
4998
        if s.JetStreamIsClustered() {
5,067✔
4999
                s.jsClusteredConsumerDeleteRequest(ci, acc, stream, consumer, subject, reply, rmsg)
2,341✔
5000
                return
2,341✔
5001
        }
2,341✔
5002

5003
        mset, err := acc.lookupStream(stream)
385✔
5004
        if err != nil {
385✔
5005
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5006
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5007
                return
×
5008
        }
×
5009

5010
        obs := mset.lookupConsumer(consumer)
385✔
5011
        if obs == nil {
547✔
5012
                resp.Error = NewJSConsumerNotFoundError()
162✔
5013
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
162✔
5014
                return
162✔
5015
        }
162✔
5016
        if err := obs.delete(); err != nil {
223✔
5017
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
5018
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5019
                return
×
5020
        }
×
5021
        resp.Success = true
223✔
5022
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
223✔
5023
}
5024

5025
// Request to pause or unpause a Consumer.
5026
func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
64✔
5027
        if c == nil || !s.JetStreamEnabled() {
64✔
5028
                return
×
5029
        }
×
5030
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
64✔
5031
        if err != nil {
64✔
5032
                s.Warnf(badAPIRequestT, msg)
×
5033
                return
×
5034
        }
×
5035

5036
        var req JSApiConsumerPauseRequest
64✔
5037
        var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}}
64✔
5038

64✔
5039
        if isJSONObjectOrArray(msg) {
119✔
5040
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
55✔
5041
                        resp.Error = NewJSInvalidJSONError(err)
×
5042
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5043
                        return
×
5044
                }
×
5045
        }
5046

5047
        // Determine if we should proceed here when we are in clustered mode.
5048
        isClustered := s.JetStreamIsClustered()
64✔
5049
        js, cc := s.getJetStreamCluster()
64✔
5050
        if isClustered {
118✔
5051
                if js == nil || cc == nil {
54✔
5052
                        return
×
5053
                }
×
5054
                if js.isLeaderless() {
54✔
5055
                        resp.Error = NewJSClusterNotAvailError()
×
5056
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5057
                        return
×
5058
                }
×
5059
                // Make sure we are meta leader.
5060
                if !s.JetStreamIsLeader() {
94✔
5061
                        return
40✔
5062
                }
40✔
5063
        }
5064

5065
        if errorOnRequiredApiLevel(hdr) {
25✔
5066
                resp.Error = NewJSRequiredApiLevelError()
1✔
5067
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5068
                return
1✔
5069
        }
1✔
5070

5071
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
23✔
5072
                if doErr {
×
5073
                        resp.Error = NewJSNotEnabledForAccountError()
×
5074
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5075
                }
×
5076
                return
×
5077
        }
5078

5079
        stream := streamNameFromSubject(subject)
23✔
5080
        consumer := consumerNameFromSubject(subject)
23✔
5081

23✔
5082
        if isClustered {
37✔
5083
                js.mu.RLock()
14✔
5084
                sa := js.streamAssignment(acc.Name, stream)
14✔
5085
                if sa == nil {
14✔
5086
                        js.mu.RUnlock()
×
5087
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5088
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5089
                        return
×
5090
                }
×
5091
                if sa.unsupported != nil {
14✔
5092
                        js.mu.RUnlock()
×
5093
                        // Just let the request time out.
×
5094
                        return
×
5095
                }
×
5096

5097
                ca, ok := sa.consumers[consumer]
14✔
5098
                if !ok || ca == nil {
14✔
5099
                        js.mu.RUnlock()
×
5100
                        resp.Error = NewJSConsumerNotFoundError()
×
5101
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5102
                        return
×
5103
                }
×
5104
                if ca.unsupported != nil {
14✔
5105
                        js.mu.RUnlock()
×
5106
                        // Just let the request time out.
×
5107
                        return
×
5108
                }
×
5109

5110
                nca := *ca
14✔
5111
                ncfg := *ca.Config
14✔
5112
                nca.Config = &ncfg
14✔
5113
                meta := cc.meta
14✔
5114
                js.mu.RUnlock()
14✔
5115
                pauseUTC := req.PauseUntil.UTC()
14✔
5116
                if !pauseUTC.IsZero() {
24✔
5117
                        nca.Config.PauseUntil = &pauseUTC
10✔
5118
                } else {
14✔
5119
                        nca.Config.PauseUntil = nil
4✔
5120
                }
4✔
5121

5122
                // Update asset version metadata due to updating pause/resume.
5123
                // Only PauseUntil is updated above, so reuse config for both.
5124
                setStaticConsumerMetadata(nca.Config)
14✔
5125

14✔
5126
                eca := encodeAddConsumerAssignment(&nca)
14✔
5127
                meta.Propose(eca)
14✔
5128

14✔
5129
                resp.PauseUntil = pauseUTC
14✔
5130
                if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
24✔
5131
                        resp.PauseRemaining = time.Until(pauseUTC)
10✔
5132
                }
10✔
5133
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
14✔
5134
                return
14✔
5135
        }
5136

5137
        mset, err := acc.lookupStream(stream)
9✔
5138
        if err != nil {
9✔
5139
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5140
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5141
                return
×
5142
        }
×
5143
        if mset.offlineReason != _EMPTY_ {
9✔
5144
                // Just let the request time out.
×
5145
                return
×
5146
        }
×
5147

5148
        obs := mset.lookupConsumer(consumer)
9✔
5149
        if obs == nil {
9✔
5150
                resp.Error = NewJSConsumerNotFoundError()
×
5151
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5152
                return
×
5153
        }
×
5154
        if obs.offlineReason != _EMPTY_ {
9✔
5155
                // Just let the request time out.
×
5156
                return
×
5157
        }
×
5158

5159
        ncfg := obs.cfg
9✔
5160
        pauseUTC := req.PauseUntil.UTC()
9✔
5161
        if !pauseUTC.IsZero() {
14✔
5162
                ncfg.PauseUntil = &pauseUTC
5✔
5163
        } else {
9✔
5164
                ncfg.PauseUntil = nil
4✔
5165
        }
4✔
5166

5167
        // Update asset version metadata due to updating pause/resume.
5168
        setStaticConsumerMetadata(&ncfg)
9✔
5169

9✔
5170
        if err := obs.updateConfig(&ncfg); err != nil {
9✔
5171
                // The only type of error that should be returned here is from o.store,
×
5172
                // so use a store failed error type.
×
5173
                resp.Error = NewJSConsumerStoreFailedError(err)
×
5174
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5175
                return
×
5176
        }
×
5177

5178
        resp.PauseUntil = pauseUTC
9✔
5179
        if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
14✔
5180
                resp.PauseRemaining = time.Until(pauseUTC)
5✔
5181
        }
5✔
5182
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
9✔
5183
}
5184

5185
// sendJetStreamAPIAuditAdvisory will send the audit event for a given event.
5186
func (s *Server) sendJetStreamAPIAuditAdvisory(ci *ClientInfo, acc *Account, subject, request, response string) {
70,923✔
5187
        s.publishAdvisory(acc, JSAuditAdvisory, JSAPIAudit{
70,923✔
5188
                TypedEvent: TypedEvent{
70,923✔
5189
                        Type: JSAPIAuditType,
70,923✔
5190
                        ID:   nuid.Next(),
70,923✔
5191
                        Time: time.Now().UTC(),
70,923✔
5192
                },
70,923✔
5193
                Server:   s.Name(),
70,923✔
5194
                Client:   ci.forAdvisory(),
70,923✔
5195
                Subject:  subject,
70,923✔
5196
                Request:  request,
70,923✔
5197
                Response: response,
70,923✔
5198
                Domain:   s.getOpts().JetStreamDomain,
70,923✔
5199
        })
70,923✔
5200
}
70,923✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc