• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 18582693047

15 Oct 2025 08:11PM UTC coverage: 86.048% (+0.001%) from 86.047%
18582693047

push

github

web-flow
(2.12) [FIXED] Atomic batch: check correct header for unsupported error (#7436)

74683 of 86792 relevant lines covered (86.05%)

357330.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.15
/server/jetstream_api.go
1
// Copyright 2020-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "cmp"
19
        "encoding/json"
20
        "errors"
21
        "fmt"
22
        "io"
23
        "os"
24
        "path/filepath"
25
        "runtime"
26
        "slices"
27
        "strconv"
28
        "strings"
29
        "sync/atomic"
30
        "time"
31
        "unicode"
32

33
        "github.com/nats-io/nuid"
34
)
35

36
// Request API subjects for JetStream.
37
const (
38
        // All API endpoints.
39
        jsAllAPI = "$JS.API.>"
40

41
        // For constructing JetStream domain prefixes.
42
        jsDomainAPI = "$JS.%s.API.>"
43

44
        JSApiPrefix = "$JS.API"
45

46
        // JSApiAccountInfo is for obtaining general information about JetStream for this account.
47
        // Will return JSON response.
48
        JSApiAccountInfo = "$JS.API.INFO"
49

50
        // JSApiTemplateCreate is the endpoint to create new stream templates.
51
        // Will return JSON response.
52
        // Deprecated: stream templates are deprecated and will be removed in a future version.
53
        JSApiTemplateCreate  = "$JS.API.STREAM.TEMPLATE.CREATE.*"
54
        JSApiTemplateCreateT = "$JS.API.STREAM.TEMPLATE.CREATE.%s"
55

56
        // JSApiTemplates is the endpoint to list all stream template names for this account.
57
        // Will return JSON response.
58
        // Deprecated: stream templates are deprecated and will be removed in a future version.
59
        JSApiTemplates = "$JS.API.STREAM.TEMPLATE.NAMES"
60

61
        // JSApiTemplateInfo is for obtaining general information about a named stream template.
62
        // Will return JSON response.
63
        // Deprecated: stream templates are deprecated and will be removed in a future version.
64
        JSApiTemplateInfo  = "$JS.API.STREAM.TEMPLATE.INFO.*"
65
        JSApiTemplateInfoT = "$JS.API.STREAM.TEMPLATE.INFO.%s"
66

67
        // JSApiTemplateDelete is the endpoint to delete stream templates.
68
        // Will return JSON response.
69
        // Deprecated: stream templates are deprecated and will be removed in a future version.
70
        JSApiTemplateDelete  = "$JS.API.STREAM.TEMPLATE.DELETE.*"
71
        JSApiTemplateDeleteT = "$JS.API.STREAM.TEMPLATE.DELETE.%s"
72

73
        // JSApiStreamCreate is the endpoint to create new streams.
74
        // Will return JSON response.
75
        JSApiStreamCreate  = "$JS.API.STREAM.CREATE.*"
76
        JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"
77

78
        // JSApiStreamUpdate is the endpoint to update existing streams.
79
        // Will return JSON response.
80
        JSApiStreamUpdate  = "$JS.API.STREAM.UPDATE.*"
81
        JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"
82

83
        // JSApiStreams is the endpoint to list all stream names for this account.
84
        // Will return JSON response.
85
        JSApiStreams = "$JS.API.STREAM.NAMES"
86
        // JSApiStreamList is the endpoint that will return all detailed stream information
87
        JSApiStreamList = "$JS.API.STREAM.LIST"
88

89
        // JSApiStreamInfo is for obtaining general information about a named stream.
90
        // Will return JSON response.
91
        JSApiStreamInfo  = "$JS.API.STREAM.INFO.*"
92
        JSApiStreamInfoT = "$JS.API.STREAM.INFO.%s"
93

94
        // JSApiStreamDelete is the endpoint to delete streams.
95
        // Will return JSON response.
96
        JSApiStreamDelete  = "$JS.API.STREAM.DELETE.*"
97
        JSApiStreamDeleteT = "$JS.API.STREAM.DELETE.%s"
98

99
        // JSApiStreamPurge is the endpoint to purge streams.
100
        // Will return JSON response.
101
        JSApiStreamPurge  = "$JS.API.STREAM.PURGE.*"
102
        JSApiStreamPurgeT = "$JS.API.STREAM.PURGE.%s"
103

104
        // JSApiStreamSnapshot is the endpoint to snapshot streams.
105
        // Will return a stream of chunks with a nil chunk as EOF to
106
        // the deliver subject. Caller should respond to each chunk
107
        // with a nil body response for ack flow.
108
        JSApiStreamSnapshot  = "$JS.API.STREAM.SNAPSHOT.*"
109
        JSApiStreamSnapshotT = "$JS.API.STREAM.SNAPSHOT.%s"
110

111
        // JSApiStreamRestore is the endpoint to restore a stream from a snapshot.
112
        // Caller should respond to each chunk with a nil body response.
113
        JSApiStreamRestore  = "$JS.API.STREAM.RESTORE.*"
114
        JSApiStreamRestoreT = "$JS.API.STREAM.RESTORE.%s"
115

116
        // JSApiMsgDelete is the endpoint to delete messages from a stream.
117
        // Will return JSON response.
118
        JSApiMsgDelete  = "$JS.API.STREAM.MSG.DELETE.*"
119
        JSApiMsgDeleteT = "$JS.API.STREAM.MSG.DELETE.%s"
120

121
        // JSApiMsgGet is the template for direct requests for a message by its stream sequence number.
122
        // Will return JSON response.
123
        JSApiMsgGet  = "$JS.API.STREAM.MSG.GET.*"
124
        JSApiMsgGetT = "$JS.API.STREAM.MSG.GET.%s"
125

126
        // JSDirectMsgGet is the template for non-api layer direct requests for a message by its stream sequence number or last by subject.
127
        // Will return the message similar to how a consumer receives the message, no JSON processing.
128
        // If the message can not be found we will use a status header of 404. If the stream does not exist the client will get a no-responders or timeout.
129
        JSDirectMsgGet  = "$JS.API.DIRECT.GET.*"
130
        JSDirectMsgGetT = "$JS.API.DIRECT.GET.%s"
131

132
        // This is a direct version of get last by subject, which will be the dominant pattern for KV access once 2.9 is released.
133
        // The stream and the key will be part of the subject to allow for no-marshal payloads and subject based security permissions.
134
        JSDirectGetLastBySubject  = "$JS.API.DIRECT.GET.*.>"
135
        JSDirectGetLastBySubjectT = "$JS.API.DIRECT.GET.%s.%s"
136

137
        // jsDirectGetPre
138
        jsDirectGetPre = "$JS.API.DIRECT.GET"
139

140
        // JSApiConsumerCreate is the endpoint to create consumers for streams.
141
        // This was also the legacy endpoint for ephemeral consumers.
142
        // It now can take consumer name and optional filter subject, which when part of the subject controls access.
143
        // Will return JSON response.
144
        JSApiConsumerCreate    = "$JS.API.CONSUMER.CREATE.*"
145
        JSApiConsumerCreateT   = "$JS.API.CONSUMER.CREATE.%s"
146
        JSApiConsumerCreateEx  = "$JS.API.CONSUMER.CREATE.*.>"
147
        JSApiConsumerCreateExT = "$JS.API.CONSUMER.CREATE.%s.%s.%s"
148

149
        // JSApiDurableCreate is the endpoint to create durable consumers for streams.
150
        // You need to include the stream and consumer name in the subject.
151
        JSApiDurableCreate  = "$JS.API.CONSUMER.DURABLE.CREATE.*.*"
152
        JSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s"
153

154
        // JSApiConsumers is the endpoint to list all consumer names for the stream.
155
        // Will return JSON response.
156
        JSApiConsumers  = "$JS.API.CONSUMER.NAMES.*"
157
        JSApiConsumersT = "$JS.API.CONSUMER.NAMES.%s"
158

159
        // JSApiConsumerList is the endpoint that will return all detailed consumer information
160
        JSApiConsumerList  = "$JS.API.CONSUMER.LIST.*"
161
        JSApiConsumerListT = "$JS.API.CONSUMER.LIST.%s"
162

163
        // JSApiConsumerInfo is for obtaining general information about a consumer.
164
        // Will return JSON response.
165
        JSApiConsumerInfo  = "$JS.API.CONSUMER.INFO.*.*"
166
        JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s"
167

168
        // JSApiConsumerDelete is the endpoint to delete consumers.
169
        // Will return JSON response.
170
        JSApiConsumerDelete  = "$JS.API.CONSUMER.DELETE.*.*"
171
        JSApiConsumerDeleteT = "$JS.API.CONSUMER.DELETE.%s.%s"
172

173
        // JSApiConsumerPause is the endpoint to pause or unpause consumers.
174
        // Will return JSON response.
175
        JSApiConsumerPause  = "$JS.API.CONSUMER.PAUSE.*.*"
176
        JSApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"
177

178
        // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
179
        JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
180

181
        // JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
182
        JSApiConsumerUnpin  = "$JS.API.CONSUMER.UNPIN.*.*"
183
        JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"
184

185
        // jsRequestNextPre
186
        jsRequestNextPre = "$JS.API.CONSUMER.MSG.NEXT."
187

188
        // For snapshots and restores. The ack will have additional tokens.
189
        jsSnapshotAckT    = "$JS.SNAPSHOT.ACK.%s.%s"
190
        jsRestoreDeliverT = "$JS.SNAPSHOT.RESTORE.%s.%s"
191

192
        // JSApiStreamRemovePeer is the endpoint to remove a peer from a clustered stream and its consumers.
193
        // Will return JSON response.
194
        JSApiStreamRemovePeer  = "$JS.API.STREAM.PEER.REMOVE.*"
195
        JSApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.%s"
196

197
        // JSApiStreamLeaderStepDown is the endpoint to have stream leader stepdown.
198
        // Will return JSON response.
199
        JSApiStreamLeaderStepDown  = "$JS.API.STREAM.LEADER.STEPDOWN.*"
200
        JSApiStreamLeaderStepDownT = "$JS.API.STREAM.LEADER.STEPDOWN.%s"
201

202
        // JSApiConsumerLeaderStepDown is the endpoint to have consumer leader stepdown.
203
        // Will return JSON response.
204
        JSApiConsumerLeaderStepDown  = "$JS.API.CONSUMER.LEADER.STEPDOWN.*.*"
205
        JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
206

207
        // JSApiLeaderStepDown is the endpoint to have our metaleader stepdown.
208
        // Only works from system account.
209
        // Will return JSON response.
210
        JSApiLeaderStepDown = "$JS.API.META.LEADER.STEPDOWN"
211

212
        // JSApiRemoveServer is the endpoint to remove a peer server from the cluster.
213
        // Only works from system account.
214
        // Will return JSON response.
215
        JSApiRemoveServer = "$JS.API.SERVER.REMOVE"
216

217
        // JSApiAccountPurge is the endpoint to purge the js content of an account
218
        // Only works from system account.
219
        // Will return JSON response.
220
        JSApiAccountPurge  = "$JS.API.ACCOUNT.PURGE.*"
221
        JSApiAccountPurgeT = "$JS.API.ACCOUNT.PURGE.%s"
222

223
        // JSApiServerStreamMove is the endpoint to move streams off a server
224
        // Only works from system account.
225
        // Will return JSON response.
226
        JSApiServerStreamMove  = "$JS.API.ACCOUNT.STREAM.MOVE.*.*"
227
        JSApiServerStreamMoveT = "$JS.API.ACCOUNT.STREAM.MOVE.%s.%s"
228

229
        // JSApiServerStreamCancelMove is the endpoint to cancel a stream move
230
        // Only works from system account.
231
        // Will return JSON response.
232
        JSApiServerStreamCancelMove  = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.*.*"
233
        JSApiServerStreamCancelMoveT = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.%s.%s"
234

235
        // The prefix for system level account API.
236
        jsAPIAccountPre = "$JS.API.ACCOUNT."
237

238
        // jsAckT is the template for the ack message stream coming back from a consumer
239
        // when they ACK/NAK, etc a message.
240
        jsAckT      = "$JS.ACK.%s.%s"
241
        jsAckPre    = "$JS.ACK."
242
        jsAckPreLen = len(jsAckPre)
243

244
        // jsFlowControl is for flow control subjects.
245
        jsFlowControlPre = "$JS.FC."
246
        // jsFlowControl is for FC responses.
247
        jsFlowControl = "$JS.FC.%s.%s.*"
248

249
        // JSAdvisoryPrefix is a prefix for all JetStream advisories.
250
        JSAdvisoryPrefix = "$JS.EVENT.ADVISORY"
251

252
        // JSMetricPrefix is a prefix for all JetStream metrics.
253
        JSMetricPrefix = "$JS.EVENT.METRIC"
254

255
        // JSMetricConsumerAckPre is a metric containing ack latency.
256
        JSMetricConsumerAckPre = "$JS.EVENT.METRIC.CONSUMER.ACK"
257

258
        // JSAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold.
259
        JSAdvisoryConsumerMaxDeliveryExceedPre = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES"
260

261
        // JSAdvisoryConsumerMsgNakPre is a notification published when a message has been naked
262
        JSAdvisoryConsumerMsgNakPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED"
263

264
        // JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated.
265
        JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"
266

267
        // JSAdvisoryStreamCreatedPre notification that a stream was created.
268
        JSAdvisoryStreamCreatedPre = "$JS.EVENT.ADVISORY.STREAM.CREATED"
269

270
        // JSAdvisoryStreamDeletedPre notification that a stream was deleted.
271
        JSAdvisoryStreamDeletedPre = "$JS.EVENT.ADVISORY.STREAM.DELETED"
272

273
        // JSAdvisoryStreamUpdatedPre notification that a stream was updated.
274
        JSAdvisoryStreamUpdatedPre = "$JS.EVENT.ADVISORY.STREAM.UPDATED"
275

276
        // JSAdvisoryConsumerCreatedPre notification that a consumer was created.
277
        JSAdvisoryConsumerCreatedPre = "$JS.EVENT.ADVISORY.CONSUMER.CREATED"
278

279
        // JSAdvisoryConsumerDeletedPre notification that a consumer was deleted.
280
        JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"
281

282
        // JSAdvisoryConsumerPausePre notification that a consumer paused/unpaused.
283
        JSAdvisoryConsumerPausePre = "$JS.EVENT.ADVISORY.CONSUMER.PAUSE"
284

285
        // JSAdvisoryConsumerPinnedPre notification that a consumer was pinned.
286
        JSAdvisoryConsumerPinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.PINNED"
287

288
        // JSAdvisoryConsumerUnpinnedPre notification that a consumer was unpinned.
289
        JSAdvisoryConsumerUnpinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.UNPINNED"
290

291
        // JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
292
        JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"
293

294
        // JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed.
295
        JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"
296

297
        // JSAdvisoryStreamRestoreCreatePre notification that a restore was start.
298
        JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"
299

300
        // JSAdvisoryStreamRestoreCompletePre notification that a restore was completed.
301
        JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"
302

303
        // JSAdvisoryDomainLeaderElectedPre notification that a jetstream domain has elected a leader.
304
        JSAdvisoryDomainLeaderElected = "$JS.EVENT.ADVISORY.DOMAIN.LEADER_ELECTED"
305

306
        // JSAdvisoryStreamLeaderElectedPre notification that a replicated stream has elected a leader.
307
        JSAdvisoryStreamLeaderElectedPre = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED"
308

309
        // JSAdvisoryStreamQuorumLostPre notification that a stream and its consumers are stalled.
310
        JSAdvisoryStreamQuorumLostPre = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST"
311

312
        // JSAdvisoryStreamBatchAbandonedPre notification that a stream's batch was abandoned.
313
        JSAdvisoryStreamBatchAbandonedPre = "$JS.EVENT.ADVISORY.STREAM.BATCH_ABANDONED"
314

315
        // JSAdvisoryConsumerLeaderElectedPre notification that a replicated consumer has elected a leader.
316
        JSAdvisoryConsumerLeaderElectedPre = "$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED"
317

318
        // JSAdvisoryConsumerQuorumLostPre notification that a consumer is stalled.
319
        JSAdvisoryConsumerQuorumLostPre = "$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST"
320

321
        // JSAdvisoryServerOutOfStorage notification that a server has no more storage.
322
        JSAdvisoryServerOutOfStorage = "$JS.EVENT.ADVISORY.SERVER.OUT_OF_STORAGE"
323

324
        // JSAdvisoryServerRemoved notification that a server has been removed from the system.
325
        JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED"
326

327
        // JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit.
328
        JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED"
329

330
        // JSAuditAdvisory is a notification about JetStream API access.
331
        // FIXME - Add in details about who..
332
        JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
333
)
334

335
// Headers used in $JS.API.> requests.
336
const (
337
        // JSRequiredApiLevel requires the API level of the responding server to have the specified minimum value.
338
        JSRequiredApiLevel = "Nats-Required-Api-Level"
339
)
340

341
var denyAllClientJs = []string{jsAllAPI, "$KV.>", "$OBJ.>"}
342
var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI, "$KV.>", "$OBJ.>"}
343

344
func generateJSMappingTable(domain string) map[string]string {
765✔
345
        mappings := map[string]string{}
765✔
346
        // This set of mappings is very very very ugly.
765✔
347
        // It is a consequence of what we defined the domain prefix to be "$JS.domain.API" and it's mapping to "$JS.API"
765✔
348
        // For optics $KV and $OBJ where made to be independent subject spaces.
765✔
349
        // As materialized views of JS, they did not simply extend that subject space to say "$JS.API.KV" "$JS.API.OBJ"
765✔
350
        // This is very unfortunate!!!
765✔
351
        // Furthermore, it seemed bad to require different domain prefixes for JS/KV/OBJ.
765✔
352
        // Especially since the actual API for say KV, does use stream create from JS.
765✔
353
        // To avoid overlaps KV and OBJ views append the prefix to their API.
765✔
354
        // (Replacing $KV with the prefix allows users to create collisions with say the bucket name)
765✔
355
        // This mapping therefore needs to have extra token so that the mapping can properly discern between $JS, $KV, $OBJ
765✔
356
        for srcMappingSuffix, to := range map[string]string{
765✔
357
                "INFO":       JSApiAccountInfo,
765✔
358
                "STREAM.>":   "$JS.API.STREAM.>",
765✔
359
                "CONSUMER.>": "$JS.API.CONSUMER.>",
765✔
360
                "DIRECT.>":   "$JS.API.DIRECT.>",
765✔
361
                "META.>":     "$JS.API.META.>",
765✔
362
                "SERVER.>":   "$JS.API.SERVER.>",
765✔
363
                "ACCOUNT.>":  "$JS.API.ACCOUNT.>",
765✔
364
                "$KV.>":      "$KV.>",
765✔
365
                "$OBJ.>":     "$OBJ.>",
765✔
366
        } {
7,650✔
367
                mappings[fmt.Sprintf("$JS.%s.API.%s", domain, srcMappingSuffix)] = to
6,885✔
368
        }
6,885✔
369
        return mappings
765✔
370
}
371

372
// JSMaxDescription is the maximum description length for streams and consumers.
373
const JSMaxDescriptionLen = 4 * 1024
374

375
// JSMaxMetadataLen is the maximum length for streams and consumers metadata map.
376
// It's calculated by summing length of all keys and values.
377
const JSMaxMetadataLen = 128 * 1024
378

379
// JSMaxNameLen is the maximum name lengths for streams, consumers and templates.
380
// Picked 255 as it seems to be a widely used file name limit
381
const JSMaxNameLen = 255
382

383
// JSDefaultRequestQueueLimit is the default number of entries that we will
384
// put on the global request queue before we react.
385
const JSDefaultRequestQueueLimit = 10_000
386

387
// Responses for API calls.
388

389
// ApiResponse is a standard response from the JetStream JSON API
390
type ApiResponse struct {
391
        Type  string    `json:"type"`
392
        Error *ApiError `json:"error,omitempty"`
393
}
394

395
const JSApiSystemResponseType = "io.nats.jetstream.api.v1.system_response"
396

397
// When passing back to the clients generalize store failures.
398
var (
399
        errStreamStoreFailed   = errors.New("error creating store for stream")
400
        errConsumerStoreFailed = errors.New("error creating store for consumer")
401
)
402

403
// ToError checks if the response has a error and if it does converts it to an error avoiding
404
// the pitfalls described by https://yourbasic.org/golang/gotcha-why-nil-error-not-equal-nil/
405
func (r *ApiResponse) ToError() error {
2,930✔
406
        if r.Error == nil {
4,704✔
407
                return nil
1,774✔
408
        }
1,774✔
409

410
        return r.Error
1,156✔
411
}
412

413
const JSApiOverloadedType = "io.nats.jetstream.api.v1.system_overloaded"
414

415
// ApiPaged includes variables used to create paged responses from the JSON API
416
type ApiPaged struct {
417
        Total  int `json:"total"`
418
        Offset int `json:"offset"`
419
        Limit  int `json:"limit"`
420
}
421

422
// ApiPagedRequest includes parameters allowing specific pages to be requests from APIs responding with ApiPaged
423
type ApiPagedRequest struct {
424
        Offset int `json:"offset"`
425
}
426

427
// JSApiAccountInfoResponse reports back information on jetstream for this account.
428
type JSApiAccountInfoResponse struct {
429
        ApiResponse
430
        *JetStreamAccountStats
431
}
432

433
const JSApiAccountInfoResponseType = "io.nats.jetstream.api.v1.account_info_response"
434

435
// JSApiStreamCreateResponse stream creation.
436
type JSApiStreamCreateResponse struct {
437
        ApiResponse
438
        *StreamInfo
439
        DidCreate bool `json:"did_create,omitempty"`
440
}
441

442
const JSApiStreamCreateResponseType = "io.nats.jetstream.api.v1.stream_create_response"
443

444
// JSApiStreamDeleteResponse stream removal.
445
type JSApiStreamDeleteResponse struct {
446
        ApiResponse
447
        Success bool `json:"success,omitempty"`
448
}
449

450
const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"
451

452
// JSMaxSubjectDetails The limit of the number of subject details we will send in a stream info response.
453
const JSMaxSubjectDetails = 100_000
454

455
type JSApiStreamInfoRequest struct {
456
        ApiPagedRequest
457
        DeletedDetails bool   `json:"deleted_details,omitempty"`
458
        SubjectsFilter string `json:"subjects_filter,omitempty"`
459
}
460

461
type JSApiStreamInfoResponse struct {
462
        ApiResponse
463
        ApiPaged
464
        *StreamInfo
465
}
466

467
const JSApiStreamInfoResponseType = "io.nats.jetstream.api.v1.stream_info_response"
468

469
// JSApiNamesLimit is the maximum entries we will return for streams or consumers lists.
470
// TODO(dlc) - with header or request support could request chunked response.
471
const JSApiNamesLimit = 1024
472
const JSApiListLimit = 256
473

474
type JSApiStreamNamesRequest struct {
475
        ApiPagedRequest
476
        // These are filters that can be applied to the list.
477
        Subject string `json:"subject,omitempty"`
478
}
479

480
// JSApiStreamNamesResponse list of streams.
481
// A nil request is valid and means all streams.
482
type JSApiStreamNamesResponse struct {
483
        ApiResponse
484
        ApiPaged
485
        Streams []string `json:"streams"`
486
}
487

488
const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"
489

490
type JSApiStreamListRequest struct {
491
        ApiPagedRequest
492
        // These are filters that can be applied to the list.
493
        Subject string `json:"subject,omitempty"`
494
}
495

496
// JSApiStreamListResponse list of detailed stream information.
497
// A nil request is valid and means all streams.
498
type JSApiStreamListResponse struct {
499
        ApiResponse
500
        ApiPaged
501
        Streams []*StreamInfo     `json:"streams"`
502
        Missing []string          `json:"missing,omitempty"`
503
        Offline map[string]string `json:"offline,omitempty"`
504
}
505

506
const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response"
507

508
// JSApiStreamPurgeRequest is optional request information to the purge API.
509
// Subject will filter the purge request to only messages that match the subject, which can have wildcards.
510
// Sequence will purge up to but not including this sequence and can be combined with subject filtering.
511
// Keep will specify how many messages to keep. This can also be combined with subject filtering.
512
// Note that Sequence and Keep are mutually exclusive, so both can not be set at the same time.
513
type JSApiStreamPurgeRequest struct {
514
        // Purge up to but not including sequence.
515
        Sequence uint64 `json:"seq,omitempty"`
516
        // Subject to match against messages for the purge command.
517
        Subject string `json:"filter,omitempty"`
518
        // Number of messages to keep.
519
        Keep uint64 `json:"keep,omitempty"`
520
}
521

522
type JSApiStreamPurgeResponse struct {
523
        ApiResponse
524
        Success bool   `json:"success,omitempty"`
525
        Purged  uint64 `json:"purged"`
526
}
527

528
const JSApiStreamPurgeResponseType = "io.nats.jetstream.api.v1.stream_purge_response"
529

530
type JSApiConsumerUnpinRequest struct {
531
        Group string `json:"group"`
532
}
533

534
type JSApiConsumerUnpinResponse struct {
535
        ApiResponse
536
}
537

538
const JSApiConsumerUnpinResponseType = "io.nats.jetstream.api.v1.consumer_unpin_response"
539

540
// JSApiStreamUpdateResponse for updating a stream.
541
type JSApiStreamUpdateResponse struct {
542
        ApiResponse
543
        *StreamInfo
544
}
545

546
const JSApiStreamUpdateResponseType = "io.nats.jetstream.api.v1.stream_update_response"
547

548
// JSApiMsgDeleteRequest delete message request.
549
type JSApiMsgDeleteRequest struct {
550
        Seq     uint64 `json:"seq"`
551
        NoErase bool   `json:"no_erase,omitempty"`
552
}
553

554
type JSApiMsgDeleteResponse struct {
555
        ApiResponse
556
        Success bool `json:"success,omitempty"`
557
}
558

559
const JSApiMsgDeleteResponseType = "io.nats.jetstream.api.v1.stream_msg_delete_response"
560

561
type JSApiStreamSnapshotRequest struct {
562
        // Subject to deliver the chunks to for the snapshot.
563
        DeliverSubject string `json:"deliver_subject"`
564
        // Do not include consumers in the snapshot.
565
        NoConsumers bool `json:"no_consumers,omitempty"`
566
        // Optional chunk size preference.
567
        // Best to just let server select.
568
        ChunkSize int `json:"chunk_size,omitempty"`
569
        // Check all message's checksums prior to snapshot.
570
        CheckMsgs bool `json:"jsck,omitempty"`
571
}
572

573
// JSApiStreamSnapshotResponse is the direct response to the snapshot request.
574
type JSApiStreamSnapshotResponse struct {
575
        ApiResponse
576
        // Configuration of the given stream.
577
        Config *StreamConfig `json:"config,omitempty"`
578
        // Current State for the given stream.
579
        State *StreamState `json:"state,omitempty"`
580
}
581

582
const JSApiStreamSnapshotResponseType = "io.nats.jetstream.api.v1.stream_snapshot_response"
583

584
// JSApiStreamRestoreRequest is the required restore request.
585
type JSApiStreamRestoreRequest struct {
586
        // Configuration of the given stream.
587
        Config StreamConfig `json:"config"`
588
        // Current State for the given stream.
589
        State StreamState `json:"state"`
590
}
591

592
// JSApiStreamRestoreResponse is the direct response to the restore request.
593
type JSApiStreamRestoreResponse struct {
594
        ApiResponse
595
        // Subject to deliver the chunks to for the snapshot restore.
596
        DeliverSubject string `json:"deliver_subject"`
597
}
598

599
const JSApiStreamRestoreResponseType = "io.nats.jetstream.api.v1.stream_restore_response"
600

601
// JSApiStreamRemovePeerRequest is the required remove peer request.
602
type JSApiStreamRemovePeerRequest struct {
603
        // Server name of the peer to be removed.
604
        Peer string `json:"peer"`
605
}
606

607
// JSApiStreamRemovePeerResponse is the response to a remove peer request.
608
type JSApiStreamRemovePeerResponse struct {
609
        ApiResponse
610
        Success bool `json:"success,omitempty"`
611
}
612

613
const JSApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response"
614

615
// JSApiStreamLeaderStepDownResponse is the response to a leader stepdown request.
616
type JSApiStreamLeaderStepDownResponse struct {
617
        ApiResponse
618
        Success bool `json:"success,omitempty"`
619
}
620

621
const JSApiStreamLeaderStepDownResponseType = "io.nats.jetstream.api.v1.stream_leader_stepdown_response"
622

623
// JSApiConsumerLeaderStepDownResponse is the response to a consumer leader stepdown request.
624
type JSApiConsumerLeaderStepDownResponse struct {
625
        ApiResponse
626
        Success bool `json:"success,omitempty"`
627
}
628

629
const JSApiConsumerLeaderStepDownResponseType = "io.nats.jetstream.api.v1.consumer_leader_stepdown_response"
630

631
// JSApiLeaderStepdownRequest allows placement control over the meta leader placement.
632
type JSApiLeaderStepdownRequest struct {
633
        Placement *Placement `json:"placement,omitempty"`
634
}
635

636
// JSApiLeaderStepDownResponse is the response to a meta leader stepdown request.
637
type JSApiLeaderStepDownResponse struct {
638
        ApiResponse
639
        Success bool `json:"success,omitempty"`
640
}
641

642
const JSApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_stepdown_response"
643

644
// JSApiMetaServerRemoveRequest will remove a peer from the meta group.
645
type JSApiMetaServerRemoveRequest struct {
646
        // Server name of the peer to be removed.
647
        Server string `json:"peer"`
648
        // Peer ID of the peer to be removed. If specified this is used
649
        // instead of the server name.
650
        Peer string `json:"peer_id,omitempty"`
651
}
652

653
// JSApiMetaServerRemoveResponse is the response to a peer removal request in the meta group.
654
type JSApiMetaServerRemoveResponse struct {
655
        ApiResponse
656
        Success bool `json:"success,omitempty"`
657
}
658

659
const JSApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response"
660

661
// JSApiMetaServerStreamMoveRequest will move a stream on a server to another
662
// response to this will come as JSApiStreamUpdateResponse/JSApiStreamUpdateResponseType
663
type JSApiMetaServerStreamMoveRequest struct {
664
        // Server name of the peer to be evacuated.
665
        Server string `json:"server,omitempty"`
666
        // Cluster the server is in
667
        Cluster string `json:"cluster,omitempty"`
668
        // Domain the sever is in
669
        Domain string `json:"domain,omitempty"`
670
        // Ephemeral placement tags for the move
671
        Tags []string `json:"tags,omitempty"`
672
}
673

674
const JSApiAccountPurgeResponseType = "io.nats.jetstream.api.v1.account_purge_response"
675

676
// JSApiAccountPurgeResponse is the response to a purge request in the meta group.
677
type JSApiAccountPurgeResponse struct {
678
        ApiResponse
679
        Initiated bool `json:"initiated,omitempty"`
680
}
681

682
// JSApiMsgGetRequest get a message request.
683
type JSApiMsgGetRequest struct {
684
        Seq     uint64 `json:"seq,omitempty"`
685
        LastFor string `json:"last_by_subj,omitempty"`
686
        NextFor string `json:"next_by_subj,omitempty"`
687

688
        // Batch support. Used to request more than one msg at a time.
689
        // Can be used with simple starting seq, but also NextFor with wildcards.
690
        Batch int `json:"batch,omitempty"`
691
        // This will make sure we limit how much data we blast out. If not set we will
692
        // inherit the slow consumer default max setting of the server. Default is MAX_PENDING_SIZE.
693
        MaxBytes int `json:"max_bytes,omitempty"`
694
        // Return messages as of this start time.
695
        StartTime *time.Time `json:"start_time,omitempty"`
696

697
        // Multiple response support. Will get the last msgs matching the subjects. These can include wildcards.
698
        MultiLastFor []string `json:"multi_last,omitempty"`
699
        // Only return messages up to this sequence. If not set, will be last sequence for the stream.
700
        UpToSeq uint64 `json:"up_to_seq,omitempty"`
701
        // Only return messages up to this time.
702
        UpToTime *time.Time `json:"up_to_time,omitempty"`
703
        // Only return the message payload, excluding headers if present.
704
        NoHeaders bool `json:"no_hdr,omitempty"`
705
}
706

707
type JSApiMsgGetResponse struct {
708
        ApiResponse
709
        Message *StoredMsg `json:"message,omitempty"`
710
}
711

712
const JSApiMsgGetResponseType = "io.nats.jetstream.api.v1.stream_msg_get_response"
713

714
// JSWaitQueueDefaultMax is the default max number of outstanding requests for pull consumers.
715
const JSWaitQueueDefaultMax = 512
716

717
type JSApiConsumerCreateResponse struct {
718
        ApiResponse
719
        *ConsumerInfo
720
}
721

722
const JSApiConsumerCreateResponseType = "io.nats.jetstream.api.v1.consumer_create_response"
723

724
type JSApiConsumerDeleteResponse struct {
725
        ApiResponse
726
        Success bool `json:"success,omitempty"`
727
}
728

729
const JSApiConsumerDeleteResponseType = "io.nats.jetstream.api.v1.consumer_delete_response"
730

731
type JSApiConsumerPauseRequest struct {
732
        PauseUntil time.Time `json:"pause_until,omitempty"`
733
}
734

735
const JSApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response"
736

737
type JSApiConsumerPauseResponse struct {
738
        ApiResponse
739
        Paused         bool          `json:"paused"`
740
        PauseUntil     time.Time     `json:"pause_until"`
741
        PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
742
}
743

744
type JSApiConsumerInfoResponse struct {
745
        ApiResponse
746
        *ConsumerInfo
747
}
748

749
const JSApiConsumerInfoResponseType = "io.nats.jetstream.api.v1.consumer_info_response"
750

751
type JSApiConsumersRequest struct {
752
        ApiPagedRequest
753
}
754

755
type JSApiConsumerNamesResponse struct {
756
        ApiResponse
757
        ApiPaged
758
        Consumers []string `json:"consumers"`
759
}
760

761
const JSApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_response"
762

763
type JSApiConsumerListResponse struct {
764
        ApiResponse
765
        ApiPaged
766
        Consumers []*ConsumerInfo   `json:"consumers"`
767
        Missing   []string          `json:"missing,omitempty"`
768
        Offline   map[string]string `json:"offline,omitempty"`
769
}
770

771
const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"
772

773
// JSApiConsumerGetNextRequest is for getting next messages for pull based consumers.
774
type JSApiConsumerGetNextRequest struct {
775
        Expires   time.Duration `json:"expires,omitempty"`
776
        Batch     int           `json:"batch,omitempty"`
777
        MaxBytes  int           `json:"max_bytes,omitempty"`
778
        NoWait    bool          `json:"no_wait,omitempty"`
779
        Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
780
        PriorityGroup
781
}
782

783
// JSApiStreamTemplateCreateResponse for creating templates.
784
// Deprecated: stream templates are deprecated and will be removed in a future version.
785
type JSApiStreamTemplateCreateResponse struct {
786
        ApiResponse
787
        *StreamTemplateInfo
788
}
789

790
// Deprecated: stream templates are deprecated and will be removed in a future version.
791
const JSApiStreamTemplateCreateResponseType = "io.nats.jetstream.api.v1.stream_template_create_response"
792

793
// Deprecated: stream templates are deprecated and will be removed in a future version.
794
type JSApiStreamTemplateDeleteResponse struct {
795
        ApiResponse
796
        Success bool `json:"success,omitempty"`
797
}
798

799
// Deprecated: stream templates are deprecated and will be removed in a future version.
800
const JSApiStreamTemplateDeleteResponseType = "io.nats.jetstream.api.v1.stream_template_delete_response"
801

802
// JSApiStreamTemplateInfoResponse for information about stream templates.
803
// Deprecated: stream templates are deprecated and will be removed in a future version.
804
type JSApiStreamTemplateInfoResponse struct {
805
        ApiResponse
806
        *StreamTemplateInfo
807
}
808

809
// Deprecated: stream templates are deprecated and will be removed in a future version.
810
const JSApiStreamTemplateInfoResponseType = "io.nats.jetstream.api.v1.stream_template_info_response"
811

812
// Deprecated: stream templates are deprecated and will be removed in a future version.
813
type JSApiStreamTemplatesRequest struct {
814
        ApiPagedRequest
815
}
816

817
// JSApiStreamTemplateNamesResponse list of templates
818
// Deprecated: stream templates are deprecated and will be removed in a future version.
819
type JSApiStreamTemplateNamesResponse struct {
820
        ApiResponse
821
        ApiPaged
822
        Templates []string `json:"streams"`
823
}
824

825
// Deprecated: stream templates are deprecated and will be removed in a future version.
826
const JSApiStreamTemplateNamesResponseType = "io.nats.jetstream.api.v1.stream_template_names_response"
827

828
// Structure that holds state for a JetStream API request that is processed
829
// in a separate long-lived go routine. This is to avoid blocking connections.
830
type jsAPIRoutedReq struct {
831
        jsub    *subscription
832
        sub     *subscription
833
        acc     *Account
834
        subject string
835
        reply   string
836
        msg     []byte
837
        pa      pubArg
838
}
839

840
func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) {
133,611✔
841
        // Ignore system level directives meta stepdown and peer remove requests here.
133,611✔
842
        if subject == JSApiLeaderStepDown ||
133,611✔
843
                subject == JSApiRemoveServer ||
133,611✔
844
                strings.HasPrefix(subject, jsAPIAccountPre) {
134,078✔
845
                return
467✔
846
        }
467✔
847
        // No lock needed, those are immutable.
848
        s, rr := js.srv, js.apiSubs.Match(subject)
133,144✔
849

133,144✔
850
        hdr, msg := c.msgParts(rmsg)
133,144✔
851
        if len(sliceHeader(ClientInfoHdr, hdr)) == 0 {
133,151✔
852
                // Check if this is the system account. We will let these through for the account info only.
7✔
853
                sacc := s.SystemAccount()
7✔
854
                if sacc != acc {
7✔
855
                        return
×
856
                }
×
857
                if subject != JSApiAccountInfo {
11✔
858
                        // Only respond from the initial server entry to the NATS system.
4✔
859
                        if c.kind == CLIENT || c.kind == LEAF {
6✔
860
                                var resp = ApiResponse{
2✔
861
                                        Type:  JSApiSystemResponseType,
2✔
862
                                        Error: NewJSNotEnabledForAccountError(),
2✔
863
                                }
2✔
864
                                s.sendAPIErrResponse(nil, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
865
                        }
2✔
866
                        return
4✔
867
                }
868
        }
869

870
        // Short circuit for no interest.
871
        if len(rr.psubs)+len(rr.qsubs) == 0 {
153,124✔
872
                if (c.kind == CLIENT || c.kind == LEAF) && acc != s.SystemAccount() {
19,984✔
873
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
874
                        var resp = ApiResponse{
×
875
                                Type:  JSApiSystemResponseType,
×
876
                                Error: NewJSBadRequestError(),
×
877
                        }
×
878
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
879
                }
×
880
                return
19,984✔
881
        }
882

883
        // We should only have psubs and only 1 per result.
884
        if len(rr.psubs) != 1 {
113,156✔
885
                s.Warnf("Malformed JetStream API Request: [%s] %q", subject, rmsg)
×
886
                if c.kind == CLIENT || c.kind == LEAF {
×
887
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
888
                        var resp = ApiResponse{
×
889
                                Type:  JSApiSystemResponseType,
×
890
                                Error: NewJSBadRequestError(),
×
891
                        }
×
892
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
893
                }
×
894
                return
×
895
        }
896
        jsub := rr.psubs[0]
113,156✔
897

113,156✔
898
        // We need to make sure not to block. We will send the request to a long-lived
113,156✔
899
        // pool of go routines.
113,156✔
900

113,156✔
901
        // Increment inflight. Do this before queueing.
113,156✔
902
        atomic.AddInt64(&js.apiInflight, 1)
113,156✔
903

113,156✔
904
        // Copy the state. Note the JSAPI only uses the hdr index to piece apart the
113,156✔
905
        // header from the msg body. No other references are needed.
113,156✔
906
        // Check pending and warn if getting backed up.
113,156✔
907
        pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
113,156✔
908
        limit := atomic.LoadInt64(&js.queueLimit)
113,156✔
909
        if pending >= int(limit) {
113,429✔
910
                s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
273✔
911
                drained := int64(s.jsAPIRoutedReqs.drain())
273✔
912
                atomic.AddInt64(&js.apiInflight, -drained)
273✔
913

273✔
914
                s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
273✔
915
                        TypedEvent: TypedEvent{
273✔
916
                                Type: JSAPILimitReachedAdvisoryType,
273✔
917
                                ID:   nuid.Next(),
273✔
918
                                Time: time.Now().UTC(),
273✔
919
                        },
273✔
920
                        Server:  s.Name(),
273✔
921
                        Domain:  js.config.Domain,
273✔
922
                        Dropped: drained,
273✔
923
                })
273✔
924
        }
273✔
925
}
926

927
func (s *Server) processJSAPIRoutedRequests() {
16,292✔
928
        defer s.grWG.Done()
16,292✔
929

16,292✔
930
        s.mu.RLock()
16,292✔
931
        queue := s.jsAPIRoutedReqs
16,292✔
932
        client := &client{srv: s, kind: JETSTREAM}
16,292✔
933
        s.mu.RUnlock()
16,292✔
934

16,292✔
935
        js := s.getJetStream()
16,292✔
936

16,292✔
937
        for {
108,467✔
938
                select {
92,175✔
939
                case <-queue.ch:
75,883✔
940
                        // Only pop one item at a time here, otherwise if the system is recovering
75,883✔
941
                        // from queue buildup, then one worker will pull off all the tasks and the
75,883✔
942
                        // others will be starved of work.
75,883✔
943
                        for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
188,767✔
944
                                client.pa = r.pa
112,884✔
945
                                start := time.Now()
112,884✔
946
                                r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
112,884✔
947
                                if dur := time.Since(start); dur >= readLoopReportThreshold {
112,891✔
948
                                        s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
7✔
949
                                }
7✔
950
                                atomic.AddInt64(&js.apiInflight, -1)
112,884✔
951
                        }
952
                case <-s.quitCh:
16,284✔
953
                        return
16,284✔
954
                }
955
        }
956
}
957

958
func (s *Server) setJetStreamExportSubs() error {
4,073✔
959
        js := s.getJetStream()
4,073✔
960
        if js == nil {
4,073✔
961
                return NewJSNotEnabledError()
×
962
        }
×
963

964
        // Start the go routine that will process API requests received by the
965
        // subscription below when they are coming from routes, etc..
966
        const maxProcs = 16
4,073✔
967
        mp := runtime.GOMAXPROCS(0)
4,073✔
968
        // Cap at 16 max for now on larger core setups.
4,073✔
969
        if mp > maxProcs {
4,073✔
970
                mp = maxProcs
×
971
        }
×
972
        s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "Routed JS API Requests")
4,073✔
973
        for i := 0; i < mp; i++ {
20,365✔
974
                s.startGoRoutine(s.processJSAPIRoutedRequests)
16,292✔
975
        }
16,292✔
976

977
        // This is the catch all now for all JetStream API calls.
978
        if _, err := s.sysSubscribe(jsAllAPI, js.apiDispatch); err != nil {
4,073✔
979
                return err
×
980
        }
×
981

982
        if err := s.SystemAccount().AddServiceExport(jsAllAPI, nil); err != nil {
4,073✔
983
                s.Warnf("Error setting up jetstream service exports: %v", err)
×
984
                return err
×
985
        }
×
986

987
        // API handles themselves.
988
        pairs := []struct {
4,073✔
989
                subject string
4,073✔
990
                handler msgHandler
4,073✔
991
        }{
4,073✔
992
                {JSApiAccountInfo, s.jsAccountInfoRequest},
4,073✔
993
                {JSApiTemplateCreate, s.jsTemplateCreateRequest},
4,073✔
994
                {JSApiTemplates, s.jsTemplateNamesRequest},
4,073✔
995
                {JSApiTemplateInfo, s.jsTemplateInfoRequest},
4,073✔
996
                {JSApiTemplateDelete, s.jsTemplateDeleteRequest},
4,073✔
997
                {JSApiStreamCreate, s.jsStreamCreateRequest},
4,073✔
998
                {JSApiStreamUpdate, s.jsStreamUpdateRequest},
4,073✔
999
                {JSApiStreams, s.jsStreamNamesRequest},
4,073✔
1000
                {JSApiStreamList, s.jsStreamListRequest},
4,073✔
1001
                {JSApiStreamInfo, s.jsStreamInfoRequest},
4,073✔
1002
                {JSApiStreamDelete, s.jsStreamDeleteRequest},
4,073✔
1003
                {JSApiStreamPurge, s.jsStreamPurgeRequest},
4,073✔
1004
                {JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
4,073✔
1005
                {JSApiStreamRestore, s.jsStreamRestoreRequest},
4,073✔
1006
                {JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},
4,073✔
1007
                {JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},
4,073✔
1008
                {JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
4,073✔
1009
                {JSApiMsgDelete, s.jsMsgDeleteRequest},
4,073✔
1010
                {JSApiMsgGet, s.jsMsgGetRequest},
4,073✔
1011
                {JSApiConsumerCreateEx, s.jsConsumerCreateRequest},
4,073✔
1012
                {JSApiConsumerCreate, s.jsConsumerCreateRequest},
4,073✔
1013
                {JSApiDurableCreate, s.jsConsumerCreateRequest},
4,073✔
1014
                {JSApiConsumers, s.jsConsumerNamesRequest},
4,073✔
1015
                {JSApiConsumerList, s.jsConsumerListRequest},
4,073✔
1016
                {JSApiConsumerInfo, s.jsConsumerInfoRequest},
4,073✔
1017
                {JSApiConsumerDelete, s.jsConsumerDeleteRequest},
4,073✔
1018
                {JSApiConsumerPause, s.jsConsumerPauseRequest},
4,073✔
1019
                {JSApiConsumerUnpin, s.jsConsumerUnpinRequest},
4,073✔
1020
        }
4,073✔
1021

4,073✔
1022
        js.mu.Lock()
4,073✔
1023
        defer js.mu.Unlock()
4,073✔
1024

4,073✔
1025
        for _, p := range pairs {
118,117✔
1026
                sub := &subscription{subject: []byte(p.subject), icb: p.handler}
114,044✔
1027
                if err := js.apiSubs.Insert(sub); err != nil {
114,044✔
1028
                        return err
×
1029
                }
×
1030
        }
1031

1032
        return nil
4,073✔
1033
}
1034

1035
func (s *Server) sendAPIResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
51,936✔
1036
        acc.trackAPI()
51,936✔
1037
        if reply != _EMPTY_ {
81,954✔
1038
                s.sendInternalAccountMsg(nil, reply, response)
30,018✔
1039
        }
30,018✔
1040
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
51,936✔
1041
}
1042

1043
func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
18,423✔
1044
        acc.trackAPIErr()
18,423✔
1045
        if reply != _EMPTY_ {
32,970✔
1046
                s.sendInternalAccountMsg(nil, reply, response)
14,547✔
1047
        }
14,547✔
1048
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
18,423✔
1049
}
1050

1051
const errRespDelay = 500 * time.Millisecond
1052

1053
type delayedAPIResponse struct {
1054
        ci       *ClientInfo
1055
        acc      *Account
1056
        subject  string
1057
        reply    string
1058
        request  string
1059
        hdr      []byte
1060
        response string
1061
        rg       *raftGroup
1062
        deadline time.Time
1063
        noJs     bool
1064
        next     *delayedAPIResponse
1065
}
1066

1067
// Add `r` in the list that is maintained ordered by the `delayedAPIResponse.deadline` time.
1068
func addDelayedResponse(head, tail **delayedAPIResponse, r *delayedAPIResponse) {
112✔
1069
        // Check if list empty.
112✔
1070
        if *head == nil {
210✔
1071
                *head, *tail = r, r
98✔
1072
                return
98✔
1073
        }
98✔
1074
        // Check if it should be added at the end, which is if after or equal to the tail.
1075
        if r.deadline.After((*tail).deadline) || r.deadline.Equal((*tail).deadline) {
26✔
1076
                (*tail).next, *tail = r, r
12✔
1077
                return
12✔
1078
        }
12✔
1079
        // Find its spot in the list.
1080
        var prev *delayedAPIResponse
2✔
1081
        for c := *head; c != nil; c = c.next {
6✔
1082
                // We insert only if we are stricly before the current `c`.
4✔
1083
                if r.deadline.Before(c.deadline) {
6✔
1084
                        r.next = c
2✔
1085
                        if prev != nil {
3✔
1086
                                prev.next = r
1✔
1087
                        } else {
2✔
1088
                                *head = r
1✔
1089
                        }
1✔
1090
                        return
2✔
1091
                }
1092
                prev = c
2✔
1093
        }
1094
}
1095

1096
func (s *Server) delayedAPIResponder() {
7,085✔
1097
        defer s.grWG.Done()
7,085✔
1098
        var (
7,085✔
1099
                head, tail *delayedAPIResponse // Linked list.
7,085✔
1100
                r          *delayedAPIResponse // Updated by calling next().
7,085✔
1101
                rqch       <-chan struct{}     // Quit channel of the Raft group (if present).
7,085✔
1102
                tm         = time.NewTimer(time.Hour)
7,085✔
1103
        )
7,085✔
1104
        next := func() {
7,286✔
1105
                r, rqch = nil, nil
201✔
1106
                // Check that JetStream is still on. Do not exit the go routine
201✔
1107
                // since JS can be enabled/disabled. The go routine will exit
201✔
1108
                // only if server is shutdown.
201✔
1109
                js := s.getJetStream()
201✔
1110
                if js == nil {
202✔
1111
                        // Reset head and tail here. Also drain the ipQueue.
1✔
1112
                        head, tail = nil, nil
1✔
1113
                        s.delayedAPIResponses.drain()
1✔
1114
                        // Fall back into next "if" that resets timer.
1✔
1115
                }
1✔
1116
                // If there are no delayed messages then delay the timer for
1117
                // a while.
1118
                if head == nil {
299✔
1119
                        tm.Reset(time.Hour)
98✔
1120
                        return
98✔
1121
                }
98✔
1122
                // Get the first expected message and then reset the timer.
1123
                r = head
103✔
1124
                js.mu.RLock()
103✔
1125
                if r.rg != nil && r.rg.node != nil {
104✔
1126
                        // If there's an attached Raft group to the delayed response
1✔
1127
                        // then pull out the quit channel, so that we don't bother
1✔
1128
                        // sending responses for entities which are now no longer
1✔
1129
                        // running.
1✔
1130
                        rqch = r.rg.node.QuitC()
1✔
1131
                }
1✔
1132
                js.mu.RUnlock()
103✔
1133
                tm.Reset(time.Until(r.deadline))
103✔
1134
        }
1135
        pop := func() {
7,187✔
1136
                if head == nil {
102✔
1137
                        return
×
1138
                }
×
1139
                head = head.next
102✔
1140
                if head == nil {
199✔
1141
                        tail = nil
97✔
1142
                }
97✔
1143
        }
1144
        for {
14,384✔
1145
                select {
7,299✔
1146
                case <-s.delayedAPIResponses.ch:
112✔
1147
                        v, ok := s.delayedAPIResponses.popOne()
112✔
1148
                        if !ok {
112✔
1149
                                continue
×
1150
                        }
1151
                        // Add it to the list, and if ends up being the head, set things up.
1152
                        addDelayedResponse(&head, &tail, v)
112✔
1153
                        if v == head {
211✔
1154
                                next()
99✔
1155
                        }
99✔
1156
                case <-s.quitCh:
7,084✔
1157
                        return
7,084✔
1158
                case <-rqch:
1✔
1159
                        // If we were the head, drop and setup things for next.
1✔
1160
                        if r != nil && r == head {
2✔
1161
                                pop()
1✔
1162
                        }
1✔
1163
                        next()
1✔
1164
                case <-tm.C:
101✔
1165
                        if r != nil {
202✔
1166
                                // If it's not a JS API error, send it as a raw response without additional API/audit tracking.
101✔
1167
                                if r.noJs {
137✔
1168
                                        s.sendInternalAccountMsgWithReply(r.acc, r.subject, _EMPTY_, r.hdr, r.response, false)
36✔
1169
                                } else {
101✔
1170
                                        s.sendAPIErrResponse(r.ci, r.acc, r.subject, r.reply, r.request, r.response)
65✔
1171
                                }
65✔
1172
                                pop()
101✔
1173
                        }
1174
                        next()
101✔
1175
                }
1176
        }
1177
}
1178

1179
func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup, duration time.Duration) {
76✔
1180
        s.delayedAPIResponses.push(&delayedAPIResponse{
76✔
1181
                ci, acc, subject, reply, request, nil, response, rg, time.Now().Add(duration), false, nil,
76✔
1182
        })
76✔
1183
}
76✔
1184

1185
func (s *Server) sendDelayedErrResponse(acc *Account, subject string, hdr []byte, response string, duration time.Duration) {
36✔
1186
        s.delayedAPIResponses.push(&delayedAPIResponse{
36✔
1187
                nil, acc, subject, _EMPTY_, _EMPTY_, hdr, response, nil, time.Now().Add(duration), true, nil,
36✔
1188
        })
36✔
1189
}
36✔
1190

1191
func (s *Server) getRequestInfo(c *client, raw []byte) (pci *ClientInfo, acc *Account, hdr, msg []byte, err error) {
103,691✔
1192
        hdr, msg = c.msgParts(raw)
103,691✔
1193
        var ci ClientInfo
103,691✔
1194

103,691✔
1195
        if len(hdr) > 0 {
207,304✔
1196
                if err := json.Unmarshal(sliceHeader(ClientInfoHdr, hdr), &ci); err != nil {
103,613✔
1197
                        return nil, nil, nil, nil, err
×
1198
                }
×
1199
        }
1200

1201
        if ci.Service != _EMPTY_ {
103,751✔
1202
                acc, _ = s.LookupAccount(ci.Service)
60✔
1203
        } else if ci.Account != _EMPTY_ {
207,244✔
1204
                acc, _ = s.LookupAccount(ci.Account)
103,553✔
1205
        } else {
103,631✔
1206
                // Direct $SYS access.
78✔
1207
                acc = c.acc
78✔
1208
                if acc == nil {
83✔
1209
                        acc = s.SystemAccount()
5✔
1210
                }
5✔
1211
        }
1212
        if acc == nil {
103,701✔
1213
                return nil, nil, nil, nil, ErrMissingAccount
10✔
1214
        }
10✔
1215
        return &ci, acc, hdr, msg, nil
103,681✔
1216
}
1217

1218
func (s *Server) unmarshalRequest(c *client, acc *Account, subject string, msg []byte, v any) error {
16,565✔
1219
        decoder := json.NewDecoder(bytes.NewReader(msg))
16,565✔
1220
        decoder.DisallowUnknownFields()
16,565✔
1221

16,565✔
1222
        for {
49,689✔
1223
                if err := decoder.Decode(v); err != nil {
49,689✔
1224
                        if err == io.EOF {
33,124✔
1225
                                return nil
16,559✔
1226
                        }
16,559✔
1227

1228
                        var syntaxErr *json.SyntaxError
6✔
1229
                        if errors.As(err, &syntaxErr) {
6✔
1230
                                err = fmt.Errorf("%w at offset %d", err, syntaxErr.Offset)
×
1231
                        }
×
1232

1233
                        c.RateLimitWarnf("Invalid JetStream request '%s > %s': %s", acc, subject, err)
6✔
1234

6✔
1235
                        if s.JetStreamConfig().Strict {
12✔
1236
                                return err
6✔
1237
                        }
6✔
1238

1239
                        return json.Unmarshal(msg, v)
×
1240
                }
1241
        }
1242
}
1243

1244
func (a *Account) trackAPI() {
51,936✔
1245
        a.mu.RLock()
51,936✔
1246
        jsa := a.js
51,936✔
1247
        a.mu.RUnlock()
51,936✔
1248
        if jsa != nil {
103,791✔
1249
                jsa.usageMu.Lock()
51,855✔
1250
                jsa.usageApi++
51,855✔
1251
                jsa.apiTotal++
51,855✔
1252
                jsa.sendClusterUsageUpdate()
51,855✔
1253
                atomic.AddInt64(&jsa.js.apiTotal, 1)
51,855✔
1254
                jsa.usageMu.Unlock()
51,855✔
1255
        }
51,855✔
1256
}
1257

1258
func (a *Account) trackAPIErr() {
18,423✔
1259
        a.mu.RLock()
18,423✔
1260
        jsa := a.js
18,423✔
1261
        a.mu.RUnlock()
18,423✔
1262
        if jsa != nil {
36,624✔
1263
                jsa.usageMu.Lock()
18,201✔
1264
                jsa.usageApi++
18,201✔
1265
                jsa.apiTotal++
18,201✔
1266
                jsa.usageErr++
18,201✔
1267
                jsa.apiErrors++
18,201✔
1268
                jsa.sendClusterUsageUpdate()
18,201✔
1269
                atomic.AddInt64(&jsa.js.apiTotal, 1)
18,201✔
1270
                atomic.AddInt64(&jsa.js.apiErrors, 1)
18,201✔
1271
                jsa.usageMu.Unlock()
18,201✔
1272
        }
18,201✔
1273
}
1274

1275
const badAPIRequestT = "Malformed JetStream API Request: %q"
1276

1277
// Helper function to check on JetStream being enabled but also on status of leafnodes
1278
// If the local account is not enabled but does have leafnode connectivity we will not
1279
// want to error immediately and let the other side decide.
1280
func (a *Account) checkJetStream() (enabled, shouldError bool) {
43,517✔
1281
        a.mu.RLock()
43,517✔
1282
        defer a.mu.RUnlock()
43,517✔
1283
        return a.js != nil, a.nleafs+a.nrleafs == 0
43,517✔
1284
}
43,517✔
1285

1286
// Request for current usage and limits for this account.
1287
func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
496✔
1288
        if c == nil || !s.JetStreamEnabled() {
496✔
1289
                return
×
1290
        }
×
1291

1292
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
496✔
1293
        if err != nil {
497✔
1294
                s.Warnf(badAPIRequestT, msg)
1✔
1295
                return
1✔
1296
        }
1✔
1297

1298
        var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}}
495✔
1299
        if errorOnRequiredApiLevel(hdr) {
496✔
1300
                resp.Error = NewJSRequiredApiLevelError()
1✔
1301
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1302
                return
1✔
1303
        }
1✔
1304

1305
        // Determine if we should proceed here when we are in clustered mode.
1306
        if s.JetStreamIsClustered() {
914✔
1307
                js, cc := s.getJetStreamCluster()
420✔
1308
                if js == nil || cc == nil {
420✔
1309
                        return
×
1310
                }
×
1311
                if js.isLeaderless() {
421✔
1312
                        resp.Error = NewJSClusterNotAvailError()
1✔
1313
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1314
                        return
1✔
1315
                }
1✔
1316
                // Make sure we are meta leader.
1317
                if !s.JetStreamIsLeader() {
708✔
1318
                        return
289✔
1319
                }
289✔
1320
        }
1321

1322
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
210✔
1323
                if !doErr {
7✔
1324
                        return
1✔
1325
                }
1✔
1326
                resp.Error = NewJSNotEnabledForAccountError()
5✔
1327
        } else {
198✔
1328
                stats := acc.JetStreamUsage()
198✔
1329
                resp.JetStreamAccountStats = &stats
198✔
1330
        }
198✔
1331
        b, err := json.Marshal(resp)
203✔
1332
        if err != nil {
203✔
1333
                return
×
1334
        }
×
1335

1336
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), string(b))
203✔
1337
}
1338

1339
// Helpers for token extraction.
1340
func templateNameFromSubject(subject string) string {
6✔
1341
        return tokenAt(subject, 6)
6✔
1342
}
6✔
1343

1344
func streamNameFromSubject(subject string) string {
80,465✔
1345
        return tokenAt(subject, 5)
80,465✔
1346
}
80,465✔
1347

1348
func consumerNameFromSubject(subject string) string {
48,294✔
1349
        return tokenAt(subject, 6)
48,294✔
1350
}
48,294✔
1351

1352
// Request to create a new template.
1353
// Deprecated: stream templates are deprecated and will be removed in a future version.
1354
func (s *Server) jsTemplateCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
7✔
1355
        if c == nil {
7✔
1356
                return
×
1357
        }
×
1358
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
7✔
1359
        if err != nil {
7✔
1360
                s.Warnf(badAPIRequestT, msg)
×
1361
                return
×
1362
        }
×
1363

1364
        var resp = JSApiStreamTemplateCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateCreateResponseType}}
7✔
1365
        if errorOnRequiredApiLevel(hdr) {
8✔
1366
                resp.Error = NewJSRequiredApiLevelError()
1✔
1367
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1368
                return
1✔
1369
        }
1✔
1370
        if !acc.JetStreamEnabled() {
6✔
1371
                resp.Error = NewJSNotEnabledForAccountError()
×
1372
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1373
                return
×
1374
        }
×
1375

1376
        // Not supported for now.
1377
        if s.JetStreamIsClustered() {
9✔
1378
                resp.Error = NewJSClusterUnSupportFeatureError()
3✔
1379
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
1380
                return
3✔
1381
        }
3✔
1382

1383
        var cfg StreamTemplateConfig
3✔
1384
        if err := s.unmarshalRequest(c, acc, subject, msg, &cfg); err != nil {
3✔
1385
                resp.Error = NewJSInvalidJSONError(err)
×
1386
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1387
                return
×
1388
        }
×
1389
        templateName := templateNameFromSubject(subject)
3✔
1390
        if templateName != cfg.Name {
4✔
1391
                resp.Error = NewJSTemplateNameNotMatchSubjectError()
1✔
1392
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1393
                return
1✔
1394
        }
1✔
1395

1396
        t, err := acc.addStreamTemplate(&cfg)
2✔
1397
        if err != nil {
2✔
1398
                resp.Error = NewJSStreamTemplateCreateError(err, Unless(err))
×
1399
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1400
                return
×
1401
        }
×
1402
        t.mu.Lock()
2✔
1403
        tcfg := t.StreamTemplateConfig.deepCopy()
2✔
1404
        streams := t.streams
2✔
1405
        if streams == nil {
4✔
1406
                streams = []string{}
2✔
1407
        }
2✔
1408
        t.mu.Unlock()
2✔
1409
        resp.StreamTemplateInfo = &StreamTemplateInfo{Config: tcfg, Streams: streams}
2✔
1410
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
2✔
1411
}
1412

1413
// Request for the list of all template names.
1414
// Deprecated: stream templates are deprecated and will be removed in a future version.
1415
func (s *Server) jsTemplateNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
6✔
1416
        if c == nil {
6✔
1417
                return
×
1418
        }
×
1419
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
6✔
1420
        if err != nil {
6✔
1421
                s.Warnf(badAPIRequestT, msg)
×
1422
                return
×
1423
        }
×
1424

1425
        var resp = JSApiStreamTemplateNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateNamesResponseType}}
6✔
1426
        if errorOnRequiredApiLevel(hdr) {
7✔
1427
                resp.Error = NewJSRequiredApiLevelError()
1✔
1428
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1429
                return
1✔
1430
        }
1✔
1431
        if !acc.JetStreamEnabled() {
5✔
1432
                resp.Error = NewJSNotEnabledForAccountError()
×
1433
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1434
                return
×
1435
        }
×
1436

1437
        // Not supported for now.
1438
        if s.JetStreamIsClustered() {
8✔
1439
                resp.Error = NewJSClusterUnSupportFeatureError()
3✔
1440
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
1441
                return
3✔
1442
        }
3✔
1443

1444
        var offset int
2✔
1445
        if isJSONObjectOrArray(msg) {
2✔
1446
                var req JSApiStreamTemplatesRequest
×
1447
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
×
1448
                        resp.Error = NewJSInvalidJSONError(err)
×
1449
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1450
                        return
×
1451
                }
×
1452
                offset = req.Offset
×
1453
        }
1454

1455
        ts := acc.templates()
2✔
1456
        slices.SortFunc(ts, func(i, j *streamTemplate) int {
3✔
1457
                return cmp.Compare(i.StreamTemplateConfig.Name, j.StreamTemplateConfig.Name)
1✔
1458
        })
1✔
1459

1460
        tcnt := len(ts)
2✔
1461
        if offset > tcnt {
2✔
1462
                offset = tcnt
×
1463
        }
×
1464

1465
        for _, t := range ts[offset:] {
5✔
1466
                t.mu.Lock()
3✔
1467
                name := t.Name
3✔
1468
                t.mu.Unlock()
3✔
1469
                resp.Templates = append(resp.Templates, name)
3✔
1470
                if len(resp.Templates) >= JSApiNamesLimit {
3✔
1471
                        break
×
1472
                }
1473
        }
1474
        resp.Total = tcnt
2✔
1475
        resp.Limit = JSApiNamesLimit
2✔
1476
        resp.Offset = offset
2✔
1477
        if resp.Templates == nil {
2✔
1478
                resp.Templates = []string{}
×
1479
        }
×
1480
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
2✔
1481
}
1482

1483
// Request for information about a stream template.
1484
// Deprecated: stream templates are deprecated and will be removed in a future version.
1485
func (s *Server) jsTemplateInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
2✔
1486
        if c == nil {
2✔
1487
                return
×
1488
        }
×
1489
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
2✔
1490
        if err != nil {
2✔
1491
                s.Warnf(badAPIRequestT, msg)
×
1492
                return
×
1493
        }
×
1494

1495
        var resp = JSApiStreamTemplateInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateInfoResponseType}}
2✔
1496
        if errorOnRequiredApiLevel(hdr) {
3✔
1497
                resp.Error = NewJSRequiredApiLevelError()
1✔
1498
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1499
                return
1✔
1500
        }
1✔
1501
        if !acc.JetStreamEnabled() {
1✔
1502
                resp.Error = NewJSNotEnabledForAccountError()
×
1503
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1504
                return
×
1505
        }
×
1506
        if !isEmptyRequest(msg) {
1✔
1507
                resp.Error = NewJSNotEmptyRequestError()
×
1508
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1509
                return
×
1510
        }
×
1511
        name := templateNameFromSubject(subject)
1✔
1512
        t, err := acc.lookupStreamTemplate(name)
1✔
1513
        if err != nil {
1✔
1514
                resp.Error = NewJSStreamTemplateNotFoundError()
×
1515
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1516
                return
×
1517
        }
×
1518
        t.mu.Lock()
1✔
1519
        cfg := t.StreamTemplateConfig.deepCopy()
1✔
1520
        streams := t.streams
1✔
1521
        if streams == nil {
1✔
1522
                streams = []string{}
×
1523
        }
×
1524
        t.mu.Unlock()
1✔
1525

1✔
1526
        resp.StreamTemplateInfo = &StreamTemplateInfo{Config: cfg, Streams: streams}
1✔
1527
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1✔
1528
}
1529

1530
// Request to delete a stream template.
1531
// Deprecated: stream templates are deprecated and will be removed in a future version.
1532
func (s *Server) jsTemplateDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
3✔
1533
        if c == nil {
3✔
1534
                return
×
1535
        }
×
1536
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
3✔
1537
        if err != nil {
3✔
1538
                s.Warnf(badAPIRequestT, msg)
×
1539
                return
×
1540
        }
×
1541

1542
        var resp = JSApiStreamTemplateDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateDeleteResponseType}}
3✔
1543
        if errorOnRequiredApiLevel(hdr) {
4✔
1544
                resp.Error = NewJSRequiredApiLevelError()
1✔
1545
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1546
                return
1✔
1547
        }
1✔
1548
        if !acc.JetStreamEnabled() {
2✔
1549
                resp.Error = NewJSNotEnabledForAccountError()
×
1550
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1551
                return
×
1552
        }
×
1553
        if !isEmptyRequest(msg) {
2✔
1554
                resp.Error = NewJSNotEmptyRequestError()
×
1555
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1556
                return
×
1557
        }
×
1558
        name := templateNameFromSubject(subject)
2✔
1559
        err = acc.deleteStreamTemplate(name)
2✔
1560
        if err != nil {
3✔
1561
                resp.Error = NewJSStreamTemplateDeleteError(err, Unless(err))
1✔
1562
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1563
                return
1✔
1564
        }
1✔
1565
        resp.Success = true
1✔
1566
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1✔
1567
}
1568

1569
func (s *Server) jsonResponse(v any) string {
70,916✔
1570
        b, err := json.Marshal(v)
70,916✔
1571
        if err != nil {
70,916✔
1572
                s.Warnf("Problem marshaling JSON for JetStream API:", err)
×
1573
                return ""
×
1574
        }
×
1575
        return string(b)
70,916✔
1576
}
1577

1578
// Read lock must be held
1579
func (jsa *jsAccount) tieredReservation(tier string, cfg *StreamConfig) int64 {
3,562✔
1580
        reservation := int64(0)
3,562✔
1581
        if tier == _EMPTY_ {
7,102✔
1582
                for _, sa := range jsa.streams {
22,149✔
1583
                        if sa.cfg.MaxBytes > 0 {
18,636✔
1584
                                if sa.cfg.Storage == cfg.Storage && sa.cfg.Name != cfg.Name {
27✔
1585
                                        reservation += (int64(sa.cfg.Replicas) * sa.cfg.MaxBytes)
×
1586
                                }
×
1587
                        }
1588
                }
1589
        } else {
22✔
1590
                for _, sa := range jsa.streams {
37✔
1591
                        if sa.cfg.Replicas == cfg.Replicas {
29✔
1592
                                if sa.cfg.MaxBytes > 0 {
19✔
1593
                                        if isSameTier(&sa.cfg, cfg) && sa.cfg.Name != cfg.Name {
10✔
1594
                                                reservation += (int64(sa.cfg.Replicas) * sa.cfg.MaxBytes)
5✔
1595
                                        }
5✔
1596
                                }
1597
                        }
1598
                }
1599
        }
1600
        return reservation
3,562✔
1601
}
1602

1603
// Request to create a stream.
1604
func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
7,851✔
1605
        if c == nil || !s.JetStreamEnabled() {
8,122✔
1606
                return
271✔
1607
        }
271✔
1608
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
7,580✔
1609
        if err != nil {
7,583✔
1610
                s.Warnf(badAPIRequestT, msg)
3✔
1611
                return
3✔
1612
        }
3✔
1613

1614
        var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
7,577✔
1615
        if errorOnRequiredApiLevel(hdr) {
7,578✔
1616
                resp.Error = NewJSRequiredApiLevelError()
1✔
1617
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1618
                return
1✔
1619
        }
1✔
1620

1621
        // Determine if we should proceed here when we are in clustered mode.
1622
        if s.JetStreamIsClustered() {
13,900✔
1623
                js, cc := s.getJetStreamCluster()
6,324✔
1624
                if js == nil || cc == nil {
6,324✔
1625
                        return
×
1626
                }
×
1627
                if js.isLeaderless() {
6,325✔
1628
                        resp.Error = NewJSClusterNotAvailError()
1✔
1629
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1630
                        return
1✔
1631
                }
1✔
1632
                // Make sure we are meta leader.
1633
                if !s.JetStreamIsLeader() {
10,725✔
1634
                        return
4,402✔
1635
                }
4,402✔
1636
        }
1637

1638
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
3,181✔
1639
                if doErr {
8✔
1640
                        resp.Error = NewJSNotEnabledForAccountError()
×
1641
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1642
                }
×
1643
                return
8✔
1644
        }
1645

1646
        var cfg StreamConfigRequest
3,165✔
1647
        if err := s.unmarshalRequest(c, acc, subject, msg, &cfg); err != nil {
3,166✔
1648
                resp.Error = NewJSInvalidJSONError(err)
1✔
1649
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1650
                return
1✔
1651
        }
1✔
1652

1653
        // Initialize asset version metadata.
1654
        setStaticStreamMetadata(&cfg.StreamConfig)
3,164✔
1655

3,164✔
1656
        streamName := streamNameFromSubject(subject)
3,164✔
1657
        if streamName != cfg.Name {
3,165✔
1658
                resp.Error = NewJSStreamMismatchError()
1✔
1659
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1660
                return
1✔
1661
        }
1✔
1662

1663
        // Check for path like separators in the name.
1664
        if strings.ContainsAny(streamName, `\/`) {
3,165✔
1665
                resp.Error = NewJSStreamNameContainsPathSeparatorsError()
2✔
1666
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1667
                return
2✔
1668
        }
2✔
1669

1670
        // Can't create a stream with a sealed state.
1671
        if cfg.Sealed {
3,163✔
1672
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed"))
2✔
1673
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1674
                return
2✔
1675
        }
2✔
1676

1677
        // If we are told to do mirror direct but are not mirroring, error.
1678
        if cfg.MirrorDirect && cfg.Mirror == nil {
3,159✔
1679
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream has no mirror but does have mirror direct"))
×
1680
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1681
                return
×
1682
        }
×
1683

1684
        // Hand off to cluster for processing.
1685
        if s.JetStreamIsClustered() {
5,078✔
1686
                s.jsClusteredStreamRequest(ci, acc, subject, reply, rmsg, &cfg)
1,919✔
1687
                return
1,919✔
1688
        }
1,919✔
1689

1690
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg.StreamConfig); err != nil {
1,242✔
1691
                resp.Error = err
2✔
1692
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1693
                return
2✔
1694
        }
2✔
1695

1696
        mset, err := acc.addStreamPedantic(&cfg.StreamConfig, cfg.Pedantic)
1,238✔
1697
        if err != nil {
1,288✔
1698
                if IsNatsErr(err, JSStreamStoreFailedF) {
50✔
1699
                        s.Warnf("Stream create failed for '%s > %s': %v", acc, streamName, err)
×
1700
                        err = errStreamStoreFailed
×
1701
                }
×
1702
                resp.Error = NewJSStreamCreateError(err, Unless(err))
50✔
1703
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
50✔
1704
                return
50✔
1705
        }
1706
        msetCfg := mset.config()
1,188✔
1707
        resp.StreamInfo = &StreamInfo{
1,188✔
1708
                Created:   mset.createdTime(),
1,188✔
1709
                State:     mset.state(),
1,188✔
1710
                Config:    *setDynamicStreamMetadata(&msetCfg),
1,188✔
1711
                TimeStamp: time.Now().UTC(),
1,188✔
1712
                Mirror:    mset.mirrorInfo(),
1,188✔
1713
                Sources:   mset.sourcesInfo(),
1,188✔
1714
        }
1,188✔
1715
        resp.DidCreate = true
1,188✔
1716
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,188✔
1717
}
1718

1719
// Request to update a stream.
1720
func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
826✔
1721
        if c == nil || !s.JetStreamEnabled() {
826✔
1722
                return
×
1723
        }
×
1724

1725
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
826✔
1726
        if err != nil {
826✔
1727
                s.Warnf(badAPIRequestT, msg)
×
1728
                return
×
1729
        }
×
1730

1731
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
826✔
1732
        if errorOnRequiredApiLevel(hdr) {
827✔
1733
                resp.Error = NewJSRequiredApiLevelError()
1✔
1734
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1735
                return
1✔
1736
        }
1✔
1737

1738
        // Determine if we should proceed here when we are in clustered mode.
1739
        if s.JetStreamIsClustered() {
1,559✔
1740
                js, cc := s.getJetStreamCluster()
734✔
1741
                if js == nil || cc == nil {
734✔
1742
                        return
×
1743
                }
×
1744
                if js.isLeaderless() {
736✔
1745
                        resp.Error = NewJSClusterNotAvailError()
2✔
1746
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1747
                        return
2✔
1748
                }
2✔
1749
                // Make sure we are meta leader.
1750
                if !s.JetStreamIsLeader() {
1,290✔
1751
                        return
558✔
1752
                }
558✔
1753
        }
1754

1755
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
265✔
1756
                if doErr {
×
1757
                        resp.Error = NewJSNotEnabledForAccountError()
×
1758
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1759
                }
×
1760
                return
×
1761
        }
1762
        var ncfg StreamConfigRequest
265✔
1763
        if err := s.unmarshalRequest(c, acc, subject, msg, &ncfg); err != nil {
266✔
1764
                resp.Error = NewJSInvalidJSONError(err)
1✔
1765
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1766
                return
1✔
1767
        }
1✔
1768

1769
        cfg, apiErr := s.checkStreamCfg(&ncfg.StreamConfig, acc, ncfg.Pedantic)
264✔
1770
        if apiErr != nil {
282✔
1771
                resp.Error = apiErr
18✔
1772
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
18✔
1773
                return
18✔
1774
        }
18✔
1775

1776
        streamName := streamNameFromSubject(subject)
246✔
1777
        if streamName != cfg.Name {
247✔
1778
                resp.Error = NewJSStreamMismatchError()
1✔
1779
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1780
                return
1✔
1781
        }
1✔
1782

1783
        // Handle clustered version here.
1784
        if s.JetStreamIsClustered() {
414✔
1785
                s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
169✔
1786
                return
169✔
1787
        }
169✔
1788

1789
        mset, err := acc.lookupStream(streamName)
76✔
1790
        if err != nil {
80✔
1791
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
1792
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
1793
                return
4✔
1794
        }
4✔
1795
        if mset.offlineReason != _EMPTY_ {
72✔
1796
                resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason))
×
1797
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
1798
                return
×
1799
        }
×
1800

1801
        // Update asset version metadata.
1802
        setStaticStreamMetadata(&cfg)
72✔
1803

72✔
1804
        if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil {
84✔
1805
                resp.Error = NewJSStreamUpdateError(err, Unless(err))
12✔
1806
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
12✔
1807
                return
12✔
1808
        }
12✔
1809

1810
        msetCfg := mset.config()
60✔
1811
        resp.StreamInfo = &StreamInfo{
60✔
1812
                Created:   mset.createdTime(),
60✔
1813
                State:     mset.state(),
60✔
1814
                Config:    *setDynamicStreamMetadata(&msetCfg),
60✔
1815
                Domain:    s.getOpts().JetStreamDomain,
60✔
1816
                Mirror:    mset.mirrorInfo(),
60✔
1817
                Sources:   mset.sourcesInfo(),
60✔
1818
                TimeStamp: time.Now().UTC(),
60✔
1819
        }
60✔
1820
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
60✔
1821
}
1822

1823
// Request for the list of all stream names.
1824
func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,288✔
1825
        if c == nil || !s.JetStreamEnabled() {
1,288✔
1826
                return
×
1827
        }
×
1828
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
1,288✔
1829
        if err != nil {
1,288✔
1830
                s.Warnf(badAPIRequestT, msg)
×
1831
                return
×
1832
        }
×
1833

1834
        var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}}
1,288✔
1835
        if errorOnRequiredApiLevel(hdr) {
1,289✔
1836
                resp.Error = NewJSRequiredApiLevelError()
1✔
1837
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1838
                return
1✔
1839
        }
1✔
1840

1841
        // Determine if we should proceed here when we are in clustered mode.
1842
        if s.JetStreamIsClustered() {
2,319✔
1843
                js, cc := s.getJetStreamCluster()
1,032✔
1844
                if js == nil || cc == nil {
1,032✔
1845
                        return
×
1846
                }
×
1847
                if js.isLeaderless() {
1,032✔
1848
                        resp.Error = NewJSClusterNotAvailError()
×
1849
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1850
                        return
×
1851
                }
×
1852
                // Make sure we are meta leader.
1853
                if !s.JetStreamIsLeader() {
1,787✔
1854
                        return
755✔
1855
                }
755✔
1856
        }
1857

1858
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
533✔
1859
                if doErr {
1✔
1860
                        resp.Error = NewJSNotEnabledForAccountError()
×
1861
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1862
                }
×
1863
                return
1✔
1864
        }
1865

1866
        var offset int
531✔
1867
        var filter string
531✔
1868

531✔
1869
        if isJSONObjectOrArray(msg) {
900✔
1870
                var req JSApiStreamNamesRequest
369✔
1871
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
369✔
1872
                        resp.Error = NewJSInvalidJSONError(err)
×
1873
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1874
                        return
×
1875
                }
×
1876
                offset = req.Offset
369✔
1877
                if req.Subject != _EMPTY_ {
714✔
1878
                        filter = req.Subject
345✔
1879
                }
345✔
1880
        }
1881

1882
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1883
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1884
        var numStreams int
531✔
1885
        if s.JetStreamIsClustered() {
808✔
1886
                js, cc := s.getJetStreamCluster()
277✔
1887
                if js == nil || cc == nil {
277✔
1888
                        // TODO(dlc) - Debug or Warn?
×
1889
                        return
×
1890
                }
×
1891
                js.mu.RLock()
277✔
1892
                for stream, sa := range cc.streams[acc.Name] {
602✔
1893
                        if IsNatsErr(sa.err, JSClusterNotAssignedErr) {
325✔
1894
                                continue
×
1895
                        }
1896
                        if filter != _EMPTY_ {
607✔
1897
                                // These could not have subjects auto-filled in since they are raw and unprocessed.
282✔
1898
                                if len(sa.Config.Subjects) == 0 {
284✔
1899
                                        if SubjectsCollide(filter, sa.Config.Name) {
2✔
1900
                                                resp.Streams = append(resp.Streams, stream)
×
1901
                                        }
×
1902
                                } else {
280✔
1903
                                        for _, subj := range sa.Config.Subjects {
560✔
1904
                                                if SubjectsCollide(filter, subj) {
505✔
1905
                                                        resp.Streams = append(resp.Streams, stream)
225✔
1906
                                                        break
225✔
1907
                                                }
1908
                                        }
1909
                                }
1910
                        } else {
43✔
1911
                                resp.Streams = append(resp.Streams, stream)
43✔
1912
                        }
43✔
1913
                }
1914
                js.mu.RUnlock()
277✔
1915
                if len(resp.Streams) > 1 {
279✔
1916
                        slices.Sort(resp.Streams)
2✔
1917
                }
2✔
1918
                numStreams = len(resp.Streams)
277✔
1919
                if offset > numStreams {
277✔
1920
                        offset = numStreams
×
1921
                }
×
1922
                if offset > 0 {
277✔
1923
                        resp.Streams = resp.Streams[offset:]
×
1924
                }
×
1925
                if len(resp.Streams) > JSApiNamesLimit {
277✔
1926
                        resp.Streams = resp.Streams[:JSApiNamesLimit]
×
1927
                }
×
1928
        } else {
254✔
1929
                msets := acc.filteredStreams(filter)
254✔
1930
                // Since we page results order matters.
254✔
1931
                if len(msets) > 1 {
260✔
1932
                        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
16✔
1933
                }
1934

1935
                numStreams = len(msets)
254✔
1936
                if offset > numStreams {
254✔
1937
                        offset = numStreams
×
1938
                }
×
1939

1940
                for _, mset := range msets[offset:] {
518✔
1941
                        resp.Streams = append(resp.Streams, mset.cfg.Name)
264✔
1942
                        if len(resp.Streams) >= JSApiNamesLimit {
264✔
1943
                                break
×
1944
                        }
1945
                }
1946
        }
1947
        resp.Total = numStreams
531✔
1948
        resp.Limit = JSApiNamesLimit
531✔
1949
        resp.Offset = offset
531✔
1950

531✔
1951
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
531✔
1952
}
1953

1954
// Request for the list of all detailed stream info.
1955
// TODO(dlc) - combine with above long term
1956
func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
67✔
1957
        if c == nil || !s.JetStreamEnabled() {
67✔
1958
                return
×
1959
        }
×
1960
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
67✔
1961
        if err != nil {
67✔
1962
                s.Warnf(badAPIRequestT, msg)
×
1963
                return
×
1964
        }
×
1965

1966
        var resp = JSApiStreamListResponse{
67✔
1967
                ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
67✔
1968
                Streams:     []*StreamInfo{},
67✔
1969
        }
67✔
1970
        if errorOnRequiredApiLevel(hdr) {
68✔
1971
                resp.Error = NewJSRequiredApiLevelError()
1✔
1972
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1973
                return
1✔
1974
        }
1✔
1975

1976
        // Determine if we should proceed here when we are in clustered mode.
1977
        if s.JetStreamIsClustered() {
127✔
1978
                js, cc := s.getJetStreamCluster()
61✔
1979
                if js == nil || cc == nil {
61✔
1980
                        return
×
1981
                }
×
1982
                if js.isLeaderless() {
62✔
1983
                        resp.Error = NewJSClusterNotAvailError()
1✔
1984
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1985
                        return
1✔
1986
                }
1✔
1987
                // Make sure we are meta leader.
1988
                if !s.JetStreamIsLeader() {
103✔
1989
                        return
43✔
1990
                }
43✔
1991
        }
1992

1993
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
22✔
1994
                if doErr {
×
1995
                        resp.Error = NewJSNotEnabledForAccountError()
×
1996
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1997
                }
×
1998
                return
×
1999
        }
2000

2001
        var offset int
22✔
2002
        var filter string
22✔
2003

22✔
2004
        if isJSONObjectOrArray(msg) {
33✔
2005
                var req JSApiStreamListRequest
11✔
2006
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
2007
                        resp.Error = NewJSInvalidJSONError(err)
1✔
2008
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2009
                        return
1✔
2010
                }
1✔
2011
                offset = req.Offset
10✔
2012
                if req.Subject != _EMPTY_ {
12✔
2013
                        filter = req.Subject
2✔
2014
                }
2✔
2015
        }
2016

2017
        // Clustered mode will invoke a scatter and gather.
2018
        if s.JetStreamIsClustered() {
38✔
2019
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
17✔
2020
                msg = copyBytes(msg)
17✔
2021
                s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) })
34✔
2022
                return
17✔
2023
        }
2024

2025
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
2026
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
2027
        var msets []*stream
4✔
2028
        if filter == _EMPTY_ {
7✔
2029
                msets = acc.streams()
3✔
2030
        } else {
4✔
2031
                msets = acc.filteredStreams(filter)
1✔
2032
        }
1✔
2033

2034
        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
5✔
2035

2036
        scnt := len(msets)
4✔
2037
        if offset > scnt {
4✔
2038
                offset = scnt
×
2039
        }
×
2040

2041
        var missingNames []string
4✔
2042
        for _, mset := range msets[offset:] {
9✔
2043
                if mset.offlineReason != _EMPTY_ {
6✔
2044
                        if resp.Offline == nil {
2✔
2045
                                resp.Offline = make(map[string]string, 1)
1✔
2046
                        }
1✔
2047
                        resp.Offline[mset.getCfgName()] = mset.offlineReason
1✔
2048
                        missingNames = append(missingNames, mset.getCfgName())
1✔
2049
                        continue
1✔
2050
                }
2051

2052
                config := mset.config()
4✔
2053
                resp.Streams = append(resp.Streams, &StreamInfo{
4✔
2054
                        Created:   mset.createdTime(),
4✔
2055
                        State:     mset.state(),
4✔
2056
                        Config:    config,
4✔
2057
                        Domain:    s.getOpts().JetStreamDomain,
4✔
2058
                        Mirror:    mset.mirrorInfo(),
4✔
2059
                        Sources:   mset.sourcesInfo(),
4✔
2060
                        TimeStamp: time.Now().UTC(),
4✔
2061
                })
4✔
2062
                if len(resp.Streams) >= JSApiListLimit {
4✔
2063
                        break
×
2064
                }
2065
        }
2066
        resp.Total = scnt
4✔
2067
        resp.Limit = JSApiListLimit
4✔
2068
        resp.Offset = offset
4✔
2069
        resp.Missing = missingNames
4✔
2070
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
4✔
2071
}
2072

2073
// Request for information about a stream.
2074
func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
27,203✔
2075
        if c == nil || !s.JetStreamEnabled() {
27,210✔
2076
                return
7✔
2077
        }
7✔
2078
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
27,196✔
2079
        if err != nil {
27,201✔
2080
                s.Warnf(badAPIRequestT, msg)
5✔
2081
                return
5✔
2082
        }
5✔
2083

2084
        streamName := streamNameFromSubject(subject)
27,191✔
2085

27,191✔
2086
        var resp = JSApiStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamInfoResponseType}}
27,191✔
2087

27,191✔
2088
        // If someone creates a duplicate stream that is identical we will get this request forwarded to us.
27,191✔
2089
        // Make sure the response type is for a create call.
27,191✔
2090
        if rt := getHeader(JSResponseType, hdr); len(rt) > 0 && string(rt) == jsCreateResponse {
27,191✔
2091
                resp.ApiResponse.Type = JSApiStreamCreateResponseType
×
2092
        }
×
2093
        if errorOnRequiredApiLevel(hdr) {
27,192✔
2094
                resp.Error = NewJSRequiredApiLevelError()
1✔
2095
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2096
                return
1✔
2097
        }
1✔
2098

2099
        var clusterWideConsCount int
27,190✔
2100

27,190✔
2101
        js, cc := s.getJetStreamCluster()
27,190✔
2102
        if js == nil {
27,190✔
2103
                return
×
2104
        }
×
2105
        // If we are in clustered mode we need to be the stream leader to proceed.
2106
        if cc != nil {
38,933✔
2107
                // Check to make sure the stream is assigned.
11,743✔
2108
                js.mu.RLock()
11,743✔
2109
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName)
11,743✔
2110
                var offline bool
11,743✔
2111
                if sa != nil {
22,269✔
2112
                        clusterWideConsCount = len(sa.consumers)
10,526✔
2113
                        offline = s.allPeersOffline(sa.Group)
10,526✔
2114
                        if sa.unsupported != nil && sa.Group != nil && cc.meta != nil && sa.Group.isMember(cc.meta.ID()) {
10,556✔
2115
                                // If we're a member for this stream, and it's not supported, report it as offline.
30✔
2116
                                resp.Error = NewJSStreamOfflineReasonError(errors.New(sa.unsupported.reason))
30✔
2117
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
30✔
2118
                                js.mu.RUnlock()
30✔
2119
                                return
30✔
2120
                        }
30✔
2121
                }
2122
                js.mu.RUnlock()
11,713✔
2123

11,713✔
2124
                if isLeader && sa == nil {
11,975✔
2125
                        // We can't find the stream, so mimic what would be the errors below.
262✔
2126
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
262✔
2127
                                if doErr {
×
2128
                                        resp.Error = NewJSNotEnabledForAccountError()
×
2129
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2130
                                }
×
2131
                                return
×
2132
                        }
2133
                        // No stream present.
2134
                        resp.Error = NewJSStreamNotFoundError()
262✔
2135
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
262✔
2136
                        return
262✔
2137
                } else if sa == nil {
12,406✔
2138
                        if js.isLeaderless() {
955✔
2139
                                resp.Error = NewJSClusterNotAvailError()
×
2140
                                // Delaying an error response gives the leader a chance to respond before us
×
2141
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
2142
                        }
×
2143
                        return
955✔
2144
                } else if isLeader && offline {
10,498✔
2145
                        resp.Error = NewJSStreamOfflineError()
2✔
2146
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
2147
                        return
2✔
2148
                }
2✔
2149

2150
                // Check to see if we are a member of the group and if the group has no leader.
2151
                isLeaderless := js.isGroupLeaderless(sa.Group)
10,494✔
2152

10,494✔
2153
                // We have the stream assigned and a leader, so only the stream leader should answer.
10,494✔
2154
                if !acc.JetStreamIsStreamLeader(streamName) && !isLeaderless {
19,072✔
2155
                        if js.isLeaderless() {
8,578✔
2156
                                resp.Error = NewJSClusterNotAvailError()
×
2157
                                // Delaying an error response gives the leader a chance to respond before us
×
2158
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), sa.Group, errRespDelay)
×
2159
                                return
×
2160
                        }
×
2161

2162
                        // We may be in process of electing a leader, but if this is a scale up from 1 we will still be the state leader
2163
                        // while the new members work through the election and catchup process.
2164
                        // Double check for that instead of exiting here and being silent. e.g. nats stream update test --replicas=3
2165
                        js.mu.RLock()
8,578✔
2166
                        rg := sa.Group
8,578✔
2167
                        var ourID string
8,578✔
2168
                        if cc.meta != nil {
17,156✔
2169
                                ourID = cc.meta.ID()
8,578✔
2170
                        }
8,578✔
2171
                        // We have seen cases where rg is nil at this point,
2172
                        // so check explicitly and bail if that is the case.
2173
                        bail := rg == nil || !rg.isMember(ourID)
8,578✔
2174
                        if !bail {
11,293✔
2175
                                // We know we are a member here, if this group is new and we are preferred allow us to answer.
2,715✔
2176
                                // Also, we have seen cases where rg.node is nil at this point,
2,715✔
2177
                                // so check explicitly and bail if that is the case.
2,715✔
2178
                                bail = rg.Preferred != ourID || (rg.node != nil && time.Since(rg.node.Created()) > lostQuorumIntervalDefault)
2,715✔
2179
                        }
2,715✔
2180
                        js.mu.RUnlock()
8,578✔
2181
                        if bail {
17,142✔
2182
                                return
8,564✔
2183
                        }
8,564✔
2184
                }
2185
        }
2186

2187
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
17,383✔
2188
                if doErr {
7✔
2189
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
2190
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2191
                }
1✔
2192
                return
6✔
2193
        }
2194

2195
        var details bool
17,371✔
2196
        var subjects string
17,371✔
2197
        var offset int
17,371✔
2198
        if isJSONObjectOrArray(msg) {
17,404✔
2199
                var req JSApiStreamInfoRequest
33✔
2200
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
34✔
2201
                        resp.Error = NewJSInvalidJSONError(err)
1✔
2202
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2203
                        return
1✔
2204
                }
1✔
2205
                details, subjects = req.DeletedDetails, req.SubjectsFilter
32✔
2206
                offset = req.Offset
32✔
2207
        }
2208

2209
        mset, err := acc.lookupStream(streamName)
17,370✔
2210
        // Error is not to be expected at this point, but could happen if same stream trying to be created.
17,370✔
2211
        if err != nil {
18,022✔
2212
                if cc != nil {
652✔
2213
                        // This could be inflight, pause for a short bit and try again.
×
2214
                        // This will not be inline, so ok.
×
2215
                        time.Sleep(10 * time.Millisecond)
×
2216
                        mset, err = acc.lookupStream(streamName)
×
2217
                }
×
2218
                // Check again.
2219
                if err != nil {
1,304✔
2220
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
652✔
2221
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
652✔
2222
                        return
652✔
2223
                }
652✔
2224
        }
2225

2226
        if mset.offlineReason != _EMPTY_ {
16,719✔
2227
                resp.Error = NewJSStreamOfflineReasonError(errors.New(mset.offlineReason))
1✔
2228
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
2229
                return
1✔
2230
        }
1✔
2231

2232
        config := mset.config()
16,717✔
2233
        resp.StreamInfo = &StreamInfo{
16,717✔
2234
                Created:    mset.createdTime(),
16,717✔
2235
                State:      mset.stateWithDetail(details),
16,717✔
2236
                Config:     *setDynamicStreamMetadata(&config),
16,717✔
2237
                Domain:     s.getOpts().JetStreamDomain,
16,717✔
2238
                Cluster:    js.clusterInfo(mset.raftGroup()),
16,717✔
2239
                Mirror:     mset.mirrorInfo(),
16,717✔
2240
                Sources:    mset.sourcesInfo(),
16,717✔
2241
                Alternates: js.streamAlternates(ci, config.Name),
16,717✔
2242
                TimeStamp:  time.Now().UTC(),
16,717✔
2243
        }
16,717✔
2244
        if clusterWideConsCount > 0 {
17,195✔
2245
                resp.StreamInfo.State.Consumers = clusterWideConsCount
478✔
2246
        }
478✔
2247

2248
        // Check if they have asked for subject details.
2249
        if subjects != _EMPTY_ {
16,747✔
2250
                st := mset.store.SubjectsTotals(subjects)
30✔
2251
                if lst := len(st); lst > 0 {
56✔
2252
                        // Common for both cases.
26✔
2253
                        resp.Offset = offset
26✔
2254
                        resp.Limit = JSMaxSubjectDetails
26✔
2255
                        resp.Total = lst
26✔
2256

26✔
2257
                        if offset == 0 && lst <= JSMaxSubjectDetails {
52✔
2258
                                resp.StreamInfo.State.Subjects = st
26✔
2259
                        } else {
26✔
2260
                                // Here we have to filter list due to offset or maximum constraints.
×
2261
                                subjs := make([]string, 0, len(st))
×
2262
                                for subj := range st {
×
2263
                                        subjs = append(subjs, subj)
×
2264
                                }
×
2265
                                // Sort it
2266
                                slices.Sort(subjs)
×
2267

×
2268
                                if offset > len(subjs) {
×
2269
                                        offset = len(subjs)
×
2270
                                }
×
2271

2272
                                end := offset + JSMaxSubjectDetails
×
2273
                                if end > len(subjs) {
×
2274
                                        end = len(subjs)
×
2275
                                }
×
2276
                                actualSize := end - offset
×
2277
                                var sd map[string]uint64
×
2278

×
2279
                                if actualSize > 0 {
×
2280
                                        sd = make(map[string]uint64, actualSize)
×
2281
                                        for _, ss := range subjs[offset:end] {
×
2282
                                                sd[ss] = st[ss]
×
2283
                                        }
×
2284
                                }
2285
                                resp.StreamInfo.State.Subjects = sd
×
2286
                        }
2287
                }
2288
        }
2289
        // Check for out of band catchups.
2290
        if mset.hasCatchupPeers() {
16,717✔
2291
                mset.checkClusterInfo(resp.StreamInfo.Cluster)
×
2292
        }
×
2293

2294
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
16,717✔
2295
}
2296

2297
// Request to have a stream leader stepdown.
2298
func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
256✔
2299
        if c == nil || !s.JetStreamEnabled() {
256✔
2300
                return
×
2301
        }
×
2302
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
256✔
2303
        if err != nil {
256✔
2304
                s.Warnf(badAPIRequestT, msg)
×
2305
                return
×
2306
        }
×
2307

2308
        // Have extra token for this one.
2309
        name := tokenAt(subject, 6)
256✔
2310

256✔
2311
        var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}}
256✔
2312
        if errorOnRequiredApiLevel(hdr) {
257✔
2313
                resp.Error = NewJSRequiredApiLevelError()
1✔
2314
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2315
                return
1✔
2316
        }
1✔
2317

2318
        // If we are not in clustered mode this is a failed request.
2319
        if !s.JetStreamIsClustered() {
255✔
2320
                resp.Error = NewJSClusterRequiredError()
×
2321
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2322
                return
×
2323
        }
×
2324

2325
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2326
        js, cc := s.getJetStreamCluster()
255✔
2327
        if js == nil || cc == nil {
255✔
2328
                return
×
2329
        }
×
2330
        if js.isLeaderless() {
255✔
2331
                resp.Error = NewJSClusterNotAvailError()
×
2332
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2333
                return
×
2334
        }
×
2335

2336
        js.mu.RLock()
255✔
2337
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
255✔
2338
        js.mu.RUnlock()
255✔
2339

255✔
2340
        if isLeader && sa == nil {
255✔
2341
                resp.Error = NewJSStreamNotFoundError()
×
2342
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2343
                return
×
2344
        } else if sa == nil {
255✔
2345
                return
×
2346
        }
×
2347

2348
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
255✔
2349
                if doErr {
×
2350
                        resp.Error = NewJSNotEnabledForAccountError()
×
2351
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2352
                }
×
2353
                return
×
2354
        }
2355

2356
        // Check to see if we are a member of the group and if the group has no leader.
2357
        if js.isGroupLeaderless(sa.Group) {
255✔
2358
                resp.Error = NewJSClusterNotAvailError()
×
2359
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2360
                return
×
2361
        }
×
2362

2363
        // We have the stream assigned and a leader, so only the stream leader should answer.
2364
        if !acc.JetStreamIsStreamLeader(name) {
454✔
2365
                return
199✔
2366
        }
199✔
2367

2368
        mset, err := acc.lookupStream(name)
56✔
2369
        if err != nil {
56✔
2370
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
2371
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2372
                return
×
2373
        }
×
2374

2375
        if mset == nil {
56✔
2376
                resp.Success = true
×
2377
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2378
                return
×
2379
        }
×
2380

2381
        node := mset.raftNode()
56✔
2382
        if node == nil {
56✔
2383
                resp.Success = true
×
2384
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2385
                return
×
2386
        }
×
2387

2388
        var preferredLeader string
56✔
2389
        if isJSONObjectOrArray(msg) {
69✔
2390
                var req JSApiLeaderStepdownRequest
13✔
2391
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2392
                        resp.Error = NewJSInvalidJSONError(err)
×
2393
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2394
                        return
×
2395
                }
×
2396
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(node, req.Placement); resp.Error != nil {
18✔
2397
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2398
                        return
5✔
2399
                }
5✔
2400
        }
2401

2402
        // Call actual stepdown.
2403
        err = node.StepDown(preferredLeader)
51✔
2404
        if err != nil {
51✔
2405
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2406
        } else {
51✔
2407
                resp.Success = true
51✔
2408
        }
51✔
2409
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
51✔
2410
}
2411

2412
// Request to have a consumer leader stepdown.
2413
func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
225✔
2414
        if c == nil || !s.JetStreamEnabled() {
225✔
2415
                return
×
2416
        }
×
2417
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
225✔
2418
        if err != nil {
225✔
2419
                s.Warnf(badAPIRequestT, msg)
×
2420
                return
×
2421
        }
×
2422

2423
        var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}}
225✔
2424
        if errorOnRequiredApiLevel(hdr) {
226✔
2425
                resp.Error = NewJSRequiredApiLevelError()
1✔
2426
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2427
                return
1✔
2428
        }
1✔
2429

2430
        // If we are not in clustered mode this is a failed request.
2431
        if !s.JetStreamIsClustered() {
224✔
2432
                resp.Error = NewJSClusterRequiredError()
×
2433
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2434
                return
×
2435
        }
×
2436

2437
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2438
        js, cc := s.getJetStreamCluster()
224✔
2439
        if js == nil || cc == nil {
224✔
2440
                return
×
2441
        }
×
2442
        if js.isLeaderless() {
224✔
2443
                resp.Error = NewJSClusterNotAvailError()
×
2444
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2445
                return
×
2446
        }
×
2447

2448
        // Have extra token for this one.
2449
        stream := tokenAt(subject, 6)
224✔
2450
        consumer := tokenAt(subject, 7)
224✔
2451

224✔
2452
        js.mu.RLock()
224✔
2453
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
224✔
2454
        js.mu.RUnlock()
224✔
2455

224✔
2456
        if isLeader && sa == nil {
224✔
2457
                resp.Error = NewJSStreamNotFoundError()
×
2458
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2459
                return
×
2460
        } else if sa == nil {
224✔
2461
                return
×
2462
        }
×
2463
        var ca *consumerAssignment
224✔
2464
        if sa.consumers != nil {
448✔
2465
                ca = sa.consumers[consumer]
224✔
2466
        }
224✔
2467
        if ca == nil {
224✔
2468
                resp.Error = NewJSConsumerNotFoundError()
×
2469
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2470
                return
×
2471
        }
×
2472
        // Check to see if we are a member of the group and if the group has no leader.
2473
        if js.isGroupLeaderless(ca.Group) {
224✔
2474
                resp.Error = NewJSClusterNotAvailError()
×
2475
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2476
                return
×
2477
        }
×
2478

2479
        if !acc.JetStreamIsConsumerLeader(stream, consumer) {
386✔
2480
                return
162✔
2481
        }
162✔
2482

2483
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
62✔
2484
                if doErr {
×
2485
                        resp.Error = NewJSNotEnabledForAccountError()
×
2486
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2487
                }
×
2488
                return
×
2489
        }
2490

2491
        mset, err := acc.lookupStream(stream)
62✔
2492
        if err != nil {
62✔
2493
                resp.Error = NewJSStreamNotFoundError()
×
2494
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2495
                return
×
2496
        }
×
2497
        o := mset.lookupConsumer(consumer)
62✔
2498
        if o == nil {
62✔
2499
                resp.Error = NewJSConsumerNotFoundError()
×
2500
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2501
                return
×
2502
        }
×
2503

2504
        n := o.raftNode()
62✔
2505
        if n == nil {
62✔
2506
                resp.Success = true
×
2507
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2508
                return
×
2509
        }
×
2510

2511
        var preferredLeader string
62✔
2512
        if isJSONObjectOrArray(msg) {
75✔
2513
                var req JSApiLeaderStepdownRequest
13✔
2514
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2515
                        resp.Error = NewJSInvalidJSONError(err)
×
2516
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2517
                        return
×
2518
                }
×
2519
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(n, req.Placement); resp.Error != nil {
18✔
2520
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2521
                        return
5✔
2522
                }
5✔
2523
        }
2524

2525
        // Call actual stepdown.
2526
        err = n.StepDown(preferredLeader)
57✔
2527
        if err != nil {
57✔
2528
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2529
        } else {
57✔
2530
                resp.Success = true
57✔
2531
        }
57✔
2532
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
57✔
2533
}
2534

2535
// Request to remove a peer from a clustered stream.
2536
func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
70✔
2537
        if c == nil || !s.JetStreamEnabled() {
70✔
2538
                return
×
2539
        }
×
2540
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
70✔
2541
        if err != nil {
70✔
2542
                s.Warnf(badAPIRequestT, msg)
×
2543
                return
×
2544
        }
×
2545

2546
        // Have extra token for this one.
2547
        name := tokenAt(subject, 6)
70✔
2548

70✔
2549
        var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}}
70✔
2550
        if errorOnRequiredApiLevel(hdr) {
71✔
2551
                resp.Error = NewJSRequiredApiLevelError()
1✔
2552
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2553
                return
1✔
2554
        }
1✔
2555

2556
        // If we are not in clustered mode this is a failed request.
2557
        if !s.JetStreamIsClustered() {
69✔
2558
                resp.Error = NewJSClusterRequiredError()
×
2559
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2560
                return
×
2561
        }
×
2562

2563
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2564
        js, cc := s.getJetStreamCluster()
69✔
2565
        if js == nil || cc == nil {
69✔
2566
                return
×
2567
        }
×
2568
        if js.isLeaderless() {
69✔
2569
                resp.Error = NewJSClusterNotAvailError()
×
2570
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2571
                return
×
2572
        }
×
2573

2574
        js.mu.RLock()
69✔
2575
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
69✔
2576
        js.mu.RUnlock()
69✔
2577

69✔
2578
        // Make sure we are meta leader.
69✔
2579
        if !isLeader {
127✔
2580
                return
58✔
2581
        }
58✔
2582

2583
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
2584
                if doErr {
×
2585
                        resp.Error = NewJSNotEnabledForAccountError()
×
2586
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2587
                }
×
2588
                return
×
2589
        }
2590
        if isEmptyRequest(msg) {
11✔
2591
                resp.Error = NewJSBadRequestError()
×
2592
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2593
                return
×
2594
        }
×
2595

2596
        var req JSApiStreamRemovePeerRequest
11✔
2597
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
2598
                resp.Error = NewJSInvalidJSONError(err)
×
2599
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2600
                return
×
2601
        }
×
2602
        if req.Peer == _EMPTY_ {
11✔
2603
                resp.Error = NewJSBadRequestError()
×
2604
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2605
                return
×
2606
        }
×
2607

2608
        if sa == nil {
11✔
2609
                // No stream present.
×
2610
                resp.Error = NewJSStreamNotFoundError()
×
2611
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2612
                return
×
2613
        }
×
2614

2615
        // Check to see if we are a member of the group and if the group has no leader.
2616
        // Peers here is a server name, convert to node name.
2617
        nodeName := getHash(req.Peer)
11✔
2618

11✔
2619
        js.mu.RLock()
11✔
2620
        rg := sa.Group
11✔
2621
        isMember := rg.isMember(nodeName)
11✔
2622
        js.mu.RUnlock()
11✔
2623

11✔
2624
        // Make sure we are a member.
11✔
2625
        if !isMember {
12✔
2626
                resp.Error = NewJSClusterPeerNotMemberError()
1✔
2627
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2628
                return
1✔
2629
        }
1✔
2630

2631
        // If we are here we have a valid peer member set for removal.
2632
        if !js.removePeerFromStream(sa, nodeName) {
12✔
2633
                resp.Error = NewJSPeerRemapError()
2✔
2634
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2635
                return
2✔
2636
        }
2✔
2637

2638
        resp.Success = true
8✔
2639
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
8✔
2640
}
2641

2642
// Request to have the metaleader remove a peer from the system.
2643
func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
9✔
2644
        if c == nil || !s.JetStreamEnabled() {
9✔
2645
                return
×
2646
        }
×
2647

2648
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
9✔
2649
        if err != nil {
9✔
2650
                s.Warnf(badAPIRequestT, msg)
×
2651
                return
×
2652
        }
×
2653
        if acc != s.SystemAccount() {
9✔
2654
                return
×
2655
        }
×
2656

2657
        js, cc := s.getJetStreamCluster()
9✔
2658
        if js == nil || cc == nil {
9✔
2659
                return
×
2660
        }
×
2661

2662
        js.mu.RLock()
9✔
2663
        isLeader := cc.isLeader()
9✔
2664
        meta := cc.meta
9✔
2665
        js.mu.RUnlock()
9✔
2666

9✔
2667
        // Extra checks here but only leader is listening.
9✔
2668
        if !isLeader {
9✔
2669
                return
×
2670
        }
×
2671

2672
        var resp = JSApiMetaServerRemoveResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerRemoveResponseType}}
9✔
2673
        if errorOnRequiredApiLevel(hdr) {
9✔
2674
                resp.Error = NewJSRequiredApiLevelError()
×
2675
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2676
                return
×
2677
        }
×
2678

2679
        if isEmptyRequest(msg) {
9✔
2680
                resp.Error = NewJSBadRequestError()
×
2681
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2682
                return
×
2683
        }
×
2684

2685
        var req JSApiMetaServerRemoveRequest
9✔
2686
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
9✔
2687
                resp.Error = NewJSInvalidJSONError(err)
×
2688
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2689
                return
×
2690
        }
×
2691

2692
        var found string
9✔
2693
        js.mu.RLock()
9✔
2694
        for _, p := range meta.Peers() {
37✔
2695
                // If Peer is specified, it takes precedence
28✔
2696
                if req.Peer != _EMPTY_ {
34✔
2697
                        if p.ID == req.Peer {
7✔
2698
                                found = req.Peer
1✔
2699
                                break
1✔
2700
                        }
2701
                        continue
5✔
2702
                }
2703
                si, ok := s.nodeToInfo.Load(p.ID)
22✔
2704
                if ok && si.(nodeInfo).name == req.Server {
27✔
2705
                        found = p.ID
5✔
2706
                        break
5✔
2707
                }
2708
        }
2709
        js.mu.RUnlock()
9✔
2710

9✔
2711
        if found == _EMPTY_ {
12✔
2712
                resp.Error = NewJSClusterServerNotMemberError()
3✔
2713
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
2714
                return
3✔
2715
        }
3✔
2716

2717
        // So we have a valid peer.
2718
        js.mu.Lock()
6✔
2719
        meta.ProposeRemovePeer(found)
6✔
2720
        js.mu.Unlock()
6✔
2721

6✔
2722
        resp.Success = true
6✔
2723
        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
6✔
2724
}
2725

2726
func (s *Server) peerSetToNames(ps []string) []string {
169✔
2727
        names := make([]string, len(ps))
169✔
2728
        for i := 0; i < len(ps); i++ {
634✔
2729
                if si, ok := s.nodeToInfo.Load(ps[i]); !ok {
465✔
2730
                        names[i] = ps[i]
×
2731
                } else {
465✔
2732
                        names[i] = si.(nodeInfo).name
465✔
2733
                }
465✔
2734
        }
2735
        return names
169✔
2736
}
2737

2738
// looks up the peer id for a given server name. Cluster and domain name are optional filter criteria
2739
func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string {
29✔
2740
        js.mu.RLock()
29✔
2741
        defer js.mu.RUnlock()
29✔
2742
        if cc := js.cluster; cc != nil {
58✔
2743
                for _, p := range cc.meta.Peers() {
143✔
2744
                        si, ok := s.nodeToInfo.Load(p.ID)
114✔
2745
                        if ok && si.(nodeInfo).name == serverName {
143✔
2746
                                if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
58✔
2747
                                        if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
58✔
2748
                                                return p.ID
29✔
2749
                                        }
29✔
2750
                                }
2751
                        }
2752
                }
2753
        }
2754
        return _EMPTY_
×
2755
}
2756

2757
// Request to have the metaleader move a stream on a peer to another
2758
func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
33✔
2759
        if c == nil || !s.JetStreamEnabled() {
33✔
2760
                return
×
2761
        }
×
2762

2763
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
33✔
2764
        if err != nil {
33✔
2765
                s.Warnf(badAPIRequestT, msg)
×
2766
                return
×
2767
        }
×
2768

2769
        js, cc := s.getJetStreamCluster()
33✔
2770
        if js == nil || cc == nil {
33✔
2771
                return
×
2772
        }
×
2773

2774
        // Extra checks here but only leader is listening.
2775
        js.mu.RLock()
33✔
2776
        isLeader := cc.isLeader()
33✔
2777
        js.mu.RUnlock()
33✔
2778

33✔
2779
        if !isLeader {
33✔
2780
                return
×
2781
        }
×
2782

2783
        accName := tokenAt(subject, 6)
33✔
2784
        streamName := tokenAt(subject, 7)
33✔
2785

33✔
2786
        if acc.GetName() != accName && acc != s.SystemAccount() {
33✔
2787
                return
×
2788
        }
×
2789

2790
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
33✔
2791
        if errorOnRequiredApiLevel(hdr) {
33✔
2792
                resp.Error = NewJSRequiredApiLevelError()
×
2793
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2794
                return
×
2795
        }
×
2796

2797
        var req JSApiMetaServerStreamMoveRequest
33✔
2798
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
33✔
2799
                resp.Error = NewJSInvalidJSONError(err)
×
2800
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2801
                return
×
2802
        }
×
2803

2804
        srcPeer := _EMPTY_
33✔
2805
        if req.Server != _EMPTY_ {
62✔
2806
                srcPeer = s.nameToPeer(js, req.Server, req.Cluster, req.Domain)
29✔
2807
        }
29✔
2808

2809
        targetAcc, ok := s.accounts.Load(accName)
33✔
2810
        if !ok {
33✔
2811
                resp.Error = NewJSNoAccountError()
×
2812
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2813
                return
×
2814
        }
×
2815

2816
        var streamFound bool
33✔
2817
        cfg := StreamConfig{}
33✔
2818
        currPeers := []string{}
33✔
2819
        currCluster := _EMPTY_
33✔
2820
        js.mu.Lock()
33✔
2821
        streams, ok := cc.streams[accName]
33✔
2822
        if ok {
66✔
2823
                sa, ok := streams[streamName]
33✔
2824
                if ok {
66✔
2825
                        cfg = *sa.Config.clone()
33✔
2826
                        streamFound = true
33✔
2827
                        currPeers = sa.Group.Peers
33✔
2828
                        currCluster = sa.Group.Cluster
33✔
2829
                }
33✔
2830
        }
2831
        js.mu.Unlock()
33✔
2832

33✔
2833
        if !streamFound {
33✔
2834
                resp.Error = NewJSStreamNotFoundError()
×
2835
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2836
                return
×
2837
        }
×
2838

2839
        // if server was picked, make sure src peer exists and move it to first position.
2840
        // removal will drop peers from the left
2841
        if req.Server != _EMPTY_ {
62✔
2842
                if srcPeer == _EMPTY_ {
29✔
2843
                        resp.Error = NewJSClusterServerNotMemberError()
×
2844
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2845
                        return
×
2846
                }
×
2847
                var peerFound bool
29✔
2848
                for i := 0; i < len(currPeers); i++ {
83✔
2849
                        if currPeers[i] == srcPeer {
83✔
2850
                                copy(currPeers[1:], currPeers[:i])
29✔
2851
                                currPeers[0] = srcPeer
29✔
2852
                                peerFound = true
29✔
2853
                                break
29✔
2854
                        }
2855
                }
2856
                if !peerFound {
29✔
2857
                        resp.Error = NewJSClusterPeerNotMemberError()
×
2858
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2859
                        return
×
2860
                }
×
2861
        }
2862

2863
        // make sure client is scoped to requested account
2864
        ciNew := *(ci)
33✔
2865
        ciNew.Account = accName
33✔
2866

33✔
2867
        // backup placement such that peers can be looked up with modified tag list
33✔
2868
        var origPlacement *Placement
33✔
2869
        if cfg.Placement != nil {
33✔
2870
                tmp := *cfg.Placement
×
2871
                origPlacement = &tmp
×
2872
        }
×
2873

2874
        if len(req.Tags) > 0 {
60✔
2875
                if cfg.Placement == nil {
54✔
2876
                        cfg.Placement = &Placement{}
27✔
2877
                }
27✔
2878
                cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...)
27✔
2879
        }
2880

2881
        peers, e := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1, nil)
33✔
2882
        if len(peers) <= cfg.Replicas {
35✔
2883
                // since expanding in the same cluster did not yield a result, try in different cluster
2✔
2884
                peers = nil
2✔
2885

2✔
2886
                clusters := map[string]struct{}{}
2✔
2887
                s.nodeToInfo.Range(func(_, ni any) bool {
18✔
2888
                        if currCluster != ni.(nodeInfo).cluster {
24✔
2889
                                clusters[ni.(nodeInfo).cluster] = struct{}{}
8✔
2890
                        }
8✔
2891
                        return true
16✔
2892
                })
2893
                errs := &selectPeerError{}
2✔
2894
                errs.accumulate(e)
2✔
2895
                for cluster := range clusters {
4✔
2896
                        newPeers, e := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0, nil)
2✔
2897
                        if len(newPeers) >= cfg.Replicas {
4✔
2898
                                peers = append([]string{}, currPeers...)
2✔
2899
                                peers = append(peers, newPeers[:cfg.Replicas]...)
2✔
2900
                                break
2✔
2901
                        }
2902
                        errs.accumulate(e)
×
2903
                }
2904
                if peers == nil {
2✔
2905
                        resp.Error = NewJSClusterNoPeersError(errs)
×
2906
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2907
                        return
×
2908
                }
×
2909
        }
2910

2911
        cfg.Placement = origPlacement
33✔
2912

33✔
2913
        s.Noticef("Requested move for stream '%s > %s' R=%d from %+v to %+v",
33✔
2914
                accName, streamName, cfg.Replicas, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
33✔
2915

33✔
2916
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
33✔
2917
        // We should be fine ignoring pedantic mode here. as we do not touch configuration.
33✔
2918
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
33✔
2919
}
2920

2921
// Request to have the metaleader move a stream on a peer to another
2922
func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
4✔
2923
        if c == nil || !s.JetStreamEnabled() {
4✔
2924
                return
×
2925
        }
×
2926

2927
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
4✔
2928
        if err != nil {
4✔
2929
                s.Warnf(badAPIRequestT, msg)
×
2930
                return
×
2931
        }
×
2932

2933
        js, cc := s.getJetStreamCluster()
4✔
2934
        if js == nil || cc == nil {
4✔
2935
                return
×
2936
        }
×
2937

2938
        // Extra checks here but only leader is listening.
2939
        js.mu.RLock()
4✔
2940
        isLeader := cc.isLeader()
4✔
2941
        js.mu.RUnlock()
4✔
2942

4✔
2943
        if !isLeader {
4✔
2944
                return
×
2945
        }
×
2946

2947
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
4✔
2948
        if errorOnRequiredApiLevel(hdr) {
4✔
2949
                resp.Error = NewJSRequiredApiLevelError()
×
2950
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2951
                return
×
2952
        }
×
2953

2954
        accName := tokenAt(subject, 6)
4✔
2955
        streamName := tokenAt(subject, 7)
4✔
2956

4✔
2957
        if acc.GetName() != accName && acc != s.SystemAccount() {
4✔
2958
                return
×
2959
        }
×
2960

2961
        targetAcc, ok := s.accounts.Load(accName)
4✔
2962
        if !ok {
4✔
2963
                resp.Error = NewJSNoAccountError()
×
2964
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2965
                return
×
2966
        }
×
2967

2968
        streamFound := false
4✔
2969
        cfg := StreamConfig{}
4✔
2970
        currPeers := []string{}
4✔
2971
        js.mu.Lock()
4✔
2972
        streams, ok := cc.streams[accName]
4✔
2973
        if ok {
8✔
2974
                sa, ok := streams[streamName]
4✔
2975
                if ok {
8✔
2976
                        cfg = *sa.Config.clone()
4✔
2977
                        streamFound = true
4✔
2978
                        currPeers = sa.Group.Peers
4✔
2979
                }
4✔
2980
        }
2981
        js.mu.Unlock()
4✔
2982

4✔
2983
        if !streamFound {
4✔
2984
                resp.Error = NewJSStreamNotFoundError()
×
2985
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2986
                return
×
2987
        }
×
2988

2989
        if len(currPeers) <= cfg.Replicas {
4✔
2990
                resp.Error = NewJSStreamMoveNotInProgressError()
×
2991
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2992
                return
×
2993
        }
×
2994

2995
        // make sure client is scoped to requested account
2996
        ciNew := *(ci)
4✔
2997
        ciNew.Account = accName
4✔
2998

4✔
2999
        peers := currPeers[:cfg.Replicas]
4✔
3000

4✔
3001
        // Remove placement in case tags don't match
4✔
3002
        // This can happen if the move was initiated by modifying the tags.
4✔
3003
        // This is an account operation.
4✔
3004
        // This can NOT happen when the move was initiated by the system account.
4✔
3005
        // There move honors the original tag list.
4✔
3006
        if cfg.Placement != nil && len(cfg.Placement.Tags) != 0 {
5✔
3007
        FOR_TAGCHECK:
1✔
3008
                for _, peer := range peers {
2✔
3009
                        si, ok := s.nodeToInfo.Load(peer)
1✔
3010
                        if !ok {
1✔
3011
                                // can't verify tags, do the safe thing and error
×
3012
                                resp.Error = NewJSStreamGeneralError(
×
3013
                                        fmt.Errorf("peer %s not present for tag validation", peer))
×
3014
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3015
                                return
×
3016
                        }
×
3017
                        nodeTags := si.(nodeInfo).tags
1✔
3018
                        for _, tag := range cfg.Placement.Tags {
2✔
3019
                                if !nodeTags.Contains(tag) {
2✔
3020
                                        // clear placement as tags don't match
1✔
3021
                                        cfg.Placement = nil
1✔
3022
                                        break FOR_TAGCHECK
1✔
3023
                                }
3024
                        }
3025

3026
                }
3027
        }
3028

3029
        s.Noticef("Requested cancel of move: R=%d '%s > %s' to peer set %+v and restore previous peer set %+v",
4✔
3030
                cfg.Replicas, accName, streamName, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
4✔
3031

4✔
3032
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
4✔
3033
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
4✔
3034
}
3035

3036
// Request to have an account purged
3037
func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
6✔
3038
        if c == nil || !s.JetStreamEnabled() {
6✔
3039
                return
×
3040
        }
×
3041

3042
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
6✔
3043
        if err != nil {
6✔
3044
                s.Warnf(badAPIRequestT, msg)
×
3045
                return
×
3046
        }
×
3047
        if acc != s.SystemAccount() {
6✔
3048
                return
×
3049
        }
×
3050

3051
        js := s.getJetStream()
6✔
3052
        if js == nil {
6✔
3053
                return
×
3054
        }
×
3055

3056
        accName := tokenAt(subject, 5)
6✔
3057

6✔
3058
        var resp = JSApiAccountPurgeResponse{ApiResponse: ApiResponse{Type: JSApiAccountPurgeResponseType}}
6✔
3059
        if errorOnRequiredApiLevel(hdr) {
6✔
3060
                resp.Error = NewJSRequiredApiLevelError()
×
3061
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3062
                return
×
3063
        }
×
3064

3065
        if !s.JetStreamIsClustered() {
8✔
3066
                var streams []*stream
2✔
3067
                var ac *Account
2✔
3068
                if ac, err = s.lookupAccount(accName); err == nil && ac != nil {
3✔
3069
                        streams = ac.streams()
1✔
3070
                }
1✔
3071

3072
                s.Noticef("Purge request for account %s (streams: %d, hasAccount: %t)",
2✔
3073
                        accName, len(streams), ac != nil)
2✔
3074

2✔
3075
                for _, mset := range streams {
3✔
3076
                        err := mset.delete()
1✔
3077
                        if err != nil {
1✔
3078
                                resp.Error = NewJSStreamDeleteError(err)
×
3079
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3080
                                return
×
3081
                        }
×
3082
                }
3083
                if err := os.RemoveAll(filepath.Join(js.config.StoreDir, accName)); err != nil {
2✔
3084
                        resp.Error = NewJSStreamGeneralError(err)
×
3085
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3086
                        return
×
3087
                }
×
3088
                resp.Initiated = true
2✔
3089
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3090
                return
2✔
3091
        }
3092

3093
        _, cc := s.getJetStreamCluster()
4✔
3094

4✔
3095
        js.mu.RLock()
4✔
3096
        isLeader := cc.isLeader()
4✔
3097
        meta := cc.meta
4✔
3098
        js.mu.RUnlock()
4✔
3099

4✔
3100
        if !isLeader {
4✔
3101
                return
×
3102
        }
×
3103

3104
        if js.isMetaRecovering() {
4✔
3105
                // While in recovery mode, the data structures are not fully initialized
×
3106
                resp.Error = NewJSClusterNotAvailError()
×
3107
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3108
                return
×
3109
        }
×
3110

3111
        js.mu.RLock()
4✔
3112
        ns, nc := 0, 0
4✔
3113
        streams, hasAccount := cc.streams[accName]
4✔
3114
        for _, osa := range streams {
12✔
3115
                for _, oca := range osa.consumers {
20✔
3116
                        oca.deleted = true
12✔
3117
                        ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client}
12✔
3118
                        meta.Propose(encodeDeleteConsumerAssignment(ca))
12✔
3119
                        nc++
12✔
3120
                }
12✔
3121
                sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client}
8✔
3122
                meta.Propose(encodeDeleteStreamAssignment(sa))
8✔
3123
                ns++
8✔
3124
        }
3125
        js.mu.RUnlock()
4✔
3126

4✔
3127
        s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)", accName, ns, nc, hasAccount)
4✔
3128

4✔
3129
        resp.Initiated = true
4✔
3130
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3131
}
3132

3133
// Request to have the meta leader stepdown.
3134
// These will only be received by the meta leader, so less checking needed.
3135
func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
24✔
3136
        if c == nil || !s.JetStreamEnabled() {
24✔
3137
                return
×
3138
        }
×
3139

3140
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
24✔
3141
        if err != nil {
24✔
3142
                s.Warnf(badAPIRequestT, msg)
×
3143
                return
×
3144
        }
×
3145

3146
        // This should only be coming from the System Account.
3147
        if acc != s.SystemAccount() {
25✔
3148
                s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User)
1✔
3149
                return
1✔
3150
        }
1✔
3151

3152
        js, cc := s.getJetStreamCluster()
23✔
3153
        if js == nil || cc == nil {
23✔
3154
                return
×
3155
        }
×
3156

3157
        // Extra checks here but only leader is listening.
3158
        js.mu.RLock()
23✔
3159
        isLeader := cc.isLeader()
23✔
3160
        meta := cc.meta
23✔
3161
        js.mu.RUnlock()
23✔
3162

23✔
3163
        if !isLeader {
23✔
3164
                return
×
3165
        }
×
3166

3167
        var preferredLeader string
23✔
3168
        var resp = JSApiLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiLeaderStepDownResponseType}}
23✔
3169
        if errorOnRequiredApiLevel(hdr) {
23✔
3170
                resp.Error = NewJSRequiredApiLevelError()
×
3171
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3172
                return
×
3173
        }
×
3174

3175
        if isJSONObjectOrArray(msg) {
38✔
3176
                var req JSApiLeaderStepdownRequest
15✔
3177
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
15✔
3178
                        resp.Error = NewJSInvalidJSONError(err)
×
3179
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3180
                        return
×
3181
                }
×
3182
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(meta, req.Placement); resp.Error != nil {
21✔
3183
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
6✔
3184
                        return
6✔
3185
                }
6✔
3186
        }
3187

3188
        // Call actual stepdown.
3189
        err = meta.StepDown(preferredLeader)
17✔
3190
        if err != nil {
17✔
3191
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
3192
        } else {
17✔
3193
                resp.Success = true
17✔
3194
        }
17✔
3195
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
17✔
3196
}
3197

3198
// Check if given []bytes is a JSON Object or Array.
3199
// Technically, valid JSON can also be a plain string or number, but for our use case,
3200
// we care only for JSON objects or arrays which starts with `[` or `{`.
3201
// This function does not have to ensure valid JSON in its entirety. It is used merely
3202
// to hint the codepath if it should attempt to parse the request as JSON or not.
3203
func isJSONObjectOrArray(req []byte) bool {
18,249✔
3204
        // Skip leading JSON whitespace (space, tab, newline, carriage return)
18,249✔
3205
        i := 0
18,249✔
3206
        for i < len(req) && (req[i] == ' ' || req[i] == '\t' || req[i] == '\n' || req[i] == '\r') {
18,261✔
3207
                i++
12✔
3208
        }
12✔
3209
        // Check for empty input after trimming
3210
        if i >= len(req) {
35,923✔
3211
                return false
17,674✔
3212
        }
17,674✔
3213
        // Check if the first non-whitespace character is '{' or '['
3214
        return req[i] == '{' || req[i] == '['
575✔
3215
}
3216

3217
func isEmptyRequest(req []byte) bool {
46,955✔
3218
        if len(req) == 0 {
92,243✔
3219
                return true
45,288✔
3220
        }
45,288✔
3221
        if bytes.Equal(req, []byte("{}")) {
1,668✔
3222
                return true
1✔
3223
        }
1✔
3224
        // If we are here we didn't get our simple match, but still could be valid.
3225
        var v any
1,666✔
3226
        if err := json.Unmarshal(req, &v); err != nil {
1,666✔
3227
                return false
×
3228
        }
×
3229
        vm, ok := v.(map[string]any)
1,666✔
3230
        if !ok {
1,666✔
3231
                return false
×
3232
        }
×
3233
        return len(vm) == 0
1,666✔
3234
}
3235

3236
// getStepDownPreferredPlacement attempts to work out what the best placement is
3237
// for a stepdown request. The preferred server name always takes precedence, but
3238
// if not specified, the placement will be used to filter by cluster. The caller
3239
// should check for return API errors and return those to the requestor if needed.
3240
func (s *Server) getStepDownPreferredPlacement(group RaftNode, placement *Placement) (string, *ApiError) {
41✔
3241
        if placement == nil {
43✔
3242
                return _EMPTY_, nil
2✔
3243
        }
2✔
3244
        var preferredLeader string
39✔
3245
        if placement.Preferred != _EMPTY_ {
58✔
3246
                for _, p := range group.Peers() {
74✔
3247
                        si, ok := s.nodeToInfo.Load(p.ID)
55✔
3248
                        if !ok || si == nil {
55✔
3249
                                continue
×
3250
                        }
3251
                        if si.(nodeInfo).name == placement.Preferred {
70✔
3252
                                preferredLeader = p.ID
15✔
3253
                                break
15✔
3254
                        }
3255
                }
3256
                if preferredLeader == group.ID() {
23✔
3257
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q is already leader", placement.Preferred))
4✔
3258
                }
4✔
3259
                if preferredLeader == _EMPTY_ {
19✔
3260
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q not known", placement.Preferred))
4✔
3261
                }
4✔
3262
        } else {
20✔
3263
                possiblePeers := make(map[*Peer]nodeInfo, len(group.Peers()))
20✔
3264
                ourID := group.ID()
20✔
3265
                for _, p := range group.Peers() {
116✔
3266
                        if p == nil {
96✔
3267
                                continue // ... shouldn't happen.
×
3268
                        }
3269
                        si, ok := s.nodeToInfo.Load(p.ID)
96✔
3270
                        if !ok || si == nil {
96✔
3271
                                continue
×
3272
                        }
3273
                        ni := si.(nodeInfo)
96✔
3274
                        if ni.offline || p.ID == ourID {
116✔
3275
                                continue
20✔
3276
                        }
3277
                        possiblePeers[p] = ni
76✔
3278
                }
3279
                // If cluster is specified, filter out anything not matching the cluster name.
3280
                if placement.Cluster != _EMPTY_ {
31✔
3281
                        for p, si := range possiblePeers {
51✔
3282
                                if si.cluster != placement.Cluster {
66✔
3283
                                        delete(possiblePeers, p)
26✔
3284
                                }
26✔
3285
                        }
3286
                }
3287
                // If tags are specified, filter out anything not matching all supplied tags.
3288
                if len(placement.Tags) > 0 {
32✔
3289
                        for p, si := range possiblePeers {
55✔
3290
                                matchesAll := true
43✔
3291
                                for _, tag := range placement.Tags {
93✔
3292
                                        if matchesAll = matchesAll && si.tags.Contains(tag); !matchesAll {
82✔
3293
                                                break
32✔
3294
                                        }
3295
                                }
3296
                                if !matchesAll {
75✔
3297
                                        delete(possiblePeers, p)
32✔
3298
                                }
32✔
3299
                        }
3300
                }
3301
                // If there are no possible peers, return an error.
3302
                if len(possiblePeers) == 0 {
28✔
3303
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
8✔
3304
                }
8✔
3305
                // Take advantage of random map iteration order to select the preferred.
3306
                for p := range possiblePeers {
24✔
3307
                        preferredLeader = p.ID
12✔
3308
                        break
12✔
3309
                }
3310
        }
3311
        return preferredLeader, nil
23✔
3312
}
3313

3314
// Request to delete a stream.
3315
func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
531✔
3316
        if c == nil || !s.JetStreamEnabled() {
531✔
3317
                return
×
3318
        }
×
3319
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
531✔
3320
        if err != nil {
531✔
3321
                s.Warnf(badAPIRequestT, msg)
×
3322
                return
×
3323
        }
×
3324

3325
        var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
531✔
3326
        if errorOnRequiredApiLevel(hdr) {
532✔
3327
                resp.Error = NewJSRequiredApiLevelError()
1✔
3328
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3329
                return
1✔
3330
        }
1✔
3331

3332
        // Determine if we should proceed here when we are in clustered mode.
3333
        if s.JetStreamIsClustered() {
1,016✔
3334
                js, cc := s.getJetStreamCluster()
486✔
3335
                if js == nil || cc == nil {
486✔
3336
                        return
×
3337
                }
×
3338
                if js.isLeaderless() {
487✔
3339
                        resp.Error = NewJSClusterNotAvailError()
1✔
3340
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3341
                        return
1✔
3342
                }
1✔
3343
                // Make sure we are meta leader.
3344
                if !s.JetStreamIsLeader() {
865✔
3345
                        return
380✔
3346
                }
380✔
3347
        }
3348

3349
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
149✔
3350
                if doErr {
×
3351
                        resp.Error = NewJSNotEnabledForAccountError()
×
3352
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3353
                }
×
3354
                return
×
3355
        }
3356

3357
        if !isEmptyRequest(msg) {
150✔
3358
                resp.Error = NewJSNotEmptyRequestError()
1✔
3359
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3360
                return
1✔
3361
        }
1✔
3362
        stream := streamNameFromSubject(subject)
148✔
3363

148✔
3364
        // Clustered.
148✔
3365
        if s.JetStreamIsClustered() {
253✔
3366
                s.jsClusteredStreamDeleteRequest(ci, acc, stream, subject, reply, msg)
105✔
3367
                return
105✔
3368
        }
105✔
3369

3370
        mset, err := acc.lookupStream(stream)
43✔
3371
        if err != nil {
48✔
3372
                resp.Error = NewJSStreamNotFoundError(Unless(err))
5✔
3373
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
3374
                return
5✔
3375
        }
5✔
3376

3377
        if err := mset.delete(); err != nil {
38✔
3378
                resp.Error = NewJSStreamDeleteError(err, Unless(err))
×
3379
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3380
                return
×
3381
        }
×
3382
        resp.Success = true
38✔
3383
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
38✔
3384
}
3385

3386
// Request to delete a message.
3387
// This expects a stream sequence number as the msg body.
3388
func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
803✔
3389
        if c == nil || !s.JetStreamEnabled() {
804✔
3390
                return
1✔
3391
        }
1✔
3392
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
802✔
3393
        if err != nil {
802✔
3394
                s.Warnf(badAPIRequestT, msg)
×
3395
                return
×
3396
        }
×
3397

3398
        stream := tokenAt(subject, 6)
802✔
3399

802✔
3400
        var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
802✔
3401
        if errorOnRequiredApiLevel(hdr) {
803✔
3402
                resp.Error = NewJSRequiredApiLevelError()
1✔
3403
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3404
                return
1✔
3405
        }
1✔
3406

3407
        // If we are in clustered mode we need to be the stream leader to proceed.
3408
        if s.JetStreamIsClustered() {
1,183✔
3409
                // Check to make sure the stream is assigned.
382✔
3410
                js, cc := s.getJetStreamCluster()
382✔
3411
                if js == nil || cc == nil {
382✔
3412
                        return
×
3413
                }
×
3414
                if js.isLeaderless() {
383✔
3415
                        resp.Error = NewJSClusterNotAvailError()
1✔
3416
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3417
                        return
1✔
3418
                }
1✔
3419

3420
                js.mu.RLock()
381✔
3421
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
381✔
3422
                js.mu.RUnlock()
381✔
3423

381✔
3424
                if isLeader && sa == nil {
381✔
3425
                        // We can't find the stream, so mimic what would be the errors below.
×
3426
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3427
                                if doErr {
×
3428
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3429
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3430
                                }
×
3431
                                return
×
3432
                        }
3433
                        // No stream present.
3434
                        resp.Error = NewJSStreamNotFoundError()
×
3435
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3436
                        return
×
3437
                } else if sa == nil {
381✔
3438
                        return
×
3439
                }
×
3440

3441
                // Check to see if we are a member of the group and if the group has no leader.
3442
                if js.isGroupLeaderless(sa.Group) {
381✔
3443
                        resp.Error = NewJSClusterNotAvailError()
×
3444
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3445
                        return
×
3446
                }
×
3447

3448
                // We have the stream assigned and a leader, so only the stream leader should answer.
3449
                if !acc.JetStreamIsStreamLeader(stream) {
634✔
3450
                        return
253✔
3451
                }
253✔
3452
        }
3453

3454
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
686✔
3455
                if doErr {
276✔
3456
                        resp.Error = NewJSNotEnabledForAccountError()
137✔
3457
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
137✔
3458
                }
137✔
3459
                return
139✔
3460
        }
3461
        if isEmptyRequest(msg) {
408✔
3462
                resp.Error = NewJSBadRequestError()
×
3463
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3464
                return
×
3465
        }
×
3466
        var req JSApiMsgDeleteRequest
408✔
3467
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
408✔
3468
                resp.Error = NewJSInvalidJSONError(err)
×
3469
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3470
                return
×
3471
        }
×
3472

3473
        mset, err := acc.lookupStream(stream)
408✔
3474
        if err != nil {
410✔
3475
                resp.Error = NewJSStreamNotFoundError(Unless(err))
2✔
3476
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3477
                return
2✔
3478
        }
2✔
3479
        if mset.cfg.Sealed {
408✔
3480
                resp.Error = NewJSStreamSealedError()
2✔
3481
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3482
                return
2✔
3483
        }
2✔
3484
        if mset.cfg.DenyDelete {
405✔
3485
                resp.Error = NewJSStreamMsgDeleteFailedError(errors.New("message delete not permitted"))
1✔
3486
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3487
                return
1✔
3488
        }
1✔
3489

3490
        if s.JetStreamIsClustered() {
529✔
3491
                s.jsClusteredMsgDeleteRequest(ci, acc, mset, stream, subject, reply, &req, rmsg)
126✔
3492
                return
126✔
3493
        }
126✔
3494

3495
        var removed bool
277✔
3496
        if req.NoErase {
553✔
3497
                removed, err = mset.removeMsg(req.Seq)
276✔
3498
        } else {
277✔
3499
                removed, err = mset.eraseMsg(req.Seq)
1✔
3500
        }
1✔
3501
        if err != nil {
277✔
3502
                resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err))
×
3503
        } else if !removed {
277✔
3504
                resp.Error = NewJSSequenceNotFoundError(req.Seq)
×
3505
        } else {
277✔
3506
                resp.Success = true
277✔
3507
        }
277✔
3508
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
277✔
3509
}
3510

3511
// Request to get a raw stream message.
3512
func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
2,150✔
3513
        if c == nil || !s.JetStreamEnabled() {
2,150✔
3514
                return
×
3515
        }
×
3516
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
2,150✔
3517
        if err != nil {
2,150✔
3518
                s.Warnf(badAPIRequestT, msg)
×
3519
                return
×
3520
        }
×
3521

3522
        stream := tokenAt(subject, 6)
2,150✔
3523

2,150✔
3524
        var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
2,150✔
3525
        if errorOnRequiredApiLevel(hdr) {
2,151✔
3526
                resp.Error = NewJSRequiredApiLevelError()
1✔
3527
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3528
                return
1✔
3529
        }
1✔
3530

3531
        // If we are in clustered mode we need to be the stream leader to proceed.
3532
        if s.JetStreamIsClustered() {
3,561✔
3533
                // Check to make sure the stream is assigned.
1,412✔
3534
                js, cc := s.getJetStreamCluster()
1,412✔
3535
                if js == nil || cc == nil {
1,412✔
3536
                        return
×
3537
                }
×
3538
                if js.isLeaderless() {
1,412✔
3539
                        resp.Error = NewJSClusterNotAvailError()
×
3540
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3541
                        return
×
3542
                }
×
3543

3544
                js.mu.RLock()
1,412✔
3545
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
1,412✔
3546
                js.mu.RUnlock()
1,412✔
3547

1,412✔
3548
                if isLeader && sa == nil {
1,412✔
3549
                        // We can't find the stream, so mimic what would be the errors below.
×
3550
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3551
                                if doErr {
×
3552
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3553
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3554
                                }
×
3555
                                return
×
3556
                        }
3557
                        // No stream present.
3558
                        resp.Error = NewJSStreamNotFoundError()
×
3559
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3560
                        return
×
3561
                } else if sa == nil {
1,412✔
3562
                        return
×
3563
                }
×
3564

3565
                // Check to see if we are a member of the group and if the group has no leader.
3566
                if js.isGroupLeaderless(sa.Group) {
1,412✔
3567
                        resp.Error = NewJSClusterNotAvailError()
×
3568
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3569
                        return
×
3570
                }
×
3571

3572
                // We have the stream assigned and a leader, so only the stream leader should answer.
3573
                if !acc.JetStreamIsStreamLeader(stream) {
2,361✔
3574
                        return
949✔
3575
                }
949✔
3576
        }
3577

3578
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
1,203✔
3579
                if doErr {
3✔
3580
                        resp.Error = NewJSNotEnabledForAccountError()
×
3581
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3582
                }
×
3583
                return
3✔
3584
        }
3585
        if isEmptyRequest(msg) {
1,197✔
3586
                resp.Error = NewJSBadRequestError()
×
3587
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3588
                return
×
3589
        }
×
3590
        var req JSApiMsgGetRequest
1,197✔
3591
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
1,197✔
3592
                resp.Error = NewJSInvalidJSONError(err)
×
3593
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3594
                return
×
3595
        }
×
3596

3597
        // This version does not support batch.
3598
        if req.Batch > 0 || req.MaxBytes > 0 {
1,198✔
3599
                resp.Error = NewJSBadRequestError()
1✔
3600
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3601
                return
1✔
3602
        }
1✔
3603

3604
        // Validate non-conflicting options. Seq, LastFor, and AsOfTime are mutually exclusive.
3605
        // NextFor can be paired with Seq or AsOfTime indicating a filter subject.
3606
        if (req.Seq > 0 && req.LastFor != _EMPTY_) ||
1,196✔
3607
                (req.Seq == 0 && req.LastFor == _EMPTY_ && req.NextFor == _EMPTY_ && req.StartTime == nil) ||
1,196✔
3608
                (req.Seq > 0 && req.StartTime != nil) ||
1,196✔
3609
                (req.StartTime != nil && req.LastFor != _EMPTY_) ||
1,196✔
3610
                (req.LastFor != _EMPTY_ && req.NextFor != _EMPTY_) {
1,200✔
3611
                resp.Error = NewJSBadRequestError()
4✔
3612
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3613
                return
4✔
3614
        }
4✔
3615

3616
        mset, err := acc.lookupStream(stream)
1,192✔
3617
        if err != nil {
1,192✔
3618
                resp.Error = NewJSStreamNotFoundError()
×
3619
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3620
                return
×
3621
        }
×
3622
        if mset.offlineReason != _EMPTY_ {
1,192✔
3623
                // Just let the request time out.
×
3624
                return
×
3625
        }
×
3626

3627
        var svp StoreMsg
1,192✔
3628
        var sm *StoreMsg
1,192✔
3629

1,192✔
3630
        // Ensure this read request is isolated and doesn't interleave with writes.
1,192✔
3631
        mset.mu.RLock()
1,192✔
3632
        defer mset.mu.RUnlock()
1,192✔
3633

1,192✔
3634
        // If AsOfTime is set, perform this first to get the sequence.
1,192✔
3635
        var seq uint64
1,192✔
3636
        if req.StartTime != nil {
1,198✔
3637
                seq = mset.store.GetSeqFromTime(*req.StartTime)
6✔
3638
        } else {
1,192✔
3639
                seq = req.Seq
1,186✔
3640
        }
1,186✔
3641

3642
        if seq > 0 && req.NextFor == _EMPTY_ {
1,633✔
3643
                sm, err = mset.store.LoadMsg(seq, &svp)
441✔
3644
        } else if req.NextFor != _EMPTY_ {
1,294✔
3645
                sm, _, err = mset.store.LoadNextMsg(req.NextFor, subjectHasWildcard(req.NextFor), seq, &svp)
102✔
3646
        } else {
751✔
3647
                sm, err = mset.store.LoadLastMsg(req.LastFor, &svp)
649✔
3648
        }
649✔
3649
        if err != nil {
1,682✔
3650
                resp.Error = NewJSNoMessageFoundError()
490✔
3651
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
490✔
3652
                return
490✔
3653
        }
490✔
3654
        resp.Message = &StoredMsg{
702✔
3655
                Subject:  sm.subj,
702✔
3656
                Sequence: sm.seq,
702✔
3657
                Data:     sm.msg,
702✔
3658
                Time:     time.Unix(0, sm.ts).UTC(),
702✔
3659
        }
702✔
3660
        if !req.NoHeaders {
1,403✔
3661
                resp.Message.Header = sm.hdr
701✔
3662
        }
701✔
3663

3664
        // Don't send response through API layer for this call.
3665
        s.sendInternalAccountMsg(nil, reply, s.jsonResponse(resp))
702✔
3666
}
3667

3668
func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
28✔
3669
        if c == nil || !s.JetStreamEnabled() {
28✔
3670
                return
×
3671
        }
×
3672

3673
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
28✔
3674
        if err != nil {
28✔
3675
                s.Warnf(badAPIRequestT, msg)
×
3676
                return
×
3677
        }
×
3678

3679
        stream := streamNameFromSubject(subject)
28✔
3680
        consumer := consumerNameFromSubject(subject)
28✔
3681

28✔
3682
        var req JSApiConsumerUnpinRequest
28✔
3683
        var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}}
28✔
3684
        if errorOnRequiredApiLevel(hdr) {
29✔
3685
                resp.Error = NewJSRequiredApiLevelError()
1✔
3686
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3687
                return
1✔
3688
        }
1✔
3689

3690
        if err := json.Unmarshal(msg, &req); err != nil {
27✔
3691
                resp.Error = NewJSInvalidJSONError(err)
×
3692
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3693
                return
×
3694
        }
×
3695

3696
        if req.Group == _EMPTY_ {
31✔
3697
                resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
4✔
3698
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3699
                return
4✔
3700
        }
4✔
3701

3702
        if !validGroupName.MatchString(req.Group) {
27✔
3703
                resp.Error = NewJSConsumerInvalidGroupNameError()
4✔
3704
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3705
                return
4✔
3706
        }
4✔
3707
        if s.JetStreamIsClustered() {
31✔
3708
                // Check to make sure the stream is assigned.
12✔
3709
                js, cc := s.getJetStreamCluster()
12✔
3710
                if js == nil || cc == nil {
12✔
3711
                        return
×
3712
                }
×
3713

3714
                // First check if the stream and consumer is there.
3715
                js.mu.RLock()
12✔
3716
                sa := js.streamAssignment(acc.Name, stream)
12✔
3717
                if sa == nil {
15✔
3718
                        js.mu.RUnlock()
3✔
3719
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
3✔
3720
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3721
                        return
3✔
3722
                }
3✔
3723
                if sa.unsupported != nil {
9✔
3724
                        js.mu.RUnlock()
×
3725
                        // Just let the request time out.
×
3726
                        return
×
3727
                }
×
3728

3729
                ca, ok := sa.consumers[consumer]
9✔
3730
                if !ok || ca == nil {
12✔
3731
                        js.mu.RUnlock()
3✔
3732
                        resp.Error = NewJSConsumerNotFoundError()
3✔
3733
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3734
                        return
3✔
3735
                }
3✔
3736
                if ca.unsupported != nil {
6✔
3737
                        js.mu.RUnlock()
×
3738
                        // Just let the request time out.
×
3739
                        return
×
3740
                }
×
3741
                js.mu.RUnlock()
6✔
3742

6✔
3743
                // Then check if we are the leader.
6✔
3744
                mset, err := acc.lookupStream(stream)
6✔
3745
                if err != nil {
6✔
3746
                        return
×
3747
                }
×
3748

3749
                o := mset.lookupConsumer(consumer)
6✔
3750
                if o == nil {
6✔
3751
                        return
×
3752
                }
×
3753
                if !o.isLeader() {
10✔
3754
                        return
4✔
3755
                }
4✔
3756
        }
3757

3758
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
9✔
3759
                if doErr {
×
3760
                        resp.Error = NewJSNotEnabledForAccountError()
×
3761
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3762
                }
×
3763
                return
×
3764
        }
3765

3766
        mset, err := acc.lookupStream(stream)
9✔
3767
        if err != nil {
10✔
3768
                resp.Error = NewJSStreamNotFoundError()
1✔
3769
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3770
                return
1✔
3771
        }
1✔
3772
        if mset.offlineReason != _EMPTY_ {
8✔
3773
                // Just let the request time out.
×
3774
                return
×
3775
        }
×
3776
        o := mset.lookupConsumer(consumer)
8✔
3777
        if o == nil {
9✔
3778
                resp.Error = NewJSConsumerNotFoundError()
1✔
3779
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3780
                return
1✔
3781
        }
1✔
3782
        if o.offlineReason != _EMPTY_ {
7✔
3783
                // Just let the request time out.
×
3784
                return
×
3785
        }
×
3786

3787
        var foundPriority bool
7✔
3788
        for _, group := range o.config().PriorityGroups {
14✔
3789
                if group == req.Group {
12✔
3790
                        foundPriority = true
5✔
3791
                        break
5✔
3792
                }
3793
        }
3794
        if !foundPriority {
9✔
3795
                resp.Error = NewJSConsumerInvalidPriorityGroupError()
2✔
3796
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3797
                return
2✔
3798
        }
2✔
3799

3800
        o.mu.Lock()
5✔
3801
        o.currentPinId = _EMPTY_
5✔
3802
        o.sendUnpinnedAdvisoryLocked(req.Group, "admin")
5✔
3803
        o.mu.Unlock()
5✔
3804
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
5✔
3805
}
3806

3807
// Request to purge a stream.
3808
func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
136✔
3809
        if c == nil || !s.JetStreamEnabled() {
136✔
3810
                return
×
3811
        }
×
3812
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
136✔
3813
        if err != nil {
136✔
3814
                s.Warnf(badAPIRequestT, msg)
×
3815
                return
×
3816
        }
×
3817

3818
        stream := streamNameFromSubject(subject)
136✔
3819

136✔
3820
        var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
136✔
3821
        if errorOnRequiredApiLevel(hdr) {
137✔
3822
                resp.Error = NewJSRequiredApiLevelError()
1✔
3823
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3824
                return
1✔
3825
        }
1✔
3826

3827
        // If we are in clustered mode we need to be the stream leader to proceed.
3828
        if s.JetStreamIsClustered() {
236✔
3829
                // Check to make sure the stream is assigned.
101✔
3830
                js, cc := s.getJetStreamCluster()
101✔
3831
                if js == nil || cc == nil {
101✔
3832
                        return
×
3833
                }
×
3834

3835
                js.mu.RLock()
101✔
3836
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
101✔
3837
                js.mu.RUnlock()
101✔
3838

101✔
3839
                if isLeader && sa == nil {
101✔
3840
                        // We can't find the stream, so mimic what would be the errors below.
×
3841
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3842
                                if doErr {
×
3843
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3844
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3845
                                }
×
3846
                                return
×
3847
                        }
3848
                        // No stream present.
3849
                        resp.Error = NewJSStreamNotFoundError()
×
3850
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3851
                        return
×
3852
                } else if sa == nil {
101✔
3853
                        if js.isLeaderless() {
×
3854
                                resp.Error = NewJSClusterNotAvailError()
×
3855
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3856
                        }
×
3857
                        return
×
3858
                }
3859

3860
                // Check to see if we are a member of the group and if the group has no leader.
3861
                if js.isGroupLeaderless(sa.Group) {
102✔
3862
                        resp.Error = NewJSClusterNotAvailError()
1✔
3863
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3864
                        return
1✔
3865
                }
1✔
3866

3867
                // We have the stream assigned and a leader, so only the stream leader should answer.
3868
                if !acc.JetStreamIsStreamLeader(stream) {
168✔
3869
                        if js.isLeaderless() {
68✔
3870
                                resp.Error = NewJSClusterNotAvailError()
×
3871
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3872
                        }
×
3873
                        return
68✔
3874
                }
3875
        }
3876

3877
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
66✔
3878
                if doErr {
×
3879
                        resp.Error = NewJSNotEnabledForAccountError()
×
3880
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3881
                }
×
3882
                return
×
3883
        }
3884

3885
        var purgeRequest *JSApiStreamPurgeRequest
66✔
3886
        if isJSONObjectOrArray(msg) {
100✔
3887
                var req JSApiStreamPurgeRequest
34✔
3888
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
34✔
3889
                        resp.Error = NewJSInvalidJSONError(err)
×
3890
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3891
                        return
×
3892
                }
×
3893
                if req.Sequence > 0 && req.Keep > 0 {
34✔
3894
                        resp.Error = NewJSBadRequestError()
×
3895
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3896
                        return
×
3897
                }
×
3898
                purgeRequest = &req
34✔
3899
        }
3900

3901
        mset, err := acc.lookupStream(stream)
66✔
3902
        if err != nil {
66✔
3903
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
3904
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3905
                return
×
3906
        }
×
3907
        if mset.cfg.Sealed {
68✔
3908
                resp.Error = NewJSStreamSealedError()
2✔
3909
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3910
                return
2✔
3911
        }
2✔
3912
        if mset.cfg.DenyPurge {
65✔
3913
                resp.Error = NewJSStreamPurgeFailedError(errors.New("stream purge not permitted"))
1✔
3914
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3915
                return
1✔
3916
        }
1✔
3917

3918
        if s.JetStreamIsClustered() {
93✔
3919
                s.jsClusteredStreamPurgeRequest(ci, acc, mset, stream, subject, reply, rmsg, purgeRequest)
30✔
3920
                return
30✔
3921
        }
30✔
3922

3923
        purged, err := mset.purge(purgeRequest)
33✔
3924
        if err != nil {
33✔
3925
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
3926
        } else {
33✔
3927
                resp.Purged = purged
33✔
3928
                resp.Success = true
33✔
3929
        }
33✔
3930
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
33✔
3931
}
3932

3933
func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError {
1,251✔
3934
        var replicas int
1,251✔
3935
        if cfg != nil {
2,502✔
3936
                replicas = cfg.Replicas
1,251✔
3937
        }
1,251✔
3938
        selectedLimits, tier, jsa, apiErr := acc.selectLimits(replicas)
1,251✔
3939
        if apiErr != nil {
1,251✔
3940
                return apiErr
×
3941
        }
×
3942
        jsa.js.mu.RLock()
1,251✔
3943
        defer jsa.js.mu.RUnlock()
1,251✔
3944
        jsa.mu.RLock()
1,251✔
3945
        defer jsa.mu.RUnlock()
1,251✔
3946
        if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, cfg) >= selectedLimits.MaxStreams {
1,254✔
3947
                return NewJSMaximumStreamsLimitError()
3✔
3948
        }
3✔
3949
        reserved := jsa.tieredReservation(tier, cfg)
1,248✔
3950
        if err := jsa.js.checkAllLimits(selectedLimits, cfg, reserved, 0); err != nil {
1,249✔
3951
                return NewJSStreamLimitsError(err, Unless(err))
1✔
3952
        }
1✔
3953
        return nil
1,247✔
3954
}
3955

3956
// Request to restore a stream.
3957
func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
50✔
3958
        if c == nil || !s.JetStreamIsLeader() {
74✔
3959
                return
24✔
3960
        }
24✔
3961
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
26✔
3962
        if err != nil {
26✔
3963
                s.Warnf(badAPIRequestT, msg)
×
3964
                return
×
3965
        }
×
3966

3967
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
26✔
3968
        if errorOnRequiredApiLevel(hdr) {
27✔
3969
                resp.Error = NewJSRequiredApiLevelError()
1✔
3970
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3971
                return
1✔
3972
        }
1✔
3973
        if !acc.JetStreamEnabled() {
25✔
3974
                resp.Error = NewJSNotEnabledForAccountError()
×
3975
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3976
                return
×
3977
        }
×
3978
        if isEmptyRequest(msg) {
26✔
3979
                resp.Error = NewJSBadRequestError()
1✔
3980
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3981
                return
1✔
3982
        }
1✔
3983

3984
        var req JSApiStreamRestoreRequest
24✔
3985
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
24✔
3986
                resp.Error = NewJSInvalidJSONError(err)
×
3987
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3988
                return
×
3989
        }
×
3990

3991
        stream := streamNameFromSubject(subject)
24✔
3992

24✔
3993
        if stream != req.Config.Name && req.Config.Name == _EMPTY_ {
24✔
3994
                req.Config.Name = stream
×
3995
        }
×
3996

3997
        // check stream config at the start of the restore process, not at the end
3998
        cfg, apiErr := s.checkStreamCfg(&req.Config, acc, false)
24✔
3999
        if apiErr != nil {
26✔
4000
                resp.Error = apiErr
2✔
4001
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
4002
                return
2✔
4003
        }
2✔
4004

4005
        if s.JetStreamIsClustered() {
33✔
4006
                s.jsClusteredStreamRestoreRequest(ci, acc, &req, subject, reply, rmsg)
11✔
4007
                return
11✔
4008
        }
11✔
4009

4010
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil {
13✔
4011
                resp.Error = err
2✔
4012
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
4013
                return
2✔
4014
        }
2✔
4015

4016
        if _, err := acc.lookupStream(stream); err == nil {
10✔
4017
                resp.Error = NewJSStreamNameExistRestoreFailedError()
1✔
4018
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4019
                return
1✔
4020
        }
1✔
4021

4022
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
8✔
4023
                if doErr {
×
4024
                        resp.Error = NewJSNotEnabledForAccountError()
×
4025
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4026
                }
×
4027
                return
×
4028
        }
4029

4030
        s.processStreamRestore(ci, acc, &req.Config, subject, reply, string(msg))
8✔
4031
}
4032

4033
func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamConfig, subject, reply, msg string) <-chan error {
16✔
4034
        js := s.getJetStream()
16✔
4035

16✔
4036
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
16✔
4037

16✔
4038
        snapDir := filepath.Join(js.config.StoreDir, snapStagingDir)
16✔
4039
        if _, err := os.Stat(snapDir); os.IsNotExist(err) {
29✔
4040
                if err := os.MkdirAll(snapDir, defaultDirPerms); err != nil {
13✔
4041
                        resp.Error = &ApiError{Code: 503, Description: "JetStream unable to create temp storage for restore"}
×
4042
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4043
                        return nil
×
4044
                }
×
4045
        }
4046

4047
        tfile, err := os.CreateTemp(snapDir, "js-restore-")
16✔
4048
        if err != nil {
16✔
4049
                resp.Error = NewJSTempStorageFailedError()
×
4050
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
4051
                return nil
×
4052
        }
×
4053

4054
        streamName := cfg.Name
16✔
4055
        s.Noticef("Starting restore for stream '%s > %s'", acc.Name, streamName)
16✔
4056

16✔
4057
        start := time.Now().UTC()
16✔
4058
        domain := s.getOpts().JetStreamDomain
16✔
4059
        s.publishAdvisory(acc, JSAdvisoryStreamRestoreCreatePre+"."+streamName, &JSRestoreCreateAdvisory{
16✔
4060
                TypedEvent: TypedEvent{
16✔
4061
                        Type: JSRestoreCreateAdvisoryType,
16✔
4062
                        ID:   nuid.Next(),
16✔
4063
                        Time: start,
16✔
4064
                },
16✔
4065
                Stream: streamName,
16✔
4066
                Client: ci.forAdvisory(),
16✔
4067
                Domain: domain,
16✔
4068
        })
16✔
4069

16✔
4070
        // Create our internal subscription to accept the snapshot.
16✔
4071
        restoreSubj := fmt.Sprintf(jsRestoreDeliverT, streamName, nuid.Next())
16✔
4072

16✔
4073
        type result struct {
16✔
4074
                err   error
16✔
4075
                reply string
16✔
4076
        }
16✔
4077

16✔
4078
        // For signaling to upper layers.
16✔
4079
        resultCh := make(chan result, 1)
16✔
4080
        activeQ := newIPQueue[int](s, fmt.Sprintf("[ACC:%s] stream '%s' restore", acc.Name, streamName)) // of int
16✔
4081

16✔
4082
        var total int
16✔
4083

16✔
4084
        // FIXME(dlc) - Probably take out of network path eventually due to disk I/O?
16✔
4085
        processChunk := func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
128✔
4086
                // We require reply subjects to communicate back failures, flow etc. If they do not have one log and cancel.
112✔
4087
                if reply == _EMPTY_ {
113✔
4088
                        sub.client.processUnsub(sub.sid)
1✔
4089
                        resultCh <- result{
1✔
4090
                                fmt.Errorf("restore for stream '%s > %s' requires reply subject for each chunk", acc.Name, streamName),
1✔
4091
                                reply,
1✔
4092
                        }
1✔
4093
                        return
1✔
4094
                }
1✔
4095
                // Account client messages have \r\n on end. This is an error.
4096
                if len(msg) < LEN_CR_LF {
111✔
4097
                        sub.client.processUnsub(sub.sid)
×
4098
                        resultCh <- result{
×
4099
                                fmt.Errorf("restore for stream '%s > %s' received short chunk", acc.Name, streamName),
×
4100
                                reply,
×
4101
                        }
×
4102
                        return
×
4103
                }
×
4104
                // Adjust.
4105
                msg = msg[:len(msg)-LEN_CR_LF]
111✔
4106

111✔
4107
                // This means we are complete with our transfer from the client.
111✔
4108
                if len(msg) == 0 {
126✔
4109
                        s.Debugf("Finished staging restore for stream '%s > %s'", acc.Name, streamName)
15✔
4110
                        resultCh <- result{err, reply}
15✔
4111
                        return
15✔
4112
                }
15✔
4113

4114
                // We track total and check on server limits.
4115
                // TODO(dlc) - We could check apriori and cancel initial request if we know it won't fit.
4116
                total += len(msg)
96✔
4117
                if js.wouldExceedLimits(FileStorage, total) {
96✔
4118
                        s.resourcesExceededError(FileStorage)
×
4119
                        resultCh <- result{NewJSInsufficientResourcesError(), reply}
×
4120
                        return
×
4121
                }
×
4122

4123
                // Append chunk to temp file. Mark as issue if we encounter an error.
4124
                if n, err := tfile.Write(msg); n != len(msg) || err != nil {
96✔
4125
                        resultCh <- result{err, reply}
×
4126
                        if reply != _EMPTY_ {
×
4127
                                s.sendInternalAccountMsg(acc, reply, "-ERR 'storage failure during restore'")
×
4128
                        }
×
4129
                        return
×
4130
                }
4131

4132
                activeQ.push(len(msg))
96✔
4133

96✔
4134
                s.sendInternalAccountMsg(acc, reply, nil)
96✔
4135
        }
4136

4137
        sub, err := acc.subscribeInternal(restoreSubj, processChunk)
16✔
4138
        if err != nil {
16✔
4139
                tfile.Close()
×
4140
                os.Remove(tfile.Name())
×
4141
                resp.Error = NewJSRestoreSubscribeFailedError(err, restoreSubj)
×
4142
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
4143
                return nil
×
4144
        }
×
4145

4146
        // Mark the subject so the end user knows where to send the snapshot chunks.
4147
        resp.DeliverSubject = restoreSubj
16✔
4148
        s.sendAPIResponse(ci, acc, subject, reply, msg, s.jsonResponse(resp))
16✔
4149

16✔
4150
        doneCh := make(chan error, 1)
16✔
4151

16✔
4152
        // Monitor the progress from another Go routine.
16✔
4153
        s.startGoRoutine(func() {
32✔
4154
                defer s.grWG.Done()
16✔
4155
                defer func() {
32✔
4156
                        tfile.Close()
16✔
4157
                        os.Remove(tfile.Name())
16✔
4158
                        sub.client.processUnsub(sub.sid)
16✔
4159
                        activeQ.unregister()
16✔
4160
                }()
16✔
4161

4162
                const activityInterval = 5 * time.Second
16✔
4163
                notActive := time.NewTimer(activityInterval)
16✔
4164
                defer notActive.Stop()
16✔
4165

16✔
4166
                total := 0
16✔
4167
                for {
128✔
4168
                        select {
112✔
4169
                        case result := <-resultCh:
16✔
4170
                                err := result.err
16✔
4171
                                var mset *stream
16✔
4172

16✔
4173
                                // If we staged properly go ahead and do restore now.
16✔
4174
                                if err == nil {
31✔
4175
                                        s.Debugf("Finalizing restore for stream '%s > %s'", acc.Name, streamName)
15✔
4176
                                        tfile.Seek(0, 0)
15✔
4177
                                        mset, err = acc.RestoreStream(cfg, tfile)
15✔
4178
                                } else {
16✔
4179
                                        errStr := err.Error()
1✔
4180
                                        tmp := []rune(errStr)
1✔
4181
                                        tmp[0] = unicode.ToUpper(tmp[0])
1✔
4182
                                        s.Warnf(errStr)
1✔
4183
                                }
1✔
4184

4185
                                end := time.Now().UTC()
16✔
4186

16✔
4187
                                // TODO(rip) - Should this have the error code in it??
16✔
4188
                                s.publishAdvisory(acc, JSAdvisoryStreamRestoreCompletePre+"."+streamName, &JSRestoreCompleteAdvisory{
16✔
4189
                                        TypedEvent: TypedEvent{
16✔
4190
                                                Type: JSRestoreCompleteAdvisoryType,
16✔
4191
                                                ID:   nuid.Next(),
16✔
4192
                                                Time: end,
16✔
4193
                                        },
16✔
4194
                                        Stream: streamName,
16✔
4195
                                        Start:  start,
16✔
4196
                                        End:    end,
16✔
4197
                                        Bytes:  int64(total),
16✔
4198
                                        Client: ci.forAdvisory(),
16✔
4199
                                        Domain: domain,
16✔
4200
                                })
16✔
4201

16✔
4202
                                var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
16✔
4203

16✔
4204
                                if err != nil {
20✔
4205
                                        resp.Error = NewJSStreamRestoreError(err, Unless(err))
4✔
4206
                                        s.Warnf("Restore failed for %s for stream '%s > %s' in %v",
4✔
4207
                                                friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start))
4✔
4208
                                } else {
16✔
4209
                                        msetCfg := mset.config()
12✔
4210
                                        resp.StreamInfo = &StreamInfo{
12✔
4211
                                                Created:   mset.createdTime(),
12✔
4212
                                                State:     mset.state(),
12✔
4213
                                                Config:    *setDynamicStreamMetadata(&msetCfg),
12✔
4214
                                                TimeStamp: time.Now().UTC(),
12✔
4215
                                        }
12✔
4216
                                        s.Noticef("Completed restore of %s for stream '%s > %s' in %v",
12✔
4217
                                                friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start).Round(time.Millisecond))
12✔
4218
                                }
12✔
4219

4220
                                // On the last EOF, send back the stream info or error status.
4221
                                s.sendInternalAccountMsg(acc, result.reply, s.jsonResponse(&resp))
16✔
4222
                                // Signal to the upper layers.
16✔
4223
                                doneCh <- err
16✔
4224
                                return
16✔
4225
                        case <-activeQ.ch:
96✔
4226
                                if n, ok := activeQ.popOne(); ok {
192✔
4227
                                        total += n
96✔
4228
                                        notActive.Reset(activityInterval)
96✔
4229
                                }
96✔
4230
                        case <-notActive.C:
×
4231
                                err := fmt.Errorf("restore for stream '%s > %s' is stalled", acc, streamName)
×
4232
                                doneCh <- err
×
4233
                                return
×
4234
                        }
4235
                }
4236
        })
4237

4238
        return doneCh
16✔
4239
}
4240

4241
// Process a snapshot request.
4242
func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
29✔
4243
        if c == nil || !s.JetStreamEnabled() {
29✔
4244
                return
×
4245
        }
×
4246
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
29✔
4247
        if err != nil {
29✔
4248
                s.Warnf(badAPIRequestT, msg)
×
4249
                return
×
4250
        }
×
4251

4252
        smsg := string(msg)
29✔
4253
        stream := streamNameFromSubject(subject)
29✔
4254

29✔
4255
        // If we are in clustered mode we need to be the stream leader to proceed.
29✔
4256
        if s.JetStreamIsClustered() && !acc.JetStreamIsStreamLeader(stream) {
44✔
4257
                return
15✔
4258
        }
15✔
4259

4260
        var resp = JSApiStreamSnapshotResponse{ApiResponse: ApiResponse{Type: JSApiStreamSnapshotResponseType}}
14✔
4261
        if errorOnRequiredApiLevel(hdr) {
15✔
4262
                resp.Error = NewJSRequiredApiLevelError()
1✔
4263
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4264
                return
1✔
4265
        }
1✔
4266
        if !acc.JetStreamEnabled() {
13✔
4267
                resp.Error = NewJSNotEnabledForAccountError()
×
4268
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4269
                return
×
4270
        }
×
4271
        if isEmptyRequest(msg) {
14✔
4272
                resp.Error = NewJSBadRequestError()
1✔
4273
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4274
                return
1✔
4275
        }
1✔
4276

4277
        mset, err := acc.lookupStream(stream)
12✔
4278
        if err != nil {
13✔
4279
                resp.Error = NewJSStreamNotFoundError(Unless(err))
1✔
4280
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4281
                return
1✔
4282
        }
1✔
4283

4284
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
4285
                if doErr {
×
4286
                        resp.Error = NewJSNotEnabledForAccountError()
×
4287
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4288
                }
×
4289
                return
×
4290
        }
4291

4292
        var req JSApiStreamSnapshotRequest
11✔
4293
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
4294
                resp.Error = NewJSInvalidJSONError(err)
×
4295
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4296
                return
×
4297
        }
×
4298
        if !IsValidSubject(req.DeliverSubject) {
12✔
4299
                resp.Error = NewJSSnapshotDeliverSubjectInvalidError()
1✔
4300
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4301
                return
1✔
4302
        }
1✔
4303

4304
        // We will do the snapshot in a go routine as well since check msgs may
4305
        // stall this go routine.
4306
        go func() {
20✔
4307
                if req.CheckMsgs {
12✔
4308
                        s.Noticef("Starting health check and snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
2✔
4309
                } else {
10✔
4310
                        s.Noticef("Starting snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
8✔
4311
                }
8✔
4312

4313
                start := time.Now().UTC()
10✔
4314

10✔
4315
                sr, err := mset.snapshot(0, req.CheckMsgs, !req.NoConsumers)
10✔
4316
                if err != nil {
10✔
4317
                        s.Warnf("Snapshot of stream '%s > %s' failed: %v", mset.jsa.account.Name, mset.name(), err)
×
4318
                        resp.Error = NewJSStreamSnapshotError(err, Unless(err))
×
4319
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4320
                        return
×
4321
                }
×
4322

4323
                config := mset.config()
10✔
4324
                resp.State = &sr.State
10✔
4325
                resp.Config = &config
10✔
4326

10✔
4327
                s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(resp))
10✔
4328

10✔
4329
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCreatePre+"."+mset.name(), &JSSnapshotCreateAdvisory{
10✔
4330
                        TypedEvent: TypedEvent{
10✔
4331
                                Type: JSSnapshotCreatedAdvisoryType,
10✔
4332
                                ID:   nuid.Next(),
10✔
4333
                                Time: time.Now().UTC(),
10✔
4334
                        },
10✔
4335
                        Stream: mset.name(),
10✔
4336
                        State:  sr.State,
10✔
4337
                        Client: ci.forAdvisory(),
10✔
4338
                        Domain: s.getOpts().JetStreamDomain,
10✔
4339
                })
10✔
4340

10✔
4341
                // Now do the real streaming.
10✔
4342
                s.streamSnapshot(acc, mset, sr, &req)
10✔
4343

10✔
4344
                end := time.Now().UTC()
10✔
4345

10✔
4346
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCompletePre+"."+mset.name(), &JSSnapshotCompleteAdvisory{
10✔
4347
                        TypedEvent: TypedEvent{
10✔
4348
                                Type: JSSnapshotCompleteAdvisoryType,
10✔
4349
                                ID:   nuid.Next(),
10✔
4350
                                Time: end,
10✔
4351
                        },
10✔
4352
                        Stream: mset.name(),
10✔
4353
                        Start:  start,
10✔
4354
                        End:    end,
10✔
4355
                        Client: ci.forAdvisory(),
10✔
4356
                        Domain: s.getOpts().JetStreamDomain,
10✔
4357
                })
10✔
4358

10✔
4359
                s.Noticef("Completed snapshot of %s for stream '%s > %s' in %v",
10✔
4360
                        friendlyBytes(int64(sr.State.Bytes)),
10✔
4361
                        mset.jsa.account.Name,
10✔
4362
                        mset.name(),
10✔
4363
                        end.Sub(start))
10✔
4364
        }()
4365
}
4366

4367
// Default chunk size for now.
4368
const defaultSnapshotChunkSize = 128 * 1024
4369
const defaultSnapshotWindowSize = 8 * 1024 * 1024 // 8MB
4370

4371
// streamSnapshot will stream out our snapshot to the reply subject.
4372
func (s *Server) streamSnapshot(acc *Account, mset *stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) {
10✔
4373
        chunkSize := req.ChunkSize
10✔
4374
        if chunkSize == 0 {
12✔
4375
                chunkSize = defaultSnapshotChunkSize
2✔
4376
        }
2✔
4377
        // Setup for the chunk stream.
4378
        reply := req.DeliverSubject
10✔
4379
        r := sr.Reader
10✔
4380
        defer r.Close()
10✔
4381

10✔
4382
        // Check interest for the snapshot deliver subject.
10✔
4383
        inch := make(chan bool, 1)
10✔
4384
        acc.sl.RegisterNotification(req.DeliverSubject, inch)
10✔
4385
        defer acc.sl.ClearNotification(req.DeliverSubject, inch)
10✔
4386
        hasInterest := <-inch
10✔
4387
        if !hasInterest {
15✔
4388
                // Allow 2 seconds or so for interest to show up.
5✔
4389
                select {
5✔
4390
                case <-inch:
4✔
4391
                case <-time.After(2 * time.Second):
1✔
4392
                }
4393
        }
4394

4395
        // Create our ack flow handler.
4396
        // This is very simple for now.
4397
        ackSize := defaultSnapshotWindowSize / chunkSize
10✔
4398
        if ackSize < 8 {
10✔
4399
                ackSize = 8
×
4400
        } else if ackSize > 8*1024 {
16✔
4401
                ackSize = 8 * 1024
6✔
4402
        }
6✔
4403
        acks := make(chan struct{}, ackSize)
10✔
4404
        acks <- struct{}{}
10✔
4405

10✔
4406
        // Track bytes outstanding.
10✔
4407
        var out int32
10✔
4408

10✔
4409
        // We will place sequence number and size of chunk sent in the reply.
10✔
4410
        ackSubj := fmt.Sprintf(jsSnapshotAckT, mset.name(), nuid.Next())
10✔
4411
        ackSub, _ := mset.subscribeInternal(ackSubj+".>", func(_ *subscription, _ *client, _ *Account, subject, _ string, _ []byte) {
21✔
4412
                cs, _ := strconv.Atoi(tokenAt(subject, 6))
11✔
4413
                // This is very crude and simple, but ok for now.
11✔
4414
                // This only matters when sending multiple chunks.
11✔
4415
                if atomic.AddInt32(&out, int32(-cs)) < defaultSnapshotWindowSize {
22✔
4416
                        select {
11✔
4417
                        case acks <- struct{}{}:
11✔
4418
                        default:
×
4419
                        }
4420
                }
4421
        })
4422
        defer mset.unsubscribe(ackSub)
10✔
4423

10✔
4424
        // TODO(dlc) - Add in NATS-Chunked-Sequence header
10✔
4425
        var hdr []byte
10✔
4426
        for index := 1; ; index++ {
112✔
4427
                chunk := make([]byte, chunkSize)
102✔
4428
                n, err := r.Read(chunk)
102✔
4429
                chunk = chunk[:n]
102✔
4430
                if err != nil {
112✔
4431
                        if n > 0 {
10✔
4432
                                mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0))
×
4433
                        }
×
4434
                        break
10✔
4435
                }
4436

4437
                // Wait on acks for flow control if past our window size.
4438
                // Wait up to 10ms for now if no acks received.
4439
                if atomic.LoadInt32(&out) > defaultSnapshotWindowSize {
92✔
4440
                        select {
×
4441
                        case <-acks:
×
4442
                                // ok to proceed.
4443
                        case <-inch:
×
4444
                                // Lost interest
×
4445
                                hdr = []byte("NATS/1.0 408 No Interest\r\n\r\n")
×
4446
                                goto done
×
4447
                        case <-time.After(2 * time.Second):
×
4448
                                hdr = []byte("NATS/1.0 408 No Flow Response\r\n\r\n")
×
4449
                                goto done
×
4450
                        }
4451
                }
4452
                ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index)
92✔
4453
                if hdr == nil {
102✔
4454
                        hdr = []byte("NATS/1.0 204\r\n\r\n")
10✔
4455
                }
10✔
4456
                mset.outq.send(newJSPubMsg(reply, _EMPTY_, ackReply, nil, chunk, nil, 0))
92✔
4457
                atomic.AddInt32(&out, int32(len(chunk)))
92✔
4458
        }
4459

4460
        if err := <-sr.errCh; err != _EMPTY_ {
10✔
4461
                hdr = []byte(fmt.Sprintf("NATS/1.0 500 %s\r\n\r\n", err))
×
4462
        }
×
4463

4464
done:
4465
        // Send last EOF
4466
        // TODO(dlc) - place hash in header
4467
        mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
10✔
4468
}
4469

4470
// For determining consumer request type.
4471
type ccReqType uint8
4472

4473
const (
4474
        ccNew = iota
4475
        ccLegacyEphemeral
4476
        ccLegacyDurable
4477
)
4478

4479
// Request to create a consumer where stream and optional consumer name are part of the subject, and optional
4480
// filtered subjects can be at the tail end.
4481
// Assumes stream and consumer names are single tokens.
4482
func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
10,881✔
4483
        if c == nil || !s.JetStreamEnabled() {
10,881✔
4484
                return
×
4485
        }
×
4486

4487
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
10,881✔
4488
        if err != nil {
10,882✔
4489
                s.Warnf(badAPIRequestT, msg)
1✔
4490
                return
1✔
4491
        }
1✔
4492

4493
        var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
10,880✔
4494
        if errorOnRequiredApiLevel(hdr) {
10,883✔
4495
                resp.Error = NewJSRequiredApiLevelError()
3✔
4496
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4497
                return
3✔
4498
        }
3✔
4499

4500
        var req CreateConsumerRequest
10,877✔
4501
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
10,878✔
4502
                resp.Error = NewJSInvalidJSONError(err)
1✔
4503
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4504
                return
1✔
4505
        }
1✔
4506

4507
        var js *jetStream
10,876✔
4508
        isClustered := s.JetStreamIsClustered()
10,876✔
4509

10,876✔
4510
        // Determine if we should proceed here when we are in clustered mode.
10,876✔
4511
        if isClustered {
20,786✔
4512
                if req.Config.Direct {
10,324✔
4513
                        // Check to see if we have this stream and are the stream leader.
414✔
4514
                        if !acc.JetStreamIsStreamLeader(streamNameFromSubject(subject)) {
727✔
4515
                                return
313✔
4516
                        }
313✔
4517
                } else {
9,496✔
4518
                        var cc *jetStreamCluster
9,496✔
4519
                        js, cc = s.getJetStreamCluster()
9,496✔
4520
                        if js == nil || cc == nil {
9,496✔
4521
                                return
×
4522
                        }
×
4523
                        if js.isLeaderless() {
9,496✔
4524
                                resp.Error = NewJSClusterNotAvailError()
×
4525
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4526
                                return
×
4527
                        }
×
4528
                        // Make sure we are meta leader.
4529
                        if !s.JetStreamIsLeader() {
15,914✔
4530
                                return
6,418✔
4531
                        }
6,418✔
4532
                }
4533
        }
4534

4535
        var streamName, consumerName, filteredSubject string
4,145✔
4536
        var rt ccReqType
4,145✔
4537

4,145✔
4538
        if n := numTokens(subject); n < 5 {
4,145✔
4539
                s.Warnf(badAPIRequestT, msg)
×
4540
                return
×
4541
        } else if n == 5 {
4,927✔
4542
                // Legacy ephemeral.
782✔
4543
                rt = ccLegacyEphemeral
782✔
4544
                streamName = streamNameFromSubject(subject)
782✔
4545
        } else {
4,145✔
4546
                // New style and durable legacy.
3,363✔
4547
                if tokenAt(subject, 4) == "DURABLE" {
3,623✔
4548
                        rt = ccLegacyDurable
260✔
4549
                        if n != 7 {
260✔
4550
                                resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4551
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4552
                                return
×
4553
                        }
×
4554
                        streamName = tokenAt(subject, 6)
260✔
4555
                        consumerName = tokenAt(subject, 7)
260✔
4556
                } else {
3,103✔
4557
                        streamName = streamNameFromSubject(subject)
3,103✔
4558
                        consumerName = consumerNameFromSubject(subject)
3,103✔
4559
                        // New has optional filtered subject as part of main subject..
3,103✔
4560
                        if n > 6 {
5,740✔
4561
                                tokens := strings.Split(subject, tsep)
2,637✔
4562
                                filteredSubject = strings.Join(tokens[6:], tsep)
2,637✔
4563
                        }
2,637✔
4564
                }
4565
        }
4566

4567
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
4,148✔
4568
                if doErr {
4✔
4569
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
4570
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4571
                }
1✔
4572
                return
3✔
4573
        }
4574

4575
        if streamName != req.Stream {
4,143✔
4576
                resp.Error = NewJSStreamMismatchError()
1✔
4577
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4578
                return
1✔
4579
        }
1✔
4580

4581
        if consumerName != _EMPTY_ {
7,502✔
4582
                // Check for path like separators in the name.
3,361✔
4583
                if strings.ContainsAny(consumerName, `\/`) {
3,365✔
4584
                        resp.Error = NewJSConsumerNameContainsPathSeparatorsError()
4✔
4585
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4586
                        return
4✔
4587
                }
4✔
4588
        }
4589

4590
        // Should we expect a durable name
4591
        if rt == ccLegacyDurable {
4,396✔
4592
                if numTokens(subject) < 7 {
259✔
4593
                        resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4594
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4595
                        return
×
4596
                }
×
4597
                // Now check on requirements for durable request.
4598
                if req.Config.Durable == _EMPTY_ {
260✔
4599
                        resp.Error = NewJSConsumerDurableNameNotSetError()
1✔
4600
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4601
                        return
1✔
4602
                }
1✔
4603
                if consumerName != req.Config.Durable {
258✔
4604
                        resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4605
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4606
                        return
×
4607
                }
×
4608
        }
4609
        // If new style and durable set make sure they match.
4610
        if rt == ccNew {
7,234✔
4611
                if req.Config.Durable != _EMPTY_ {
5,694✔
4612
                        if consumerName != req.Config.Durable {
2,596✔
4613
                                resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4614
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4615
                                return
×
4616
                        }
×
4617
                }
4618
                // New style ephemeral so we need to honor the name.
4619
                req.Config.Name = consumerName
3,098✔
4620
        }
4621
        // Check for legacy ephemeral mis-configuration.
4622
        if rt == ccLegacyEphemeral && req.Config.Durable != _EMPTY_ {
4,139✔
4623
                resp.Error = NewJSConsumerEphemeralWithDurableNameError()
3✔
4624
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4625
                return
3✔
4626
        }
3✔
4627

4628
        // in case of multiple filters provided, error if new API is used.
4629
        if filteredSubject != _EMPTY_ && len(req.Config.FilterSubjects) != 0 {
4,134✔
4630
                resp.Error = NewJSConsumerMultipleFiltersNotAllowedError()
1✔
4631
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4632
                return
1✔
4633
        }
1✔
4634

4635
        // Check for a filter subject.
4636
        if filteredSubject != _EMPTY_ && req.Config.FilterSubject != filteredSubject {
4,134✔
4637
                resp.Error = NewJSConsumerCreateFilterSubjectMismatchError()
2✔
4638
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
4639
                return
2✔
4640
        }
2✔
4641

4642
        if isClustered && !req.Config.Direct {
7,206✔
4643
                s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
3,076✔
4644
                return
3,076✔
4645
        }
3,076✔
4646

4647
        // If we are here we are single server mode.
4648
        if req.Config.Replicas > 1 {
1,054✔
4649
                resp.Error = NewJSStreamReplicasNotSupportedError()
×
4650
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4651
                return
×
4652
        }
×
4653

4654
        stream, err := acc.lookupStream(req.Stream)
1,054✔
4655
        if err != nil {
1,058✔
4656
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
4657
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4658
                return
4✔
4659
        }
4✔
4660
        if stream.offlineReason != _EMPTY_ {
1,050✔
4661
                resp.Error = NewJSStreamOfflineReasonError(errors.New(stream.offlineReason))
×
4662
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4663
                return
×
4664
        }
×
4665

4666
        if o := stream.lookupConsumer(consumerName); o != nil {
1,098✔
4667
                if o.offlineReason != _EMPTY_ {
48✔
4668
                        resp.Error = NewJSConsumerOfflineReasonError(errors.New(o.offlineReason))
×
4669
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4670
                        return
×
4671
                }
×
4672
                // If the consumer already exists then don't allow updating the PauseUntil, just set
4673
                // it back to whatever the current configured value is.
4674
                o.mu.RLock()
48✔
4675
                req.Config.PauseUntil = o.cfg.PauseUntil
48✔
4676
                o.mu.RUnlock()
48✔
4677
        }
4678

4679
        // Initialize/update asset version metadata.
4680
        setStaticConsumerMetadata(&req.Config)
1,050✔
4681

1,050✔
4682
        o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic)
1,050✔
4683

1,050✔
4684
        if err != nil {
1,103✔
4685
                if IsNatsErr(err, JSConsumerStoreFailedErrF) {
53✔
4686
                        cname := req.Config.Durable // Will be empty if ephemeral.
×
4687
                        s.Warnf("Consumer create failed for '%s > %s > %s': %v", acc, req.Stream, cname, err)
×
4688
                        err = errConsumerStoreFailed
×
4689
                }
×
4690
                resp.Error = NewJSConsumerCreateError(err, Unless(err))
53✔
4691
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
53✔
4692
                return
53✔
4693
        }
4694
        resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.initialInfo())
997✔
4695
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
997✔
4696

997✔
4697
        o.mu.RLock()
997✔
4698
        if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
1,001✔
4699
                o.sendPauseAdvisoryLocked(&o.cfg)
4✔
4700
        }
4✔
4701
        o.mu.RUnlock()
997✔
4702
}
4703

4704
// Request for the list of all consumer names.
4705
func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
35✔
4706
        if c == nil || !s.JetStreamEnabled() {
35✔
4707
                return
×
4708
        }
×
4709
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
35✔
4710
        if err != nil {
35✔
4711
                s.Warnf(badAPIRequestT, msg)
×
4712
                return
×
4713
        }
×
4714

4715
        var resp = JSApiConsumerNamesResponse{
35✔
4716
                ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType},
35✔
4717
                Consumers:   []string{},
35✔
4718
        }
35✔
4719
        if errorOnRequiredApiLevel(hdr) {
36✔
4720
                resp.Error = NewJSRequiredApiLevelError()
1✔
4721
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4722
                return
1✔
4723
        }
1✔
4724

4725
        // Determine if we should proceed here when we are in clustered mode.
4726
        if s.JetStreamIsClustered() {
64✔
4727
                js, cc := s.getJetStreamCluster()
30✔
4728
                if js == nil || cc == nil {
30✔
4729
                        return
×
4730
                }
×
4731
                if js.isLeaderless() {
30✔
4732
                        resp.Error = NewJSClusterNotAvailError()
×
4733
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4734
                        return
×
4735
                }
×
4736
                // Make sure we are meta leader.
4737
                if !s.JetStreamIsLeader() {
50✔
4738
                        return
20✔
4739
                }
20✔
4740
        }
4741

4742
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
14✔
4743
                if doErr {
×
4744
                        resp.Error = NewJSNotEnabledForAccountError()
×
4745
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4746
                }
×
4747
                return
×
4748
        }
4749

4750
        var offset int
14✔
4751
        if isJSONObjectOrArray(msg) {
22✔
4752
                var req JSApiConsumersRequest
8✔
4753
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
8✔
4754
                        resp.Error = NewJSInvalidJSONError(err)
×
4755
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4756
                        return
×
4757
                }
×
4758
                offset = req.Offset
8✔
4759
        }
4760

4761
        streamName := streamNameFromSubject(subject)
14✔
4762
        var numConsumers int
14✔
4763

14✔
4764
        if s.JetStreamIsClustered() {
24✔
4765
                js, cc := s.getJetStreamCluster()
10✔
4766
                if js == nil || cc == nil {
10✔
4767
                        // TODO(dlc) - Debug or Warn?
×
4768
                        return
×
4769
                }
×
4770
                js.mu.RLock()
10✔
4771
                sas := cc.streams[acc.Name]
10✔
4772
                if sas == nil {
10✔
4773
                        js.mu.RUnlock()
×
4774
                        resp.Error = NewJSStreamNotFoundError()
×
4775
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4776
                        return
×
4777
                }
×
4778
                sa := sas[streamName]
10✔
4779
                if sa == nil || sa.err != nil {
10✔
4780
                        js.mu.RUnlock()
×
4781
                        resp.Error = NewJSStreamNotFoundError()
×
4782
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4783
                        return
×
4784
                }
×
4785
                for consumer := range sa.consumers {
23✔
4786
                        resp.Consumers = append(resp.Consumers, consumer)
13✔
4787
                }
13✔
4788
                if len(resp.Consumers) > 1 {
14✔
4789
                        slices.Sort(resp.Consumers)
4✔
4790
                }
4✔
4791
                numConsumers = len(resp.Consumers)
10✔
4792
                if offset > numConsumers {
10✔
4793
                        offset = numConsumers
×
4794
                }
×
4795
                resp.Consumers = resp.Consumers[offset:]
10✔
4796
                if len(resp.Consumers) > JSApiNamesLimit {
10✔
4797
                        resp.Consumers = resp.Consumers[:JSApiNamesLimit]
×
4798
                }
×
4799
                js.mu.RUnlock()
10✔
4800

4801
        } else {
4✔
4802
                mset, err := acc.lookupStream(streamName)
4✔
4803
                if err != nil {
4✔
4804
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4805
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4806
                        return
×
4807
                }
×
4808

4809
                obs := mset.getPublicConsumers()
4✔
4810
                slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
4✔
4811

4812
                numConsumers = len(obs)
4✔
4813
                if offset > numConsumers {
4✔
4814
                        offset = numConsumers
×
4815
                }
×
4816

4817
                for _, o := range obs[offset:] {
7✔
4818
                        resp.Consumers = append(resp.Consumers, o.String())
3✔
4819
                        if len(resp.Consumers) >= JSApiNamesLimit {
3✔
4820
                                break
×
4821
                        }
4822
                }
4823
        }
4824
        resp.Total = numConsumers
14✔
4825
        resp.Limit = JSApiNamesLimit
14✔
4826
        resp.Offset = offset
14✔
4827
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
14✔
4828
}
4829

4830
// Request for the list of all detailed consumer information.
4831
func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
80✔
4832
        if c == nil || !s.JetStreamEnabled() {
80✔
4833
                return
×
4834
        }
×
4835

4836
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
80✔
4837
        if err != nil {
80✔
4838
                s.Warnf(badAPIRequestT, msg)
×
4839
                return
×
4840
        }
×
4841

4842
        var resp = JSApiConsumerListResponse{
80✔
4843
                ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
80✔
4844
                Consumers:   []*ConsumerInfo{},
80✔
4845
        }
80✔
4846
        if errorOnRequiredApiLevel(hdr) {
81✔
4847
                resp.Error = NewJSRequiredApiLevelError()
1✔
4848
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4849
                return
1✔
4850
        }
1✔
4851

4852
        // Determine if we should proceed here when we are in clustered mode.
4853
        if s.JetStreamIsClustered() {
153✔
4854
                js, cc := s.getJetStreamCluster()
74✔
4855
                if js == nil || cc == nil {
74✔
4856
                        return
×
4857
                }
×
4858
                if js.isLeaderless() {
75✔
4859
                        resp.Error = NewJSClusterNotAvailError()
1✔
4860
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4861
                        return
1✔
4862
                }
1✔
4863
                // Make sure we are meta leader.
4864
                if !s.JetStreamIsLeader() {
127✔
4865
                        return
54✔
4866
                }
54✔
4867
        }
4868

4869
        if errorOnRequiredApiLevel(hdr) {
24✔
4870
                resp.Error = NewJSClusterNotAvailError()
×
4871
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4872
                return
×
4873
        }
×
4874

4875
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
24✔
4876
                if doErr {
×
4877
                        resp.Error = NewJSNotEnabledForAccountError()
×
4878
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4879
                }
×
4880
                return
×
4881
        }
4882

4883
        var offset int
24✔
4884
        if isJSONObjectOrArray(msg) {
35✔
4885
                var req JSApiConsumersRequest
11✔
4886
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
4887
                        resp.Error = NewJSInvalidJSONError(err)
1✔
4888
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4889
                        return
1✔
4890
                }
1✔
4891
                offset = req.Offset
10✔
4892
        }
4893

4894
        streamName := streamNameFromSubject(subject)
23✔
4895

23✔
4896
        // Clustered mode will invoke a scatter and gather.
23✔
4897
        if s.JetStreamIsClustered() {
42✔
4898
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
19✔
4899
                msg = copyBytes(msg)
19✔
4900
                s.startGoRoutine(func() {
38✔
4901
                        s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg)
19✔
4902
                })
19✔
4903
                return
19✔
4904
        }
4905

4906
        mset, err := acc.lookupStream(streamName)
4✔
4907
        if err != nil {
4✔
4908
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4909
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4910
                return
×
4911
        }
×
4912

4913
        obs := mset.getPublicConsumers()
4✔
4914
        slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
4✔
4915

4916
        ocnt := len(obs)
4✔
4917
        if offset > ocnt {
4✔
4918
                offset = ocnt
×
4919
        }
×
4920

4921
        var missingNames []string
4✔
4922
        for _, o := range obs[offset:] {
7✔
4923
                if o.offlineReason != _EMPTY_ {
4✔
4924
                        if resp.Offline == nil {
2✔
4925
                                resp.Offline = make(map[string]string, 1)
1✔
4926
                        }
1✔
4927
                        resp.Offline[o.name] = o.offlineReason
1✔
4928
                        missingNames = append(missingNames, o.name)
1✔
4929
                        continue
1✔
4930
                }
4931
                if cinfo := o.info(); cinfo != nil {
4✔
4932
                        resp.Consumers = append(resp.Consumers, cinfo)
2✔
4933
                }
2✔
4934
                if len(resp.Consumers) >= JSApiListLimit {
2✔
4935
                        break
×
4936
                }
4937
        }
4938
        resp.Total = ocnt
4✔
4939
        resp.Limit = JSApiListLimit
4✔
4940
        resp.Offset = offset
4✔
4941
        resp.Missing = missingNames
4✔
4942
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
4✔
4943
}
4944

4945
// Request for information about an consumer.
4946
func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
42,452✔
4947
        if c == nil || !s.JetStreamEnabled() {
42,452✔
4948
                return
×
4949
        }
×
4950
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
42,452✔
4951
        if err != nil {
42,452✔
4952
                s.Warnf(badAPIRequestT, msg)
×
4953
                return
×
4954
        }
×
4955

4956
        streamName := streamNameFromSubject(subject)
42,452✔
4957
        consumerName := consumerNameFromSubject(subject)
42,452✔
4958

42,452✔
4959
        var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}
42,452✔
4960
        if errorOnRequiredApiLevel(hdr) {
42,453✔
4961
                resp.Error = NewJSRequiredApiLevelError()
1✔
4962
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4963
                return
1✔
4964
        }
1✔
4965

4966
        if !isEmptyRequest(msg) {
42,452✔
4967
                resp.Error = NewJSNotEmptyRequestError()
1✔
4968
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4969
                return
1✔
4970
        }
1✔
4971

4972
        // If we are in clustered mode we need to be the consumer leader to proceed.
4973
        if s.JetStreamIsClustered() {
84,395✔
4974
                // Check to make sure the consumer is assigned.
41,945✔
4975
                js, cc := s.getJetStreamCluster()
41,945✔
4976
                if js == nil || cc == nil {
41,945✔
4977
                        return
×
4978
                }
×
4979

4980
                js.mu.RLock()
41,945✔
4981
                meta := cc.meta
41,945✔
4982
                js.mu.RUnlock()
41,945✔
4983

41,945✔
4984
                if meta == nil {
41,945✔
4985
                        return
×
4986
                }
×
4987

4988
                // Since these could wait on the Raft group lock, don't do so under the JS lock.
4989
                ourID := meta.ID()
41,945✔
4990
                groupLeaderless := meta.Leaderless()
41,945✔
4991
                groupCreated := meta.Created()
41,945✔
4992

41,945✔
4993
                js.mu.RLock()
41,945✔
4994
                isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName)
41,945✔
4995
                var rg *raftGroup
41,945✔
4996
                var offline, isMember bool
41,945✔
4997
                if ca != nil {
46,408✔
4998
                        if rg = ca.Group; rg != nil {
8,926✔
4999
                                offline = s.allPeersOffline(rg)
4,463✔
5000
                                isMember = rg.isMember(ourID)
4,463✔
5001
                        }
4,463✔
5002
                        if ca.unsupported != nil && isMember {
4,481✔
5003
                                // If we're a member for this consumer, and it's not supported, report it as offline.
18✔
5004
                                resp.Error = NewJSConsumerOfflineReasonError(errors.New(ca.unsupported.reason))
18✔
5005
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
18✔
5006
                                js.mu.RUnlock()
18✔
5007
                                return
18✔
5008
                        }
18✔
5009
                }
5010
                // Capture consumer leader here.
5011
                isConsumerLeader := cc.isConsumerLeader(acc.Name, streamName, consumerName)
41,927✔
5012
                // Also capture if we think there is no meta leader.
41,927✔
5013
                var isLeaderLess bool
41,927✔
5014
                if !isLeader {
70,097✔
5015
                        isLeaderLess = groupLeaderless && time.Since(groupCreated) > lostQuorumIntervalDefault
28,170✔
5016
                }
28,170✔
5017
                js.mu.RUnlock()
41,927✔
5018

41,927✔
5019
                if isLeader && ca == nil {
54,330✔
5020
                        // We can't find the consumer, so mimic what would be the errors below.
12,403✔
5021
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
12,403✔
5022
                                if doErr {
×
5023
                                        resp.Error = NewJSNotEnabledForAccountError()
×
5024
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5025
                                }
×
5026
                                return
×
5027
                        }
5028
                        if sa == nil {
22,406✔
5029
                                resp.Error = NewJSStreamNotFoundError()
10,003✔
5030
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
10,003✔
5031
                                return
10,003✔
5032
                        }
10,003✔
5033
                        // If we are here the consumer is not present.
5034
                        resp.Error = NewJSConsumerNotFoundError()
2,400✔
5035
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2,400✔
5036
                        return
2,400✔
5037
                } else if ca == nil {
54,603✔
5038
                        if isLeaderLess {
25,081✔
5039
                                resp.Error = NewJSClusterNotAvailError()
2✔
5040
                                // Delaying an error response gives the leader a chance to respond before us
2✔
5041
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
5042
                        }
2✔
5043
                        return
25,079✔
5044
                } else if isLeader && offline {
4,447✔
5045
                        resp.Error = NewJSConsumerOfflineError()
2✔
5046
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
5047
                        return
2✔
5048
                }
2✔
5049

5050
                // Check to see if we are a member of the group and if the group has no leader.
5051
                if isMember && js.isGroupLeaderless(ca.Group) {
4,444✔
5052
                        resp.Error = NewJSClusterNotAvailError()
1✔
5053
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5054
                        return
1✔
5055
                }
1✔
5056

5057
                // We have the consumer assigned and a leader, so only the consumer leader should answer.
5058
                if !isConsumerLeader {
7,543✔
5059
                        if isLeaderLess {
3,101✔
5060
                                resp.Error = NewJSClusterNotAvailError()
×
5061
                                // Delaying an error response gives the leader a chance to respond before us
×
5062
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group, errRespDelay)
×
5063
                                return
×
5064
                        }
×
5065

5066
                        var node RaftNode
3,101✔
5067
                        var leaderNotPartOfGroup bool
3,101✔
5068

3,101✔
5069
                        // We have a consumer assignment.
3,101✔
5070
                        if isMember {
5,352✔
5071
                                js.mu.RLock()
2,251✔
5072
                                if rg != nil && rg.node != nil {
4,499✔
5073
                                        node = rg.node
2,248✔
5074
                                        if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
2,248✔
5075
                                                leaderNotPartOfGroup = true
×
5076
                                        }
×
5077
                                }
5078
                                js.mu.RUnlock()
2,251✔
5079
                        }
5080

5081
                        // Check if we should ignore all together.
5082
                        if node == nil {
3,954✔
5083
                                // We have been assigned but have not created a node yet. If we are a member return
853✔
5084
                                // our config and defaults for state and no cluster info.
853✔
5085
                                if isMember {
856✔
5086
                                        // Since we access consumerAssignment, need js lock.
3✔
5087
                                        js.mu.RLock()
3✔
5088
                                        resp.ConsumerInfo = &ConsumerInfo{
3✔
5089
                                                Stream:    ca.Stream,
3✔
5090
                                                Name:      ca.Name,
3✔
5091
                                                Created:   ca.Created,
3✔
5092
                                                Config:    setDynamicConsumerMetadata(ca.Config),
3✔
5093
                                                TimeStamp: time.Now().UTC(),
3✔
5094
                                        }
3✔
5095
                                        b := s.jsonResponse(resp)
3✔
5096
                                        js.mu.RUnlock()
3✔
5097
                                        s.sendAPIResponse(ci, acc, subject, reply, string(msg), b)
3✔
5098
                                }
3✔
5099
                                return
853✔
5100
                        }
5101
                        // If we are a member and we have a group leader or we had a previous leader consider bailing out.
5102
                        if !node.Leaderless() || node.HadPreviousLeader() || (rg != nil && rg.Preferred != _EMPTY_ && rg.Preferred != ourID) {
4,489✔
5103
                                if leaderNotPartOfGroup {
2,241✔
5104
                                        resp.Error = NewJSConsumerOfflineError()
×
5105
                                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
5106
                                }
×
5107
                                return
2,241✔
5108
                        }
5109
                        // If we are here we are a member and this is just a new consumer that does not have a (preferred) leader yet.
5110
                        // Will fall through and return what we have. All consumers can respond but this should be very rare
5111
                        // but makes more sense to clients when they try to create, get a consumer exists, and then do consumer info.
5112
                }
5113
        }
5114

5115
        if !acc.JetStreamEnabled() {
1,853✔
5116
                resp.Error = NewJSNotEnabledForAccountError()
×
5117
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5118
                return
×
5119
        }
×
5120

5121
        mset, err := acc.lookupStream(streamName)
1,853✔
5122
        if err != nil {
1,853✔
5123
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5124
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5125
                return
×
5126
        }
×
5127

5128
        obs := mset.lookupConsumer(consumerName)
1,853✔
5129
        if obs == nil {
2,031✔
5130
                resp.Error = NewJSConsumerNotFoundError()
178✔
5131
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
178✔
5132
                return
178✔
5133
        }
178✔
5134

5135
        if obs.offlineReason != _EMPTY_ {
1,676✔
5136
                resp.Error = NewJSConsumerOfflineReasonError(errors.New(obs.offlineReason))
1✔
5137
                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
5138
                return
1✔
5139
        }
1✔
5140

5141
        if resp.ConsumerInfo = setDynamicConsumerInfoMetadata(obs.info()); resp.ConsumerInfo == nil {
1,674✔
5142
                // This consumer returned nil which means it's closed. Respond with not found.
×
5143
                resp.Error = NewJSConsumerNotFoundError()
×
5144
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5145
                return
×
5146
        }
×
5147
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,674✔
5148
}
5149

5150
// Request to delete an Consumer.
5151
func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
7,355✔
5152
        if c == nil || !s.JetStreamEnabled() {
7,357✔
5153
                return
2✔
5154
        }
2✔
5155
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
7,353✔
5156
        if err != nil {
7,353✔
5157
                s.Warnf(badAPIRequestT, msg)
×
5158
                return
×
5159
        }
×
5160

5161
        var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
7,353✔
5162
        if errorOnRequiredApiLevel(hdr) {
7,354✔
5163
                resp.Error = NewJSRequiredApiLevelError()
1✔
5164
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5165
                return
1✔
5166
        }
1✔
5167

5168
        // Determine if we should proceed here when we are in clustered mode.
5169
        if s.JetStreamIsClustered() {
14,277✔
5170
                js, cc := s.getJetStreamCluster()
6,925✔
5171
                if js == nil || cc == nil {
6,926✔
5172
                        return
1✔
5173
                }
1✔
5174
                if js.isLeaderless() {
6,925✔
5175
                        resp.Error = NewJSClusterNotAvailError()
1✔
5176
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5177
                        return
1✔
5178
                }
1✔
5179
                // Make sure we are meta leader.
5180
                if !s.JetStreamIsLeader() {
11,518✔
5181
                        return
4,595✔
5182
                }
4,595✔
5183
        }
5184

5185
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
2,821✔
5186
                if doErr {
129✔
5187
                        resp.Error = NewJSNotEnabledForAccountError()
63✔
5188
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
63✔
5189
                }
63✔
5190
                return
66✔
5191
        }
5192
        if !isEmptyRequest(msg) {
2,690✔
5193
                resp.Error = NewJSNotEmptyRequestError()
1✔
5194
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5195
                return
1✔
5196
        }
1✔
5197
        stream := streamNameFromSubject(subject)
2,688✔
5198
        consumer := consumerNameFromSubject(subject)
2,688✔
5199

2,688✔
5200
        if s.JetStreamIsClustered() {
5,016✔
5201
                s.jsClusteredConsumerDeleteRequest(ci, acc, stream, consumer, subject, reply, rmsg)
2,328✔
5202
                return
2,328✔
5203
        }
2,328✔
5204

5205
        mset, err := acc.lookupStream(stream)
360✔
5206
        if err != nil {
360✔
5207
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5208
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5209
                return
×
5210
        }
×
5211

5212
        obs := mset.lookupConsumer(consumer)
360✔
5213
        if obs == nil {
501✔
5214
                resp.Error = NewJSConsumerNotFoundError()
141✔
5215
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
141✔
5216
                return
141✔
5217
        }
141✔
5218
        if err := obs.delete(); err != nil {
219✔
5219
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
5220
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5221
                return
×
5222
        }
×
5223
        resp.Success = true
219✔
5224
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
219✔
5225
}
5226

5227
// Request to pause or unpause a Consumer.
5228
func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
64✔
5229
        if c == nil || !s.JetStreamEnabled() {
64✔
5230
                return
×
5231
        }
×
5232
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
64✔
5233
        if err != nil {
64✔
5234
                s.Warnf(badAPIRequestT, msg)
×
5235
                return
×
5236
        }
×
5237

5238
        var req JSApiConsumerPauseRequest
64✔
5239
        var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}}
64✔
5240
        if errorOnRequiredApiLevel(hdr) {
65✔
5241
                resp.Error = NewJSRequiredApiLevelError()
1✔
5242
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
5243
                return
1✔
5244
        }
1✔
5245

5246
        if isJSONObjectOrArray(msg) {
118✔
5247
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
55✔
5248
                        resp.Error = NewJSInvalidJSONError(err)
×
5249
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5250
                        return
×
5251
                }
×
5252
        }
5253

5254
        // Determine if we should proceed here when we are in clustered mode.
5255
        isClustered := s.JetStreamIsClustered()
63✔
5256
        js, cc := s.getJetStreamCluster()
63✔
5257
        if isClustered {
117✔
5258
                if js == nil || cc == nil {
54✔
5259
                        return
×
5260
                }
×
5261
                if js.isLeaderless() {
54✔
5262
                        resp.Error = NewJSClusterNotAvailError()
×
5263
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5264
                        return
×
5265
                }
×
5266
                // Make sure we are meta leader.
5267
                if !s.JetStreamIsLeader() {
94✔
5268
                        return
40✔
5269
                }
40✔
5270
        }
5271

5272
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
23✔
5273
                if doErr {
×
5274
                        resp.Error = NewJSNotEnabledForAccountError()
×
5275
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5276
                }
×
5277
                return
×
5278
        }
5279

5280
        stream := streamNameFromSubject(subject)
23✔
5281
        consumer := consumerNameFromSubject(subject)
23✔
5282

23✔
5283
        if isClustered {
37✔
5284
                js.mu.RLock()
14✔
5285
                sa := js.streamAssignment(acc.Name, stream)
14✔
5286
                if sa == nil {
14✔
5287
                        js.mu.RUnlock()
×
5288
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5289
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5290
                        return
×
5291
                }
×
5292
                if sa.unsupported != nil {
14✔
5293
                        js.mu.RUnlock()
×
5294
                        // Just let the request time out.
×
5295
                        return
×
5296
                }
×
5297

5298
                ca, ok := sa.consumers[consumer]
14✔
5299
                if !ok || ca == nil {
14✔
5300
                        js.mu.RUnlock()
×
5301
                        resp.Error = NewJSConsumerNotFoundError()
×
5302
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5303
                        return
×
5304
                }
×
5305
                if ca.unsupported != nil {
14✔
5306
                        js.mu.RUnlock()
×
5307
                        // Just let the request time out.
×
5308
                        return
×
5309
                }
×
5310

5311
                nca := *ca
14✔
5312
                ncfg := *ca.Config
14✔
5313
                nca.Config = &ncfg
14✔
5314
                meta := cc.meta
14✔
5315
                js.mu.RUnlock()
14✔
5316
                pauseUTC := req.PauseUntil.UTC()
14✔
5317
                if !pauseUTC.IsZero() {
24✔
5318
                        nca.Config.PauseUntil = &pauseUTC
10✔
5319
                } else {
14✔
5320
                        nca.Config.PauseUntil = nil
4✔
5321
                }
4✔
5322

5323
                // Update asset version metadata due to updating pause/resume.
5324
                // Only PauseUntil is updated above, so reuse config for both.
5325
                setStaticConsumerMetadata(nca.Config)
14✔
5326

14✔
5327
                eca := encodeAddConsumerAssignment(&nca)
14✔
5328
                meta.Propose(eca)
14✔
5329

14✔
5330
                resp.PauseUntil = pauseUTC
14✔
5331
                if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
24✔
5332
                        resp.PauseRemaining = time.Until(pauseUTC)
10✔
5333
                }
10✔
5334
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
14✔
5335
                return
14✔
5336
        }
5337

5338
        mset, err := acc.lookupStream(stream)
9✔
5339
        if err != nil {
9✔
5340
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5341
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5342
                return
×
5343
        }
×
5344
        if mset.offlineReason != _EMPTY_ {
9✔
5345
                // Just let the request time out.
×
5346
                return
×
5347
        }
×
5348

5349
        obs := mset.lookupConsumer(consumer)
9✔
5350
        if obs == nil {
9✔
5351
                resp.Error = NewJSConsumerNotFoundError()
×
5352
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5353
                return
×
5354
        }
×
5355
        if obs.offlineReason != _EMPTY_ {
9✔
5356
                // Just let the request time out.
×
5357
                return
×
5358
        }
×
5359

5360
        ncfg := obs.cfg
9✔
5361
        pauseUTC := req.PauseUntil.UTC()
9✔
5362
        if !pauseUTC.IsZero() {
14✔
5363
                ncfg.PauseUntil = &pauseUTC
5✔
5364
        } else {
9✔
5365
                ncfg.PauseUntil = nil
4✔
5366
        }
4✔
5367

5368
        // Update asset version metadata due to updating pause/resume.
5369
        setStaticConsumerMetadata(&ncfg)
9✔
5370

9✔
5371
        if err := obs.updateConfig(&ncfg); err != nil {
9✔
5372
                // The only type of error that should be returned here is from o.store,
×
5373
                // so use a store failed error type.
×
5374
                resp.Error = NewJSConsumerStoreFailedError(err)
×
5375
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5376
                return
×
5377
        }
×
5378

5379
        resp.PauseUntil = pauseUTC
9✔
5380
        if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
14✔
5381
                resp.PauseRemaining = time.Until(pauseUTC)
5✔
5382
        }
5✔
5383
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
9✔
5384
}
5385

5386
// sendJetStreamAPIAuditAdvisory will send the audit event for a given event.
5387
func (s *Server) sendJetStreamAPIAuditAdvisory(ci *ClientInfo, acc *Account, subject, request, response string) {
70,359✔
5388
        s.publishAdvisory(acc, JSAuditAdvisory, JSAPIAudit{
70,359✔
5389
                TypedEvent: TypedEvent{
70,359✔
5390
                        Type: JSAPIAuditType,
70,359✔
5391
                        ID:   nuid.Next(),
70,359✔
5392
                        Time: time.Now().UTC(),
70,359✔
5393
                },
70,359✔
5394
                Server:   s.Name(),
70,359✔
5395
                Client:   ci.forAdvisory(),
70,359✔
5396
                Subject:  subject,
70,359✔
5397
                Request:  request,
70,359✔
5398
                Response: response,
70,359✔
5399
                Domain:   s.getOpts().JetStreamDomain,
70,359✔
5400
        })
70,359✔
5401
}
70,359✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc