• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 13984795907

20 Mar 2025 09:11PM UTC coverage: 85.48% (-0.03%) from 85.509%
13984795907

push

github

web-flow
Various GHA tweaks (#6702)

Signed-off-by: Neil Twigg <neil@nats.io>

69231 of 80991 relevant lines covered (85.48%)

472279.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.38
/server/jetstream_api.go
1
// Copyright 2020-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "cmp"
19
        "encoding/json"
20
        "errors"
21
        "fmt"
22
        "io"
23
        "os"
24
        "path/filepath"
25
        "runtime"
26
        "slices"
27
        "strconv"
28
        "strings"
29
        "sync/atomic"
30
        "time"
31
        "unicode"
32

33
        "github.com/nats-io/nuid"
34
)
35

36
// Request API subjects for JetStream.
37
const (
38
        // All API endpoints.
39
        jsAllAPI = "$JS.API.>"
40

41
        // For constructing JetStream domain prefixes.
42
        jsDomainAPI = "$JS.%s.API.>"
43

44
        JSApiPrefix = "$JS.API"
45

46
        // JSApiAccountInfo is for obtaining general information about JetStream for this account.
47
        // Will return JSON response.
48
        JSApiAccountInfo = "$JS.API.INFO"
49

50
        // JSApiTemplateCreate is the endpoint to create new stream templates.
51
        // Will return JSON response.
52
        JSApiTemplateCreate  = "$JS.API.STREAM.TEMPLATE.CREATE.*"
53
        JSApiTemplateCreateT = "$JS.API.STREAM.TEMPLATE.CREATE.%s"
54

55
        // JSApiTemplates is the endpoint to list all stream template names for this account.
56
        // Will return JSON response.
57
        JSApiTemplates = "$JS.API.STREAM.TEMPLATE.NAMES"
58

59
        // JSApiTemplateInfo is for obtaining general information about a named stream template.
60
        // Will return JSON response.
61
        JSApiTemplateInfo  = "$JS.API.STREAM.TEMPLATE.INFO.*"
62
        JSApiTemplateInfoT = "$JS.API.STREAM.TEMPLATE.INFO.%s"
63

64
        // JSApiTemplateDelete is the endpoint to delete stream templates.
65
        // Will return JSON response.
66
        JSApiTemplateDelete  = "$JS.API.STREAM.TEMPLATE.DELETE.*"
67
        JSApiTemplateDeleteT = "$JS.API.STREAM.TEMPLATE.DELETE.%s"
68

69
        // JSApiStreamCreate is the endpoint to create new streams.
70
        // Will return JSON response.
71
        JSApiStreamCreate  = "$JS.API.STREAM.CREATE.*"
72
        JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"
73

74
        // JSApiStreamUpdate is the endpoint to update existing streams.
75
        // Will return JSON response.
76
        JSApiStreamUpdate  = "$JS.API.STREAM.UPDATE.*"
77
        JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"
78

79
        // JSApiStreams is the endpoint to list all stream names for this account.
80
        // Will return JSON response.
81
        JSApiStreams = "$JS.API.STREAM.NAMES"
82
        // JSApiStreamList is the endpoint that will return all detailed stream information
83
        JSApiStreamList = "$JS.API.STREAM.LIST"
84

85
        // JSApiStreamInfo is for obtaining general information about a named stream.
86
        // Will return JSON response.
87
        JSApiStreamInfo  = "$JS.API.STREAM.INFO.*"
88
        JSApiStreamInfoT = "$JS.API.STREAM.INFO.%s"
89

90
        // JSApiStreamDelete is the endpoint to delete streams.
91
        // Will return JSON response.
92
        JSApiStreamDelete  = "$JS.API.STREAM.DELETE.*"
93
        JSApiStreamDeleteT = "$JS.API.STREAM.DELETE.%s"
94

95
        // JSApiStreamPurge is the endpoint to purge streams.
96
        // Will return JSON response.
97
        JSApiStreamPurge  = "$JS.API.STREAM.PURGE.*"
98
        JSApiStreamPurgeT = "$JS.API.STREAM.PURGE.%s"
99

100
        // JSApiStreamSnapshot is the endpoint to snapshot streams.
101
        // Will return a stream of chunks with a nil chunk as EOF to
102
        // the deliver subject. Caller should respond to each chunk
103
        // with a nil body response for ack flow.
104
        JSApiStreamSnapshot  = "$JS.API.STREAM.SNAPSHOT.*"
105
        JSApiStreamSnapshotT = "$JS.API.STREAM.SNAPSHOT.%s"
106

107
        // JSApiStreamRestore is the endpoint to restore a stream from a snapshot.
108
        // Caller should respond to each chunk with a nil body response.
109
        JSApiStreamRestore  = "$JS.API.STREAM.RESTORE.*"
110
        JSApiStreamRestoreT = "$JS.API.STREAM.RESTORE.%s"
111

112
        // JSApiMsgDelete is the endpoint to delete messages from a stream.
113
        // Will return JSON response.
114
        JSApiMsgDelete  = "$JS.API.STREAM.MSG.DELETE.*"
115
        JSApiMsgDeleteT = "$JS.API.STREAM.MSG.DELETE.%s"
116

117
        // JSApiMsgGet is the template for direct requests for a message by its stream sequence number.
118
        // Will return JSON response.
119
        JSApiMsgGet  = "$JS.API.STREAM.MSG.GET.*"
120
        JSApiMsgGetT = "$JS.API.STREAM.MSG.GET.%s"
121

122
        // JSDirectMsgGet is the template for non-api layer direct requests for a message by its stream sequence number or last by subject.
123
        // Will return the message similar to how a consumer receives the message, no JSON processing.
124
        // If the message can not be found we will use a status header of 404. If the stream does not exist the client will get a no-responders or timeout.
125
        JSDirectMsgGet  = "$JS.API.DIRECT.GET.*"
126
        JSDirectMsgGetT = "$JS.API.DIRECT.GET.%s"
127

128
        // This is a direct version of get last by subject, which will be the dominant pattern for KV access once 2.9 is released.
129
        // The stream and the key will be part of the subject to allow for no-marshal payloads and subject based security permissions.
130
        JSDirectGetLastBySubject  = "$JS.API.DIRECT.GET.*.>"
131
        JSDirectGetLastBySubjectT = "$JS.API.DIRECT.GET.%s.%s"
132

133
        // jsDirectGetPre
134
        jsDirectGetPre = "$JS.API.DIRECT.GET"
135

136
        // JSApiConsumerCreate is the endpoint to create consumers for streams.
137
        // This was also the legacy endpoint for ephemeral consumers.
138
        // It now can take consumer name and optional filter subject, which when part of the subject controls access.
139
        // Will return JSON response.
140
        JSApiConsumerCreate    = "$JS.API.CONSUMER.CREATE.*"
141
        JSApiConsumerCreateT   = "$JS.API.CONSUMER.CREATE.%s"
142
        JSApiConsumerCreateEx  = "$JS.API.CONSUMER.CREATE.*.>"
143
        JSApiConsumerCreateExT = "$JS.API.CONSUMER.CREATE.%s.%s.%s"
144

145
        // JSApiDurableCreate is the endpoint to create durable consumers for streams.
146
        // You need to include the stream and consumer name in the subject.
147
        JSApiDurableCreate  = "$JS.API.CONSUMER.DURABLE.CREATE.*.*"
148
        JSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s"
149

150
        // JSApiConsumers is the endpoint to list all consumer names for the stream.
151
        // Will return JSON response.
152
        JSApiConsumers  = "$JS.API.CONSUMER.NAMES.*"
153
        JSApiConsumersT = "$JS.API.CONSUMER.NAMES.%s"
154

155
        // JSApiConsumerList is the endpoint that will return all detailed consumer information
156
        JSApiConsumerList  = "$JS.API.CONSUMER.LIST.*"
157
        JSApiConsumerListT = "$JS.API.CONSUMER.LIST.%s"
158

159
        // JSApiConsumerInfo is for obtaining general information about a consumer.
160
        // Will return JSON response.
161
        JSApiConsumerInfo  = "$JS.API.CONSUMER.INFO.*.*"
162
        JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s"
163

164
        // JSApiConsumerDelete is the endpoint to delete consumers.
165
        // Will return JSON response.
166
        JSApiConsumerDelete  = "$JS.API.CONSUMER.DELETE.*.*"
167
        JSApiConsumerDeleteT = "$JS.API.CONSUMER.DELETE.%s.%s"
168

169
        // JSApiConsumerPause is the endpoint to pause or unpause consumers.
170
        // Will return JSON response.
171
        JSApiConsumerPause  = "$JS.API.CONSUMER.PAUSE.*.*"
172
        JSApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"
173

174
        // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
175
        JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
176

177
        // JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
178
        JSApiConsumerUnpin  = "$JS.API.CONSUMER.UNPIN.*.*"
179
        JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"
180

181
        // jsRequestNextPre
182
        jsRequestNextPre = "$JS.API.CONSUMER.MSG.NEXT."
183

184
        // For snapshots and restores. The ack will have additional tokens.
185
        jsSnapshotAckT    = "$JS.SNAPSHOT.ACK.%s.%s"
186
        jsRestoreDeliverT = "$JS.SNAPSHOT.RESTORE.%s.%s"
187

188
        // JSApiStreamRemovePeer is the endpoint to remove a peer from a clustered stream and its consumers.
189
        // Will return JSON response.
190
        JSApiStreamRemovePeer  = "$JS.API.STREAM.PEER.REMOVE.*"
191
        JSApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.%s"
192

193
        // JSApiStreamLeaderStepDown is the endpoint to have stream leader stepdown.
194
        // Will return JSON response.
195
        JSApiStreamLeaderStepDown  = "$JS.API.STREAM.LEADER.STEPDOWN.*"
196
        JSApiStreamLeaderStepDownT = "$JS.API.STREAM.LEADER.STEPDOWN.%s"
197

198
        // JSApiConsumerLeaderStepDown is the endpoint to have consumer leader stepdown.
199
        // Will return JSON response.
200
        JSApiConsumerLeaderStepDown  = "$JS.API.CONSUMER.LEADER.STEPDOWN.*.*"
201
        JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
202

203
        // JSApiLeaderStepDown is the endpoint to have our metaleader stepdown.
204
        // Only works from system account.
205
        // Will return JSON response.
206
        JSApiLeaderStepDown = "$JS.API.META.LEADER.STEPDOWN"
207

208
        // JSApiRemoveServer is the endpoint to remove a peer server from the cluster.
209
        // Only works from system account.
210
        // Will return JSON response.
211
        JSApiRemoveServer = "$JS.API.SERVER.REMOVE"
212

213
        // JSApiAccountPurge is the endpoint to purge the js content of an account
214
        // Only works from system account.
215
        // Will return JSON response.
216
        JSApiAccountPurge  = "$JS.API.ACCOUNT.PURGE.*"
217
        JSApiAccountPurgeT = "$JS.API.ACCOUNT.PURGE.%s"
218

219
        // JSApiServerStreamMove is the endpoint to move streams off a server
220
        // Only works from system account.
221
        // Will return JSON response.
222
        JSApiServerStreamMove  = "$JS.API.ACCOUNT.STREAM.MOVE.*.*"
223
        JSApiServerStreamMoveT = "$JS.API.ACCOUNT.STREAM.MOVE.%s.%s"
224

225
        // JSApiServerStreamCancelMove is the endpoint to cancel a stream move
226
        // Only works from system account.
227
        // Will return JSON response.
228
        JSApiServerStreamCancelMove  = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.*.*"
229
        JSApiServerStreamCancelMoveT = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.%s.%s"
230

231
        // The prefix for system level account API.
232
        jsAPIAccountPre = "$JS.API.ACCOUNT."
233

234
        // jsAckT is the template for the ack message stream coming back from a consumer
235
        // when they ACK/NAK, etc a message.
236
        jsAckT      = "$JS.ACK.%s.%s"
237
        jsAckPre    = "$JS.ACK."
238
        jsAckPreLen = len(jsAckPre)
239

240
        // jsFlowControl is for flow control subjects.
241
        jsFlowControlPre = "$JS.FC."
242
        // jsFlowControl is for FC responses.
243
        jsFlowControl = "$JS.FC.%s.%s.*"
244

245
        // JSAdvisoryPrefix is a prefix for all JetStream advisories.
246
        JSAdvisoryPrefix = "$JS.EVENT.ADVISORY"
247

248
        // JSMetricPrefix is a prefix for all JetStream metrics.
249
        JSMetricPrefix = "$JS.EVENT.METRIC"
250

251
        // JSMetricConsumerAckPre is a metric containing ack latency.
252
        JSMetricConsumerAckPre = "$JS.EVENT.METRIC.CONSUMER.ACK"
253

254
        // JSAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold.
255
        JSAdvisoryConsumerMaxDeliveryExceedPre = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES"
256

257
        // JSAdvisoryConsumerMsgNakPre is a notification published when a message has been naked
258
        JSAdvisoryConsumerMsgNakPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED"
259

260
        // JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated.
261
        JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"
262

263
        // JSAdvisoryStreamCreatedPre notification that a stream was created.
264
        JSAdvisoryStreamCreatedPre = "$JS.EVENT.ADVISORY.STREAM.CREATED"
265

266
        // JSAdvisoryStreamDeletedPre notification that a stream was deleted.
267
        JSAdvisoryStreamDeletedPre = "$JS.EVENT.ADVISORY.STREAM.DELETED"
268

269
        // JSAdvisoryStreamUpdatedPre notification that a stream was updated.
270
        JSAdvisoryStreamUpdatedPre = "$JS.EVENT.ADVISORY.STREAM.UPDATED"
271

272
        // JSAdvisoryConsumerCreatedPre notification that a consumer was created.
273
        JSAdvisoryConsumerCreatedPre = "$JS.EVENT.ADVISORY.CONSUMER.CREATED"
274

275
        // JSAdvisoryConsumerDeletedPre notification that a consumer was deleted.
276
        JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"
277

278
        // JSAdvisoryConsumerPausePre notification that a consumer paused/unpaused.
279
        JSAdvisoryConsumerPausePre = "$JS.EVENT.ADVISORY.CONSUMER.PAUSE"
280

281
        // JSAdvisoryConsumerPinnedPre notification that a consumer was pinned.
282
        JSAdvisoryConsumerPinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.PINNED"
283

284
        // JSAdvisoryConsumerUnpinnedPre notification that a consumer was unpinned.
285
        JSAdvisoryConsumerUnpinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.UNPINNED"
286

287
        // JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
288
        JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"
289

290
        // JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed.
291
        JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"
292

293
        // JSAdvisoryStreamRestoreCreatePre notification that a restore was start.
294
        JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"
295

296
        // JSAdvisoryStreamRestoreCompletePre notification that a restore was completed.
297
        JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"
298

299
        // JSAdvisoryDomainLeaderElectedPre notification that a jetstream domain has elected a leader.
300
        JSAdvisoryDomainLeaderElected = "$JS.EVENT.ADVISORY.DOMAIN.LEADER_ELECTED"
301

302
        // JSAdvisoryStreamLeaderElectedPre notification that a replicated stream has elected a leader.
303
        JSAdvisoryStreamLeaderElectedPre = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED"
304

305
        // JSAdvisoryStreamQuorumLostPre notification that a stream and its consumers are stalled.
306
        JSAdvisoryStreamQuorumLostPre = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST"
307

308
        // JSAdvisoryConsumerLeaderElectedPre notification that a replicated consumer has elected a leader.
309
        JSAdvisoryConsumerLeaderElectedPre = "$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED"
310

311
        // JSAdvisoryConsumerQuorumLostPre notification that a consumer is stalled.
312
        JSAdvisoryConsumerQuorumLostPre = "$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST"
313

314
        // JSAdvisoryServerOutOfStorage notification that a server has no more storage.
315
        JSAdvisoryServerOutOfStorage = "$JS.EVENT.ADVISORY.SERVER.OUT_OF_STORAGE"
316

317
        // JSAdvisoryServerRemoved notification that a server has been removed from the system.
318
        JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED"
319

320
        // JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit.
321
        JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED"
322

323
        // JSAuditAdvisory is a notification about JetStream API access.
324
        // FIXME - Add in details about who..
325
        JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
326
)
327

328
var denyAllClientJs = []string{jsAllAPI, "$KV.>", "$OBJ.>"}
329
var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI, "$KV.>", "$OBJ.>"}
330

331
func generateJSMappingTable(domain string) map[string]string {
719✔
332
        mappings := map[string]string{}
719✔
333
        // This set of mappings is very very very ugly.
719✔
334
        // It is a consequence of what we defined the domain prefix to be "$JS.domain.API" and it's mapping to "$JS.API"
719✔
335
        // For optics $KV and $OBJ where made to be independent subject spaces.
719✔
336
        // As materialized views of JS, they did not simply extend that subject space to say "$JS.API.KV" "$JS.API.OBJ"
719✔
337
        // This is very unfortunate!!!
719✔
338
        // Furthermore, it seemed bad to require different domain prefixes for JS/KV/OBJ.
719✔
339
        // Especially since the actual API for say KV, does use stream create from JS.
719✔
340
        // To avoid overlaps KV and OBJ views append the prefix to their API.
719✔
341
        // (Replacing $KV with the prefix allows users to create collisions with say the bucket name)
719✔
342
        // This mapping therefore needs to have extra token so that the mapping can properly discern between $JS, $KV, $OBJ
719✔
343
        for srcMappingSuffix, to := range map[string]string{
719✔
344
                "INFO":       JSApiAccountInfo,
719✔
345
                "STREAM.>":   "$JS.API.STREAM.>",
719✔
346
                "CONSUMER.>": "$JS.API.CONSUMER.>",
719✔
347
                "DIRECT.>":   "$JS.API.DIRECT.>",
719✔
348
                "META.>":     "$JS.API.META.>",
719✔
349
                "SERVER.>":   "$JS.API.SERVER.>",
719✔
350
                "ACCOUNT.>":  "$JS.API.ACCOUNT.>",
719✔
351
                "$KV.>":      "$KV.>",
719✔
352
                "$OBJ.>":     "$OBJ.>",
719✔
353
        } {
7,190✔
354
                mappings[fmt.Sprintf("$JS.%s.API.%s", domain, srcMappingSuffix)] = to
6,471✔
355
        }
6,471✔
356
        return mappings
719✔
357
}
358

359
// JSMaxDescription is the maximum description length for streams and consumers.
360
const JSMaxDescriptionLen = 4 * 1024
361

362
// JSMaxMetadataLen is the maximum length for streams and consumers metadata map.
363
// It's calculated by summing length of all keys and values.
364
const JSMaxMetadataLen = 128 * 1024
365

366
// JSMaxNameLen is the maximum name lengths for streams, consumers and templates.
367
// Picked 255 as it seems to be a widely used file name limit
368
const JSMaxNameLen = 255
369

370
// JSDefaultRequestQueueLimit is the default number of entries that we will
371
// put on the global request queue before we react.
372
const JSDefaultRequestQueueLimit = 10_000
373

374
// Responses for API calls.
375

376
// ApiResponse is a standard response from the JetStream JSON API
377
type ApiResponse struct {
378
        Type  string    `json:"type"`
379
        Error *ApiError `json:"error,omitempty"`
380
}
381

382
const JSApiSystemResponseType = "io.nats.jetstream.api.v1.system_response"
383

384
// When passing back to the clients generalize store failures.
385
var (
386
        errStreamStoreFailed   = errors.New("error creating store for stream")
387
        errConsumerStoreFailed = errors.New("error creating store for consumer")
388
)
389

390
// ToError checks if the response has a error and if it does converts it to an error avoiding
391
// the pitfalls described by https://yourbasic.org/golang/gotcha-why-nil-error-not-equal-nil/
392
func (r *ApiResponse) ToError() error {
2,930✔
393
        if r.Error == nil {
4,704✔
394
                return nil
1,774✔
395
        }
1,774✔
396

397
        return r.Error
1,156✔
398
}
399

400
const JSApiOverloadedType = "io.nats.jetstream.api.v1.system_overloaded"
401

402
// ApiPaged includes variables used to create paged responses from the JSON API
403
type ApiPaged struct {
404
        Total  int `json:"total"`
405
        Offset int `json:"offset"`
406
        Limit  int `json:"limit"`
407
}
408

409
// ApiPagedRequest includes parameters allowing specific pages to be requests from APIs responding with ApiPaged
410
type ApiPagedRequest struct {
411
        Offset int `json:"offset"`
412
}
413

414
// JSApiAccountInfoResponse reports back information on jetstream for this account.
415
type JSApiAccountInfoResponse struct {
416
        ApiResponse
417
        *JetStreamAccountStats
418
}
419

420
const JSApiAccountInfoResponseType = "io.nats.jetstream.api.v1.account_info_response"
421

422
// JSApiStreamCreateResponse stream creation.
423
type JSApiStreamCreateResponse struct {
424
        ApiResponse
425
        *StreamInfo
426
        DidCreate bool `json:"did_create,omitempty"`
427
}
428

429
const JSApiStreamCreateResponseType = "io.nats.jetstream.api.v1.stream_create_response"
430

431
// JSApiStreamDeleteResponse stream removal.
432
type JSApiStreamDeleteResponse struct {
433
        ApiResponse
434
        Success bool `json:"success,omitempty"`
435
}
436

437
const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"
438

439
// JSMaxSubjectDetails The limit of the number of subject details we will send in a stream info response.
440
const JSMaxSubjectDetails = 100_000
441

442
type JSApiStreamInfoRequest struct {
443
        ApiPagedRequest
444
        DeletedDetails bool   `json:"deleted_details,omitempty"`
445
        SubjectsFilter string `json:"subjects_filter,omitempty"`
446
}
447

448
type JSApiStreamInfoResponse struct {
449
        ApiResponse
450
        ApiPaged
451
        *StreamInfo
452
}
453

454
const JSApiStreamInfoResponseType = "io.nats.jetstream.api.v1.stream_info_response"
455

456
// JSApiNamesLimit is the maximum entries we will return for streams or consumers lists.
457
// TODO(dlc) - with header or request support could request chunked response.
458
const JSApiNamesLimit = 1024
459
const JSApiListLimit = 256
460

461
type JSApiStreamNamesRequest struct {
462
        ApiPagedRequest
463
        // These are filters that can be applied to the list.
464
        Subject string `json:"subject,omitempty"`
465
}
466

467
// JSApiStreamNamesResponse list of streams.
468
// A nil request is valid and means all streams.
469
type JSApiStreamNamesResponse struct {
470
        ApiResponse
471
        ApiPaged
472
        Streams []string `json:"streams"`
473
}
474

475
const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"
476

477
type JSApiStreamListRequest struct {
478
        ApiPagedRequest
479
        // These are filters that can be applied to the list.
480
        Subject string `json:"subject,omitempty"`
481
}
482

483
// JSApiStreamListResponse list of detailed stream information.
484
// A nil request is valid and means all streams.
485
type JSApiStreamListResponse struct {
486
        ApiResponse
487
        ApiPaged
488
        Streams []*StreamInfo `json:"streams"`
489
        Missing []string      `json:"missing,omitempty"`
490
}
491

492
const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response"
493

494
// JSApiStreamPurgeRequest is optional request information to the purge API.
495
// Subject will filter the purge request to only messages that match the subject, which can have wildcards.
496
// Sequence will purge up to but not including this sequence and can be combined with subject filtering.
497
// Keep will specify how many messages to keep. This can also be combined with subject filtering.
498
// Note that Sequence and Keep are mutually exclusive, so both can not be set at the same time.
499
type JSApiStreamPurgeRequest struct {
500
        // Purge up to but not including sequence.
501
        Sequence uint64 `json:"seq,omitempty"`
502
        // Subject to match against messages for the purge command.
503
        Subject string `json:"filter,omitempty"`
504
        // Number of messages to keep.
505
        Keep uint64 `json:"keep,omitempty"`
506
}
507

508
type JSApiStreamPurgeResponse struct {
509
        ApiResponse
510
        Success bool   `json:"success,omitempty"`
511
        Purged  uint64 `json:"purged"`
512
}
513

514
const JSApiStreamPurgeResponseType = "io.nats.jetstream.api.v1.stream_purge_response"
515

516
type JSApiConsumerUnpinRequest struct {
517
        Group string `json:"group"`
518
}
519

520
type JSApiConsumerUnpinResponse struct {
521
        ApiResponse
522
}
523

524
const JSApiConsumerUnpinResponseType = "io.nats.jetstream.api.v1.consumer_unpin_response"
525

526
// JSApiStreamUpdateResponse for updating a stream.
527
type JSApiStreamUpdateResponse struct {
528
        ApiResponse
529
        *StreamInfo
530
}
531

532
const JSApiStreamUpdateResponseType = "io.nats.jetstream.api.v1.stream_update_response"
533

534
// JSApiMsgDeleteRequest delete message request.
535
type JSApiMsgDeleteRequest struct {
536
        Seq     uint64 `json:"seq"`
537
        NoErase bool   `json:"no_erase,omitempty"`
538
}
539

540
type JSApiMsgDeleteResponse struct {
541
        ApiResponse
542
        Success bool `json:"success,omitempty"`
543
}
544

545
const JSApiMsgDeleteResponseType = "io.nats.jetstream.api.v1.stream_msg_delete_response"
546

547
type JSApiStreamSnapshotRequest struct {
548
        // Subject to deliver the chunks to for the snapshot.
549
        DeliverSubject string `json:"deliver_subject"`
550
        // Do not include consumers in the snapshot.
551
        NoConsumers bool `json:"no_consumers,omitempty"`
552
        // Optional chunk size preference.
553
        // Best to just let server select.
554
        ChunkSize int `json:"chunk_size,omitempty"`
555
        // Check all message's checksums prior to snapshot.
556
        CheckMsgs bool `json:"jsck,omitempty"`
557
}
558

559
// JSApiStreamSnapshotResponse is the direct response to the snapshot request.
560
type JSApiStreamSnapshotResponse struct {
561
        ApiResponse
562
        // Configuration of the given stream.
563
        Config *StreamConfig `json:"config,omitempty"`
564
        // Current State for the given stream.
565
        State *StreamState `json:"state,omitempty"`
566
}
567

568
const JSApiStreamSnapshotResponseType = "io.nats.jetstream.api.v1.stream_snapshot_response"
569

570
// JSApiStreamRestoreRequest is the required restore request.
571
type JSApiStreamRestoreRequest struct {
572
        // Configuration of the given stream.
573
        Config StreamConfig `json:"config"`
574
        // Current State for the given stream.
575
        State StreamState `json:"state"`
576
}
577

578
// JSApiStreamRestoreResponse is the direct response to the restore request.
579
type JSApiStreamRestoreResponse struct {
580
        ApiResponse
581
        // Subject to deliver the chunks to for the snapshot restore.
582
        DeliverSubject string `json:"deliver_subject"`
583
}
584

585
const JSApiStreamRestoreResponseType = "io.nats.jetstream.api.v1.stream_restore_response"
586

587
// JSApiStreamRemovePeerRequest is the required remove peer request.
588
type JSApiStreamRemovePeerRequest struct {
589
        // Server name of the peer to be removed.
590
        Peer string `json:"peer"`
591
}
592

593
// JSApiStreamRemovePeerResponse is the response to a remove peer request.
594
type JSApiStreamRemovePeerResponse struct {
595
        ApiResponse
596
        Success bool `json:"success,omitempty"`
597
}
598

599
const JSApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response"
600

601
// JSApiStreamLeaderStepDownResponse is the response to a leader stepdown request.
602
type JSApiStreamLeaderStepDownResponse struct {
603
        ApiResponse
604
        Success bool `json:"success,omitempty"`
605
}
606

607
const JSApiStreamLeaderStepDownResponseType = "io.nats.jetstream.api.v1.stream_leader_stepdown_response"
608

609
// JSApiConsumerLeaderStepDownResponse is the response to a consumer leader stepdown request.
610
type JSApiConsumerLeaderStepDownResponse struct {
611
        ApiResponse
612
        Success bool `json:"success,omitempty"`
613
}
614

615
const JSApiConsumerLeaderStepDownResponseType = "io.nats.jetstream.api.v1.consumer_leader_stepdown_response"
616

617
// JSApiLeaderStepdownRequest allows placement control over the meta leader placement.
618
type JSApiLeaderStepdownRequest struct {
619
        Placement *Placement `json:"placement,omitempty"`
620
}
621

622
// JSApiLeaderStepDownResponse is the response to a meta leader stepdown request.
623
type JSApiLeaderStepDownResponse struct {
624
        ApiResponse
625
        Success bool `json:"success,omitempty"`
626
}
627

628
const JSApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_stepdown_response"
629

630
// JSApiMetaServerRemoveRequest will remove a peer from the meta group.
631
type JSApiMetaServerRemoveRequest struct {
632
        // Server name of the peer to be removed.
633
        Server string `json:"peer"`
634
        // Peer ID of the peer to be removed. If specified this is used
635
        // instead of the server name.
636
        Peer string `json:"peer_id,omitempty"`
637
}
638

639
// JSApiMetaServerRemoveResponse is the response to a peer removal request in the meta group.
640
type JSApiMetaServerRemoveResponse struct {
641
        ApiResponse
642
        Success bool `json:"success,omitempty"`
643
}
644

645
const JSApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response"
646

647
// JSApiMetaServerStreamMoveRequest will move a stream on a server to another
648
// response to this will come as JSApiStreamUpdateResponse/JSApiStreamUpdateResponseType
649
type JSApiMetaServerStreamMoveRequest struct {
650
        // Server name of the peer to be evacuated.
651
        Server string `json:"server,omitempty"`
652
        // Cluster the server is in
653
        Cluster string `json:"cluster,omitempty"`
654
        // Domain the sever is in
655
        Domain string `json:"domain,omitempty"`
656
        // Ephemeral placement tags for the move
657
        Tags []string `json:"tags,omitempty"`
658
}
659

660
const JSApiAccountPurgeResponseType = "io.nats.jetstream.api.v1.account_purge_response"
661

662
// JSApiAccountPurgeResponse is the response to a purge request in the meta group.
663
type JSApiAccountPurgeResponse struct {
664
        ApiResponse
665
        Initiated bool `json:"initiated,omitempty"`
666
}
667

668
// JSApiMsgGetRequest get a message request.
669
type JSApiMsgGetRequest struct {
670
        Seq     uint64 `json:"seq,omitempty"`
671
        LastFor string `json:"last_by_subj,omitempty"`
672
        NextFor string `json:"next_by_subj,omitempty"`
673

674
        // Batch support. Used to request more then one msg at a time.
675
        // Can be used with simple starting seq, but also NextFor with wildcards.
676
        Batch int `json:"batch,omitempty"`
677
        // This will make sure we limit how much data we blast out. If not set we will
678
        // inherit the slow consumer default max setting of the server. Default is MAX_PENDING_SIZE.
679
        MaxBytes int `json:"max_bytes,omitempty"`
680
        // Return messages as of this start time.
681
        StartTime *time.Time `json:"start_time,omitempty"`
682

683
        // Multiple response support. Will get the last msgs matching the subjects. These can include wildcards.
684
        MultiLastFor []string `json:"multi_last,omitempty"`
685
        // Only return messages up to this sequence. If not set, will be last sequence for the stream.
686
        UpToSeq uint64 `json:"up_to_seq,omitempty"`
687
        // Only return messages up to this time.
688
        UpToTime *time.Time `json:"up_to_time,omitempty"`
689
}
690

691
type JSApiMsgGetResponse struct {
692
        ApiResponse
693
        Message *StoredMsg `json:"message,omitempty"`
694
}
695

696
const JSApiMsgGetResponseType = "io.nats.jetstream.api.v1.stream_msg_get_response"
697

698
// JSWaitQueueDefaultMax is the default max number of outstanding requests for pull consumers.
699
const JSWaitQueueDefaultMax = 512
700

701
type JSApiConsumerCreateResponse struct {
702
        ApiResponse
703
        *ConsumerInfo
704
}
705

706
const JSApiConsumerCreateResponseType = "io.nats.jetstream.api.v1.consumer_create_response"
707

708
type JSApiConsumerDeleteResponse struct {
709
        ApiResponse
710
        Success bool `json:"success,omitempty"`
711
}
712

713
const JSApiConsumerDeleteResponseType = "io.nats.jetstream.api.v1.consumer_delete_response"
714

715
type JSApiConsumerPauseRequest struct {
716
        PauseUntil time.Time `json:"pause_until,omitempty"`
717
}
718

719
const JSApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response"
720

721
type JSApiConsumerPauseResponse struct {
722
        ApiResponse
723
        Paused         bool          `json:"paused"`
724
        PauseUntil     time.Time     `json:"pause_until"`
725
        PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
726
}
727

728
type JSApiConsumerInfoResponse struct {
729
        ApiResponse
730
        *ConsumerInfo
731
}
732

733
const JSApiConsumerInfoResponseType = "io.nats.jetstream.api.v1.consumer_info_response"
734

735
type JSApiConsumersRequest struct {
736
        ApiPagedRequest
737
}
738

739
type JSApiConsumerNamesResponse struct {
740
        ApiResponse
741
        ApiPaged
742
        Consumers []string `json:"consumers"`
743
}
744

745
const JSApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_response"
746

747
type JSApiConsumerListResponse struct {
748
        ApiResponse
749
        ApiPaged
750
        Consumers []*ConsumerInfo `json:"consumers"`
751
        Missing   []string        `json:"missing,omitempty"`
752
}
753

754
const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"
755

756
// JSApiConsumerGetNextRequest is for getting next messages for pull based consumers.
757
type JSApiConsumerGetNextRequest struct {
758
        Expires   time.Duration `json:"expires,omitempty"`
759
        Batch     int           `json:"batch,omitempty"`
760
        MaxBytes  int           `json:"max_bytes,omitempty"`
761
        NoWait    bool          `json:"no_wait,omitempty"`
762
        Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
763
        PriorityGroup
764
}
765

766
// JSApiStreamTemplateCreateResponse for creating templates.
767
type JSApiStreamTemplateCreateResponse struct {
768
        ApiResponse
769
        *StreamTemplateInfo
770
}
771

772
const JSApiStreamTemplateCreateResponseType = "io.nats.jetstream.api.v1.stream_template_create_response"
773

774
type JSApiStreamTemplateDeleteResponse struct {
775
        ApiResponse
776
        Success bool `json:"success,omitempty"`
777
}
778

779
const JSApiStreamTemplateDeleteResponseType = "io.nats.jetstream.api.v1.stream_template_delete_response"
780

781
// JSApiStreamTemplateInfoResponse for information about stream templates.
782
type JSApiStreamTemplateInfoResponse struct {
783
        ApiResponse
784
        *StreamTemplateInfo
785
}
786

787
const JSApiStreamTemplateInfoResponseType = "io.nats.jetstream.api.v1.stream_template_info_response"
788

789
type JSApiStreamTemplatesRequest struct {
790
        ApiPagedRequest
791
}
792

793
// JSApiStreamTemplateNamesResponse list of templates
794
type JSApiStreamTemplateNamesResponse struct {
795
        ApiResponse
796
        ApiPaged
797
        Templates []string `json:"streams"`
798
}
799

800
const JSApiStreamTemplateNamesResponseType = "io.nats.jetstream.api.v1.stream_template_names_response"
801

802
// Structure that holds state for a JetStream API request that is processed
803
// in a separate long-lived go routine. This is to avoid possibly blocking
804
// ROUTE and GATEWAY connections.
805
type jsAPIRoutedReq struct {
806
        jsub    *subscription
807
        sub     *subscription
808
        acc     *Account
809
        subject string
810
        reply   string
811
        msg     []byte
812
        pa      pubArg
813
}
814

815
func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) {
121,327✔
816
        // Ignore system level directives meta stepdown and peer remove requests here.
121,327✔
817
        if subject == JSApiLeaderStepDown ||
121,327✔
818
                subject == JSApiRemoveServer ||
121,327✔
819
                strings.HasPrefix(subject, jsAPIAccountPre) {
121,784✔
820
                return
457✔
821
        }
457✔
822
        // No lock needed, those are immutable.
823
        s, rr := js.srv, js.apiSubs.Match(subject)
120,870✔
824

120,870✔
825
        hdr, msg := c.msgParts(rmsg)
120,870✔
826
        if len(sliceHeader(ClientInfoHdr, hdr)) == 0 {
120,877✔
827
                // Check if this is the system account. We will let these through for the account info only.
7✔
828
                sacc := s.SystemAccount()
7✔
829
                if sacc != acc {
7✔
830
                        return
×
831
                }
×
832
                if subject != JSApiAccountInfo {
11✔
833
                        // Only respond from the initial server entry to the NATS system.
4✔
834
                        if c.kind == CLIENT || c.kind == LEAF {
6✔
835
                                var resp = ApiResponse{
2✔
836
                                        Type:  JSApiSystemResponseType,
2✔
837
                                        Error: NewJSNotEnabledForAccountError(),
2✔
838
                                }
2✔
839
                                s.sendAPIErrResponse(nil, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
840
                        }
2✔
841
                        return
4✔
842
                }
843
        }
844

845
        // Short circuit for no interest.
846
        if len(rr.psubs)+len(rr.qsubs) == 0 {
133,662✔
847
                if (c.kind == CLIENT || c.kind == LEAF) && acc != s.SystemAccount() {
12,796✔
848
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
849
                        var resp = ApiResponse{
×
850
                                Type:  JSApiSystemResponseType,
×
851
                                Error: NewJSBadRequestError(),
×
852
                        }
×
853
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
854
                }
×
855
                return
12,796✔
856
        }
857

858
        // We should only have psubs and only 1 per result.
859
        if len(rr.psubs) != 1 {
108,070✔
860
                s.Warnf("Malformed JetStream API Request: [%s] %q", subject, rmsg)
×
861
                if c.kind == CLIENT || c.kind == LEAF {
×
862
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
863
                        var resp = ApiResponse{
×
864
                                Type:  JSApiSystemResponseType,
×
865
                                Error: NewJSBadRequestError(),
×
866
                        }
×
867
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
868
                }
×
869
                return
×
870
        }
871
        jsub := rr.psubs[0]
108,070✔
872

108,070✔
873
        // If this is directly from a client connection ok to do in place.
108,070✔
874
        if c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF {
153,983✔
875
                start := time.Now()
45,913✔
876
                jsub.icb(sub, c, acc, subject, reply, rmsg)
45,913✔
877
                if dur := time.Since(start); dur >= readLoopReportThreshold {
45,913✔
878
                        s.Warnf("Internal subscription on %q took too long: %v", subject, dur)
×
879
                }
×
880
                return
45,913✔
881
        }
882

883
        // If we are here we have received this request over a non-client connection.
884
        // We need to make sure not to block. We will send the request to a long-lived
885
        // pool of go routines.
886

887
        // Increment inflight. Do this before queueing.
888
        atomic.AddInt64(&js.apiInflight, 1)
62,157✔
889

62,157✔
890
        // Copy the state. Note the JSAPI only uses the hdr index to piece apart the
62,157✔
891
        // header from the msg body. No other references are needed.
62,157✔
892
        // Check pending and warn if getting backed up.
62,157✔
893
        pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
62,157✔
894
        limit := atomic.LoadInt64(&js.queueLimit)
62,157✔
895
        if pending >= int(limit) {
62,262✔
896
                s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
105✔
897
                drained := int64(s.jsAPIRoutedReqs.drain())
105✔
898
                atomic.AddInt64(&js.apiInflight, -drained)
105✔
899

105✔
900
                s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
105✔
901
                        TypedEvent: TypedEvent{
105✔
902
                                Type: JSAPILimitReachedAdvisoryType,
105✔
903
                                ID:   nuid.Next(),
105✔
904
                                Time: time.Now().UTC(),
105✔
905
                        },
105✔
906
                        Server:  s.Name(),
105✔
907
                        Domain:  js.config.Domain,
105✔
908
                        Dropped: drained,
105✔
909
                })
105✔
910
        }
105✔
911
}
912

913
func (s *Server) processJSAPIRoutedRequests() {
12,608✔
914
        defer s.grWG.Done()
12,608✔
915

12,608✔
916
        s.mu.RLock()
12,608✔
917
        queue := s.jsAPIRoutedReqs
12,608✔
918
        client := &client{srv: s, kind: JETSTREAM}
12,608✔
919
        s.mu.RUnlock()
12,608✔
920

12,608✔
921
        js := s.getJetStream()
12,608✔
922

12,608✔
923
        for {
67,946✔
924
                select {
55,338✔
925
                case <-queue.ch:
42,730✔
926
                        // Only pop one item at a time here, otherwise if the system is recovering
42,730✔
927
                        // from queue buildup, then one worker will pull off all the tasks and the
42,730✔
928
                        // others will be starved of work.
42,730✔
929
                        for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
104,782✔
930
                                client.pa = r.pa
62,052✔
931
                                start := time.Now()
62,052✔
932
                                r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
62,052✔
933
                                if dur := time.Since(start); dur >= readLoopReportThreshold {
62,052✔
934
                                        s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
×
935
                                }
×
936
                                atomic.AddInt64(&js.apiInflight, -1)
62,052✔
937
                        }
938
                case <-s.quitCh:
12,604✔
939
                        return
12,604✔
940
                }
941
        }
942
}
943

944
func (s *Server) setJetStreamExportSubs() error {
3,152✔
945
        js := s.getJetStream()
3,152✔
946
        if js == nil {
3,152✔
947
                return NewJSNotEnabledError()
×
948
        }
×
949

950
        // Start the go routine that will process API requests received by the
951
        // subscription below when they are coming from routes, etc..
952
        const maxProcs = 16
3,152✔
953
        mp := runtime.GOMAXPROCS(0)
3,152✔
954
        // Cap at 16 max for now on larger core setups.
3,152✔
955
        if mp > maxProcs {
3,152✔
956
                mp = maxProcs
×
957
        }
×
958
        s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "Routed JS API Requests")
3,152✔
959
        for i := 0; i < mp; i++ {
15,760✔
960
                s.startGoRoutine(s.processJSAPIRoutedRequests)
12,608✔
961
        }
12,608✔
962

963
        // This is the catch all now for all JetStream API calls.
964
        if _, err := s.sysSubscribe(jsAllAPI, js.apiDispatch); err != nil {
3,152✔
965
                return err
×
966
        }
×
967

968
        if err := s.SystemAccount().AddServiceExport(jsAllAPI, nil); err != nil {
3,152✔
969
                s.Warnf("Error setting up jetstream service exports: %v", err)
×
970
                return err
×
971
        }
×
972

973
        // API handles themselves.
974
        pairs := []struct {
3,152✔
975
                subject string
3,152✔
976
                handler msgHandler
3,152✔
977
        }{
3,152✔
978
                {JSApiAccountInfo, s.jsAccountInfoRequest},
3,152✔
979
                {JSApiTemplateCreate, s.jsTemplateCreateRequest},
3,152✔
980
                {JSApiTemplates, s.jsTemplateNamesRequest},
3,152✔
981
                {JSApiTemplateInfo, s.jsTemplateInfoRequest},
3,152✔
982
                {JSApiTemplateDelete, s.jsTemplateDeleteRequest},
3,152✔
983
                {JSApiStreamCreate, s.jsStreamCreateRequest},
3,152✔
984
                {JSApiStreamUpdate, s.jsStreamUpdateRequest},
3,152✔
985
                {JSApiStreams, s.jsStreamNamesRequest},
3,152✔
986
                {JSApiStreamList, s.jsStreamListRequest},
3,152✔
987
                {JSApiStreamInfo, s.jsStreamInfoRequest},
3,152✔
988
                {JSApiStreamDelete, s.jsStreamDeleteRequest},
3,152✔
989
                {JSApiStreamPurge, s.jsStreamPurgeRequest},
3,152✔
990
                {JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
3,152✔
991
                {JSApiStreamRestore, s.jsStreamRestoreRequest},
3,152✔
992
                {JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},
3,152✔
993
                {JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},
3,152✔
994
                {JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
3,152✔
995
                {JSApiMsgDelete, s.jsMsgDeleteRequest},
3,152✔
996
                {JSApiMsgGet, s.jsMsgGetRequest},
3,152✔
997
                {JSApiConsumerCreateEx, s.jsConsumerCreateRequest},
3,152✔
998
                {JSApiConsumerCreate, s.jsConsumerCreateRequest},
3,152✔
999
                {JSApiDurableCreate, s.jsConsumerCreateRequest},
3,152✔
1000
                {JSApiConsumers, s.jsConsumerNamesRequest},
3,152✔
1001
                {JSApiConsumerList, s.jsConsumerListRequest},
3,152✔
1002
                {JSApiConsumerInfo, s.jsConsumerInfoRequest},
3,152✔
1003
                {JSApiConsumerDelete, s.jsConsumerDeleteRequest},
3,152✔
1004
                {JSApiConsumerPause, s.jsConsumerPauseRequest},
3,152✔
1005
                {JSApiConsumerUnpin, s.jsConsumerUnpinRequest},
3,152✔
1006
        }
3,152✔
1007

3,152✔
1008
        js.mu.Lock()
3,152✔
1009
        defer js.mu.Unlock()
3,152✔
1010

3,152✔
1011
        for _, p := range pairs {
91,408✔
1012
                sub := &subscription{subject: []byte(p.subject), icb: p.handler}
88,256✔
1013
                if err := js.apiSubs.Insert(sub); err != nil {
88,256✔
1014
                        return err
×
1015
                }
×
1016
        }
1017

1018
        return nil
3,152✔
1019
}
1020

1021
func (s *Server) sendAPIResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
31,472✔
1022
        acc.trackAPI()
31,472✔
1023
        if reply != _EMPTY_ {
62,700✔
1024
                s.sendInternalAccountMsg(nil, reply, response)
31,228✔
1025
        }
31,228✔
1026
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
31,472✔
1027
}
1028

1029
func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
14,708✔
1030
        acc.trackAPIErr()
14,708✔
1031
        if reply != _EMPTY_ {
28,990✔
1032
                s.sendInternalAccountMsg(nil, reply, response)
14,282✔
1033
        }
14,282✔
1034
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
14,708✔
1035
}
1036

1037
const errRespDelay = 500 * time.Millisecond
1038

1039
type delayedAPIResponse struct {
1040
        ci       *ClientInfo
1041
        acc      *Account
1042
        subject  string
1043
        reply    string
1044
        request  string
1045
        response string
1046
        rg       *raftGroup
1047
        deadline time.Time
1048
        next     *delayedAPIResponse
1049
}
1050

1051
// Add `r` in the list that is maintained ordered by the `delayedAPIResponse.deadline` time.
1052
func addDelayedResponse(head, tail **delayedAPIResponse, r *delayedAPIResponse) {
22✔
1053
        // Check if list empty.
22✔
1054
        if *head == nil {
30✔
1055
                *head, *tail = r, r
8✔
1056
                return
8✔
1057
        }
8✔
1058
        // Check if it should be added at the end, which is if after or equal to the tail.
1059
        if r.deadline.After((*tail).deadline) || r.deadline.Equal((*tail).deadline) {
26✔
1060
                (*tail).next, *tail = r, r
12✔
1061
                return
12✔
1062
        }
12✔
1063
        // Find its spot in the list.
1064
        var prev *delayedAPIResponse
2✔
1065
        for c := *head; c != nil; c = c.next {
6✔
1066
                // We insert only if we are stricly before the current `c`.
4✔
1067
                if r.deadline.Before(c.deadline) {
6✔
1068
                        r.next = c
2✔
1069
                        if prev != nil {
3✔
1070
                                prev.next = r
1✔
1071
                        } else {
2✔
1072
                                *head = r
1✔
1073
                        }
1✔
1074
                        return
2✔
1075
                }
1076
                prev = c
2✔
1077
        }
1078
}
1079

1080
func (s *Server) delayedAPIResponder() {
6,063✔
1081
        defer s.grWG.Done()
6,063✔
1082
        var (
6,063✔
1083
                head, tail *delayedAPIResponse // Linked list.
6,063✔
1084
                r          *delayedAPIResponse // Updated by calling next().
6,063✔
1085
                rqch       <-chan struct{}     // Quit channel of the Raft group (if present).
6,063✔
1086
                tm         = time.NewTimer(time.Hour)
6,063✔
1087
        )
6,063✔
1088
        next := func() {
6,084✔
1089
                r, rqch = nil, nil
21✔
1090
                // Check that JetStream is still on. Do not exit the go routine
21✔
1091
                // since JS can be enabled/disabled. The go routine will exit
21✔
1092
                // only if server is shutdown.
21✔
1093
                js := s.getJetStream()
21✔
1094
                if js == nil {
22✔
1095
                        // Reset head and tail here. Also drain the ipQueue.
1✔
1096
                        head, tail = nil, nil
1✔
1097
                        s.delayedAPIResponses.drain()
1✔
1098
                        // Fall back into next "if" that resets timer.
1✔
1099
                }
1✔
1100
                // If there are no delayed messages then delay the timer for
1101
                // a while.
1102
                if head == nil {
29✔
1103
                        tm.Reset(time.Hour)
8✔
1104
                        return
8✔
1105
                }
8✔
1106
                // Get the first expected message and then reset the timer.
1107
                r = head
13✔
1108
                js.mu.RLock()
13✔
1109
                if r.rg != nil && r.rg.node != nil {
14✔
1110
                        // If there's an attached Raft group to the delayed response
1✔
1111
                        // then pull out the quit channel, so that we don't bother
1✔
1112
                        // sending responses for entities which are now no longer
1✔
1113
                        // running.
1✔
1114
                        rqch = r.rg.node.QuitC()
1✔
1115
                }
1✔
1116
                js.mu.RUnlock()
13✔
1117
                tm.Reset(time.Until(r.deadline))
13✔
1118
        }
1119
        pop := func() {
6,075✔
1120
                if head == nil {
12✔
1121
                        return
×
1122
                }
×
1123
                head = head.next
12✔
1124
                if head == nil {
19✔
1125
                        tail = nil
7✔
1126
                }
7✔
1127
        }
1128
        for {
12,160✔
1129
                select {
6,097✔
1130
                case <-s.delayedAPIResponses.ch:
22✔
1131
                        v, ok := s.delayedAPIResponses.popOne()
22✔
1132
                        if !ok {
22✔
1133
                                continue
×
1134
                        }
1135
                        // Add it to the list, and if ends up being the head, set things up.
1136
                        addDelayedResponse(&head, &tail, v)
22✔
1137
                        if v == head {
31✔
1138
                                next()
9✔
1139
                        }
9✔
1140
                case <-s.quitCh:
6,063✔
1141
                        return
6,063✔
1142
                case <-rqch:
1✔
1143
                        // If we were the head, drop and setup things for next.
1✔
1144
                        if r != nil && r == head {
2✔
1145
                                pop()
1✔
1146
                        }
1✔
1147
                        next()
1✔
1148
                case <-tm.C:
11✔
1149
                        if r != nil {
22✔
1150
                                s.sendAPIErrResponse(r.ci, r.acc, r.subject, r.reply, r.request, r.response)
11✔
1151
                                pop()
11✔
1152
                        }
11✔
1153
                        next()
11✔
1154
                }
1155
        }
1156
}
1157

1158
func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup, duration time.Duration) {
22✔
1159
        s.delayedAPIResponses.push(&delayedAPIResponse{
22✔
1160
                ci, acc, subject, reply, request, response, rg, time.Now().Add(duration), nil,
22✔
1161
        })
22✔
1162
}
22✔
1163

1164
func (s *Server) getRequestInfo(c *client, raw []byte) (pci *ClientInfo, acc *Account, hdr, msg []byte, err error) {
101,658✔
1165
        hdr, msg = c.msgParts(raw)
101,658✔
1166
        var ci ClientInfo
101,658✔
1167

101,658✔
1168
        if len(hdr) > 0 {
203,241✔
1169
                if err := json.Unmarshal(sliceHeader(ClientInfoHdr, hdr), &ci); err != nil {
101,583✔
1170
                        return nil, nil, nil, nil, err
×
1171
                }
×
1172
        }
1173

1174
        if ci.Service != _EMPTY_ {
101,712✔
1175
                acc, _ = s.LookupAccount(ci.Service)
54✔
1176
        } else if ci.Account != _EMPTY_ {
203,187✔
1177
                acc, _ = s.LookupAccount(ci.Account)
101,529✔
1178
        } else {
101,604✔
1179
                // Direct $SYS access.
75✔
1180
                acc = c.acc
75✔
1181
                if acc == nil {
78✔
1182
                        acc = s.SystemAccount()
3✔
1183
                }
3✔
1184
        }
1185
        if acc == nil {
101,668✔
1186
                return nil, nil, nil, nil, ErrMissingAccount
10✔
1187
        }
10✔
1188
        return &ci, acc, hdr, msg, nil
101,648✔
1189
}
1190

1191
func (s *Server) unmarshalRequest(c *client, acc *Account, subject string, msg []byte, v any) error {
15,186✔
1192
        decoder := json.NewDecoder(bytes.NewReader(msg))
15,186✔
1193
        decoder.DisallowUnknownFields()
15,186✔
1194

15,186✔
1195
        for {
45,552✔
1196
                if err := decoder.Decode(v); err != nil {
45,552✔
1197
                        if err == io.EOF {
30,366✔
1198
                                return nil
15,180✔
1199
                        }
15,180✔
1200

1201
                        var syntaxErr *json.SyntaxError
6✔
1202
                        if errors.As(err, &syntaxErr) {
6✔
1203
                                err = fmt.Errorf("%w at offset %d", err, syntaxErr.Offset)
×
1204
                        }
×
1205

1206
                        c.RateLimitWarnf("Invalid JetStream request '%s > %s': %s", acc, subject, err)
6✔
1207

6✔
1208
                        if s.JetStreamConfig().Strict {
12✔
1209
                                return err
6✔
1210
                        }
6✔
1211

1212
                        return json.Unmarshal(msg, v)
×
1213
                }
1214
        }
1215
}
1216

1217
func (a *Account) trackAPI() {
31,472✔
1218
        a.mu.RLock()
31,472✔
1219
        jsa := a.js
31,472✔
1220
        a.mu.RUnlock()
31,472✔
1221
        if jsa != nil {
62,873✔
1222
                jsa.usageMu.Lock()
31,401✔
1223
                jsa.usageApi++
31,401✔
1224
                jsa.apiTotal++
31,401✔
1225
                jsa.sendClusterUsageUpdate()
31,401✔
1226
                atomic.AddInt64(&jsa.js.apiTotal, 1)
31,401✔
1227
                jsa.usageMu.Unlock()
31,401✔
1228
        }
31,401✔
1229
}
1230

1231
func (a *Account) trackAPIErr() {
14,708✔
1232
        a.mu.RLock()
14,708✔
1233
        jsa := a.js
14,708✔
1234
        a.mu.RUnlock()
14,708✔
1235
        if jsa != nil {
29,166✔
1236
                jsa.usageMu.Lock()
14,458✔
1237
                jsa.usageApi++
14,458✔
1238
                jsa.apiTotal++
14,458✔
1239
                jsa.usageErr++
14,458✔
1240
                jsa.apiErrors++
14,458✔
1241
                jsa.sendClusterUsageUpdate()
14,458✔
1242
                atomic.AddInt64(&jsa.js.apiTotal, 1)
14,458✔
1243
                atomic.AddInt64(&jsa.js.apiErrors, 1)
14,458✔
1244
                jsa.usageMu.Unlock()
14,458✔
1245
        }
14,458✔
1246
}
1247

1248
const badAPIRequestT = "Malformed JetStream API Request: %q"
1249

1250
// Helper function to check on JetStream being enabled but also on status of leafnodes
1251
// If the local account is not enabled but does have leafnode connectivity we will not
1252
// want to error immediately and let the other side decide.
1253
func (a *Account) checkJetStream() (enabled, shouldError bool) {
43,522✔
1254
        a.mu.RLock()
43,522✔
1255
        defer a.mu.RUnlock()
43,522✔
1256
        return a.js != nil, a.nleafs+a.nrleafs == 0
43,522✔
1257
}
43,522✔
1258

1259
// Request for current usage and limits for this account.
1260
func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
439✔
1261
        if c == nil || !s.JetStreamEnabled() {
439✔
1262
                return
×
1263
        }
×
1264

1265
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
439✔
1266
        if err != nil {
440✔
1267
                s.Warnf(badAPIRequestT, msg)
1✔
1268
                return
1✔
1269
        }
1✔
1270

1271
        var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}}
438✔
1272

438✔
1273
        // Determine if we should proceed here when we are in clustered mode.
438✔
1274
        if s.JetStreamIsClustered() {
806✔
1275
                js, cc := s.getJetStreamCluster()
368✔
1276
                if js == nil || cc == nil {
368✔
1277
                        return
×
1278
                }
×
1279
                if js.isLeaderless() {
369✔
1280
                        resp.Error = NewJSClusterNotAvailError()
1✔
1281
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1282
                        return
1✔
1283
                }
1✔
1284
                // Make sure we are meta leader.
1285
                if !s.JetStreamIsLeader() {
622✔
1286
                        return
255✔
1287
                }
255✔
1288
        }
1289

1290
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
188✔
1291
                if !doErr {
7✔
1292
                        return
1✔
1293
                }
1✔
1294
                resp.Error = NewJSNotEnabledForAccountError()
5✔
1295
        } else {
176✔
1296
                stats := acc.JetStreamUsage()
176✔
1297
                resp.JetStreamAccountStats = &stats
176✔
1298
        }
176✔
1299
        b, err := json.Marshal(resp)
181✔
1300
        if err != nil {
181✔
1301
                return
×
1302
        }
×
1303

1304
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), string(b))
181✔
1305
}
1306

1307
// Helpers for token extraction.
1308
func templateNameFromSubject(subject string) string {
6✔
1309
        return tokenAt(subject, 6)
6✔
1310
}
6✔
1311

1312
func streamNameFromSubject(subject string) string {
81,420✔
1313
        return tokenAt(subject, 5)
81,420✔
1314
}
81,420✔
1315

1316
func consumerNameFromSubject(subject string) string {
48,694✔
1317
        return tokenAt(subject, 6)
48,694✔
1318
}
48,694✔
1319

1320
// Request to create a new template.
1321
func (s *Server) jsTemplateCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
6✔
1322
        if c == nil {
6✔
1323
                return
×
1324
        }
×
1325
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
6✔
1326
        if err != nil {
6✔
1327
                s.Warnf(badAPIRequestT, msg)
×
1328
                return
×
1329
        }
×
1330

1331
        var resp = JSApiStreamTemplateCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateCreateResponseType}}
6✔
1332
        if !acc.JetStreamEnabled() {
6✔
1333
                resp.Error = NewJSNotEnabledForAccountError()
×
1334
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1335
                return
×
1336
        }
×
1337

1338
        // Not supported for now.
1339
        if s.JetStreamIsClustered() {
9✔
1340
                resp.Error = NewJSClusterUnSupportFeatureError()
3✔
1341
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
1342
                return
3✔
1343
        }
3✔
1344

1345
        var cfg StreamTemplateConfig
3✔
1346
        if err := s.unmarshalRequest(c, acc, subject, msg, &cfg); err != nil {
3✔
1347
                resp.Error = NewJSInvalidJSONError(err)
×
1348
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1349
                return
×
1350
        }
×
1351
        templateName := templateNameFromSubject(subject)
3✔
1352
        if templateName != cfg.Name {
4✔
1353
                resp.Error = NewJSTemplateNameNotMatchSubjectError()
1✔
1354
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1355
                return
1✔
1356
        }
1✔
1357

1358
        t, err := acc.addStreamTemplate(&cfg)
2✔
1359
        if err != nil {
2✔
1360
                resp.Error = NewJSStreamTemplateCreateError(err, Unless(err))
×
1361
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1362
                return
×
1363
        }
×
1364
        t.mu.Lock()
2✔
1365
        tcfg := t.StreamTemplateConfig.deepCopy()
2✔
1366
        streams := t.streams
2✔
1367
        if streams == nil {
4✔
1368
                streams = []string{}
2✔
1369
        }
2✔
1370
        t.mu.Unlock()
2✔
1371
        resp.StreamTemplateInfo = &StreamTemplateInfo{Config: tcfg, Streams: streams}
2✔
1372
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
2✔
1373
}
1374

1375
// Request for the list of all template names.
1376
func (s *Server) jsTemplateNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
5✔
1377
        if c == nil {
5✔
1378
                return
×
1379
        }
×
1380
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
5✔
1381
        if err != nil {
5✔
1382
                s.Warnf(badAPIRequestT, msg)
×
1383
                return
×
1384
        }
×
1385

1386
        var resp = JSApiStreamTemplateNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateNamesResponseType}}
5✔
1387
        if !acc.JetStreamEnabled() {
5✔
1388
                resp.Error = NewJSNotEnabledForAccountError()
×
1389
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1390
                return
×
1391
        }
×
1392

1393
        // Not supported for now.
1394
        if s.JetStreamIsClustered() {
8✔
1395
                resp.Error = NewJSClusterUnSupportFeatureError()
3✔
1396
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
1397
                return
3✔
1398
        }
3✔
1399

1400
        var offset int
2✔
1401
        if isJSONObjectOrArray(msg) {
2✔
1402
                var req JSApiStreamTemplatesRequest
×
1403
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
×
1404
                        resp.Error = NewJSInvalidJSONError(err)
×
1405
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1406
                        return
×
1407
                }
×
1408
                offset = req.Offset
×
1409
        }
1410

1411
        ts := acc.templates()
2✔
1412
        slices.SortFunc(ts, func(i, j *streamTemplate) int {
3✔
1413
                return cmp.Compare(i.StreamTemplateConfig.Name, j.StreamTemplateConfig.Name)
1✔
1414
        })
1✔
1415

1416
        tcnt := len(ts)
2✔
1417
        if offset > tcnt {
2✔
1418
                offset = tcnt
×
1419
        }
×
1420

1421
        for _, t := range ts[offset:] {
5✔
1422
                t.mu.Lock()
3✔
1423
                name := t.Name
3✔
1424
                t.mu.Unlock()
3✔
1425
                resp.Templates = append(resp.Templates, name)
3✔
1426
                if len(resp.Templates) >= JSApiNamesLimit {
3✔
1427
                        break
×
1428
                }
1429
        }
1430
        resp.Total = tcnt
2✔
1431
        resp.Limit = JSApiNamesLimit
2✔
1432
        resp.Offset = offset
2✔
1433
        if resp.Templates == nil {
2✔
1434
                resp.Templates = []string{}
×
1435
        }
×
1436
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
2✔
1437
}
1438

1439
// Request for information about a stream template.
1440
func (s *Server) jsTemplateInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1✔
1441
        if c == nil {
1✔
1442
                return
×
1443
        }
×
1444
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
1✔
1445
        if err != nil {
1✔
1446
                s.Warnf(badAPIRequestT, msg)
×
1447
                return
×
1448
        }
×
1449

1450
        var resp = JSApiStreamTemplateInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateInfoResponseType}}
1✔
1451
        if !acc.JetStreamEnabled() {
1✔
1452
                resp.Error = NewJSNotEnabledForAccountError()
×
1453
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1454
                return
×
1455
        }
×
1456
        if !isEmptyRequest(msg) {
1✔
1457
                resp.Error = NewJSNotEmptyRequestError()
×
1458
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1459
                return
×
1460
        }
×
1461
        name := templateNameFromSubject(subject)
1✔
1462
        t, err := acc.lookupStreamTemplate(name)
1✔
1463
        if err != nil {
1✔
1464
                resp.Error = NewJSStreamTemplateNotFoundError()
×
1465
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1466
                return
×
1467
        }
×
1468
        t.mu.Lock()
1✔
1469
        cfg := t.StreamTemplateConfig.deepCopy()
1✔
1470
        streams := t.streams
1✔
1471
        if streams == nil {
1✔
1472
                streams = []string{}
×
1473
        }
×
1474
        t.mu.Unlock()
1✔
1475

1✔
1476
        resp.StreamTemplateInfo = &StreamTemplateInfo{Config: cfg, Streams: streams}
1✔
1477
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1✔
1478
}
1479

1480
// Request to delete a stream template.
1481
func (s *Server) jsTemplateDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
2✔
1482
        if c == nil {
2✔
1483
                return
×
1484
        }
×
1485
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
2✔
1486
        if err != nil {
2✔
1487
                s.Warnf(badAPIRequestT, msg)
×
1488
                return
×
1489
        }
×
1490

1491
        var resp = JSApiStreamTemplateDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateDeleteResponseType}}
2✔
1492
        if !acc.JetStreamEnabled() {
2✔
1493
                resp.Error = NewJSNotEnabledForAccountError()
×
1494
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1495
                return
×
1496
        }
×
1497
        if !isEmptyRequest(msg) {
2✔
1498
                resp.Error = NewJSNotEmptyRequestError()
×
1499
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1500
                return
×
1501
        }
×
1502
        name := templateNameFromSubject(subject)
2✔
1503
        err = acc.deleteStreamTemplate(name)
2✔
1504
        if err != nil {
3✔
1505
                resp.Error = NewJSStreamTemplateDeleteError(err, Unless(err))
1✔
1506
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1507
                return
1✔
1508
        }
1✔
1509
        resp.Success = true
1✔
1510
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1✔
1511
}
1512

1513
func (s *Server) jsonResponse(v any) string {
46,469✔
1514
        b, err := json.Marshal(v)
46,469✔
1515
        if err != nil {
46,469✔
1516
                s.Warnf("Problem marshaling JSON for JetStream API:", err)
×
1517
                return ""
×
1518
        }
×
1519
        return string(b)
46,469✔
1520
}
1521

1522
// Read lock must be held
1523
func (jsa *jsAccount) tieredReservation(tier string, cfg *StreamConfig) int64 {
3,133✔
1524
        reservation := int64(0)
3,133✔
1525
        if tier == _EMPTY_ {
6,246✔
1526
                for _, sa := range jsa.streams {
21,453✔
1527
                        if sa.cfg.MaxBytes > 0 {
18,358✔
1528
                                if sa.cfg.Storage == cfg.Storage && sa.cfg.Name != cfg.Name {
18✔
1529
                                        reservation += (int64(sa.cfg.Replicas) * sa.cfg.MaxBytes)
×
1530
                                }
×
1531
                        }
1532
                }
1533
        } else {
20✔
1534
                for _, sa := range jsa.streams {
35✔
1535
                        if sa.cfg.Replicas == cfg.Replicas {
29✔
1536
                                if sa.cfg.MaxBytes > 0 {
19✔
1537
                                        if isSameTier(&sa.cfg, cfg) && sa.cfg.Name != cfg.Name {
10✔
1538
                                                reservation += (int64(sa.cfg.Replicas) * sa.cfg.MaxBytes)
5✔
1539
                                        }
5✔
1540
                                }
1541
                        }
1542
                }
1543
        }
1544
        return reservation
3,133✔
1545
}
1546

1547
// Request to create a stream.
1548
func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
6,506✔
1549
        if c == nil || !s.JetStreamEnabled() {
6,692✔
1550
                return
186✔
1551
        }
186✔
1552
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
6,320✔
1553
        if err != nil {
6,323✔
1554
                s.Warnf(badAPIRequestT, msg)
3✔
1555
                return
3✔
1556
        }
3✔
1557

1558
        var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
6,317✔
1559

6,317✔
1560
        // Determine if we should proceed here when we are in clustered mode.
6,317✔
1561
        if s.JetStreamIsClustered() {
11,533✔
1562
                js, cc := s.getJetStreamCluster()
5,216✔
1563
                if js == nil || cc == nil {
5,216✔
1564
                        return
×
1565
                }
×
1566
                if js.isLeaderless() {
5,217✔
1567
                        resp.Error = NewJSClusterNotAvailError()
1✔
1568
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1569
                        return
1✔
1570
                }
1✔
1571
                // Make sure we are meta leader.
1572
                if !s.JetStreamIsLeader() {
8,905✔
1573
                        return
3,690✔
1574
                }
3,690✔
1575
        }
1576

1577
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
2,634✔
1578
                if doErr {
8✔
1579
                        resp.Error = NewJSNotEnabledForAccountError()
×
1580
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1581
                }
×
1582
                return
8✔
1583
        }
1584

1585
        var cfg StreamConfigRequest
2,618✔
1586
        if err := s.unmarshalRequest(c, acc, subject, msg, &cfg); err != nil {
2,619✔
1587
                resp.Error = NewJSInvalidJSONError(err)
1✔
1588
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1589
                return
1✔
1590
        }
1✔
1591

1592
        // Initialize asset version metadata.
1593
        setStaticStreamMetadata(&cfg.StreamConfig)
2,617✔
1594

2,617✔
1595
        streamName := streamNameFromSubject(subject)
2,617✔
1596
        if streamName != cfg.Name {
2,618✔
1597
                resp.Error = NewJSStreamMismatchError()
1✔
1598
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1599
                return
1✔
1600
        }
1✔
1601

1602
        // Check for path like separators in the name.
1603
        if strings.ContainsAny(streamName, `\/`) {
2,618✔
1604
                resp.Error = NewJSStreamNameContainsPathSeparatorsError()
2✔
1605
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1606
                return
2✔
1607
        }
2✔
1608

1609
        // Can't create a stream with a sealed state.
1610
        if cfg.Sealed {
2,616✔
1611
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed"))
2✔
1612
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1613
                return
2✔
1614
        }
2✔
1615

1616
        // If we are told to do mirror direct but are not mirroring, error.
1617
        if cfg.MirrorDirect && cfg.Mirror == nil {
2,612✔
1618
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream has no mirror but does have mirror direct"))
×
1619
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1620
                return
×
1621
        }
×
1622

1623
        // Hand off to cluster for processing.
1624
        if s.JetStreamIsClustered() {
4,135✔
1625
                s.jsClusteredStreamRequest(ci, acc, subject, reply, rmsg, &cfg)
1,523✔
1626
                return
1,523✔
1627
        }
1,523✔
1628

1629
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg.StreamConfig); err != nil {
1,091✔
1630
                resp.Error = err
2✔
1631
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1632
                return
2✔
1633
        }
2✔
1634

1635
        mset, err := acc.addStreamPedantic(&cfg.StreamConfig, cfg.Pedantic)
1,087✔
1636
        if err != nil {
1,123✔
1637
                if IsNatsErr(err, JSStreamStoreFailedF) {
36✔
1638
                        s.Warnf("Stream create failed for '%s > %s': %v", acc, streamName, err)
×
1639
                        err = errStreamStoreFailed
×
1640
                }
×
1641
                resp.Error = NewJSStreamCreateError(err, Unless(err))
36✔
1642
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
36✔
1643
                return
36✔
1644
        }
1645
        msetCfg := mset.config()
1,051✔
1646
        resp.StreamInfo = &StreamInfo{
1,051✔
1647
                Created:   mset.createdTime(),
1,051✔
1648
                State:     mset.state(),
1,051✔
1649
                Config:    *setDynamicStreamMetadata(&msetCfg),
1,051✔
1650
                TimeStamp: time.Now().UTC(),
1,051✔
1651
                Mirror:    mset.mirrorInfo(),
1,051✔
1652
                Sources:   mset.sourcesInfo(),
1,051✔
1653
        }
1,051✔
1654
        resp.DidCreate = true
1,051✔
1655
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,051✔
1656
}
1657

1658
// Request to update a stream.
1659
func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
637✔
1660
        if c == nil || !s.JetStreamEnabled() {
637✔
1661
                return
×
1662
        }
×
1663

1664
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
637✔
1665
        if err != nil {
637✔
1666
                s.Warnf(badAPIRequestT, msg)
×
1667
                return
×
1668
        }
×
1669

1670
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
637✔
1671

637✔
1672
        // Determine if we should proceed here when we are in clustered mode.
637✔
1673
        if s.JetStreamIsClustered() {
1,210✔
1674
                js, cc := s.getJetStreamCluster()
573✔
1675
                if js == nil || cc == nil {
573✔
1676
                        return
×
1677
                }
×
1678
                if js.isLeaderless() {
575✔
1679
                        resp.Error = NewJSClusterNotAvailError()
2✔
1680
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1681
                        return
2✔
1682
                }
2✔
1683
                // Make sure we are meta leader.
1684
                if !s.JetStreamIsLeader() {
1,022✔
1685
                        return
451✔
1686
                }
451✔
1687
        }
1688

1689
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
184✔
1690
                if doErr {
×
1691
                        resp.Error = NewJSNotEnabledForAccountError()
×
1692
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1693
                }
×
1694
                return
×
1695
        }
1696
        var ncfg StreamConfigRequest
184✔
1697
        if err := s.unmarshalRequest(c, acc, subject, msg, &ncfg); err != nil {
185✔
1698
                resp.Error = NewJSInvalidJSONError(err)
1✔
1699
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1700
                return
1✔
1701
        }
1✔
1702

1703
        cfg, apiErr := s.checkStreamCfg(&ncfg.StreamConfig, acc, ncfg.Pedantic)
183✔
1704
        if apiErr != nil {
201✔
1705
                resp.Error = apiErr
18✔
1706
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
18✔
1707
                return
18✔
1708
        }
18✔
1709

1710
        streamName := streamNameFromSubject(subject)
165✔
1711
        if streamName != cfg.Name {
166✔
1712
                resp.Error = NewJSStreamMismatchError()
1✔
1713
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1714
                return
1✔
1715
        }
1✔
1716

1717
        // Handle clustered version here.
1718
        if s.JetStreamIsClustered() {
279✔
1719
                // Always do in separate Go routine.
115✔
1720
                go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
115✔
1721
                return
115✔
1722
        }
115✔
1723

1724
        mset, err := acc.lookupStream(streamName)
49✔
1725
        if err != nil {
50✔
1726
                resp.Error = NewJSStreamNotFoundError(Unless(err))
1✔
1727
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1728
                return
1✔
1729
        }
1✔
1730

1731
        // Update asset version metadata.
1732
        setStaticStreamMetadata(&cfg)
48✔
1733

48✔
1734
        if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil {
56✔
1735
                resp.Error = NewJSStreamUpdateError(err, Unless(err))
8✔
1736
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
8✔
1737
                return
8✔
1738
        }
8✔
1739

1740
        msetCfg := mset.config()
40✔
1741
        resp.StreamInfo = &StreamInfo{
40✔
1742
                Created:   mset.createdTime(),
40✔
1743
                State:     mset.state(),
40✔
1744
                Config:    *setDynamicStreamMetadata(&msetCfg),
40✔
1745
                Domain:    s.getOpts().JetStreamDomain,
40✔
1746
                Mirror:    mset.mirrorInfo(),
40✔
1747
                Sources:   mset.sourcesInfo(),
40✔
1748
                TimeStamp: time.Now().UTC(),
40✔
1749
        }
40✔
1750
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
40✔
1751
}
1752

1753
// Request for the list of all stream names.
1754
func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,222✔
1755
        if c == nil || !s.JetStreamEnabled() {
1,222✔
1756
                return
×
1757
        }
×
1758
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
1,222✔
1759
        if err != nil {
1,222✔
1760
                s.Warnf(badAPIRequestT, msg)
×
1761
                return
×
1762
        }
×
1763

1764
        var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}}
1,222✔
1765

1,222✔
1766
        // Determine if we should proceed here when we are in clustered mode.
1,222✔
1767
        if s.JetStreamIsClustered() {
2,206✔
1768
                js, cc := s.getJetStreamCluster()
984✔
1769
                if js == nil || cc == nil {
984✔
1770
                        return
×
1771
                }
×
1772
                if js.isLeaderless() {
984✔
1773
                        resp.Error = NewJSClusterNotAvailError()
×
1774
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1775
                        return
×
1776
                }
×
1777
                // Make sure we are meta leader.
1778
                if !s.JetStreamIsLeader() {
1,707✔
1779
                        return
723✔
1780
                }
723✔
1781
        }
1782

1783
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
500✔
1784
                if doErr {
1✔
1785
                        resp.Error = NewJSNotEnabledForAccountError()
×
1786
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1787
                }
×
1788
                return
1✔
1789
        }
1790

1791
        var offset int
498✔
1792
        var filter string
498✔
1793

498✔
1794
        if isJSONObjectOrArray(msg) {
843✔
1795
                var req JSApiStreamNamesRequest
345✔
1796
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
345✔
1797
                        resp.Error = NewJSInvalidJSONError(err)
×
1798
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1799
                        return
×
1800
                }
×
1801
                offset = req.Offset
345✔
1802
                if req.Subject != _EMPTY_ {
666✔
1803
                        filter = req.Subject
321✔
1804
                }
321✔
1805
        }
1806

1807
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1808
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1809
        var numStreams int
498✔
1810
        if s.JetStreamIsClustered() {
759✔
1811
                js, cc := s.getJetStreamCluster()
261✔
1812
                if js == nil || cc == nil {
261✔
1813
                        // TODO(dlc) - Debug or Warn?
×
1814
                        return
×
1815
                }
×
1816
                js.mu.RLock()
261✔
1817
                for stream, sa := range cc.streams[acc.Name] {
570✔
1818
                        if IsNatsErr(sa.err, JSClusterNotAssignedErr) {
309✔
1819
                                continue
×
1820
                        }
1821
                        if filter != _EMPTY_ {
583✔
1822
                                // These could not have subjects auto-filled in since they are raw and unprocessed.
274✔
1823
                                if len(sa.Config.Subjects) == 0 {
276✔
1824
                                        if SubjectsCollide(filter, sa.Config.Name) {
2✔
1825
                                                resp.Streams = append(resp.Streams, stream)
×
1826
                                        }
×
1827
                                } else {
272✔
1828
                                        for _, subj := range sa.Config.Subjects {
544✔
1829
                                                if SubjectsCollide(filter, subj) {
489✔
1830
                                                        resp.Streams = append(resp.Streams, stream)
217✔
1831
                                                        break
217✔
1832
                                                }
1833
                                        }
1834
                                }
1835
                        } else {
35✔
1836
                                resp.Streams = append(resp.Streams, stream)
35✔
1837
                        }
35✔
1838
                }
1839
                js.mu.RUnlock()
261✔
1840
                if len(resp.Streams) > 1 {
263✔
1841
                        slices.Sort(resp.Streams)
2✔
1842
                }
2✔
1843
                numStreams = len(resp.Streams)
261✔
1844
                if offset > numStreams {
261✔
1845
                        offset = numStreams
×
1846
                }
×
1847
                if offset > 0 {
261✔
1848
                        resp.Streams = resp.Streams[offset:]
×
1849
                }
×
1850
                if len(resp.Streams) > JSApiNamesLimit {
261✔
1851
                        resp.Streams = resp.Streams[:JSApiNamesLimit]
×
1852
                }
×
1853
        } else {
237✔
1854
                msets := acc.filteredStreams(filter)
237✔
1855
                // Since we page results order matters.
237✔
1856
                if len(msets) > 1 {
243✔
1857
                        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
17✔
1858
                }
1859

1860
                numStreams = len(msets)
237✔
1861
                if offset > numStreams {
237✔
1862
                        offset = numStreams
×
1863
                }
×
1864

1865
                for _, mset := range msets[offset:] {
484✔
1866
                        resp.Streams = append(resp.Streams, mset.cfg.Name)
247✔
1867
                        if len(resp.Streams) >= JSApiNamesLimit {
247✔
1868
                                break
×
1869
                        }
1870
                }
1871
        }
1872
        resp.Total = numStreams
498✔
1873
        resp.Limit = JSApiNamesLimit
498✔
1874
        resp.Offset = offset
498✔
1875

498✔
1876
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
498✔
1877
}
1878

1879
// Request for the list of all detailed stream info.
1880
// TODO(dlc) - combine with above long term
1881
func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
41✔
1882
        if c == nil || !s.JetStreamEnabled() {
41✔
1883
                return
×
1884
        }
×
1885
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
41✔
1886
        if err != nil {
41✔
1887
                s.Warnf(badAPIRequestT, msg)
×
1888
                return
×
1889
        }
×
1890

1891
        var resp = JSApiStreamListResponse{
41✔
1892
                ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
41✔
1893
                Streams:     []*StreamInfo{},
41✔
1894
        }
41✔
1895

41✔
1896
        // Determine if we should proceed here when we are in clustered mode.
41✔
1897
        if s.JetStreamIsClustered() {
78✔
1898
                js, cc := s.getJetStreamCluster()
37✔
1899
                if js == nil || cc == nil {
37✔
1900
                        return
×
1901
                }
×
1902
                if js.isLeaderless() {
38✔
1903
                        resp.Error = NewJSClusterNotAvailError()
1✔
1904
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1905
                        return
1✔
1906
                }
1✔
1907
                // Make sure we are meta leader.
1908
                if !s.JetStreamIsLeader() {
63✔
1909
                        return
27✔
1910
                }
27✔
1911
        }
1912

1913
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
13✔
1914
                if doErr {
×
1915
                        resp.Error = NewJSNotEnabledForAccountError()
×
1916
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1917
                }
×
1918
                return
×
1919
        }
1920

1921
        var offset int
13✔
1922
        var filter string
13✔
1923

13✔
1924
        if isJSONObjectOrArray(msg) {
24✔
1925
                var req JSApiStreamListRequest
11✔
1926
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
1927
                        resp.Error = NewJSInvalidJSONError(err)
1✔
1928
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1929
                        return
1✔
1930
                }
1✔
1931
                offset = req.Offset
10✔
1932
                if req.Subject != _EMPTY_ {
12✔
1933
                        filter = req.Subject
2✔
1934
                }
2✔
1935
        }
1936

1937
        // Clustered mode will invoke a scatter and gather.
1938
        if s.JetStreamIsClustered() {
21✔
1939
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
9✔
1940
                msg = copyBytes(msg)
9✔
1941
                s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) })
18✔
1942
                return
9✔
1943
        }
1944

1945
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1946
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1947
        var msets []*stream
3✔
1948
        if filter == _EMPTY_ {
5✔
1949
                msets = acc.streams()
2✔
1950
        } else {
3✔
1951
                msets = acc.filteredStreams(filter)
1✔
1952
        }
1✔
1953

1954
        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
4✔
1955

1956
        scnt := len(msets)
3✔
1957
        if offset > scnt {
3✔
1958
                offset = scnt
×
1959
        }
×
1960

1961
        for _, mset := range msets[offset:] {
7✔
1962
                config := mset.config()
4✔
1963
                resp.Streams = append(resp.Streams, &StreamInfo{
4✔
1964
                        Created:   mset.createdTime(),
4✔
1965
                        State:     mset.state(),
4✔
1966
                        Config:    config,
4✔
1967
                        Domain:    s.getOpts().JetStreamDomain,
4✔
1968
                        Mirror:    mset.mirrorInfo(),
4✔
1969
                        Sources:   mset.sourcesInfo(),
4✔
1970
                        TimeStamp: time.Now().UTC(),
4✔
1971
                })
4✔
1972
                if len(resp.Streams) >= JSApiListLimit {
4✔
1973
                        break
×
1974
                }
1975
        }
1976
        resp.Total = scnt
3✔
1977
        resp.Limit = JSApiListLimit
3✔
1978
        resp.Offset = offset
3✔
1979
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
3✔
1980
}
1981

1982
// Request for information about a stream.
1983
func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
28,470✔
1984
        if c == nil || !s.JetStreamEnabled() {
28,474✔
1985
                return
4✔
1986
        }
4✔
1987
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
28,466✔
1988
        if err != nil {
28,471✔
1989
                s.Warnf(badAPIRequestT, msg)
5✔
1990
                return
5✔
1991
        }
5✔
1992

1993
        streamName := streamNameFromSubject(subject)
28,461✔
1994

28,461✔
1995
        var resp = JSApiStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamInfoResponseType}}
28,461✔
1996

28,461✔
1997
        // If someone creates a duplicate stream that is identical we will get this request forwarded to us.
28,461✔
1998
        // Make sure the response type is for a create call.
28,461✔
1999
        if rt := getHeader(JSResponseType, hdr); len(rt) > 0 && string(rt) == jsCreateResponse {
28,461✔
2000
                resp.ApiResponse.Type = JSApiStreamCreateResponseType
×
2001
        }
×
2002

2003
        var clusterWideConsCount int
28,461✔
2004

28,461✔
2005
        js, cc := s.getJetStreamCluster()
28,461✔
2006
        if js == nil {
28,461✔
2007
                return
×
2008
        }
×
2009
        // If we are in clustered mode we need to be the stream leader to proceed.
2010
        if cc != nil {
40,198✔
2011
                // Check to make sure the stream is assigned.
11,737✔
2012
                js.mu.RLock()
11,737✔
2013
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName)
11,737✔
2014
                var offline bool
11,737✔
2015
                if sa != nil {
22,251✔
2016
                        clusterWideConsCount = len(sa.consumers)
10,514✔
2017
                        offline = s.allPeersOffline(sa.Group)
10,514✔
2018
                }
10,514✔
2019
                js.mu.RUnlock()
11,737✔
2020

11,737✔
2021
                if isLeader && sa == nil {
12,000✔
2022
                        // We can't find the stream, so mimic what would be the errors below.
263✔
2023
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
263✔
2024
                                if doErr {
×
2025
                                        resp.Error = NewJSNotEnabledForAccountError()
×
2026
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2027
                                }
×
2028
                                return
×
2029
                        }
2030
                        // No stream present.
2031
                        resp.Error = NewJSStreamNotFoundError()
263✔
2032
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
263✔
2033
                        return
263✔
2034
                } else if sa == nil {
12,434✔
2035
                        if js.isLeaderless() {
960✔
2036
                                resp.Error = NewJSClusterNotAvailError()
×
2037
                                // Delaying an error response gives the leader a chance to respond before us
×
2038
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
2039
                        }
×
2040
                        return
960✔
2041
                } else if isLeader && offline {
10,515✔
2042
                        resp.Error = NewJSStreamOfflineError()
1✔
2043
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
2044
                        return
1✔
2045
                }
1✔
2046

2047
                // Check to see if we are a member of the group and if the group has no leader.
2048
                isLeaderless := js.isGroupLeaderless(sa.Group)
10,513✔
2049

10,513✔
2050
                // We have the stream assigned and a leader, so only the stream leader should answer.
10,513✔
2051
                if !acc.JetStreamIsStreamLeader(streamName) && !isLeaderless {
19,123✔
2052
                        if js.isLeaderless() {
8,610✔
2053
                                resp.Error = NewJSClusterNotAvailError()
×
2054
                                // Delaying an error response gives the leader a chance to respond before us
×
2055
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), sa.Group, errRespDelay)
×
2056
                                return
×
2057
                        }
×
2058

2059
                        // We may be in process of electing a leader, but if this is a scale up from 1 we will still be the state leader
2060
                        // while the new members work through the election and catchup process.
2061
                        // Double check for that instead of exiting here and being silent. e.g. nats stream update test --replicas=3
2062
                        js.mu.RLock()
8,610✔
2063
                        rg := sa.Group
8,610✔
2064
                        var ourID string
8,610✔
2065
                        if cc.meta != nil {
17,220✔
2066
                                ourID = cc.meta.ID()
8,610✔
2067
                        }
8,610✔
2068
                        // We have seen cases where rg is nil at this point,
2069
                        // so check explicitly and bail if that is the case.
2070
                        bail := rg == nil || !rg.isMember(ourID)
8,610✔
2071
                        if !bail {
11,362✔
2072
                                // We know we are a member here, if this group is new and we are preferred allow us to answer.
2,752✔
2073
                                // Also, we have seen cases where rg.node is nil at this point,
2,752✔
2074
                                // so check explicitly and bail if that is the case.
2,752✔
2075
                                bail = rg.Preferred != ourID || (rg.node != nil && time.Since(rg.node.Created()) > lostQuorumIntervalDefault)
2,752✔
2076
                        }
2,752✔
2077
                        js.mu.RUnlock()
8,610✔
2078
                        if bail {
17,205✔
2079
                                return
8,595✔
2080
                        }
8,595✔
2081
                }
2082
        }
2083

2084
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
18,648✔
2085
                if doErr {
7✔
2086
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
2087
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2088
                }
1✔
2089
                return
6✔
2090
        }
2091

2092
        var details bool
18,636✔
2093
        var subjects string
18,636✔
2094
        var offset int
18,636✔
2095
        if isJSONObjectOrArray(msg) {
18,669✔
2096
                var req JSApiStreamInfoRequest
33✔
2097
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
34✔
2098
                        resp.Error = NewJSInvalidJSONError(err)
1✔
2099
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2100
                        return
1✔
2101
                }
1✔
2102
                details, subjects = req.DeletedDetails, req.SubjectsFilter
32✔
2103
                offset = req.Offset
32✔
2104
        }
2105

2106
        mset, err := acc.lookupStream(streamName)
18,635✔
2107
        // Error is not to be expected at this point, but could happen if same stream trying to be created.
18,635✔
2108
        if err != nil {
19,287✔
2109
                if cc != nil {
652✔
2110
                        // This could be inflight, pause for a short bit and try again.
×
2111
                        // This will not be inline, so ok.
×
2112
                        time.Sleep(10 * time.Millisecond)
×
2113
                        mset, err = acc.lookupStream(streamName)
×
2114
                }
×
2115
                // Check again.
2116
                if err != nil {
1,304✔
2117
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
652✔
2118
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
652✔
2119
                        return
652✔
2120
                }
652✔
2121
        }
2122

2123
        config := mset.config()
17,983✔
2124
        resp.StreamInfo = &StreamInfo{
17,983✔
2125
                Created:    mset.createdTime(),
17,983✔
2126
                State:      mset.stateWithDetail(details),
17,983✔
2127
                Config:     *setDynamicStreamMetadata(&config),
17,983✔
2128
                Domain:     s.getOpts().JetStreamDomain,
17,983✔
2129
                Cluster:    js.clusterInfo(mset.raftGroup()),
17,983✔
2130
                Mirror:     mset.mirrorInfo(),
17,983✔
2131
                Sources:    mset.sourcesInfo(),
17,983✔
2132
                Alternates: js.streamAlternates(ci, config.Name),
17,983✔
2133
                TimeStamp:  time.Now().UTC(),
17,983✔
2134
        }
17,983✔
2135
        if clusterWideConsCount > 0 {
18,444✔
2136
                resp.StreamInfo.State.Consumers = clusterWideConsCount
461✔
2137
        }
461✔
2138

2139
        // Check if they have asked for subject details.
2140
        if subjects != _EMPTY_ {
18,013✔
2141
                st := mset.store.SubjectsTotals(subjects)
30✔
2142
                if lst := len(st); lst > 0 {
56✔
2143
                        // Common for both cases.
26✔
2144
                        resp.Offset = offset
26✔
2145
                        resp.Limit = JSMaxSubjectDetails
26✔
2146
                        resp.Total = lst
26✔
2147

26✔
2148
                        if offset == 0 && lst <= JSMaxSubjectDetails {
52✔
2149
                                resp.StreamInfo.State.Subjects = st
26✔
2150
                        } else {
26✔
2151
                                // Here we have to filter list due to offset or maximum constraints.
×
2152
                                subjs := make([]string, 0, len(st))
×
2153
                                for subj := range st {
×
2154
                                        subjs = append(subjs, subj)
×
2155
                                }
×
2156
                                // Sort it
2157
                                slices.Sort(subjs)
×
2158

×
2159
                                if offset > len(subjs) {
×
2160
                                        offset = len(subjs)
×
2161
                                }
×
2162

2163
                                end := offset + JSMaxSubjectDetails
×
2164
                                if end > len(subjs) {
×
2165
                                        end = len(subjs)
×
2166
                                }
×
2167
                                actualSize := end - offset
×
2168
                                var sd map[string]uint64
×
2169

×
2170
                                if actualSize > 0 {
×
2171
                                        sd = make(map[string]uint64, actualSize)
×
2172
                                        for _, ss := range subjs[offset:end] {
×
2173
                                                sd[ss] = st[ss]
×
2174
                                        }
×
2175
                                }
2176
                                resp.StreamInfo.State.Subjects = sd
×
2177
                        }
2178
                }
2179
        }
2180
        // Check for out of band catchups.
2181
        if mset.hasCatchupPeers() {
17,983✔
2182
                mset.checkClusterInfo(resp.StreamInfo.Cluster)
×
2183
        }
×
2184

2185
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
17,983✔
2186
}
2187

2188
// Request to have a stream leader stepdown.
2189
func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
252✔
2190
        if c == nil || !s.JetStreamEnabled() {
252✔
2191
                return
×
2192
        }
×
2193
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
252✔
2194
        if err != nil {
252✔
2195
                s.Warnf(badAPIRequestT, msg)
×
2196
                return
×
2197
        }
×
2198

2199
        // Have extra token for this one.
2200
        name := tokenAt(subject, 6)
252✔
2201

252✔
2202
        var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}}
252✔
2203

252✔
2204
        // If we are not in clustered mode this is a failed request.
252✔
2205
        if !s.JetStreamIsClustered() {
252✔
2206
                resp.Error = NewJSClusterRequiredError()
×
2207
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2208
                return
×
2209
        }
×
2210

2211
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2212
        js, cc := s.getJetStreamCluster()
252✔
2213
        if js == nil || cc == nil {
252✔
2214
                return
×
2215
        }
×
2216
        if js.isLeaderless() {
252✔
2217
                resp.Error = NewJSClusterNotAvailError()
×
2218
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2219
                return
×
2220
        }
×
2221

2222
        js.mu.RLock()
252✔
2223
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
252✔
2224
        js.mu.RUnlock()
252✔
2225

252✔
2226
        if isLeader && sa == nil {
252✔
2227
                resp.Error = NewJSStreamNotFoundError()
×
2228
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2229
                return
×
2230
        } else if sa == nil {
253✔
2231
                return
1✔
2232
        }
1✔
2233

2234
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
251✔
2235
                if doErr {
×
2236
                        resp.Error = NewJSNotEnabledForAccountError()
×
2237
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2238
                }
×
2239
                return
×
2240
        }
2241

2242
        // Check to see if we are a member of the group and if the group has no leader.
2243
        if js.isGroupLeaderless(sa.Group) {
251✔
2244
                resp.Error = NewJSClusterNotAvailError()
×
2245
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2246
                return
×
2247
        }
×
2248

2249
        // We have the stream assigned and a leader, so only the stream leader should answer.
2250
        if !acc.JetStreamIsStreamLeader(name) {
447✔
2251
                return
196✔
2252
        }
196✔
2253

2254
        mset, err := acc.lookupStream(name)
55✔
2255
        if err != nil {
55✔
2256
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
2257
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2258
                return
×
2259
        }
×
2260

2261
        if mset == nil {
55✔
2262
                resp.Success = true
×
2263
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2264
                return
×
2265
        }
×
2266

2267
        node := mset.raftNode()
55✔
2268
        if node == nil {
55✔
2269
                resp.Success = true
×
2270
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2271
                return
×
2272
        }
×
2273

2274
        var preferredLeader string
55✔
2275
        if isJSONObjectOrArray(msg) {
67✔
2276
                var req JSApiLeaderStepdownRequest
12✔
2277
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
2278
                        resp.Error = NewJSInvalidJSONError(err)
×
2279
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2280
                        return
×
2281
                }
×
2282
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(node, req.Placement); resp.Error != nil {
17✔
2283
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2284
                        return
5✔
2285
                }
5✔
2286
        }
2287

2288
        // Call actual stepdown.
2289
        err = node.StepDown(preferredLeader)
50✔
2290
        if err != nil {
50✔
2291
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2292
        } else {
50✔
2293
                resp.Success = true
50✔
2294
        }
50✔
2295
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
50✔
2296
}
2297

2298
// Request to have a consumer leader stepdown.
2299
func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
223✔
2300
        if c == nil || !s.JetStreamEnabled() {
223✔
2301
                return
×
2302
        }
×
2303
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
223✔
2304
        if err != nil {
223✔
2305
                s.Warnf(badAPIRequestT, msg)
×
2306
                return
×
2307
        }
×
2308

2309
        var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}}
223✔
2310

223✔
2311
        // If we are not in clustered mode this is a failed request.
223✔
2312
        if !s.JetStreamIsClustered() {
223✔
2313
                resp.Error = NewJSClusterRequiredError()
×
2314
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2315
                return
×
2316
        }
×
2317

2318
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2319
        js, cc := s.getJetStreamCluster()
223✔
2320
        if js == nil || cc == nil {
223✔
2321
                return
×
2322
        }
×
2323
        if js.isLeaderless() {
223✔
2324
                resp.Error = NewJSClusterNotAvailError()
×
2325
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2326
                return
×
2327
        }
×
2328

2329
        // Have extra token for this one.
2330
        stream := tokenAt(subject, 6)
223✔
2331
        consumer := tokenAt(subject, 7)
223✔
2332

223✔
2333
        js.mu.RLock()
223✔
2334
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
223✔
2335
        js.mu.RUnlock()
223✔
2336

223✔
2337
        if isLeader && sa == nil {
223✔
2338
                resp.Error = NewJSStreamNotFoundError()
×
2339
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2340
                return
×
2341
        } else if sa == nil {
223✔
2342
                return
×
2343
        }
×
2344
        var ca *consumerAssignment
223✔
2345
        if sa.consumers != nil {
446✔
2346
                ca = sa.consumers[consumer]
223✔
2347
        }
223✔
2348
        if ca == nil {
223✔
2349
                resp.Error = NewJSConsumerNotFoundError()
×
2350
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2351
                return
×
2352
        }
×
2353
        // Check to see if we are a member of the group and if the group has no leader.
2354
        if js.isGroupLeaderless(ca.Group) {
223✔
2355
                resp.Error = NewJSClusterNotAvailError()
×
2356
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2357
                return
×
2358
        }
×
2359

2360
        if !acc.JetStreamIsConsumerLeader(stream, consumer) {
384✔
2361
                return
161✔
2362
        }
161✔
2363

2364
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
62✔
2365
                if doErr {
×
2366
                        resp.Error = NewJSNotEnabledForAccountError()
×
2367
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2368
                }
×
2369
                return
×
2370
        }
2371

2372
        mset, err := acc.lookupStream(stream)
62✔
2373
        if err != nil {
62✔
2374
                resp.Error = NewJSStreamNotFoundError()
×
2375
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2376
                return
×
2377
        }
×
2378
        o := mset.lookupConsumer(consumer)
62✔
2379
        if o == nil {
62✔
2380
                resp.Error = NewJSConsumerNotFoundError()
×
2381
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2382
                return
×
2383
        }
×
2384

2385
        n := o.raftNode()
62✔
2386
        if n == nil {
62✔
2387
                resp.Success = true
×
2388
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2389
                return
×
2390
        }
×
2391

2392
        var preferredLeader string
62✔
2393
        if isJSONObjectOrArray(msg) {
74✔
2394
                var req JSApiLeaderStepdownRequest
12✔
2395
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
2396
                        resp.Error = NewJSInvalidJSONError(err)
×
2397
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2398
                        return
×
2399
                }
×
2400
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(n, req.Placement); resp.Error != nil {
17✔
2401
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2402
                        return
5✔
2403
                }
5✔
2404
        }
2405

2406
        // Call actual stepdown.
2407
        err = n.StepDown(preferredLeader)
57✔
2408
        if err != nil {
57✔
2409
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2410
        } else {
57✔
2411
                resp.Success = true
57✔
2412
        }
57✔
2413
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
57✔
2414
}
2415

2416
// Request to remove a peer from a clustered stream.
2417
func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
69✔
2418
        if c == nil || !s.JetStreamEnabled() {
69✔
2419
                return
×
2420
        }
×
2421
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
69✔
2422
        if err != nil {
69✔
2423
                s.Warnf(badAPIRequestT, msg)
×
2424
                return
×
2425
        }
×
2426

2427
        // Have extra token for this one.
2428
        name := tokenAt(subject, 6)
69✔
2429

69✔
2430
        var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}}
69✔
2431

69✔
2432
        // If we are not in clustered mode this is a failed request.
69✔
2433
        if !s.JetStreamIsClustered() {
69✔
2434
                resp.Error = NewJSClusterRequiredError()
×
2435
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2436
                return
×
2437
        }
×
2438

2439
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2440
        js, cc := s.getJetStreamCluster()
69✔
2441
        if js == nil || cc == nil {
69✔
2442
                return
×
2443
        }
×
2444
        if js.isLeaderless() {
69✔
2445
                resp.Error = NewJSClusterNotAvailError()
×
2446
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2447
                return
×
2448
        }
×
2449

2450
        js.mu.RLock()
69✔
2451
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
69✔
2452
        js.mu.RUnlock()
69✔
2453

69✔
2454
        // Make sure we are meta leader.
69✔
2455
        if !isLeader {
127✔
2456
                return
58✔
2457
        }
58✔
2458

2459
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
2460
                if doErr {
×
2461
                        resp.Error = NewJSNotEnabledForAccountError()
×
2462
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2463
                }
×
2464
                return
×
2465
        }
2466
        if isEmptyRequest(msg) {
11✔
2467
                resp.Error = NewJSBadRequestError()
×
2468
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2469
                return
×
2470
        }
×
2471

2472
        var req JSApiStreamRemovePeerRequest
11✔
2473
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
2474
                resp.Error = NewJSInvalidJSONError(err)
×
2475
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2476
                return
×
2477
        }
×
2478
        if req.Peer == _EMPTY_ {
11✔
2479
                resp.Error = NewJSBadRequestError()
×
2480
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2481
                return
×
2482
        }
×
2483

2484
        if sa == nil {
11✔
2485
                // No stream present.
×
2486
                resp.Error = NewJSStreamNotFoundError()
×
2487
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2488
                return
×
2489
        }
×
2490

2491
        // Check to see if we are a member of the group and if the group has no leader.
2492
        // Peers here is a server name, convert to node name.
2493
        nodeName := getHash(req.Peer)
11✔
2494

11✔
2495
        js.mu.RLock()
11✔
2496
        rg := sa.Group
11✔
2497
        isMember := rg.isMember(nodeName)
11✔
2498
        js.mu.RUnlock()
11✔
2499

11✔
2500
        // Make sure we are a member.
11✔
2501
        if !isMember {
12✔
2502
                resp.Error = NewJSClusterPeerNotMemberError()
1✔
2503
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2504
                return
1✔
2505
        }
1✔
2506

2507
        // If we are here we have a valid peer member set for removal.
2508
        if !js.removePeerFromStream(sa, nodeName) {
12✔
2509
                resp.Error = NewJSPeerRemapError()
2✔
2510
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2511
                return
2✔
2512
        }
2✔
2513

2514
        resp.Success = true
8✔
2515
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
8✔
2516
}
2517

2518
// Request to have the metaleader remove a peer from the system.
2519
func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
7✔
2520
        if c == nil || !s.JetStreamEnabled() {
7✔
2521
                return
×
2522
        }
×
2523

2524
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
7✔
2525
        if err != nil {
7✔
2526
                s.Warnf(badAPIRequestT, msg)
×
2527
                return
×
2528
        }
×
2529

2530
        js, cc := s.getJetStreamCluster()
7✔
2531
        if js == nil || cc == nil || cc.meta == nil {
7✔
2532
                return
×
2533
        }
×
2534

2535
        // Extra checks here but only leader is listening.
2536
        js.mu.RLock()
7✔
2537
        isLeader := cc.isLeader()
7✔
2538
        js.mu.RUnlock()
7✔
2539

7✔
2540
        if !isLeader {
7✔
2541
                return
×
2542
        }
×
2543

2544
        var resp = JSApiMetaServerRemoveResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerRemoveResponseType}}
7✔
2545

7✔
2546
        if isEmptyRequest(msg) {
7✔
2547
                resp.Error = NewJSBadRequestError()
×
2548
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2549
                return
×
2550
        }
×
2551

2552
        var req JSApiMetaServerRemoveRequest
7✔
2553
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
7✔
2554
                resp.Error = NewJSInvalidJSONError(err)
×
2555
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2556
                return
×
2557
        }
×
2558

2559
        var found string
7✔
2560
        js.mu.RLock()
7✔
2561
        for _, p := range cc.meta.Peers() {
30✔
2562
                // If Peer is specified, it takes precedence
23✔
2563
                if req.Peer != _EMPTY_ {
28✔
2564
                        if p.ID == req.Peer {
6✔
2565
                                found = req.Peer
1✔
2566
                                break
1✔
2567
                        }
2568
                        continue
4✔
2569
                }
2570
                si, ok := s.nodeToInfo.Load(p.ID)
18✔
2571
                if ok && si.(nodeInfo).name == req.Server {
21✔
2572
                        found = p.ID
3✔
2573
                        break
3✔
2574
                }
2575
        }
2576
        js.mu.RUnlock()
7✔
2577

7✔
2578
        if found == _EMPTY_ {
10✔
2579
                resp.Error = NewJSClusterServerNotMemberError()
3✔
2580
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
2581
                return
3✔
2582
        }
3✔
2583

2584
        // So we have a valid peer.
2585
        js.mu.Lock()
4✔
2586
        cc.meta.ProposeRemovePeer(found)
4✔
2587
        js.mu.Unlock()
4✔
2588

4✔
2589
        resp.Success = true
4✔
2590
        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
2591
}
2592

2593
func (s *Server) peerSetToNames(ps []string) []string {
170✔
2594
        names := make([]string, len(ps))
170✔
2595
        for i := 0; i < len(ps); i++ {
635✔
2596
                if si, ok := s.nodeToInfo.Load(ps[i]); !ok {
465✔
2597
                        names[i] = ps[i]
×
2598
                } else {
465✔
2599
                        names[i] = si.(nodeInfo).name
465✔
2600
                }
465✔
2601
        }
2602
        return names
170✔
2603
}
2604

2605
// looks up the peer id for a given server name. Cluster and domain name are optional filter criteria
2606
func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string {
29✔
2607
        js.mu.RLock()
29✔
2608
        defer js.mu.RUnlock()
29✔
2609
        if cc := js.cluster; cc != nil {
58✔
2610
                for _, p := range cc.meta.Peers() {
148✔
2611
                        si, ok := s.nodeToInfo.Load(p.ID)
119✔
2612
                        if ok && si.(nodeInfo).name == serverName {
148✔
2613
                                if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
58✔
2614
                                        if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
58✔
2615
                                                return p.ID
29✔
2616
                                        }
29✔
2617
                                }
2618
                        }
2619
                }
2620
        }
2621
        return _EMPTY_
×
2622
}
2623

2624
// Request to have the metaleader move a stream on a peer to another
2625
func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
33✔
2626
        if c == nil || !s.JetStreamEnabled() {
33✔
2627
                return
×
2628
        }
×
2629

2630
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
33✔
2631
        if err != nil {
33✔
2632
                s.Warnf(badAPIRequestT, msg)
×
2633
                return
×
2634
        }
×
2635

2636
        js, cc := s.getJetStreamCluster()
33✔
2637
        if js == nil || cc == nil || cc.meta == nil {
33✔
2638
                return
×
2639
        }
×
2640

2641
        // Extra checks here but only leader is listening.
2642
        js.mu.RLock()
33✔
2643
        isLeader := cc.isLeader()
33✔
2644
        js.mu.RUnlock()
33✔
2645

33✔
2646
        if !isLeader {
33✔
2647
                return
×
2648
        }
×
2649

2650
        accName := tokenAt(subject, 6)
33✔
2651
        streamName := tokenAt(subject, 7)
33✔
2652

33✔
2653
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
33✔
2654

33✔
2655
        var req JSApiMetaServerStreamMoveRequest
33✔
2656
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
33✔
2657
                resp.Error = NewJSInvalidJSONError(err)
×
2658
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2659
                return
×
2660
        }
×
2661

2662
        srcPeer := _EMPTY_
33✔
2663
        if req.Server != _EMPTY_ {
62✔
2664
                srcPeer = s.nameToPeer(js, req.Server, req.Cluster, req.Domain)
29✔
2665
        }
29✔
2666

2667
        targetAcc, ok := s.accounts.Load(accName)
33✔
2668
        if !ok {
33✔
2669
                resp.Error = NewJSNoAccountError()
×
2670
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2671
                return
×
2672
        }
×
2673

2674
        var streamFound bool
33✔
2675
        cfg := StreamConfig{}
33✔
2676
        currPeers := []string{}
33✔
2677
        currCluster := _EMPTY_
33✔
2678
        js.mu.Lock()
33✔
2679
        streams, ok := cc.streams[accName]
33✔
2680
        if ok {
66✔
2681
                sa, ok := streams[streamName]
33✔
2682
                if ok {
66✔
2683
                        cfg = *sa.Config.clone()
33✔
2684
                        streamFound = true
33✔
2685
                        currPeers = sa.Group.Peers
33✔
2686
                        currCluster = sa.Group.Cluster
33✔
2687
                }
33✔
2688
        }
2689
        js.mu.Unlock()
33✔
2690

33✔
2691
        if !streamFound {
33✔
2692
                resp.Error = NewJSStreamNotFoundError()
×
2693
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2694
                return
×
2695
        }
×
2696

2697
        // if server was picked, make sure src peer exists and move it to first position.
2698
        // removal will drop peers from the left
2699
        if req.Server != _EMPTY_ {
62✔
2700
                if srcPeer == _EMPTY_ {
29✔
2701
                        resp.Error = NewJSClusterServerNotMemberError()
×
2702
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2703
                        return
×
2704
                }
×
2705
                var peerFound bool
29✔
2706
                for i := 0; i < len(currPeers); i++ {
81✔
2707
                        if currPeers[i] == srcPeer {
81✔
2708
                                copy(currPeers[1:], currPeers[:i])
29✔
2709
                                currPeers[0] = srcPeer
29✔
2710
                                peerFound = true
29✔
2711
                                break
29✔
2712
                        }
2713
                }
2714
                if !peerFound {
29✔
2715
                        resp.Error = NewJSClusterPeerNotMemberError()
×
2716
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2717
                        return
×
2718
                }
×
2719
        }
2720

2721
        // make sure client is scoped to requested account
2722
        ciNew := *(ci)
33✔
2723
        ciNew.Account = accName
33✔
2724

33✔
2725
        // backup placement such that peers can be looked up with modified tag list
33✔
2726
        var origPlacement *Placement
33✔
2727
        if cfg.Placement != nil {
33✔
2728
                tmp := *cfg.Placement
×
2729
                origPlacement = &tmp
×
2730
        }
×
2731

2732
        if len(req.Tags) > 0 {
60✔
2733
                if cfg.Placement == nil {
54✔
2734
                        cfg.Placement = &Placement{}
27✔
2735
                }
27✔
2736
                cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...)
27✔
2737
        }
2738

2739
        peers, e := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1, nil)
33✔
2740
        if len(peers) <= cfg.Replicas {
35✔
2741
                // since expanding in the same cluster did not yield a result, try in different cluster
2✔
2742
                peers = nil
2✔
2743

2✔
2744
                clusters := map[string]struct{}{}
2✔
2745
                s.nodeToInfo.Range(func(_, ni any) bool {
18✔
2746
                        if currCluster != ni.(nodeInfo).cluster {
24✔
2747
                                clusters[ni.(nodeInfo).cluster] = struct{}{}
8✔
2748
                        }
8✔
2749
                        return true
16✔
2750
                })
2751
                errs := &selectPeerError{}
2✔
2752
                errs.accumulate(e)
2✔
2753
                for cluster := range clusters {
4✔
2754
                        newPeers, e := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0, nil)
2✔
2755
                        if len(newPeers) >= cfg.Replicas {
4✔
2756
                                peers = append([]string{}, currPeers...)
2✔
2757
                                peers = append(peers, newPeers[:cfg.Replicas]...)
2✔
2758
                                break
2✔
2759
                        }
2760
                        errs.accumulate(e)
×
2761
                }
2762
                if peers == nil {
2✔
2763
                        resp.Error = NewJSClusterNoPeersError(errs)
×
2764
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2765
                        return
×
2766
                }
×
2767
        }
2768

2769
        cfg.Placement = origPlacement
33✔
2770

33✔
2771
        s.Noticef("Requested move for stream '%s > %s' R=%d from %+v to %+v",
33✔
2772
                accName, streamName, cfg.Replicas, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
33✔
2773

33✔
2774
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
33✔
2775
        // We should be fine ignoring pedantic mode here. as we do not touch configuration.
33✔
2776
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
33✔
2777
}
2778

2779
// Request to have the metaleader move a stream on a peer to another
2780
func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
4✔
2781
        if c == nil || !s.JetStreamEnabled() {
4✔
2782
                return
×
2783
        }
×
2784

2785
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
4✔
2786
        if err != nil {
4✔
2787
                s.Warnf(badAPIRequestT, msg)
×
2788
                return
×
2789
        }
×
2790

2791
        js, cc := s.getJetStreamCluster()
4✔
2792
        if js == nil || cc == nil || cc.meta == nil {
4✔
2793
                return
×
2794
        }
×
2795

2796
        // Extra checks here but only leader is listening.
2797
        js.mu.RLock()
4✔
2798
        isLeader := cc.isLeader()
4✔
2799
        js.mu.RUnlock()
4✔
2800

4✔
2801
        if !isLeader {
4✔
2802
                return
×
2803
        }
×
2804

2805
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
4✔
2806

4✔
2807
        accName := tokenAt(subject, 6)
4✔
2808
        streamName := tokenAt(subject, 7)
4✔
2809

4✔
2810
        targetAcc, ok := s.accounts.Load(accName)
4✔
2811
        if !ok {
4✔
2812
                resp.Error = NewJSNoAccountError()
×
2813
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2814
                return
×
2815
        }
×
2816

2817
        streamFound := false
4✔
2818
        cfg := StreamConfig{}
4✔
2819
        currPeers := []string{}
4✔
2820
        js.mu.Lock()
4✔
2821
        streams, ok := cc.streams[accName]
4✔
2822
        if ok {
8✔
2823
                sa, ok := streams[streamName]
4✔
2824
                if ok {
8✔
2825
                        cfg = *sa.Config.clone()
4✔
2826
                        streamFound = true
4✔
2827
                        currPeers = sa.Group.Peers
4✔
2828
                }
4✔
2829
        }
2830
        js.mu.Unlock()
4✔
2831

4✔
2832
        if !streamFound {
4✔
2833
                resp.Error = NewJSStreamNotFoundError()
×
2834
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2835
                return
×
2836
        }
×
2837

2838
        if len(currPeers) <= cfg.Replicas {
4✔
2839
                resp.Error = NewJSStreamMoveNotInProgressError()
×
2840
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2841
                return
×
2842
        }
×
2843

2844
        // make sure client is scoped to requested account
2845
        ciNew := *(ci)
4✔
2846
        ciNew.Account = accName
4✔
2847

4✔
2848
        peers := currPeers[:cfg.Replicas]
4✔
2849

4✔
2850
        // Remove placement in case tags don't match
4✔
2851
        // This can happen if the move was initiated by modifying the tags.
4✔
2852
        // This is an account operation.
4✔
2853
        // This can NOT happen when the move was initiated by the system account.
4✔
2854
        // There move honors the original tag list.
4✔
2855
        if cfg.Placement != nil && len(cfg.Placement.Tags) != 0 {
5✔
2856
        FOR_TAGCHECK:
1✔
2857
                for _, peer := range peers {
2✔
2858
                        si, ok := s.nodeToInfo.Load(peer)
1✔
2859
                        if !ok {
1✔
2860
                                // can't verify tags, do the safe thing and error
×
2861
                                resp.Error = NewJSStreamGeneralError(
×
2862
                                        fmt.Errorf("peer %s not present for tag validation", peer))
×
2863
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2864
                                return
×
2865
                        }
×
2866
                        nodeTags := si.(nodeInfo).tags
1✔
2867
                        for _, tag := range cfg.Placement.Tags {
2✔
2868
                                if !nodeTags.Contains(tag) {
2✔
2869
                                        // clear placement as tags don't match
1✔
2870
                                        cfg.Placement = nil
1✔
2871
                                        break FOR_TAGCHECK
1✔
2872
                                }
2873
                        }
2874

2875
                }
2876
        }
2877

2878
        s.Noticef("Requested cancel of move: R=%d '%s > %s' to peer set %+v and restore previous peer set %+v",
4✔
2879
                cfg.Replicas, accName, streamName, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
4✔
2880

4✔
2881
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
4✔
2882
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
4✔
2883
}
2884

2885
// Request to have an account purged
2886
func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
6✔
2887
        if c == nil || !s.JetStreamEnabled() {
6✔
2888
                return
×
2889
        }
×
2890

2891
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
6✔
2892
        if err != nil {
6✔
2893
                s.Warnf(badAPIRequestT, msg)
×
2894
                return
×
2895
        }
×
2896

2897
        js := s.getJetStream()
6✔
2898
        if js == nil {
6✔
2899
                return
×
2900
        }
×
2901

2902
        accName := tokenAt(subject, 5)
6✔
2903

6✔
2904
        var resp = JSApiAccountPurgeResponse{ApiResponse: ApiResponse{Type: JSApiAccountPurgeResponseType}}
6✔
2905

6✔
2906
        if !s.JetStreamIsClustered() {
8✔
2907
                var streams []*stream
2✔
2908
                var ac *Account
2✔
2909
                if ac, err = s.lookupAccount(accName); err == nil && ac != nil {
3✔
2910
                        streams = ac.streams()
1✔
2911
                }
1✔
2912

2913
                s.Noticef("Purge request for account %s (streams: %d, hasAccount: %t)",
2✔
2914
                        accName, len(streams), ac != nil)
2✔
2915

2✔
2916
                for _, mset := range streams {
3✔
2917
                        err := mset.delete()
1✔
2918
                        if err != nil {
1✔
2919
                                resp.Error = NewJSStreamDeleteError(err)
×
2920
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2921
                                return
×
2922
                        }
×
2923
                }
2924
                if err := os.RemoveAll(filepath.Join(js.config.StoreDir, accName)); err != nil {
2✔
2925
                        resp.Error = NewJSStreamGeneralError(err)
×
2926
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2927
                        return
×
2928
                }
×
2929
                resp.Initiated = true
2✔
2930
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2931
                return
2✔
2932
        }
2933

2934
        _, cc := s.getJetStreamCluster()
4✔
2935
        if cc == nil || cc.meta == nil || !cc.isLeader() {
4✔
2936
                return
×
2937
        }
×
2938

2939
        if js.isMetaRecovering() {
4✔
2940
                // While in recovery mode, the data structures are not fully initialized
×
2941
                resp.Error = NewJSClusterNotAvailError()
×
2942
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2943
                return
×
2944
        }
×
2945

2946
        js.mu.RLock()
4✔
2947
        ns, nc := 0, 0
4✔
2948
        streams, hasAccount := cc.streams[accName]
4✔
2949
        for _, osa := range streams {
12✔
2950
                for _, oca := range osa.consumers {
20✔
2951
                        oca.deleted = true
12✔
2952
                        ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client}
12✔
2953
                        cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
12✔
2954
                        nc++
12✔
2955
                }
12✔
2956
                sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client}
8✔
2957
                cc.meta.Propose(encodeDeleteStreamAssignment(sa))
8✔
2958
                ns++
8✔
2959
        }
2960
        js.mu.RUnlock()
4✔
2961

4✔
2962
        s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)", accName, ns, nc, hasAccount)
4✔
2963

4✔
2964
        resp.Initiated = true
4✔
2965
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
2966
}
2967

2968
// Request to have the meta leader stepdown.
2969
// These will only be received by the meta leader, so less checking needed.
2970
func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
23✔
2971
        if c == nil || !s.JetStreamEnabled() {
23✔
2972
                return
×
2973
        }
×
2974

2975
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
23✔
2976
        if err != nil {
23✔
2977
                s.Warnf(badAPIRequestT, msg)
×
2978
                return
×
2979
        }
×
2980

2981
        // This should only be coming from the System Account.
2982
        if acc != s.SystemAccount() {
24✔
2983
                s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User)
1✔
2984
                return
1✔
2985
        }
1✔
2986

2987
        js, cc := s.getJetStreamCluster()
22✔
2988
        if js == nil || cc == nil || cc.meta == nil {
22✔
2989
                return
×
2990
        }
×
2991

2992
        // Extra checks here but only leader is listening.
2993
        js.mu.RLock()
22✔
2994
        isLeader := cc.isLeader()
22✔
2995
        js.mu.RUnlock()
22✔
2996

22✔
2997
        if !isLeader {
22✔
2998
                return
×
2999
        }
×
3000

3001
        var preferredLeader string
22✔
3002
        var resp = JSApiLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiLeaderStepDownResponseType}}
22✔
3003

22✔
3004
        if isJSONObjectOrArray(msg) {
36✔
3005
                var req JSApiLeaderStepdownRequest
14✔
3006
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
14✔
3007
                        resp.Error = NewJSInvalidJSONError(err)
×
3008
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3009
                        return
×
3010
                }
×
3011
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(cc.meta, req.Placement); resp.Error != nil {
20✔
3012
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
6✔
3013
                        return
6✔
3014
                }
6✔
3015
        }
3016

3017
        // Call actual stepdown.
3018
        err = cc.meta.StepDown(preferredLeader)
16✔
3019
        if err != nil {
16✔
3020
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
3021
        } else {
16✔
3022
                resp.Success = true
16✔
3023
        }
16✔
3024
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
16✔
3025
}
3026

3027
// Check if given []bytes is a JSON Object or Array.
3028
// Technically, valid JSON can also be a plain string or number, but for our use case,
3029
// we care only for JSON objects or arrays which starts with `[` or `{`.
3030
// This function does not have to ensure valid JSON in its entirety. It is used merely
3031
// to hint the codepath if it should attempt to parse the request as JSON or not.
3032
func isJSONObjectOrArray(req []byte) bool {
19,452✔
3033
        // Skip leading JSON whitespace (space, tab, newline, carriage return)
19,452✔
3034
        i := 0
19,452✔
3035
        for i < len(req) && (req[i] == ' ' || req[i] == '\t' || req[i] == '\n' || req[i] == '\r') {
19,464✔
3036
                i++
12✔
3037
        }
12✔
3038
        // Check for empty input after trimming
3039
        if i >= len(req) {
38,361✔
3040
                return false
18,909✔
3041
        }
18,909✔
3042
        // Check if the first non-whitespace character is '{' or '['
3043
        return req[i] == '{' || req[i] == '['
543✔
3044
}
3045

3046
func isEmptyRequest(req []byte) bool {
47,042✔
3047
        if len(req) == 0 {
92,823✔
3048
                return true
45,781✔
3049
        }
45,781✔
3050
        if bytes.Equal(req, []byte("{}")) {
1,262✔
3051
                return true
1✔
3052
        }
1✔
3053
        // If we are here we didn't get our simple match, but still could be valid.
3054
        var v any
1,260✔
3055
        if err := json.Unmarshal(req, &v); err != nil {
1,260✔
3056
                return false
×
3057
        }
×
3058
        vm, ok := v.(map[string]any)
1,260✔
3059
        if !ok {
1,260✔
3060
                return false
×
3061
        }
×
3062
        return len(vm) == 0
1,260✔
3063
}
3064

3065
// getStepDownPreferredPlacement attempts to work out what the best placement is
3066
// for a stepdown request. The preferred server name always takes precedence, but
3067
// if not specified, the placement will be used to filter by cluster. The caller
3068
// should check for return API errors and return those to the requestor if needed.
3069
func (s *Server) getStepDownPreferredPlacement(group RaftNode, placement *Placement) (string, *ApiError) {
38✔
3070
        if placement == nil {
40✔
3071
                return _EMPTY_, nil
2✔
3072
        }
2✔
3073
        var preferredLeader string
36✔
3074
        if placement.Preferred != _EMPTY_ {
52✔
3075
                for _, p := range group.Peers() {
69✔
3076
                        si, ok := s.nodeToInfo.Load(p.ID)
53✔
3077
                        if !ok || si == nil {
53✔
3078
                                continue
×
3079
                        }
3080
                        if si.(nodeInfo).name == placement.Preferred {
65✔
3081
                                preferredLeader = p.ID
12✔
3082
                                break
12✔
3083
                        }
3084
                }
3085
                if preferredLeader == group.ID() {
20✔
3086
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q is already leader", placement.Preferred))
4✔
3087
                }
4✔
3088
                if preferredLeader == _EMPTY_ {
16✔
3089
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q not known", placement.Preferred))
4✔
3090
                }
4✔
3091
        } else {
20✔
3092
                possiblePeers := make(map[*Peer]nodeInfo, len(group.Peers()))
20✔
3093
                ourID := group.ID()
20✔
3094
                for _, p := range group.Peers() {
116✔
3095
                        if p == nil {
96✔
3096
                                continue // ... shouldn't happen.
×
3097
                        }
3098
                        si, ok := s.nodeToInfo.Load(p.ID)
96✔
3099
                        if !ok || si == nil {
96✔
3100
                                continue
×
3101
                        }
3102
                        ni := si.(nodeInfo)
96✔
3103
                        if ni.offline || p.ID == ourID {
116✔
3104
                                continue
20✔
3105
                        }
3106
                        possiblePeers[p] = ni
76✔
3107
                }
3108
                // If cluster is specified, filter out anything not matching the cluster name.
3109
                if placement.Cluster != _EMPTY_ {
31✔
3110
                        for p, si := range possiblePeers {
51✔
3111
                                if si.cluster != placement.Cluster {
66✔
3112
                                        delete(possiblePeers, p)
26✔
3113
                                }
26✔
3114
                        }
3115
                }
3116
                // If tags are specified, filter out anything not matching all supplied tags.
3117
                if len(placement.Tags) > 0 {
32✔
3118
                        for p, si := range possiblePeers {
55✔
3119
                                matchesAll := true
43✔
3120
                                for _, tag := range placement.Tags {
93✔
3121
                                        if matchesAll = matchesAll && si.tags.Contains(tag); !matchesAll {
82✔
3122
                                                break
32✔
3123
                                        }
3124
                                }
3125
                                if !matchesAll {
75✔
3126
                                        delete(possiblePeers, p)
32✔
3127
                                }
32✔
3128
                        }
3129
                }
3130
                // If there are no possible peers, return an error.
3131
                if len(possiblePeers) == 0 {
28✔
3132
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
8✔
3133
                }
8✔
3134
                // Take advantage of random map iteration order to select the preferred.
3135
                for p := range possiblePeers {
24✔
3136
                        preferredLeader = p.ID
12✔
3137
                        break
12✔
3138
                }
3139
        }
3140
        return preferredLeader, nil
20✔
3141
}
3142

3143
// Request to delete a stream.
3144
func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
507✔
3145
        if c == nil || !s.JetStreamEnabled() {
507✔
3146
                return
×
3147
        }
×
3148
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
507✔
3149
        if err != nil {
507✔
3150
                s.Warnf(badAPIRequestT, msg)
×
3151
                return
×
3152
        }
×
3153

3154
        var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
507✔
3155

507✔
3156
        // Determine if we should proceed here when we are in clustered mode.
507✔
3157
        if s.JetStreamIsClustered() {
972✔
3158
                js, cc := s.getJetStreamCluster()
465✔
3159
                if js == nil || cc == nil {
465✔
3160
                        return
×
3161
                }
×
3162
                if js.isLeaderless() {
466✔
3163
                        resp.Error = NewJSClusterNotAvailError()
1✔
3164
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3165
                        return
1✔
3166
                }
1✔
3167
                // Make sure we are meta leader.
3168
                if !s.JetStreamIsLeader() {
830✔
3169
                        return
366✔
3170
                }
366✔
3171
        }
3172

3173
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
140✔
3174
                if doErr {
×
3175
                        resp.Error = NewJSNotEnabledForAccountError()
×
3176
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3177
                }
×
3178
                return
×
3179
        }
3180

3181
        if !isEmptyRequest(msg) {
141✔
3182
                resp.Error = NewJSNotEmptyRequestError()
1✔
3183
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3184
                return
1✔
3185
        }
1✔
3186
        stream := streamNameFromSubject(subject)
139✔
3187

139✔
3188
        // Clustered.
139✔
3189
        if s.JetStreamIsClustered() {
237✔
3190
                s.jsClusteredStreamDeleteRequest(ci, acc, stream, subject, reply, msg)
98✔
3191
                return
98✔
3192
        }
98✔
3193

3194
        mset, err := acc.lookupStream(stream)
41✔
3195
        if err != nil {
46✔
3196
                resp.Error = NewJSStreamNotFoundError(Unless(err))
5✔
3197
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
3198
                return
5✔
3199
        }
5✔
3200

3201
        if err := mset.delete(); err != nil {
36✔
3202
                resp.Error = NewJSStreamDeleteError(err, Unless(err))
×
3203
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3204
                return
×
3205
        }
×
3206
        resp.Success = true
36✔
3207
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
36✔
3208
}
3209

3210
// Request to delete a message.
3211
// This expects a stream sequence number as the msg body.
3212
func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
760✔
3213
        if c == nil || !s.JetStreamEnabled() {
768✔
3214
                return
8✔
3215
        }
8✔
3216
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
752✔
3217
        if err != nil {
752✔
3218
                s.Warnf(badAPIRequestT, msg)
×
3219
                return
×
3220
        }
×
3221

3222
        stream := tokenAt(subject, 6)
752✔
3223

752✔
3224
        var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
752✔
3225

752✔
3226
        // If we are in clustered mode we need to be the stream leader to proceed.
752✔
3227
        if s.JetStreamIsClustered() {
1,095✔
3228
                // Check to make sure the stream is assigned.
343✔
3229
                js, cc := s.getJetStreamCluster()
343✔
3230
                if js == nil || cc == nil {
343✔
3231
                        return
×
3232
                }
×
3233
                if js.isLeaderless() {
344✔
3234
                        resp.Error = NewJSClusterNotAvailError()
1✔
3235
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3236
                        return
1✔
3237
                }
1✔
3238

3239
                js.mu.RLock()
342✔
3240
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
342✔
3241
                js.mu.RUnlock()
342✔
3242

342✔
3243
                if isLeader && sa == nil {
342✔
3244
                        // We can't find the stream, so mimic what would be the errors below.
×
3245
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3246
                                if doErr {
×
3247
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3248
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3249
                                }
×
3250
                                return
×
3251
                        }
3252
                        // No stream present.
3253
                        resp.Error = NewJSStreamNotFoundError()
×
3254
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3255
                        return
×
3256
                } else if sa == nil {
342✔
3257
                        return
×
3258
                }
×
3259

3260
                // Check to see if we are a member of the group and if the group has no leader.
3261
                if js.isGroupLeaderless(sa.Group) {
342✔
3262
                        resp.Error = NewJSClusterNotAvailError()
×
3263
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3264
                        return
×
3265
                }
×
3266

3267
                // We have the stream assigned and a leader, so only the stream leader should answer.
3268
                if !acc.JetStreamIsStreamLeader(stream) {
570✔
3269
                        return
228✔
3270
                }
228✔
3271
        }
3272

3273
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
688✔
3274
                if doErr {
328✔
3275
                        resp.Error = NewJSNotEnabledForAccountError()
163✔
3276
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
163✔
3277
                }
163✔
3278
                return
165✔
3279
        }
3280
        if isEmptyRequest(msg) {
358✔
3281
                resp.Error = NewJSBadRequestError()
×
3282
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3283
                return
×
3284
        }
×
3285
        var req JSApiMsgDeleteRequest
358✔
3286
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
358✔
3287
                resp.Error = NewJSInvalidJSONError(err)
×
3288
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3289
                return
×
3290
        }
×
3291

3292
        mset, err := acc.lookupStream(stream)
358✔
3293
        if err != nil {
361✔
3294
                resp.Error = NewJSStreamNotFoundError(Unless(err))
3✔
3295
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3296
                return
3✔
3297
        }
3✔
3298
        if mset.cfg.Sealed {
357✔
3299
                resp.Error = NewJSStreamSealedError()
2✔
3300
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3301
                return
2✔
3302
        }
2✔
3303
        if mset.cfg.DenyDelete {
354✔
3304
                resp.Error = NewJSStreamMsgDeleteFailedError(errors.New("message delete not permitted"))
1✔
3305
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3306
                return
1✔
3307
        }
1✔
3308

3309
        if s.JetStreamIsClustered() {
464✔
3310
                s.jsClusteredMsgDeleteRequest(ci, acc, mset, stream, subject, reply, &req, rmsg)
112✔
3311
                return
112✔
3312
        }
112✔
3313

3314
        var removed bool
240✔
3315
        if req.NoErase {
479✔
3316
                removed, err = mset.removeMsg(req.Seq)
239✔
3317
        } else {
240✔
3318
                removed, err = mset.eraseMsg(req.Seq)
1✔
3319
        }
1✔
3320
        if err != nil {
240✔
3321
                resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err))
×
3322
        } else if !removed {
240✔
3323
                resp.Error = NewJSSequenceNotFoundError(req.Seq)
×
3324
        } else {
240✔
3325
                resp.Success = true
240✔
3326
        }
240✔
3327
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
240✔
3328
}
3329

3330
// Request to get a raw stream message.
3331
func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,321✔
3332
        if c == nil || !s.JetStreamEnabled() {
1,321✔
3333
                return
×
3334
        }
×
3335
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
1,321✔
3336
        if err != nil {
1,321✔
3337
                s.Warnf(badAPIRequestT, msg)
×
3338
                return
×
3339
        }
×
3340

3341
        stream := tokenAt(subject, 6)
1,321✔
3342

1,321✔
3343
        var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
1,321✔
3344

1,321✔
3345
        // If we are in clustered mode we need to be the stream leader to proceed.
1,321✔
3346
        if s.JetStreamIsClustered() {
2,020✔
3347
                // Check to make sure the stream is assigned.
699✔
3348
                js, cc := s.getJetStreamCluster()
699✔
3349
                if js == nil || cc == nil {
699✔
3350
                        return
×
3351
                }
×
3352
                if js.isLeaderless() {
699✔
3353
                        resp.Error = NewJSClusterNotAvailError()
×
3354
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3355
                        return
×
3356
                }
×
3357

3358
                js.mu.RLock()
699✔
3359
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
699✔
3360
                js.mu.RUnlock()
699✔
3361

699✔
3362
                if isLeader && sa == nil {
699✔
3363
                        // We can't find the stream, so mimic what would be the errors below.
×
3364
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3365
                                if doErr {
×
3366
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3367
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3368
                                }
×
3369
                                return
×
3370
                        }
3371
                        // No stream present.
3372
                        resp.Error = NewJSStreamNotFoundError()
×
3373
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3374
                        return
×
3375
                } else if sa == nil {
699✔
3376
                        return
×
3377
                }
×
3378

3379
                // Check to see if we are a member of the group and if the group has no leader.
3380
                if js.isGroupLeaderless(sa.Group) {
699✔
3381
                        resp.Error = NewJSClusterNotAvailError()
×
3382
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3383
                        return
×
3384
                }
×
3385

3386
                // We have the stream assigned and a leader, so only the stream leader should answer.
3387
                if !acc.JetStreamIsStreamLeader(stream) {
1,174✔
3388
                        return
475✔
3389
                }
475✔
3390
        }
3391

3392
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
849✔
3393
                if doErr {
3✔
3394
                        resp.Error = NewJSNotEnabledForAccountError()
×
3395
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3396
                }
×
3397
                return
3✔
3398
        }
3399
        if isEmptyRequest(msg) {
843✔
3400
                resp.Error = NewJSBadRequestError()
×
3401
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3402
                return
×
3403
        }
×
3404
        var req JSApiMsgGetRequest
843✔
3405
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
843✔
3406
                resp.Error = NewJSInvalidJSONError(err)
×
3407
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3408
                return
×
3409
        }
×
3410

3411
        // This version does not support batch.
3412
        if req.Batch > 0 || req.MaxBytes > 0 {
844✔
3413
                resp.Error = NewJSBadRequestError()
1✔
3414
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3415
                return
1✔
3416
        }
1✔
3417

3418
        // Validate non-conflicting options. Seq, LastFor, and AsOfTime are mutually exclusive.
3419
        // NextFor can be paired with Seq or AsOfTime indicating a filter subject.
3420
        if (req.Seq > 0 && req.LastFor != _EMPTY_) ||
842✔
3421
                (req.Seq == 0 && req.LastFor == _EMPTY_ && req.NextFor == _EMPTY_ && req.StartTime == nil) ||
842✔
3422
                (req.Seq > 0 && req.StartTime != nil) ||
842✔
3423
                (req.StartTime != nil && req.LastFor != _EMPTY_) ||
842✔
3424
                (req.LastFor != _EMPTY_ && req.NextFor != _EMPTY_) {
846✔
3425
                resp.Error = NewJSBadRequestError()
4✔
3426
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3427
                return
4✔
3428
        }
4✔
3429

3430
        mset, err := acc.lookupStream(stream)
838✔
3431
        if err != nil {
838✔
3432
                resp.Error = NewJSStreamNotFoundError()
×
3433
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3434
                return
×
3435
        }
×
3436

3437
        var svp StoreMsg
838✔
3438
        var sm *StoreMsg
838✔
3439

838✔
3440
        // If AsOfTime is set, perform this first to get the sequence.
838✔
3441
        var seq uint64
838✔
3442
        if req.StartTime != nil {
844✔
3443
                seq = mset.store.GetSeqFromTime(*req.StartTime)
6✔
3444
        } else {
838✔
3445
                seq = req.Seq
832✔
3446
        }
832✔
3447

3448
        if seq > 0 && req.NextFor == _EMPTY_ {
983✔
3449
                sm, err = mset.store.LoadMsg(seq, &svp)
145✔
3450
        } else if req.NextFor != _EMPTY_ {
940✔
3451
                sm, _, err = mset.store.LoadNextMsg(req.NextFor, subjectHasWildcard(req.NextFor), seq, &svp)
102✔
3452
        } else {
693✔
3453
                sm, err = mset.store.LoadLastMsg(req.LastFor, &svp)
591✔
3454
        }
591✔
3455
        if err != nil {
1,234✔
3456
                resp.Error = NewJSNoMessageFoundError()
396✔
3457
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
396✔
3458
                return
396✔
3459
        }
396✔
3460
        resp.Message = &StoredMsg{
442✔
3461
                Subject:  sm.subj,
442✔
3462
                Sequence: sm.seq,
442✔
3463
                Header:   sm.hdr,
442✔
3464
                Data:     sm.msg,
442✔
3465
                Time:     time.Unix(0, sm.ts).UTC(),
442✔
3466
        }
442✔
3467

442✔
3468
        // Don't send response through API layer for this call.
442✔
3469
        s.sendInternalAccountMsg(nil, reply, s.jsonResponse(resp))
442✔
3470
}
3471

3472
func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
27✔
3473
        if c == nil || !s.JetStreamEnabled() {
27✔
3474
                return
×
3475
        }
×
3476

3477
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
27✔
3478
        if err != nil {
27✔
3479
                s.Warnf(badAPIRequestT, msg)
×
3480
                return
×
3481
        }
×
3482

3483
        stream := streamNameFromSubject(subject)
27✔
3484
        consumer := consumerNameFromSubject(subject)
27✔
3485

27✔
3486
        var req JSApiConsumerUnpinRequest
27✔
3487
        var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}}
27✔
3488

27✔
3489
        if err := json.Unmarshal(msg, &req); err != nil {
27✔
3490
                resp.Error = NewJSInvalidJSONError(err)
×
3491
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3492
                return
×
3493
        }
×
3494

3495
        if req.Group == _EMPTY_ {
31✔
3496
                resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
4✔
3497
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3498
                return
4✔
3499
        }
4✔
3500

3501
        if !validGroupName.MatchString(req.Group) {
27✔
3502
                resp.Error = NewJSConsumerInvalidGroupNameError()
4✔
3503
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3504
                return
4✔
3505
        }
4✔
3506
        if s.JetStreamIsClustered() {
31✔
3507
                // Check to make sure the stream is assigned.
12✔
3508
                js, cc := s.getJetStreamCluster()
12✔
3509
                if js == nil || cc == nil {
12✔
3510
                        return
×
3511
                }
×
3512

3513
                // First check if the stream and consumer is there.
3514
                js.mu.RLock()
12✔
3515
                sa := js.streamAssignment(acc.Name, stream)
12✔
3516
                if sa == nil {
15✔
3517
                        js.mu.RUnlock()
3✔
3518
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
3✔
3519
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3520
                        return
3✔
3521
                }
3✔
3522

3523
                ca, ok := sa.consumers[consumer]
9✔
3524
                if !ok || ca == nil {
12✔
3525
                        js.mu.RUnlock()
3✔
3526
                        resp.Error = NewJSConsumerNotFoundError()
3✔
3527
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3528
                        return
3✔
3529
                }
3✔
3530
                js.mu.RUnlock()
6✔
3531

6✔
3532
                // Then check if we are the leader.
6✔
3533
                mset, err := acc.lookupStream(stream)
6✔
3534
                if err != nil {
6✔
3535
                        return
×
3536
                }
×
3537

3538
                o := mset.lookupConsumer(consumer)
6✔
3539
                if o == nil {
6✔
3540
                        return
×
3541
                }
×
3542
                if !o.isLeader() {
10✔
3543
                        return
4✔
3544
                }
4✔
3545
        }
3546

3547
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
9✔
3548
                if doErr {
×
3549
                        resp.Error = NewJSNotEnabledForAccountError()
×
3550
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3551
                }
×
3552
                return
×
3553
        }
3554

3555
        mset, err := acc.lookupStream(stream)
9✔
3556
        if err != nil {
10✔
3557
                resp.Error = NewJSStreamNotFoundError()
1✔
3558
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3559
                return
1✔
3560
        }
1✔
3561
        o := mset.lookupConsumer(consumer)
8✔
3562
        if o == nil {
9✔
3563
                resp.Error = NewJSConsumerNotFoundError()
1✔
3564
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3565
                return
1✔
3566
        }
1✔
3567

3568
        var foundPriority bool
7✔
3569
        for _, group := range o.config().PriorityGroups {
14✔
3570
                if group == req.Group {
12✔
3571
                        foundPriority = true
5✔
3572
                        break
5✔
3573
                }
3574
        }
3575
        if !foundPriority {
9✔
3576
                resp.Error = NewJSConsumerInvalidPriorityGroupError()
2✔
3577
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3578
                return
2✔
3579
        }
2✔
3580

3581
        o.mu.Lock()
5✔
3582
        o.currentPinId = _EMPTY_
5✔
3583
        o.sendUnpinnedAdvisoryLocked(req.Group, "admin")
5✔
3584
        o.mu.Unlock()
5✔
3585
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
5✔
3586
}
3587

3588
// Request to purge a stream.
3589
func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
131✔
3590
        if c == nil || !s.JetStreamEnabled() {
131✔
3591
                return
×
3592
        }
×
3593
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
131✔
3594
        if err != nil {
131✔
3595
                s.Warnf(badAPIRequestT, msg)
×
3596
                return
×
3597
        }
×
3598

3599
        stream := streamNameFromSubject(subject)
131✔
3600

131✔
3601
        var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
131✔
3602

131✔
3603
        // If we are in clustered mode we need to be the stream leader to proceed.
131✔
3604
        if s.JetStreamIsClustered() {
235✔
3605
                // Check to make sure the stream is assigned.
104✔
3606
                js, cc := s.getJetStreamCluster()
104✔
3607
                if js == nil || cc == nil {
104✔
3608
                        return
×
3609
                }
×
3610

3611
                js.mu.RLock()
104✔
3612
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
104✔
3613
                js.mu.RUnlock()
104✔
3614

104✔
3615
                if isLeader && sa == nil {
104✔
3616
                        // We can't find the stream, so mimic what would be the errors below.
×
3617
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3618
                                if doErr {
×
3619
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3620
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3621
                                }
×
3622
                                return
×
3623
                        }
3624
                        // No stream present.
3625
                        resp.Error = NewJSStreamNotFoundError()
×
3626
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3627
                        return
×
3628
                } else if sa == nil {
104✔
3629
                        if js.isLeaderless() {
×
3630
                                resp.Error = NewJSClusterNotAvailError()
×
3631
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3632
                        }
×
3633
                        return
×
3634
                }
3635

3636
                // Check to see if we are a member of the group and if the group has no leader.
3637
                if js.isGroupLeaderless(sa.Group) {
104✔
3638
                        resp.Error = NewJSClusterNotAvailError()
×
3639
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3640
                        return
×
3641
                }
×
3642

3643
                // We have the stream assigned and a leader, so only the stream leader should answer.
3644
                if !acc.JetStreamIsStreamLeader(stream) {
175✔
3645
                        if js.isLeaderless() {
72✔
3646
                                resp.Error = NewJSClusterNotAvailError()
1✔
3647
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3648
                        }
1✔
3649
                        return
71✔
3650
                }
3651
        }
3652

3653
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
60✔
3654
                if doErr {
×
3655
                        resp.Error = NewJSNotEnabledForAccountError()
×
3656
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3657
                }
×
3658
                return
×
3659
        }
3660

3661
        var purgeRequest *JSApiStreamPurgeRequest
60✔
3662
        if isJSONObjectOrArray(msg) {
89✔
3663
                var req JSApiStreamPurgeRequest
29✔
3664
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
29✔
3665
                        resp.Error = NewJSInvalidJSONError(err)
×
3666
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3667
                        return
×
3668
                }
×
3669
                if req.Sequence > 0 && req.Keep > 0 {
29✔
3670
                        resp.Error = NewJSBadRequestError()
×
3671
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3672
                        return
×
3673
                }
×
3674
                purgeRequest = &req
29✔
3675
        }
3676

3677
        mset, err := acc.lookupStream(stream)
60✔
3678
        if err != nil {
60✔
3679
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
3680
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3681
                return
×
3682
        }
×
3683
        if mset.cfg.Sealed {
62✔
3684
                resp.Error = NewJSStreamSealedError()
2✔
3685
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3686
                return
2✔
3687
        }
2✔
3688
        if mset.cfg.DenyPurge {
59✔
3689
                resp.Error = NewJSStreamPurgeFailedError(errors.New("stream purge not permitted"))
1✔
3690
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3691
                return
1✔
3692
        }
1✔
3693

3694
        if s.JetStreamIsClustered() {
88✔
3695
                s.jsClusteredStreamPurgeRequest(ci, acc, mset, stream, subject, reply, rmsg, purgeRequest)
31✔
3696
                return
31✔
3697
        }
31✔
3698

3699
        purged, err := mset.purge(purgeRequest)
26✔
3700
        if err != nil {
26✔
3701
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
3702
        } else {
26✔
3703
                resp.Purged = purged
26✔
3704
                resp.Success = true
26✔
3705
        }
26✔
3706
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
26✔
3707
}
3708

3709
func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError {
1,100✔
3710
        var replicas int
1,100✔
3711
        if cfg != nil {
2,200✔
3712
                replicas = cfg.Replicas
1,100✔
3713
        }
1,100✔
3714
        selectedLimits, tier, jsa, apiErr := acc.selectLimits(replicas)
1,100✔
3715
        if apiErr != nil {
1,100✔
3716
                return apiErr
×
3717
        }
×
3718
        jsa.mu.RLock()
1,100✔
3719
        defer jsa.mu.RUnlock()
1,100✔
3720
        if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, cfg) >= selectedLimits.MaxStreams {
1,103✔
3721
                return NewJSMaximumStreamsLimitError()
3✔
3722
        }
3✔
3723
        reserved := jsa.tieredReservation(tier, cfg)
1,097✔
3724
        if err := jsa.js.checkAllLimits(selectedLimits, cfg, reserved, 0); err != nil {
1,098✔
3725
                return NewJSStreamLimitsError(err, Unless(err))
1✔
3726
        }
1✔
3727
        return nil
1,096✔
3728
}
3729

3730
// Request to restore a stream.
3731
func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
49✔
3732
        if c == nil || !s.JetStreamIsLeader() {
73✔
3733
                return
24✔
3734
        }
24✔
3735
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
25✔
3736
        if err != nil {
25✔
3737
                s.Warnf(badAPIRequestT, msg)
×
3738
                return
×
3739
        }
×
3740

3741
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
25✔
3742
        if !acc.JetStreamEnabled() {
25✔
3743
                resp.Error = NewJSNotEnabledForAccountError()
×
3744
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3745
                return
×
3746
        }
×
3747
        if isEmptyRequest(msg) {
26✔
3748
                resp.Error = NewJSBadRequestError()
1✔
3749
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3750
                return
1✔
3751
        }
1✔
3752

3753
        var req JSApiStreamRestoreRequest
24✔
3754
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
24✔
3755
                resp.Error = NewJSInvalidJSONError(err)
×
3756
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3757
                return
×
3758
        }
×
3759

3760
        stream := streamNameFromSubject(subject)
24✔
3761

24✔
3762
        if stream != req.Config.Name && req.Config.Name == _EMPTY_ {
24✔
3763
                req.Config.Name = stream
×
3764
        }
×
3765

3766
        // check stream config at the start of the restore process, not at the end
3767
        cfg, apiErr := s.checkStreamCfg(&req.Config, acc, false)
24✔
3768
        if apiErr != nil {
26✔
3769
                resp.Error = apiErr
2✔
3770
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3771
                return
2✔
3772
        }
2✔
3773

3774
        if s.JetStreamIsClustered() {
33✔
3775
                s.jsClusteredStreamRestoreRequest(ci, acc, &req, subject, reply, rmsg)
11✔
3776
                return
11✔
3777
        }
11✔
3778

3779
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil {
13✔
3780
                resp.Error = err
2✔
3781
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3782
                return
2✔
3783
        }
2✔
3784

3785
        if _, err := acc.lookupStream(stream); err == nil {
10✔
3786
                resp.Error = NewJSStreamNameExistRestoreFailedError()
1✔
3787
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3788
                return
1✔
3789
        }
1✔
3790

3791
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
8✔
3792
                if doErr {
×
3793
                        resp.Error = NewJSNotEnabledForAccountError()
×
3794
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3795
                }
×
3796
                return
×
3797
        }
3798

3799
        s.processStreamRestore(ci, acc, &req.Config, subject, reply, string(msg))
8✔
3800
}
3801

3802
func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamConfig, subject, reply, msg string) <-chan error {
16✔
3803
        js := s.getJetStream()
16✔
3804

16✔
3805
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
16✔
3806

16✔
3807
        snapDir := filepath.Join(js.config.StoreDir, snapStagingDir)
16✔
3808
        if _, err := os.Stat(snapDir); os.IsNotExist(err) {
28✔
3809
                if err := os.MkdirAll(snapDir, defaultDirPerms); err != nil {
12✔
3810
                        resp.Error = &ApiError{Code: 503, Description: "JetStream unable to create temp storage for restore"}
×
3811
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3812
                        return nil
×
3813
                }
×
3814
        }
3815

3816
        tfile, err := os.CreateTemp(snapDir, "js-restore-")
16✔
3817
        if err != nil {
16✔
3818
                resp.Error = NewJSTempStorageFailedError()
×
3819
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
3820
                return nil
×
3821
        }
×
3822

3823
        streamName := cfg.Name
16✔
3824
        s.Noticef("Starting restore for stream '%s > %s'", acc.Name, streamName)
16✔
3825

16✔
3826
        start := time.Now().UTC()
16✔
3827
        domain := s.getOpts().JetStreamDomain
16✔
3828
        s.publishAdvisory(acc, JSAdvisoryStreamRestoreCreatePre+"."+streamName, &JSRestoreCreateAdvisory{
16✔
3829
                TypedEvent: TypedEvent{
16✔
3830
                        Type: JSRestoreCreateAdvisoryType,
16✔
3831
                        ID:   nuid.Next(),
16✔
3832
                        Time: start,
16✔
3833
                },
16✔
3834
                Stream: streamName,
16✔
3835
                Client: ci.forAdvisory(),
16✔
3836
                Domain: domain,
16✔
3837
        })
16✔
3838

16✔
3839
        // Create our internal subscription to accept the snapshot.
16✔
3840
        restoreSubj := fmt.Sprintf(jsRestoreDeliverT, streamName, nuid.Next())
16✔
3841

16✔
3842
        type result struct {
16✔
3843
                err   error
16✔
3844
                reply string
16✔
3845
        }
16✔
3846

16✔
3847
        // For signaling to upper layers.
16✔
3848
        resultCh := make(chan result, 1)
16✔
3849
        activeQ := newIPQueue[int](s, fmt.Sprintf("[ACC:%s] stream '%s' restore", acc.Name, streamName)) // of int
16✔
3850

16✔
3851
        var total int
16✔
3852

16✔
3853
        // FIXME(dlc) - Probably take out of network path eventually due to disk I/O?
16✔
3854
        processChunk := func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
137✔
3855
                // We require reply subjects to communicate back failures, flow etc. If they do not have one log and cancel.
121✔
3856
                if reply == _EMPTY_ {
122✔
3857
                        sub.client.processUnsub(sub.sid)
1✔
3858
                        resultCh <- result{
1✔
3859
                                fmt.Errorf("restore for stream '%s > %s' requires reply subject for each chunk", acc.Name, streamName),
1✔
3860
                                reply,
1✔
3861
                        }
1✔
3862
                        return
1✔
3863
                }
1✔
3864
                // Account client messages have \r\n on end. This is an error.
3865
                if len(msg) < LEN_CR_LF {
120✔
3866
                        sub.client.processUnsub(sub.sid)
×
3867
                        resultCh <- result{
×
3868
                                fmt.Errorf("restore for stream '%s > %s' received short chunk", acc.Name, streamName),
×
3869
                                reply,
×
3870
                        }
×
3871
                        return
×
3872
                }
×
3873
                // Adjust.
3874
                msg = msg[:len(msg)-LEN_CR_LF]
120✔
3875

120✔
3876
                // This means we are complete with our transfer from the client.
120✔
3877
                if len(msg) == 0 {
135✔
3878
                        s.Debugf("Finished staging restore for stream '%s > %s'", acc.Name, streamName)
15✔
3879
                        resultCh <- result{err, reply}
15✔
3880
                        return
15✔
3881
                }
15✔
3882

3883
                // We track total and check on server limits.
3884
                // TODO(dlc) - We could check apriori and cancel initial request if we know it won't fit.
3885
                total += len(msg)
105✔
3886
                if js.wouldExceedLimits(FileStorage, total) {
105✔
3887
                        s.resourcesExceededError()
×
3888
                        resultCh <- result{NewJSInsufficientResourcesError(), reply}
×
3889
                        return
×
3890
                }
×
3891

3892
                // Append chunk to temp file. Mark as issue if we encounter an error.
3893
                if n, err := tfile.Write(msg); n != len(msg) || err != nil {
105✔
3894
                        resultCh <- result{err, reply}
×
3895
                        if reply != _EMPTY_ {
×
3896
                                s.sendInternalAccountMsg(acc, reply, "-ERR 'storage failure during restore'")
×
3897
                        }
×
3898
                        return
×
3899
                }
3900

3901
                activeQ.push(len(msg))
105✔
3902

105✔
3903
                s.sendInternalAccountMsg(acc, reply, nil)
105✔
3904
        }
3905

3906
        sub, err := acc.subscribeInternal(restoreSubj, processChunk)
16✔
3907
        if err != nil {
16✔
3908
                tfile.Close()
×
3909
                os.Remove(tfile.Name())
×
3910
                resp.Error = NewJSRestoreSubscribeFailedError(err, restoreSubj)
×
3911
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
3912
                return nil
×
3913
        }
×
3914

3915
        // Mark the subject so the end user knows where to send the snapshot chunks.
3916
        resp.DeliverSubject = restoreSubj
16✔
3917
        s.sendAPIResponse(ci, acc, subject, reply, msg, s.jsonResponse(resp))
16✔
3918

16✔
3919
        doneCh := make(chan error, 1)
16✔
3920

16✔
3921
        // Monitor the progress from another Go routine.
16✔
3922
        s.startGoRoutine(func() {
32✔
3923
                defer s.grWG.Done()
16✔
3924
                defer func() {
32✔
3925
                        tfile.Close()
16✔
3926
                        os.Remove(tfile.Name())
16✔
3927
                        sub.client.processUnsub(sub.sid)
16✔
3928
                        activeQ.unregister()
16✔
3929
                }()
16✔
3930

3931
                const activityInterval = 5 * time.Second
16✔
3932
                notActive := time.NewTimer(activityInterval)
16✔
3933
                defer notActive.Stop()
16✔
3934

16✔
3935
                total := 0
16✔
3936
                for {
137✔
3937
                        select {
121✔
3938
                        case result := <-resultCh:
16✔
3939
                                err := result.err
16✔
3940
                                var mset *stream
16✔
3941

16✔
3942
                                // If we staged properly go ahead and do restore now.
16✔
3943
                                if err == nil {
31✔
3944
                                        s.Debugf("Finalizing restore for stream '%s > %s'", acc.Name, streamName)
15✔
3945
                                        tfile.Seek(0, 0)
15✔
3946
                                        mset, err = acc.RestoreStream(cfg, tfile)
15✔
3947
                                } else {
16✔
3948
                                        errStr := err.Error()
1✔
3949
                                        tmp := []rune(errStr)
1✔
3950
                                        tmp[0] = unicode.ToUpper(tmp[0])
1✔
3951
                                        s.Warnf(errStr)
1✔
3952
                                }
1✔
3953

3954
                                end := time.Now().UTC()
16✔
3955

16✔
3956
                                // TODO(rip) - Should this have the error code in it??
16✔
3957
                                s.publishAdvisory(acc, JSAdvisoryStreamRestoreCompletePre+"."+streamName, &JSRestoreCompleteAdvisory{
16✔
3958
                                        TypedEvent: TypedEvent{
16✔
3959
                                                Type: JSRestoreCompleteAdvisoryType,
16✔
3960
                                                ID:   nuid.Next(),
16✔
3961
                                                Time: end,
16✔
3962
                                        },
16✔
3963
                                        Stream: streamName,
16✔
3964
                                        Start:  start,
16✔
3965
                                        End:    end,
16✔
3966
                                        Bytes:  int64(total),
16✔
3967
                                        Client: ci.forAdvisory(),
16✔
3968
                                        Domain: domain,
16✔
3969
                                })
16✔
3970

16✔
3971
                                var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
16✔
3972

16✔
3973
                                if err != nil {
20✔
3974
                                        resp.Error = NewJSStreamRestoreError(err, Unless(err))
4✔
3975
                                        s.Warnf("Restore failed for %s for stream '%s > %s' in %v",
4✔
3976
                                                friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start))
4✔
3977
                                } else {
16✔
3978
                                        msetCfg := mset.config()
12✔
3979
                                        resp.StreamInfo = &StreamInfo{
12✔
3980
                                                Created:   mset.createdTime(),
12✔
3981
                                                State:     mset.state(),
12✔
3982
                                                Config:    *setDynamicStreamMetadata(&msetCfg),
12✔
3983
                                                TimeStamp: time.Now().UTC(),
12✔
3984
                                        }
12✔
3985
                                        s.Noticef("Completed restore of %s for stream '%s > %s' in %v",
12✔
3986
                                                friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start).Round(time.Millisecond))
12✔
3987
                                }
12✔
3988

3989
                                // On the last EOF, send back the stream info or error status.
3990
                                s.sendInternalAccountMsg(acc, result.reply, s.jsonResponse(&resp))
16✔
3991
                                // Signal to the upper layers.
16✔
3992
                                doneCh <- err
16✔
3993
                                return
16✔
3994
                        case <-activeQ.ch:
105✔
3995
                                if n, ok := activeQ.popOne(); ok {
210✔
3996
                                        total += n
105✔
3997
                                        notActive.Reset(activityInterval)
105✔
3998
                                }
105✔
3999
                        case <-notActive.C:
×
4000
                                err := fmt.Errorf("restore for stream '%s > %s' is stalled", acc, streamName)
×
4001
                                doneCh <- err
×
4002
                                return
×
4003
                        }
4004
                }
4005
        })
4006

4007
        return doneCh
16✔
4008
}
4009

4010
// Process a snapshot request.
4011
func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
28✔
4012
        if c == nil || !s.JetStreamEnabled() {
28✔
4013
                return
×
4014
        }
×
4015
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
28✔
4016
        if err != nil {
28✔
4017
                s.Warnf(badAPIRequestT, msg)
×
4018
                return
×
4019
        }
×
4020

4021
        smsg := string(msg)
28✔
4022
        stream := streamNameFromSubject(subject)
28✔
4023

28✔
4024
        // If we are in clustered mode we need to be the stream leader to proceed.
28✔
4025
        if s.JetStreamIsClustered() && !acc.JetStreamIsStreamLeader(stream) {
43✔
4026
                return
15✔
4027
        }
15✔
4028

4029
        var resp = JSApiStreamSnapshotResponse{ApiResponse: ApiResponse{Type: JSApiStreamSnapshotResponseType}}
13✔
4030
        if !acc.JetStreamEnabled() {
13✔
4031
                resp.Error = NewJSNotEnabledForAccountError()
×
4032
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4033
                return
×
4034
        }
×
4035
        if isEmptyRequest(msg) {
14✔
4036
                resp.Error = NewJSBadRequestError()
1✔
4037
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4038
                return
1✔
4039
        }
1✔
4040

4041
        mset, err := acc.lookupStream(stream)
12✔
4042
        if err != nil {
13✔
4043
                resp.Error = NewJSStreamNotFoundError(Unless(err))
1✔
4044
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4045
                return
1✔
4046
        }
1✔
4047

4048
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
4049
                if doErr {
×
4050
                        resp.Error = NewJSNotEnabledForAccountError()
×
4051
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4052
                }
×
4053
                return
×
4054
        }
4055

4056
        var req JSApiStreamSnapshotRequest
11✔
4057
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
4058
                resp.Error = NewJSInvalidJSONError(err)
×
4059
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4060
                return
×
4061
        }
×
4062
        if !IsValidSubject(req.DeliverSubject) {
12✔
4063
                resp.Error = NewJSSnapshotDeliverSubjectInvalidError()
1✔
4064
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4065
                return
1✔
4066
        }
1✔
4067

4068
        // We will do the snapshot in a go routine as well since check msgs may
4069
        // stall this go routine.
4070
        go func() {
20✔
4071
                if req.CheckMsgs {
12✔
4072
                        s.Noticef("Starting health check and snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
2✔
4073
                } else {
10✔
4074
                        s.Noticef("Starting snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
8✔
4075
                }
8✔
4076

4077
                start := time.Now().UTC()
10✔
4078

10✔
4079
                sr, err := mset.snapshot(0, req.CheckMsgs, !req.NoConsumers)
10✔
4080
                if err != nil {
10✔
4081
                        s.Warnf("Snapshot of stream '%s > %s' failed: %v", mset.jsa.account.Name, mset.name(), err)
×
4082
                        resp.Error = NewJSStreamSnapshotError(err, Unless(err))
×
4083
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4084
                        return
×
4085
                }
×
4086

4087
                config := mset.config()
10✔
4088
                resp.State = &sr.State
10✔
4089
                resp.Config = &config
10✔
4090

10✔
4091
                s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(resp))
10✔
4092

10✔
4093
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCreatePre+"."+mset.name(), &JSSnapshotCreateAdvisory{
10✔
4094
                        TypedEvent: TypedEvent{
10✔
4095
                                Type: JSSnapshotCreatedAdvisoryType,
10✔
4096
                                ID:   nuid.Next(),
10✔
4097
                                Time: time.Now().UTC(),
10✔
4098
                        },
10✔
4099
                        Stream: mset.name(),
10✔
4100
                        State:  sr.State,
10✔
4101
                        Client: ci.forAdvisory(),
10✔
4102
                        Domain: s.getOpts().JetStreamDomain,
10✔
4103
                })
10✔
4104

10✔
4105
                // Now do the real streaming.
10✔
4106
                s.streamSnapshot(acc, mset, sr, &req)
10✔
4107

10✔
4108
                end := time.Now().UTC()
10✔
4109

10✔
4110
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCompletePre+"."+mset.name(), &JSSnapshotCompleteAdvisory{
10✔
4111
                        TypedEvent: TypedEvent{
10✔
4112
                                Type: JSSnapshotCompleteAdvisoryType,
10✔
4113
                                ID:   nuid.Next(),
10✔
4114
                                Time: end,
10✔
4115
                        },
10✔
4116
                        Stream: mset.name(),
10✔
4117
                        Start:  start,
10✔
4118
                        End:    end,
10✔
4119
                        Client: ci.forAdvisory(),
10✔
4120
                        Domain: s.getOpts().JetStreamDomain,
10✔
4121
                })
10✔
4122

10✔
4123
                s.Noticef("Completed snapshot of %s for stream '%s > %s' in %v",
10✔
4124
                        friendlyBytes(int64(sr.State.Bytes)),
10✔
4125
                        mset.jsa.account.Name,
10✔
4126
                        mset.name(),
10✔
4127
                        end.Sub(start))
10✔
4128
        }()
4129
}
4130

4131
// Default chunk size for now.
4132
const defaultSnapshotChunkSize = 128 * 1024
4133
const defaultSnapshotWindowSize = 8 * 1024 * 1024 // 8MB
4134

4135
// streamSnapshot will stream out our snapshot to the reply subject.
4136
func (s *Server) streamSnapshot(acc *Account, mset *stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) {
10✔
4137
        chunkSize := req.ChunkSize
10✔
4138
        if chunkSize == 0 {
12✔
4139
                chunkSize = defaultSnapshotChunkSize
2✔
4140
        }
2✔
4141
        // Setup for the chunk stream.
4142
        reply := req.DeliverSubject
10✔
4143
        r := sr.Reader
10✔
4144
        defer r.Close()
10✔
4145

10✔
4146
        // Check interest for the snapshot deliver subject.
10✔
4147
        inch := make(chan bool, 1)
10✔
4148
        acc.sl.RegisterNotification(req.DeliverSubject, inch)
10✔
4149
        defer acc.sl.ClearNotification(req.DeliverSubject, inch)
10✔
4150
        hasInterest := <-inch
10✔
4151
        if !hasInterest {
15✔
4152
                // Allow 2 seconds or so for interest to show up.
5✔
4153
                select {
5✔
4154
                case <-inch:
4✔
4155
                case <-time.After(2 * time.Second):
1✔
4156
                }
4157
        }
4158

4159
        // Create our ack flow handler.
4160
        // This is very simple for now.
4161
        ackSize := defaultSnapshotWindowSize / chunkSize
10✔
4162
        if ackSize < 8 {
10✔
4163
                ackSize = 8
×
4164
        } else if ackSize > 8*1024 {
16✔
4165
                ackSize = 8 * 1024
6✔
4166
        }
6✔
4167
        acks := make(chan struct{}, ackSize)
10✔
4168
        acks <- struct{}{}
10✔
4169

10✔
4170
        // Track bytes outstanding.
10✔
4171
        var out int32
10✔
4172

10✔
4173
        // We will place sequence number and size of chunk sent in the reply.
10✔
4174
        ackSubj := fmt.Sprintf(jsSnapshotAckT, mset.name(), nuid.Next())
10✔
4175
        ackSub, _ := mset.subscribeInternal(ackSubj+".>", func(_ *subscription, _ *client, _ *Account, subject, _ string, _ []byte) {
19✔
4176
                cs, _ := strconv.Atoi(tokenAt(subject, 6))
9✔
4177
                // This is very crude and simple, but ok for now.
9✔
4178
                // This only matters when sending multiple chunks.
9✔
4179
                if atomic.AddInt32(&out, int32(-cs)) < defaultSnapshotWindowSize {
18✔
4180
                        select {
9✔
4181
                        case acks <- struct{}{}:
9✔
4182
                        default:
×
4183
                        }
4184
                }
4185
        })
4186
        defer mset.unsubscribe(ackSub)
10✔
4187

10✔
4188
        // TODO(dlc) - Add in NATS-Chunked-Sequence header
10✔
4189
        var hdr []byte
10✔
4190
        for index := 1; ; index++ {
114✔
4191
                chunk := make([]byte, chunkSize)
104✔
4192
                n, err := r.Read(chunk)
104✔
4193
                chunk = chunk[:n]
104✔
4194
                if err != nil {
114✔
4195
                        if n > 0 {
10✔
4196
                                mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0))
×
4197
                        }
×
4198
                        break
10✔
4199
                }
4200

4201
                // Wait on acks for flow control if past our window size.
4202
                // Wait up to 10ms for now if no acks received.
4203
                if atomic.LoadInt32(&out) > defaultSnapshotWindowSize {
94✔
4204
                        select {
×
4205
                        case <-acks:
×
4206
                                // ok to proceed.
4207
                        case <-inch:
×
4208
                                // Lost interest
×
4209
                                hdr = []byte("NATS/1.0 408 No Interest\r\n\r\n")
×
4210
                                goto done
×
4211
                        case <-time.After(2 * time.Second):
×
4212
                                hdr = []byte("NATS/1.0 408 No Flow Response\r\n\r\n")
×
4213
                                goto done
×
4214
                        }
4215
                }
4216
                ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index)
94✔
4217
                if hdr == nil {
104✔
4218
                        hdr = []byte("NATS/1.0 204\r\n\r\n")
10✔
4219
                }
10✔
4220
                mset.outq.send(newJSPubMsg(reply, _EMPTY_, ackReply, nil, chunk, nil, 0))
94✔
4221
                atomic.AddInt32(&out, int32(len(chunk)))
94✔
4222
        }
4223

4224
        if err := <-sr.errCh; err != _EMPTY_ {
10✔
4225
                hdr = []byte(fmt.Sprintf("NATS/1.0 500 %s\r\n\r\n", err))
×
4226
        }
×
4227

4228
done:
4229
        // Send last EOF
4230
        // TODO(dlc) - place hash in header
4231
        mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
10✔
4232
}
4233

4234
// For determining consumer request type.
4235
type ccReqType uint8
4236

4237
const (
4238
        ccNew = iota
4239
        ccLegacyEphemeral
4240
        ccLegacyDurable
4241
)
4242

4243
// Request to create a consumer where stream and optional consumer name are part of the subject, and optional
4244
// filtered subjects can be at the tail end.
4245
// Assumes stream and consumer names are single tokens.
4246
func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
10,565✔
4247
        if c == nil || !s.JetStreamEnabled() {
10,565✔
4248
                return
×
4249
        }
×
4250

4251
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
10,565✔
4252
        if err != nil {
10,566✔
4253
                s.Warnf(badAPIRequestT, msg)
1✔
4254
                return
1✔
4255
        }
1✔
4256

4257
        var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
10,564✔
4258

10,564✔
4259
        var req CreateConsumerRequest
10,564✔
4260
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
10,565✔
4261
                resp.Error = NewJSInvalidJSONError(err)
1✔
4262
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4263
                return
1✔
4264
        }
1✔
4265

4266
        var js *jetStream
10,563✔
4267
        isClustered := s.JetStreamIsClustered()
10,563✔
4268

10,563✔
4269
        // Determine if we should proceed here when we are in clustered mode.
10,563✔
4270
        if isClustered {
20,193✔
4271
                if req.Config.Direct {
10,015✔
4272
                        // Check to see if we have this stream and are the stream leader.
385✔
4273
                        if !acc.JetStreamIsStreamLeader(streamNameFromSubject(subject)) {
684✔
4274
                                return
299✔
4275
                        }
299✔
4276
                } else {
9,245✔
4277
                        var cc *jetStreamCluster
9,245✔
4278
                        js, cc = s.getJetStreamCluster()
9,245✔
4279
                        if js == nil || cc == nil {
9,245✔
4280
                                return
×
4281
                        }
×
4282
                        if js.isLeaderless() {
9,245✔
4283
                                resp.Error = NewJSClusterNotAvailError()
×
4284
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4285
                                return
×
4286
                        }
×
4287
                        // Make sure we are meta leader.
4288
                        if !s.JetStreamIsLeader() {
15,499✔
4289
                                return
6,254✔
4290
                        }
6,254✔
4291
                }
4292
        }
4293

4294
        var streamName, consumerName, filteredSubject string
4,010✔
4295
        var rt ccReqType
4,010✔
4296

4,010✔
4297
        if n := numTokens(subject); n < 5 {
4,010✔
4298
                s.Warnf(badAPIRequestT, msg)
×
4299
                return
×
4300
        } else if n == 5 {
4,761✔
4301
                // Legacy ephemeral.
751✔
4302
                rt = ccLegacyEphemeral
751✔
4303
                streamName = streamNameFromSubject(subject)
751✔
4304
        } else {
4,010✔
4305
                // New style and durable legacy.
3,259✔
4306
                if tokenAt(subject, 4) == "DURABLE" {
3,515✔
4307
                        rt = ccLegacyDurable
256✔
4308
                        if n != 7 {
256✔
4309
                                resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4310
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4311
                                return
×
4312
                        }
×
4313
                        streamName = tokenAt(subject, 6)
256✔
4314
                        consumerName = tokenAt(subject, 7)
256✔
4315
                } else {
3,003✔
4316
                        streamName = streamNameFromSubject(subject)
3,003✔
4317
                        consumerName = consumerNameFromSubject(subject)
3,003✔
4318
                        // New has optional filtered subject as part of main subject..
3,003✔
4319
                        if n > 6 {
5,632✔
4320
                                tokens := strings.Split(subject, tsep)
2,629✔
4321
                                filteredSubject = strings.Join(tokens[6:], tsep)
2,629✔
4322
                        }
2,629✔
4323
                }
4324
        }
4325

4326
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
4,013✔
4327
                if doErr {
4✔
4328
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
4329
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4330
                }
1✔
4331
                return
3✔
4332
        }
4333

4334
        if streamName != req.Stream {
4,008✔
4335
                resp.Error = NewJSStreamMismatchError()
1✔
4336
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4337
                return
1✔
4338
        }
1✔
4339

4340
        if consumerName != _EMPTY_ {
7,263✔
4341
                // Check for path like separators in the name.
3,257✔
4342
                if strings.ContainsAny(consumerName, `\/`) {
3,261✔
4343
                        resp.Error = NewJSConsumerNameContainsPathSeparatorsError()
4✔
4344
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4345
                        return
4✔
4346
                }
4✔
4347
        }
4348

4349
        // Should we expect a durable name
4350
        if rt == ccLegacyDurable {
4,257✔
4351
                if numTokens(subject) < 7 {
255✔
4352
                        resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4353
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4354
                        return
×
4355
                }
×
4356
                // Now check on requirements for durable request.
4357
                if req.Config.Durable == _EMPTY_ {
256✔
4358
                        resp.Error = NewJSConsumerDurableNameNotSetError()
1✔
4359
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4360
                        return
1✔
4361
                }
1✔
4362
                if consumerName != req.Config.Durable {
254✔
4363
                        resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4364
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4365
                        return
×
4366
                }
×
4367
        }
4368
        // If new style and durable set make sure they match.
4369
        if rt == ccNew {
6,999✔
4370
                if req.Config.Durable != _EMPTY_ {
5,513✔
4371
                        if consumerName != req.Config.Durable {
2,515✔
4372
                                resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4373
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4374
                                return
×
4375
                        }
×
4376
                }
4377
                // New style ephemeral so we need to honor the name.
4378
                req.Config.Name = consumerName
2,998✔
4379
        }
4380
        // Check for legacy ephemeral mis-configuration.
4381
        if rt == ccLegacyEphemeral && req.Config.Durable != _EMPTY_ {
4,004✔
4382
                resp.Error = NewJSConsumerEphemeralWithDurableNameError()
3✔
4383
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4384
                return
3✔
4385
        }
3✔
4386

4387
        // in case of multiple filters provided, error if new API is used.
4388
        if filteredSubject != _EMPTY_ && len(req.Config.FilterSubjects) != 0 {
3,999✔
4389
                resp.Error = NewJSConsumerMultipleFiltersNotAllowedError()
1✔
4390
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4391
                return
1✔
4392
        }
1✔
4393

4394
        // Check for a filter subject.
4395
        if filteredSubject != _EMPTY_ && req.Config.FilterSubject != filteredSubject {
3,999✔
4396
                resp.Error = NewJSConsumerCreateFilterSubjectMismatchError()
2✔
4397
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
4398
                return
2✔
4399
        }
2✔
4400

4401
        if isClustered && !req.Config.Direct {
6,984✔
4402
                // If we are inline with client, we still may need to do a callout for consumer info
2,989✔
4403
                // during this call, so place in Go routine to not block client.
2,989✔
4404
                // Router and Gateway API calls already in separate context.
2,989✔
4405
                if c.kind != ROUTER && c.kind != GATEWAY {
5,978✔
4406
                        go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
2,989✔
4407
                } else {
2,989✔
4408
                        s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
×
4409
                }
×
4410
                return
2,989✔
4411
        }
4412

4413
        // If we are here we are single server mode.
4414
        if req.Config.Replicas > 1 {
1,006✔
4415
                resp.Error = NewJSStreamReplicasNotSupportedError()
×
4416
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4417
                return
×
4418
        }
×
4419

4420
        stream, err := acc.lookupStream(req.Stream)
1,006✔
4421
        if err != nil {
1,010✔
4422
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
4423
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4424
                return
4✔
4425
        }
4✔
4426

4427
        if o := stream.lookupConsumer(consumerName); o != nil {
1,044✔
4428
                // If the consumer already exists then don't allow updating the PauseUntil, just set
42✔
4429
                // it back to whatever the current configured value is.
42✔
4430
                req.Config.PauseUntil = o.cfg.PauseUntil
42✔
4431
        }
42✔
4432

4433
        // Initialize/update asset version metadata.
4434
        setStaticConsumerMetadata(&req.Config)
1,002✔
4435

1,002✔
4436
        o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic)
1,002✔
4437

1,002✔
4438
        if err != nil {
1,052✔
4439
                if IsNatsErr(err, JSConsumerStoreFailedErrF) {
50✔
4440
                        cname := req.Config.Durable // Will be empty if ephemeral.
×
4441
                        s.Warnf("Consumer create failed for '%s > %s > %s': %v", acc, req.Stream, cname, err)
×
4442
                        err = errConsumerStoreFailed
×
4443
                }
×
4444
                resp.Error = NewJSConsumerCreateError(err, Unless(err))
50✔
4445
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
50✔
4446
                return
50✔
4447
        }
4448
        resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.initialInfo())
952✔
4449
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
952✔
4450

952✔
4451
        if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
956✔
4452
                o.sendPauseAdvisoryLocked(&o.cfg)
4✔
4453
        }
4✔
4454
}
4455

4456
// Request for the list of all consumer names.
4457
func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
26✔
4458
        if c == nil || !s.JetStreamEnabled() {
26✔
4459
                return
×
4460
        }
×
4461
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
26✔
4462
        if err != nil {
26✔
4463
                s.Warnf(badAPIRequestT, msg)
×
4464
                return
×
4465
        }
×
4466

4467
        var resp = JSApiConsumerNamesResponse{
26✔
4468
                ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType},
26✔
4469
                Consumers:   []string{},
26✔
4470
        }
26✔
4471

26✔
4472
        // Determine if we should proceed here when we are in clustered mode.
26✔
4473
        if s.JetStreamIsClustered() {
49✔
4474
                js, cc := s.getJetStreamCluster()
23✔
4475
                if js == nil || cc == nil {
23✔
4476
                        return
×
4477
                }
×
4478
                if js.isLeaderless() {
23✔
4479
                        resp.Error = NewJSClusterNotAvailError()
×
4480
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4481
                        return
×
4482
                }
×
4483
                // Make sure we are meta leader.
4484
                if !s.JetStreamIsLeader() {
39✔
4485
                        return
16✔
4486
                }
16✔
4487
        }
4488

4489
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
10✔
4490
                if doErr {
×
4491
                        resp.Error = NewJSNotEnabledForAccountError()
×
4492
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4493
                }
×
4494
                return
×
4495
        }
4496

4497
        var offset int
10✔
4498
        if isJSONObjectOrArray(msg) {
19✔
4499
                var req JSApiConsumersRequest
9✔
4500
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
9✔
4501
                        resp.Error = NewJSInvalidJSONError(err)
×
4502
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4503
                        return
×
4504
                }
×
4505
                offset = req.Offset
9✔
4506
        }
4507

4508
        streamName := streamNameFromSubject(subject)
10✔
4509
        var numConsumers int
10✔
4510

10✔
4511
        if s.JetStreamIsClustered() {
17✔
4512
                js, cc := s.getJetStreamCluster()
7✔
4513
                if js == nil || cc == nil {
7✔
4514
                        // TODO(dlc) - Debug or Warn?
×
4515
                        return
×
4516
                }
×
4517
                js.mu.RLock()
7✔
4518
                sas := cc.streams[acc.Name]
7✔
4519
                if sas == nil {
7✔
4520
                        js.mu.RUnlock()
×
4521
                        resp.Error = NewJSStreamNotFoundError()
×
4522
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4523
                        return
×
4524
                }
×
4525
                sa := sas[streamName]
7✔
4526
                if sa == nil || sa.err != nil {
7✔
4527
                        js.mu.RUnlock()
×
4528
                        resp.Error = NewJSStreamNotFoundError()
×
4529
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4530
                        return
×
4531
                }
×
4532
                for consumer := range sa.consumers {
14✔
4533
                        resp.Consumers = append(resp.Consumers, consumer)
7✔
4534
                }
7✔
4535
                if len(resp.Consumers) > 1 {
8✔
4536
                        slices.Sort(resp.Consumers)
1✔
4537
                }
1✔
4538
                numConsumers = len(resp.Consumers)
7✔
4539
                if offset > numConsumers {
7✔
4540
                        offset = numConsumers
×
4541
                }
×
4542
                resp.Consumers = resp.Consumers[offset:]
7✔
4543
                if len(resp.Consumers) > JSApiNamesLimit {
7✔
4544
                        resp.Consumers = resp.Consumers[:JSApiNamesLimit]
×
4545
                }
×
4546
                js.mu.RUnlock()
7✔
4547

4548
        } else {
3✔
4549
                mset, err := acc.lookupStream(streamName)
3✔
4550
                if err != nil {
3✔
4551
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4552
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4553
                        return
×
4554
                }
×
4555

4556
                obs := mset.getPublicConsumers()
3✔
4557
                slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
3✔
4558

4559
                numConsumers = len(obs)
3✔
4560
                if offset > numConsumers {
3✔
4561
                        offset = numConsumers
×
4562
                }
×
4563

4564
                for _, o := range obs[offset:] {
5✔
4565
                        resp.Consumers = append(resp.Consumers, o.String())
2✔
4566
                        if len(resp.Consumers) >= JSApiNamesLimit {
2✔
4567
                                break
×
4568
                        }
4569
                }
4570
        }
4571
        resp.Total = numConsumers
10✔
4572
        resp.Limit = JSApiNamesLimit
10✔
4573
        resp.Offset = offset
10✔
4574
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
10✔
4575
}
4576

4577
// Request for the list of all detailed consumer information.
4578
func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
154✔
4579
        if c == nil || !s.JetStreamEnabled() {
188✔
4580
                return
34✔
4581
        }
34✔
4582

4583
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
120✔
4584
        if err != nil {
120✔
4585
                s.Warnf(badAPIRequestT, msg)
×
4586
                return
×
4587
        }
×
4588

4589
        var resp = JSApiConsumerListResponse{
120✔
4590
                ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
120✔
4591
                Consumers:   []*ConsumerInfo{},
120✔
4592
        }
120✔
4593

120✔
4594
        // Determine if we should proceed here when we are in clustered mode.
120✔
4595
        if s.JetStreamIsClustered() {
236✔
4596
                js, cc := s.getJetStreamCluster()
116✔
4597
                if js == nil || cc == nil {
116✔
4598
                        return
×
4599
                }
×
4600
                if js.isLeaderless() {
117✔
4601
                        resp.Error = NewJSClusterNotAvailError()
1✔
4602
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4603
                        return
1✔
4604
                }
1✔
4605
                // Make sure we are meta leader.
4606
                if !s.JetStreamIsLeader() {
218✔
4607
                        return
103✔
4608
                }
103✔
4609
        }
4610

4611
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
16✔
4612
                if doErr {
×
4613
                        resp.Error = NewJSNotEnabledForAccountError()
×
4614
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4615
                }
×
4616
                return
×
4617
        }
4618

4619
        var offset int
16✔
4620
        if isJSONObjectOrArray(msg) {
26✔
4621
                var req JSApiConsumersRequest
10✔
4622
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
4623
                        resp.Error = NewJSInvalidJSONError(err)
1✔
4624
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4625
                        return
1✔
4626
                }
1✔
4627
                offset = req.Offset
9✔
4628
        }
4629

4630
        streamName := streamNameFromSubject(subject)
15✔
4631

15✔
4632
        // Clustered mode will invoke a scatter and gather.
15✔
4633
        if s.JetStreamIsClustered() {
27✔
4634
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
12✔
4635
                msg = copyBytes(msg)
12✔
4636
                s.startGoRoutine(func() {
24✔
4637
                        s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg)
12✔
4638
                })
12✔
4639
                return
12✔
4640
        }
4641

4642
        mset, err := acc.lookupStream(streamName)
3✔
4643
        if err != nil {
3✔
4644
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4645
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4646
                return
×
4647
        }
×
4648

4649
        obs := mset.getPublicConsumers()
3✔
4650
        slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
3✔
4651

4652
        ocnt := len(obs)
3✔
4653
        if offset > ocnt {
3✔
4654
                offset = ocnt
×
4655
        }
×
4656

4657
        for _, o := range obs[offset:] {
5✔
4658
                if cinfo := o.info(); cinfo != nil {
4✔
4659
                        resp.Consumers = append(resp.Consumers, cinfo)
2✔
4660
                }
2✔
4661
                if len(resp.Consumers) >= JSApiListLimit {
2✔
4662
                        break
×
4663
                }
4664
        }
4665
        resp.Total = ocnt
3✔
4666
        resp.Limit = JSApiListLimit
3✔
4667
        resp.Offset = offset
3✔
4668
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
3✔
4669
}
4670

4671
// Request for information about an consumer.
4672
func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
42,957✔
4673
        if c == nil || !s.JetStreamEnabled() {
42,957✔
4674
                return
×
4675
        }
×
4676
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
42,957✔
4677
        if err != nil {
42,957✔
4678
                s.Warnf(badAPIRequestT, msg)
×
4679
                return
×
4680
        }
×
4681

4682
        streamName := streamNameFromSubject(subject)
42,957✔
4683
        consumerName := consumerNameFromSubject(subject)
42,957✔
4684

42,957✔
4685
        var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}
42,957✔
4686

42,957✔
4687
        if !isEmptyRequest(msg) {
42,958✔
4688
                resp.Error = NewJSNotEmptyRequestError()
1✔
4689
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4690
                return
1✔
4691
        }
1✔
4692

4693
        // If we are in clustered mode we need to be the consumer leader to proceed.
4694
        if s.JetStreamIsClustered() {
84,516✔
4695
                // Check to make sure the consumer is assigned.
41,560✔
4696
                js, cc := s.getJetStreamCluster()
41,560✔
4697
                if js == nil || cc == nil {
41,560✔
4698
                        return
×
4699
                }
×
4700

4701
                js.mu.RLock()
41,560✔
4702
                meta := cc.meta
41,560✔
4703
                js.mu.RUnlock()
41,560✔
4704

41,560✔
4705
                // Since these could wait on the Raft group lock, don't do so under the JS lock.
41,560✔
4706
                ourID := meta.ID()
41,560✔
4707
                groupLeaderless := meta.Leaderless()
41,560✔
4708
                groupCreated := meta.Created()
41,560✔
4709

41,560✔
4710
                js.mu.RLock()
41,560✔
4711
                isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName)
41,560✔
4712
                var rg *raftGroup
41,560✔
4713
                var offline, isMember bool
41,560✔
4714
                if ca != nil {
45,668✔
4715
                        if rg = ca.Group; rg != nil {
8,216✔
4716
                                offline = s.allPeersOffline(rg)
4,108✔
4717
                                isMember = rg.isMember(ourID)
4,108✔
4718
                        }
4,108✔
4719
                }
4720
                // Capture consumer leader here.
4721
                isConsumerLeader := cc.isConsumerLeader(acc.Name, streamName, consumerName)
41,560✔
4722
                // Also capture if we think there is no meta leader.
41,560✔
4723
                var isLeaderLess bool
41,560✔
4724
                if !isLeader {
69,470✔
4725
                        isLeaderLess = groupLeaderless && time.Since(groupCreated) > lostQuorumIntervalDefault
27,910✔
4726
                }
27,910✔
4727
                js.mu.RUnlock()
41,560✔
4728

41,560✔
4729
                if isLeader && ca == nil {
53,939✔
4730
                        // We can't find the consumer, so mimic what would be the errors below.
12,379✔
4731
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
12,379✔
4732
                                if doErr {
×
4733
                                        resp.Error = NewJSNotEnabledForAccountError()
×
4734
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4735
                                }
×
4736
                                return
×
4737
                        }
4738
                        if sa == nil {
22,382✔
4739
                                resp.Error = NewJSStreamNotFoundError()
10,003✔
4740
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
10,003✔
4741
                                return
10,003✔
4742
                        }
10,003✔
4743
                        // If we are here the consumer is not present.
4744
                        resp.Error = NewJSConsumerNotFoundError()
2,376✔
4745
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2,376✔
4746
                        return
2,376✔
4747
                } else if ca == nil {
54,254✔
4748
                        if isLeaderLess {
25,075✔
4749
                                resp.Error = NewJSClusterNotAvailError()
2✔
4750
                                // Delaying an error response gives the leader a chance to respond before us
2✔
4751
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
4752
                        }
2✔
4753
                        return
25,073✔
4754
                } else if isLeader && offline {
4,109✔
4755
                        resp.Error = NewJSConsumerOfflineError()
1✔
4756
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
4757
                        return
1✔
4758
                }
1✔
4759

4760
                // Check to see if we are a member of the group and if the group has no leader.
4761
                if isMember && js.isGroupLeaderless(ca.Group) {
4,108✔
4762
                        resp.Error = NewJSClusterNotAvailError()
1✔
4763
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4764
                        return
1✔
4765
                }
1✔
4766

4767
                // We have the consumer assigned and a leader, so only the consumer leader should answer.
4768
                if !isConsumerLeader {
6,980✔
4769
                        if isLeaderLess {
2,874✔
4770
                                resp.Error = NewJSClusterNotAvailError()
×
4771
                                // Delaying an error response gives the leader a chance to respond before us
×
4772
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group, errRespDelay)
×
4773
                                return
×
4774
                        }
×
4775

4776
                        var node RaftNode
2,874✔
4777
                        var leaderNotPartOfGroup bool
2,874✔
4778

2,874✔
4779
                        // We have a consumer assignment.
2,874✔
4780
                        if isMember {
5,045✔
4781
                                js.mu.RLock()
2,171✔
4782
                                if rg != nil && rg.node != nil {
4,316✔
4783
                                        node = rg.node
2,145✔
4784
                                        if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
2,145✔
4785
                                                leaderNotPartOfGroup = true
×
4786
                                        }
×
4787
                                }
4788
                                js.mu.RUnlock()
2,171✔
4789
                        }
4790

4791
                        // Check if we should ignore all together.
4792
                        if node == nil {
3,603✔
4793
                                // We have been assigned but have not created a node yet. If we are a member return
729✔
4794
                                // our config and defaults for state and no cluster info.
729✔
4795
                                if isMember {
755✔
4796
                                        // Since we access consumerAssignment, need js lock.
26✔
4797
                                        js.mu.RLock()
26✔
4798
                                        resp.ConsumerInfo = &ConsumerInfo{
26✔
4799
                                                Stream:    ca.Stream,
26✔
4800
                                                Name:      ca.Name,
26✔
4801
                                                Created:   ca.Created,
26✔
4802
                                                Config:    setDynamicConsumerMetadata(ca.Config),
26✔
4803
                                                TimeStamp: time.Now().UTC(),
26✔
4804
                                        }
26✔
4805
                                        b := s.jsonResponse(resp)
26✔
4806
                                        js.mu.RUnlock()
26✔
4807
                                        s.sendAPIResponse(ci, acc, subject, reply, string(msg), b)
26✔
4808
                                }
26✔
4809
                                return
729✔
4810
                        }
4811
                        // If we are a member and we have a group leader or we had a previous leader consider bailing out.
4812
                        if !node.Leaderless() || node.HadPreviousLeader() || (rg != nil && rg.Preferred != _EMPTY_ && rg.Preferred != ourID) {
4,278✔
4813
                                if leaderNotPartOfGroup {
2,133✔
4814
                                        resp.Error = NewJSConsumerOfflineError()
×
4815
                                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4816
                                }
×
4817
                                return
2,133✔
4818
                        }
4819
                        // If we are here we are a member and this is just a new consumer that does not have a (preferred) leader yet.
4820
                        // Will fall through and return what we have. All consumers can respond but this should be very rare
4821
                        // but makes more sense to clients when they try to create, get a consumer exists, and then do consumer info.
4822
                }
4823
        }
4824

4825
        if !acc.JetStreamEnabled() {
2,640✔
4826
                resp.Error = NewJSNotEnabledForAccountError()
×
4827
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4828
                return
×
4829
        }
×
4830

4831
        mset, err := acc.lookupStream(streamName)
2,640✔
4832
        if err != nil {
2,640✔
4833
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4834
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4835
                return
×
4836
        }
×
4837

4838
        obs := mset.lookupConsumer(consumerName)
2,640✔
4839
        if obs == nil {
2,806✔
4840
                resp.Error = NewJSConsumerNotFoundError()
166✔
4841
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
166✔
4842
                return
166✔
4843
        }
166✔
4844

4845
        if resp.ConsumerInfo = setDynamicConsumerInfoMetadata(obs.info()); resp.ConsumerInfo == nil {
2,474✔
4846
                // This consumer returned nil which means it's closed. Respond with not found.
×
4847
                resp.Error = NewJSConsumerNotFoundError()
×
4848
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4849
                return
×
4850
        }
×
4851
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
2,474✔
4852
}
4853

4854
// Request to delete an Consumer.
4855
func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
7,366✔
4856
        if c == nil || !s.JetStreamEnabled() {
7,375✔
4857
                return
9✔
4858
        }
9✔
4859
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
7,357✔
4860
        if err != nil {
7,357✔
4861
                s.Warnf(badAPIRequestT, msg)
×
4862
                return
×
4863
        }
×
4864

4865
        var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
7,357✔
4866

7,357✔
4867
        // Determine if we should proceed here when we are in clustered mode.
7,357✔
4868
        if s.JetStreamIsClustered() {
14,292✔
4869
                js, cc := s.getJetStreamCluster()
6,935✔
4870
                if js == nil || cc == nil {
6,937✔
4871
                        return
2✔
4872
                }
2✔
4873
                if js.isLeaderless() {
6,934✔
4874
                        resp.Error = NewJSClusterNotAvailError()
1✔
4875
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4876
                        return
1✔
4877
                }
1✔
4878
                // Make sure we are meta leader.
4879
                if !s.JetStreamIsLeader() {
11,532✔
4880
                        return
4,600✔
4881
                }
4,600✔
4882
        }
4883

4884
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
2,823✔
4885
                if doErr {
135✔
4886
                        resp.Error = NewJSNotEnabledForAccountError()
66✔
4887
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
66✔
4888
                }
66✔
4889
                return
69✔
4890
        }
4891
        if !isEmptyRequest(msg) {
2,686✔
4892
                resp.Error = NewJSNotEmptyRequestError()
1✔
4893
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4894
                return
1✔
4895
        }
1✔
4896
        stream := streamNameFromSubject(subject)
2,684✔
4897
        consumer := consumerNameFromSubject(subject)
2,684✔
4898

2,684✔
4899
        if s.JetStreamIsClustered() {
5,016✔
4900
                s.jsClusteredConsumerDeleteRequest(ci, acc, stream, consumer, subject, reply, rmsg)
2,332✔
4901
                return
2,332✔
4902
        }
2,332✔
4903

4904
        mset, err := acc.lookupStream(stream)
352✔
4905
        if err != nil {
352✔
4906
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4907
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4908
                return
×
4909
        }
×
4910

4911
        obs := mset.lookupConsumer(consumer)
352✔
4912
        if obs == nil {
493✔
4913
                resp.Error = NewJSConsumerNotFoundError()
141✔
4914
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
141✔
4915
                return
141✔
4916
        }
141✔
4917
        if err := obs.delete(); err != nil {
211✔
4918
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
4919
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4920
                return
×
4921
        }
×
4922
        resp.Success = true
211✔
4923
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
211✔
4924
}
4925

4926
// Request to pause or unpause a Consumer.
4927
func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
63✔
4928
        if c == nil || !s.JetStreamEnabled() {
63✔
4929
                return
×
4930
        }
×
4931
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
63✔
4932
        if err != nil {
63✔
4933
                s.Warnf(badAPIRequestT, msg)
×
4934
                return
×
4935
        }
×
4936

4937
        var req JSApiConsumerPauseRequest
63✔
4938
        var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}}
63✔
4939

63✔
4940
        if isJSONObjectOrArray(msg) {
118✔
4941
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
55✔
4942
                        resp.Error = NewJSInvalidJSONError(err)
×
4943
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4944
                        return
×
4945
                }
×
4946
        }
4947

4948
        // Determine if we should proceed here when we are in clustered mode.
4949
        isClustered := s.JetStreamIsClustered()
63✔
4950
        js, cc := s.getJetStreamCluster()
63✔
4951
        if isClustered {
117✔
4952
                if js == nil || cc == nil {
54✔
4953
                        return
×
4954
                }
×
4955
                if js.isLeaderless() {
54✔
4956
                        resp.Error = NewJSClusterNotAvailError()
×
4957
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4958
                        return
×
4959
                }
×
4960
                // Make sure we are meta leader.
4961
                if !s.JetStreamIsLeader() {
94✔
4962
                        return
40✔
4963
                }
40✔
4964
        }
4965

4966
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
23✔
4967
                if doErr {
×
4968
                        resp.Error = NewJSNotEnabledForAccountError()
×
4969
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4970
                }
×
4971
                return
×
4972
        }
4973

4974
        stream := streamNameFromSubject(subject)
23✔
4975
        consumer := consumerNameFromSubject(subject)
23✔
4976

23✔
4977
        if isClustered {
37✔
4978
                js.mu.RLock()
14✔
4979
                sa := js.streamAssignment(acc.Name, stream)
14✔
4980
                if sa == nil {
14✔
4981
                        js.mu.RUnlock()
×
4982
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4983
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4984
                        return
×
4985
                }
×
4986

4987
                ca, ok := sa.consumers[consumer]
14✔
4988
                if !ok || ca == nil {
14✔
4989
                        js.mu.RUnlock()
×
4990
                        resp.Error = NewJSConsumerNotFoundError()
×
4991
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4992
                        return
×
4993
                }
×
4994

4995
                nca := *ca
14✔
4996
                ncfg := *ca.Config
14✔
4997
                nca.Config = &ncfg
14✔
4998
                js.mu.RUnlock()
14✔
4999
                pauseUTC := req.PauseUntil.UTC()
14✔
5000
                if !pauseUTC.IsZero() {
24✔
5001
                        nca.Config.PauseUntil = &pauseUTC
10✔
5002
                } else {
14✔
5003
                        nca.Config.PauseUntil = nil
4✔
5004
                }
4✔
5005

5006
                // Update asset version metadata due to updating pause/resume.
5007
                // Only PauseUntil is updated above, so reuse config for both.
5008
                setStaticConsumerMetadata(nca.Config)
14✔
5009

14✔
5010
                eca := encodeAddConsumerAssignment(&nca)
14✔
5011
                cc.meta.Propose(eca)
14✔
5012

14✔
5013
                resp.PauseUntil = pauseUTC
14✔
5014
                if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
24✔
5015
                        resp.PauseRemaining = time.Until(pauseUTC)
10✔
5016
                }
10✔
5017
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
14✔
5018
                return
14✔
5019
        }
5020

5021
        mset, err := acc.lookupStream(stream)
9✔
5022
        if err != nil {
9✔
5023
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5024
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5025
                return
×
5026
        }
×
5027

5028
        obs := mset.lookupConsumer(consumer)
9✔
5029
        if obs == nil {
9✔
5030
                resp.Error = NewJSConsumerNotFoundError()
×
5031
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5032
                return
×
5033
        }
×
5034

5035
        ncfg := obs.cfg
9✔
5036
        pauseUTC := req.PauseUntil.UTC()
9✔
5037
        if !pauseUTC.IsZero() {
14✔
5038
                ncfg.PauseUntil = &pauseUTC
5✔
5039
        } else {
9✔
5040
                ncfg.PauseUntil = nil
4✔
5041
        }
4✔
5042

5043
        // Update asset version metadata due to updating pause/resume.
5044
        setStaticConsumerMetadata(&ncfg)
9✔
5045

9✔
5046
        if err := obs.updateConfig(&ncfg); err != nil {
9✔
5047
                // The only type of error that should be returned here is from o.store,
×
5048
                // so use a store failed error type.
×
5049
                resp.Error = NewJSConsumerStoreFailedError(err)
×
5050
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5051
                return
×
5052
        }
×
5053

5054
        resp.PauseUntil = pauseUTC
9✔
5055
        if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
14✔
5056
                resp.PauseRemaining = time.Until(pauseUTC)
5✔
5057
        }
5✔
5058
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
9✔
5059
}
5060

5061
// sendJetStreamAPIAuditAdvisory will send the audit event for a given event.
5062
func (s *Server) sendJetStreamAPIAuditAdvisory(ci *ClientInfo, acc *Account, subject, request, response string) {
46,180✔
5063
        s.publishAdvisory(acc, JSAuditAdvisory, JSAPIAudit{
46,180✔
5064
                TypedEvent: TypedEvent{
46,180✔
5065
                        Type: JSAPIAuditType,
46,180✔
5066
                        ID:   nuid.Next(),
46,180✔
5067
                        Time: time.Now().UTC(),
46,180✔
5068
                },
46,180✔
5069
                Server:   s.Name(),
46,180✔
5070
                Client:   ci.forAdvisory(),
46,180✔
5071
                Subject:  subject,
46,180✔
5072
                Request:  request,
46,180✔
5073
                Response: response,
46,180✔
5074
                Domain:   s.getOpts().JetStreamDomain,
46,180✔
5075
        })
46,180✔
5076
}
46,180✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc