• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 14426271004

11 Apr 2025 03:04PM UTC coverage: 85.523% (-0.03%) from 85.552%
14426271004

push

github

web-flow
Report `GOMAXPROCS` and `GOMEMLIMIT` in `ServerStats` (#6791)

Fixes #6672.

Signed-off-by: Neil Twigg <neil@nats.io>

69537 of 81308 relevant lines covered (85.52%)

357513.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.87
/server/jetstream_api.go
1
// Copyright 2020-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "cmp"
19
        "encoding/json"
20
        "errors"
21
        "fmt"
22
        "io"
23
        "os"
24
        "path/filepath"
25
        "runtime"
26
        "slices"
27
        "strconv"
28
        "strings"
29
        "sync/atomic"
30
        "time"
31
        "unicode"
32

33
        "github.com/nats-io/nuid"
34
)
35

36
// Request API subjects for JetStream.
37
const (
38
        // All API endpoints.
39
        jsAllAPI = "$JS.API.>"
40

41
        // For constructing JetStream domain prefixes.
42
        jsDomainAPI = "$JS.%s.API.>"
43

44
        JSApiPrefix = "$JS.API"
45

46
        // JSApiAccountInfo is for obtaining general information about JetStream for this account.
47
        // Will return JSON response.
48
        JSApiAccountInfo = "$JS.API.INFO"
49

50
        // JSApiTemplateCreate is the endpoint to create new stream templates.
51
        // Will return JSON response.
52
        JSApiTemplateCreate  = "$JS.API.STREAM.TEMPLATE.CREATE.*"
53
        JSApiTemplateCreateT = "$JS.API.STREAM.TEMPLATE.CREATE.%s"
54

55
        // JSApiTemplates is the endpoint to list all stream template names for this account.
56
        // Will return JSON response.
57
        JSApiTemplates = "$JS.API.STREAM.TEMPLATE.NAMES"
58

59
        // JSApiTemplateInfo is for obtaining general information about a named stream template.
60
        // Will return JSON response.
61
        JSApiTemplateInfo  = "$JS.API.STREAM.TEMPLATE.INFO.*"
62
        JSApiTemplateInfoT = "$JS.API.STREAM.TEMPLATE.INFO.%s"
63

64
        // JSApiTemplateDelete is the endpoint to delete stream templates.
65
        // Will return JSON response.
66
        JSApiTemplateDelete  = "$JS.API.STREAM.TEMPLATE.DELETE.*"
67
        JSApiTemplateDeleteT = "$JS.API.STREAM.TEMPLATE.DELETE.%s"
68

69
        // JSApiStreamCreate is the endpoint to create new streams.
70
        // Will return JSON response.
71
        JSApiStreamCreate  = "$JS.API.STREAM.CREATE.*"
72
        JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"
73

74
        // JSApiStreamUpdate is the endpoint to update existing streams.
75
        // Will return JSON response.
76
        JSApiStreamUpdate  = "$JS.API.STREAM.UPDATE.*"
77
        JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"
78

79
        // JSApiStreams is the endpoint to list all stream names for this account.
80
        // Will return JSON response.
81
        JSApiStreams = "$JS.API.STREAM.NAMES"
82
        // JSApiStreamList is the endpoint that will return all detailed stream information
83
        JSApiStreamList = "$JS.API.STREAM.LIST"
84

85
        // JSApiStreamInfo is for obtaining general information about a named stream.
86
        // Will return JSON response.
87
        JSApiStreamInfo  = "$JS.API.STREAM.INFO.*"
88
        JSApiStreamInfoT = "$JS.API.STREAM.INFO.%s"
89

90
        // JSApiStreamDelete is the endpoint to delete streams.
91
        // Will return JSON response.
92
        JSApiStreamDelete  = "$JS.API.STREAM.DELETE.*"
93
        JSApiStreamDeleteT = "$JS.API.STREAM.DELETE.%s"
94

95
        // JSApiStreamPurge is the endpoint to purge streams.
96
        // Will return JSON response.
97
        JSApiStreamPurge  = "$JS.API.STREAM.PURGE.*"
98
        JSApiStreamPurgeT = "$JS.API.STREAM.PURGE.%s"
99

100
        // JSApiStreamSnapshot is the endpoint to snapshot streams.
101
        // Will return a stream of chunks with a nil chunk as EOF to
102
        // the deliver subject. Caller should respond to each chunk
103
        // with a nil body response for ack flow.
104
        JSApiStreamSnapshot  = "$JS.API.STREAM.SNAPSHOT.*"
105
        JSApiStreamSnapshotT = "$JS.API.STREAM.SNAPSHOT.%s"
106

107
        // JSApiStreamRestore is the endpoint to restore a stream from a snapshot.
108
        // Caller should respond to each chunk with a nil body response.
109
        JSApiStreamRestore  = "$JS.API.STREAM.RESTORE.*"
110
        JSApiStreamRestoreT = "$JS.API.STREAM.RESTORE.%s"
111

112
        // JSApiMsgDelete is the endpoint to delete messages from a stream.
113
        // Will return JSON response.
114
        JSApiMsgDelete  = "$JS.API.STREAM.MSG.DELETE.*"
115
        JSApiMsgDeleteT = "$JS.API.STREAM.MSG.DELETE.%s"
116

117
        // JSApiMsgGet is the template for direct requests for a message by its stream sequence number.
118
        // Will return JSON response.
119
        JSApiMsgGet  = "$JS.API.STREAM.MSG.GET.*"
120
        JSApiMsgGetT = "$JS.API.STREAM.MSG.GET.%s"
121

122
        // JSDirectMsgGet is the template for non-api layer direct requests for a message by its stream sequence number or last by subject.
123
        // Will return the message similar to how a consumer receives the message, no JSON processing.
124
        // If the message can not be found we will use a status header of 404. If the stream does not exist the client will get a no-responders or timeout.
125
        JSDirectMsgGet  = "$JS.API.DIRECT.GET.*"
126
        JSDirectMsgGetT = "$JS.API.DIRECT.GET.%s"
127

128
        // This is a direct version of get last by subject, which will be the dominant pattern for KV access once 2.9 is released.
129
        // The stream and the key will be part of the subject to allow for no-marshal payloads and subject based security permissions.
130
        JSDirectGetLastBySubject  = "$JS.API.DIRECT.GET.*.>"
131
        JSDirectGetLastBySubjectT = "$JS.API.DIRECT.GET.%s.%s"
132

133
        // jsDirectGetPre
134
        jsDirectGetPre = "$JS.API.DIRECT.GET"
135

136
        // JSApiConsumerCreate is the endpoint to create consumers for streams.
137
        // This was also the legacy endpoint for ephemeral consumers.
138
        // It now can take consumer name and optional filter subject, which when part of the subject controls access.
139
        // Will return JSON response.
140
        JSApiConsumerCreate    = "$JS.API.CONSUMER.CREATE.*"
141
        JSApiConsumerCreateT   = "$JS.API.CONSUMER.CREATE.%s"
142
        JSApiConsumerCreateEx  = "$JS.API.CONSUMER.CREATE.*.>"
143
        JSApiConsumerCreateExT = "$JS.API.CONSUMER.CREATE.%s.%s.%s"
144

145
        // JSApiDurableCreate is the endpoint to create durable consumers for streams.
146
        // You need to include the stream and consumer name in the subject.
147
        JSApiDurableCreate  = "$JS.API.CONSUMER.DURABLE.CREATE.*.*"
148
        JSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s"
149

150
        // JSApiConsumers is the endpoint to list all consumer names for the stream.
151
        // Will return JSON response.
152
        JSApiConsumers  = "$JS.API.CONSUMER.NAMES.*"
153
        JSApiConsumersT = "$JS.API.CONSUMER.NAMES.%s"
154

155
        // JSApiConsumerList is the endpoint that will return all detailed consumer information
156
        JSApiConsumerList  = "$JS.API.CONSUMER.LIST.*"
157
        JSApiConsumerListT = "$JS.API.CONSUMER.LIST.%s"
158

159
        // JSApiConsumerInfo is for obtaining general information about a consumer.
160
        // Will return JSON response.
161
        JSApiConsumerInfo  = "$JS.API.CONSUMER.INFO.*.*"
162
        JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s"
163

164
        // JSApiConsumerDelete is the endpoint to delete consumers.
165
        // Will return JSON response.
166
        JSApiConsumerDelete  = "$JS.API.CONSUMER.DELETE.*.*"
167
        JSApiConsumerDeleteT = "$JS.API.CONSUMER.DELETE.%s.%s"
168

169
        // JSApiConsumerPause is the endpoint to pause or unpause consumers.
170
        // Will return JSON response.
171
        JSApiConsumerPause  = "$JS.API.CONSUMER.PAUSE.*.*"
172
        JSApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"
173

174
        // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
175
        JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
176

177
        // JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
178
        JSApiConsumerUnpin  = "$JS.API.CONSUMER.UNPIN.*.*"
179
        JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"
180

181
        // jsRequestNextPre
182
        jsRequestNextPre = "$JS.API.CONSUMER.MSG.NEXT."
183

184
        // For snapshots and restores. The ack will have additional tokens.
185
        jsSnapshotAckT    = "$JS.SNAPSHOT.ACK.%s.%s"
186
        jsRestoreDeliverT = "$JS.SNAPSHOT.RESTORE.%s.%s"
187

188
        // JSApiStreamRemovePeer is the endpoint to remove a peer from a clustered stream and its consumers.
189
        // Will return JSON response.
190
        JSApiStreamRemovePeer  = "$JS.API.STREAM.PEER.REMOVE.*"
191
        JSApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.%s"
192

193
        // JSApiStreamLeaderStepDown is the endpoint to have stream leader stepdown.
194
        // Will return JSON response.
195
        JSApiStreamLeaderStepDown  = "$JS.API.STREAM.LEADER.STEPDOWN.*"
196
        JSApiStreamLeaderStepDownT = "$JS.API.STREAM.LEADER.STEPDOWN.%s"
197

198
        // JSApiConsumerLeaderStepDown is the endpoint to have consumer leader stepdown.
199
        // Will return JSON response.
200
        JSApiConsumerLeaderStepDown  = "$JS.API.CONSUMER.LEADER.STEPDOWN.*.*"
201
        JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
202

203
        // JSApiLeaderStepDown is the endpoint to have our metaleader stepdown.
204
        // Only works from system account.
205
        // Will return JSON response.
206
        JSApiLeaderStepDown = "$JS.API.META.LEADER.STEPDOWN"
207

208
        // JSApiRemoveServer is the endpoint to remove a peer server from the cluster.
209
        // Only works from system account.
210
        // Will return JSON response.
211
        JSApiRemoveServer = "$JS.API.SERVER.REMOVE"
212

213
        // JSApiAccountPurge is the endpoint to purge the js content of an account
214
        // Only works from system account.
215
        // Will return JSON response.
216
        JSApiAccountPurge  = "$JS.API.ACCOUNT.PURGE.*"
217
        JSApiAccountPurgeT = "$JS.API.ACCOUNT.PURGE.%s"
218

219
        // JSApiServerStreamMove is the endpoint to move streams off a server
220
        // Only works from system account.
221
        // Will return JSON response.
222
        JSApiServerStreamMove  = "$JS.API.ACCOUNT.STREAM.MOVE.*.*"
223
        JSApiServerStreamMoveT = "$JS.API.ACCOUNT.STREAM.MOVE.%s.%s"
224

225
        // JSApiServerStreamCancelMove is the endpoint to cancel a stream move
226
        // Only works from system account.
227
        // Will return JSON response.
228
        JSApiServerStreamCancelMove  = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.*.*"
229
        JSApiServerStreamCancelMoveT = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.%s.%s"
230

231
        // The prefix for system level account API.
232
        jsAPIAccountPre = "$JS.API.ACCOUNT."
233

234
        // jsAckT is the template for the ack message stream coming back from a consumer
235
        // when they ACK/NAK, etc a message.
236
        jsAckT      = "$JS.ACK.%s.%s"
237
        jsAckPre    = "$JS.ACK."
238
        jsAckPreLen = len(jsAckPre)
239

240
        // jsFlowControl is for flow control subjects.
241
        jsFlowControlPre = "$JS.FC."
242
        // jsFlowControl is for FC responses.
243
        jsFlowControl = "$JS.FC.%s.%s.*"
244

245
        // JSAdvisoryPrefix is a prefix for all JetStream advisories.
246
        JSAdvisoryPrefix = "$JS.EVENT.ADVISORY"
247

248
        // JSMetricPrefix is a prefix for all JetStream metrics.
249
        JSMetricPrefix = "$JS.EVENT.METRIC"
250

251
        // JSMetricConsumerAckPre is a metric containing ack latency.
252
        JSMetricConsumerAckPre = "$JS.EVENT.METRIC.CONSUMER.ACK"
253

254
        // JSAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold.
255
        JSAdvisoryConsumerMaxDeliveryExceedPre = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES"
256

257
        // JSAdvisoryConsumerMsgNakPre is a notification published when a message has been naked
258
        JSAdvisoryConsumerMsgNakPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED"
259

260
        // JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated.
261
        JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"
262

263
        // JSAdvisoryStreamCreatedPre notification that a stream was created.
264
        JSAdvisoryStreamCreatedPre = "$JS.EVENT.ADVISORY.STREAM.CREATED"
265

266
        // JSAdvisoryStreamDeletedPre notification that a stream was deleted.
267
        JSAdvisoryStreamDeletedPre = "$JS.EVENT.ADVISORY.STREAM.DELETED"
268

269
        // JSAdvisoryStreamUpdatedPre notification that a stream was updated.
270
        JSAdvisoryStreamUpdatedPre = "$JS.EVENT.ADVISORY.STREAM.UPDATED"
271

272
        // JSAdvisoryConsumerCreatedPre notification that a consumer was created.
273
        JSAdvisoryConsumerCreatedPre = "$JS.EVENT.ADVISORY.CONSUMER.CREATED"
274

275
        // JSAdvisoryConsumerDeletedPre notification that a consumer was deleted.
276
        JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"
277

278
        // JSAdvisoryConsumerPausePre notification that a consumer paused/unpaused.
279
        JSAdvisoryConsumerPausePre = "$JS.EVENT.ADVISORY.CONSUMER.PAUSE"
280

281
        // JSAdvisoryConsumerPinnedPre notification that a consumer was pinned.
282
        JSAdvisoryConsumerPinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.PINNED"
283

284
        // JSAdvisoryConsumerUnpinnedPre notification that a consumer was unpinned.
285
        JSAdvisoryConsumerUnpinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.UNPINNED"
286

287
        // JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
288
        JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"
289

290
        // JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed.
291
        JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"
292

293
        // JSAdvisoryStreamRestoreCreatePre notification that a restore was start.
294
        JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"
295

296
        // JSAdvisoryStreamRestoreCompletePre notification that a restore was completed.
297
        JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"
298

299
        // JSAdvisoryDomainLeaderElectedPre notification that a jetstream domain has elected a leader.
300
        JSAdvisoryDomainLeaderElected = "$JS.EVENT.ADVISORY.DOMAIN.LEADER_ELECTED"
301

302
        // JSAdvisoryStreamLeaderElectedPre notification that a replicated stream has elected a leader.
303
        JSAdvisoryStreamLeaderElectedPre = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED"
304

305
        // JSAdvisoryStreamQuorumLostPre notification that a stream and its consumers are stalled.
306
        JSAdvisoryStreamQuorumLostPre = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST"
307

308
        // JSAdvisoryConsumerLeaderElectedPre notification that a replicated consumer has elected a leader.
309
        JSAdvisoryConsumerLeaderElectedPre = "$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED"
310

311
        // JSAdvisoryConsumerQuorumLostPre notification that a consumer is stalled.
312
        JSAdvisoryConsumerQuorumLostPre = "$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST"
313

314
        // JSAdvisoryServerOutOfStorage notification that a server has no more storage.
315
        JSAdvisoryServerOutOfStorage = "$JS.EVENT.ADVISORY.SERVER.OUT_OF_STORAGE"
316

317
        // JSAdvisoryServerRemoved notification that a server has been removed from the system.
318
        JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED"
319

320
        // JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit.
321
        JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED"
322

323
        // JSAuditAdvisory is a notification about JetStream API access.
324
        // FIXME - Add in details about who..
325
        JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
326
)
327

328
var denyAllClientJs = []string{jsAllAPI, "$KV.>", "$OBJ.>"}
329
var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI, "$KV.>", "$OBJ.>"}
330

331
func generateJSMappingTable(domain string) map[string]string {
726✔
332
        mappings := map[string]string{}
726✔
333
        // This set of mappings is very very very ugly.
726✔
334
        // It is a consequence of what we defined the domain prefix to be "$JS.domain.API" and it's mapping to "$JS.API"
726✔
335
        // For optics $KV and $OBJ where made to be independent subject spaces.
726✔
336
        // As materialized views of JS, they did not simply extend that subject space to say "$JS.API.KV" "$JS.API.OBJ"
726✔
337
        // This is very unfortunate!!!
726✔
338
        // Furthermore, it seemed bad to require different domain prefixes for JS/KV/OBJ.
726✔
339
        // Especially since the actual API for say KV, does use stream create from JS.
726✔
340
        // To avoid overlaps KV and OBJ views append the prefix to their API.
726✔
341
        // (Replacing $KV with the prefix allows users to create collisions with say the bucket name)
726✔
342
        // This mapping therefore needs to have extra token so that the mapping can properly discern between $JS, $KV, $OBJ
726✔
343
        for srcMappingSuffix, to := range map[string]string{
726✔
344
                "INFO":       JSApiAccountInfo,
726✔
345
                "STREAM.>":   "$JS.API.STREAM.>",
726✔
346
                "CONSUMER.>": "$JS.API.CONSUMER.>",
726✔
347
                "DIRECT.>":   "$JS.API.DIRECT.>",
726✔
348
                "META.>":     "$JS.API.META.>",
726✔
349
                "SERVER.>":   "$JS.API.SERVER.>",
726✔
350
                "ACCOUNT.>":  "$JS.API.ACCOUNT.>",
726✔
351
                "$KV.>":      "$KV.>",
726✔
352
                "$OBJ.>":     "$OBJ.>",
726✔
353
        } {
7,260✔
354
                mappings[fmt.Sprintf("$JS.%s.API.%s", domain, srcMappingSuffix)] = to
6,534✔
355
        }
6,534✔
356
        return mappings
726✔
357
}
358

359
// JSMaxDescription is the maximum description length for streams and consumers.
360
const JSMaxDescriptionLen = 4 * 1024
361

362
// JSMaxMetadataLen is the maximum length for streams and consumers metadata map.
363
// It's calculated by summing length of all keys and values.
364
const JSMaxMetadataLen = 128 * 1024
365

366
// JSMaxNameLen is the maximum name lengths for streams, consumers and templates.
367
// Picked 255 as it seems to be a widely used file name limit
368
const JSMaxNameLen = 255
369

370
// JSDefaultRequestQueueLimit is the default number of entries that we will
371
// put on the global request queue before we react.
372
const JSDefaultRequestQueueLimit = 10_000
373

374
// Responses for API calls.
375

376
// ApiResponse is a standard response from the JetStream JSON API
377
type ApiResponse struct {
378
        Type  string    `json:"type"`
379
        Error *ApiError `json:"error,omitempty"`
380
}
381

382
const JSApiSystemResponseType = "io.nats.jetstream.api.v1.system_response"
383

384
// When passing back to the clients generalize store failures.
385
var (
386
        errStreamStoreFailed   = errors.New("error creating store for stream")
387
        errConsumerStoreFailed = errors.New("error creating store for consumer")
388
)
389

390
// ToError checks if the response has a error and if it does converts it to an error avoiding
391
// the pitfalls described by https://yourbasic.org/golang/gotcha-why-nil-error-not-equal-nil/
392
func (r *ApiResponse) ToError() error {
2,930✔
393
        if r.Error == nil {
4,704✔
394
                return nil
1,774✔
395
        }
1,774✔
396

397
        return r.Error
1,156✔
398
}
399

400
const JSApiOverloadedType = "io.nats.jetstream.api.v1.system_overloaded"
401

402
// ApiPaged includes variables used to create paged responses from the JSON API
403
type ApiPaged struct {
404
        Total  int `json:"total"`
405
        Offset int `json:"offset"`
406
        Limit  int `json:"limit"`
407
}
408

409
// ApiPagedRequest includes parameters allowing specific pages to be requests from APIs responding with ApiPaged
410
type ApiPagedRequest struct {
411
        Offset int `json:"offset"`
412
}
413

414
// JSApiAccountInfoResponse reports back information on jetstream for this account.
415
type JSApiAccountInfoResponse struct {
416
        ApiResponse
417
        *JetStreamAccountStats
418
}
419

420
const JSApiAccountInfoResponseType = "io.nats.jetstream.api.v1.account_info_response"
421

422
// JSApiStreamCreateResponse stream creation.
423
type JSApiStreamCreateResponse struct {
424
        ApiResponse
425
        *StreamInfo
426
        DidCreate bool `json:"did_create,omitempty"`
427
}
428

429
const JSApiStreamCreateResponseType = "io.nats.jetstream.api.v1.stream_create_response"
430

431
// JSApiStreamDeleteResponse stream removal.
432
type JSApiStreamDeleteResponse struct {
433
        ApiResponse
434
        Success bool `json:"success,omitempty"`
435
}
436

437
const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"
438

439
// JSMaxSubjectDetails The limit of the number of subject details we will send in a stream info response.
440
const JSMaxSubjectDetails = 100_000
441

442
type JSApiStreamInfoRequest struct {
443
        ApiPagedRequest
444
        DeletedDetails bool   `json:"deleted_details,omitempty"`
445
        SubjectsFilter string `json:"subjects_filter,omitempty"`
446
}
447

448
type JSApiStreamInfoResponse struct {
449
        ApiResponse
450
        ApiPaged
451
        *StreamInfo
452
}
453

454
const JSApiStreamInfoResponseType = "io.nats.jetstream.api.v1.stream_info_response"
455

456
// JSApiNamesLimit is the maximum entries we will return for streams or consumers lists.
457
// TODO(dlc) - with header or request support could request chunked response.
458
const JSApiNamesLimit = 1024
459
const JSApiListLimit = 256
460

461
type JSApiStreamNamesRequest struct {
462
        ApiPagedRequest
463
        // These are filters that can be applied to the list.
464
        Subject string `json:"subject,omitempty"`
465
}
466

467
// JSApiStreamNamesResponse list of streams.
468
// A nil request is valid and means all streams.
469
type JSApiStreamNamesResponse struct {
470
        ApiResponse
471
        ApiPaged
472
        Streams []string `json:"streams"`
473
}
474

475
const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"
476

477
type JSApiStreamListRequest struct {
478
        ApiPagedRequest
479
        // These are filters that can be applied to the list.
480
        Subject string `json:"subject,omitempty"`
481
}
482

483
// JSApiStreamListResponse list of detailed stream information.
484
// A nil request is valid and means all streams.
485
type JSApiStreamListResponse struct {
486
        ApiResponse
487
        ApiPaged
488
        Streams []*StreamInfo `json:"streams"`
489
        Missing []string      `json:"missing,omitempty"`
490
}
491

492
const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response"
493

494
// JSApiStreamPurgeRequest is optional request information to the purge API.
495
// Subject will filter the purge request to only messages that match the subject, which can have wildcards.
496
// Sequence will purge up to but not including this sequence and can be combined with subject filtering.
497
// Keep will specify how many messages to keep. This can also be combined with subject filtering.
498
// Note that Sequence and Keep are mutually exclusive, so both can not be set at the same time.
499
type JSApiStreamPurgeRequest struct {
500
        // Purge up to but not including sequence.
501
        Sequence uint64 `json:"seq,omitempty"`
502
        // Subject to match against messages for the purge command.
503
        Subject string `json:"filter,omitempty"`
504
        // Number of messages to keep.
505
        Keep uint64 `json:"keep,omitempty"`
506
}
507

508
type JSApiStreamPurgeResponse struct {
509
        ApiResponse
510
        Success bool   `json:"success,omitempty"`
511
        Purged  uint64 `json:"purged"`
512
}
513

514
const JSApiStreamPurgeResponseType = "io.nats.jetstream.api.v1.stream_purge_response"
515

516
type JSApiConsumerUnpinRequest struct {
517
        Group string `json:"group"`
518
}
519

520
type JSApiConsumerUnpinResponse struct {
521
        ApiResponse
522
}
523

524
const JSApiConsumerUnpinResponseType = "io.nats.jetstream.api.v1.consumer_unpin_response"
525

526
// JSApiStreamUpdateResponse for updating a stream.
527
type JSApiStreamUpdateResponse struct {
528
        ApiResponse
529
        *StreamInfo
530
}
531

532
const JSApiStreamUpdateResponseType = "io.nats.jetstream.api.v1.stream_update_response"
533

534
// JSApiMsgDeleteRequest delete message request.
535
type JSApiMsgDeleteRequest struct {
536
        Seq     uint64 `json:"seq"`
537
        NoErase bool   `json:"no_erase,omitempty"`
538
}
539

540
type JSApiMsgDeleteResponse struct {
541
        ApiResponse
542
        Success bool `json:"success,omitempty"`
543
}
544

545
const JSApiMsgDeleteResponseType = "io.nats.jetstream.api.v1.stream_msg_delete_response"
546

547
type JSApiStreamSnapshotRequest struct {
548
        // Subject to deliver the chunks to for the snapshot.
549
        DeliverSubject string `json:"deliver_subject"`
550
        // Do not include consumers in the snapshot.
551
        NoConsumers bool `json:"no_consumers,omitempty"`
552
        // Optional chunk size preference.
553
        // Best to just let server select.
554
        ChunkSize int `json:"chunk_size,omitempty"`
555
        // Check all message's checksums prior to snapshot.
556
        CheckMsgs bool `json:"jsck,omitempty"`
557
}
558

559
// JSApiStreamSnapshotResponse is the direct response to the snapshot request.
560
type JSApiStreamSnapshotResponse struct {
561
        ApiResponse
562
        // Configuration of the given stream.
563
        Config *StreamConfig `json:"config,omitempty"`
564
        // Current State for the given stream.
565
        State *StreamState `json:"state,omitempty"`
566
}
567

568
const JSApiStreamSnapshotResponseType = "io.nats.jetstream.api.v1.stream_snapshot_response"
569

570
// JSApiStreamRestoreRequest is the required restore request.
571
type JSApiStreamRestoreRequest struct {
572
        // Configuration of the given stream.
573
        Config StreamConfig `json:"config"`
574
        // Current State for the given stream.
575
        State StreamState `json:"state"`
576
}
577

578
// JSApiStreamRestoreResponse is the direct response to the restore request.
579
type JSApiStreamRestoreResponse struct {
580
        ApiResponse
581
        // Subject to deliver the chunks to for the snapshot restore.
582
        DeliverSubject string `json:"deliver_subject"`
583
}
584

585
const JSApiStreamRestoreResponseType = "io.nats.jetstream.api.v1.stream_restore_response"
586

587
// JSApiStreamRemovePeerRequest is the required remove peer request.
588
type JSApiStreamRemovePeerRequest struct {
589
        // Server name of the peer to be removed.
590
        Peer string `json:"peer"`
591
}
592

593
// JSApiStreamRemovePeerResponse is the response to a remove peer request.
594
type JSApiStreamRemovePeerResponse struct {
595
        ApiResponse
596
        Success bool `json:"success,omitempty"`
597
}
598

599
const JSApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response"
600

601
// JSApiStreamLeaderStepDownResponse is the response to a leader stepdown request.
602
type JSApiStreamLeaderStepDownResponse struct {
603
        ApiResponse
604
        Success bool `json:"success,omitempty"`
605
}
606

607
const JSApiStreamLeaderStepDownResponseType = "io.nats.jetstream.api.v1.stream_leader_stepdown_response"
608

609
// JSApiConsumerLeaderStepDownResponse is the response to a consumer leader stepdown request.
610
type JSApiConsumerLeaderStepDownResponse struct {
611
        ApiResponse
612
        Success bool `json:"success,omitempty"`
613
}
614

615
const JSApiConsumerLeaderStepDownResponseType = "io.nats.jetstream.api.v1.consumer_leader_stepdown_response"
616

617
// JSApiLeaderStepdownRequest allows placement control over the meta leader placement.
618
type JSApiLeaderStepdownRequest struct {
619
        Placement *Placement `json:"placement,omitempty"`
620
}
621

622
// JSApiLeaderStepDownResponse is the response to a meta leader stepdown request.
623
type JSApiLeaderStepDownResponse struct {
624
        ApiResponse
625
        Success bool `json:"success,omitempty"`
626
}
627

628
const JSApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_stepdown_response"
629

630
// JSApiMetaServerRemoveRequest will remove a peer from the meta group.
631
type JSApiMetaServerRemoveRequest struct {
632
        // Server name of the peer to be removed.
633
        Server string `json:"peer"`
634
        // Peer ID of the peer to be removed. If specified this is used
635
        // instead of the server name.
636
        Peer string `json:"peer_id,omitempty"`
637
}
638

639
// JSApiMetaServerRemoveResponse is the response to a peer removal request in the meta group.
640
type JSApiMetaServerRemoveResponse struct {
641
        ApiResponse
642
        Success bool `json:"success,omitempty"`
643
}
644

645
const JSApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response"
646

647
// JSApiMetaServerStreamMoveRequest will move a stream on a server to another
648
// response to this will come as JSApiStreamUpdateResponse/JSApiStreamUpdateResponseType
649
type JSApiMetaServerStreamMoveRequest struct {
650
        // Server name of the peer to be evacuated.
651
        Server string `json:"server,omitempty"`
652
        // Cluster the server is in
653
        Cluster string `json:"cluster,omitempty"`
654
        // Domain the sever is in
655
        Domain string `json:"domain,omitempty"`
656
        // Ephemeral placement tags for the move
657
        Tags []string `json:"tags,omitempty"`
658
}
659

660
const JSApiAccountPurgeResponseType = "io.nats.jetstream.api.v1.account_purge_response"
661

662
// JSApiAccountPurgeResponse is the response to a purge request in the meta group.
663
type JSApiAccountPurgeResponse struct {
664
        ApiResponse
665
        Initiated bool `json:"initiated,omitempty"`
666
}
667

668
// JSApiMsgGetRequest get a message request.
669
type JSApiMsgGetRequest struct {
670
        Seq     uint64 `json:"seq,omitempty"`
671
        LastFor string `json:"last_by_subj,omitempty"`
672
        NextFor string `json:"next_by_subj,omitempty"`
673

674
        // Batch support. Used to request more then one msg at a time.
675
        // Can be used with simple starting seq, but also NextFor with wildcards.
676
        Batch int `json:"batch,omitempty"`
677
        // This will make sure we limit how much data we blast out. If not set we will
678
        // inherit the slow consumer default max setting of the server. Default is MAX_PENDING_SIZE.
679
        MaxBytes int `json:"max_bytes,omitempty"`
680
        // Return messages as of this start time.
681
        StartTime *time.Time `json:"start_time,omitempty"`
682

683
        // Multiple response support. Will get the last msgs matching the subjects. These can include wildcards.
684
        MultiLastFor []string `json:"multi_last,omitempty"`
685
        // Only return messages up to this sequence. If not set, will be last sequence for the stream.
686
        UpToSeq uint64 `json:"up_to_seq,omitempty"`
687
        // Only return messages up to this time.
688
        UpToTime *time.Time `json:"up_to_time,omitempty"`
689
}
690

691
type JSApiMsgGetResponse struct {
692
        ApiResponse
693
        Message *StoredMsg `json:"message,omitempty"`
694
}
695

696
const JSApiMsgGetResponseType = "io.nats.jetstream.api.v1.stream_msg_get_response"
697

698
// JSWaitQueueDefaultMax is the default max number of outstanding requests for pull consumers.
699
const JSWaitQueueDefaultMax = 512
700

701
type JSApiConsumerCreateResponse struct {
702
        ApiResponse
703
        *ConsumerInfo
704
}
705

706
const JSApiConsumerCreateResponseType = "io.nats.jetstream.api.v1.consumer_create_response"
707

708
type JSApiConsumerDeleteResponse struct {
709
        ApiResponse
710
        Success bool `json:"success,omitempty"`
711
}
712

713
const JSApiConsumerDeleteResponseType = "io.nats.jetstream.api.v1.consumer_delete_response"
714

715
type JSApiConsumerPauseRequest struct {
716
        PauseUntil time.Time `json:"pause_until,omitempty"`
717
}
718

719
const JSApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response"
720

721
type JSApiConsumerPauseResponse struct {
722
        ApiResponse
723
        Paused         bool          `json:"paused"`
724
        PauseUntil     time.Time     `json:"pause_until"`
725
        PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
726
}
727

728
type JSApiConsumerInfoResponse struct {
729
        ApiResponse
730
        *ConsumerInfo
731
}
732

733
const JSApiConsumerInfoResponseType = "io.nats.jetstream.api.v1.consumer_info_response"
734

735
type JSApiConsumersRequest struct {
736
        ApiPagedRequest
737
}
738

739
type JSApiConsumerNamesResponse struct {
740
        ApiResponse
741
        ApiPaged
742
        Consumers []string `json:"consumers"`
743
}
744

745
const JSApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_response"
746

747
type JSApiConsumerListResponse struct {
748
        ApiResponse
749
        ApiPaged
750
        Consumers []*ConsumerInfo `json:"consumers"`
751
        Missing   []string        `json:"missing,omitempty"`
752
}
753

754
const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"
755

756
// JSApiConsumerGetNextRequest is for getting next messages for pull based consumers.
757
type JSApiConsumerGetNextRequest struct {
758
        Expires   time.Duration `json:"expires,omitempty"`
759
        Batch     int           `json:"batch,omitempty"`
760
        MaxBytes  int           `json:"max_bytes,omitempty"`
761
        NoWait    bool          `json:"no_wait,omitempty"`
762
        Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
763
        PriorityGroup
764
}
765

766
// JSApiStreamTemplateCreateResponse for creating templates.
767
type JSApiStreamTemplateCreateResponse struct {
768
        ApiResponse
769
        *StreamTemplateInfo
770
}
771

772
const JSApiStreamTemplateCreateResponseType = "io.nats.jetstream.api.v1.stream_template_create_response"
773

774
type JSApiStreamTemplateDeleteResponse struct {
775
        ApiResponse
776
        Success bool `json:"success,omitempty"`
777
}
778

779
const JSApiStreamTemplateDeleteResponseType = "io.nats.jetstream.api.v1.stream_template_delete_response"
780

781
// JSApiStreamTemplateInfoResponse for information about stream templates.
782
type JSApiStreamTemplateInfoResponse struct {
783
        ApiResponse
784
        *StreamTemplateInfo
785
}
786

787
const JSApiStreamTemplateInfoResponseType = "io.nats.jetstream.api.v1.stream_template_info_response"
788

789
type JSApiStreamTemplatesRequest struct {
790
        ApiPagedRequest
791
}
792

793
// JSApiStreamTemplateNamesResponse list of templates
794
type JSApiStreamTemplateNamesResponse struct {
795
        ApiResponse
796
        ApiPaged
797
        Templates []string `json:"streams"`
798
}
799

800
const JSApiStreamTemplateNamesResponseType = "io.nats.jetstream.api.v1.stream_template_names_response"
801

802
// Structure that holds state for a JetStream API request that is processed
803
// in a separate long-lived go routine. This is to avoid possibly blocking
804
// ROUTE and GATEWAY connections.
805
type jsAPIRoutedReq struct {
806
        jsub    *subscription
807
        sub     *subscription
808
        acc     *Account
809
        subject string
810
        reply   string
811
        msg     []byte
812
        pa      pubArg
813
}
814

815
func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) {
127,408✔
816
        // Ignore system level directives meta stepdown and peer remove requests here.
127,408✔
817
        if subject == JSApiLeaderStepDown ||
127,408✔
818
                subject == JSApiRemoveServer ||
127,408✔
819
                strings.HasPrefix(subject, jsAPIAccountPre) {
127,865✔
820
                return
457✔
821
        }
457✔
822
        // No lock needed, those are immutable.
823
        s, rr := js.srv, js.apiSubs.Match(subject)
126,951✔
824

126,951✔
825
        hdr, msg := c.msgParts(rmsg)
126,951✔
826
        if len(sliceHeader(ClientInfoHdr, hdr)) == 0 {
126,958✔
827
                // Check if this is the system account. We will let these through for the account info only.
7✔
828
                sacc := s.SystemAccount()
7✔
829
                if sacc != acc {
7✔
830
                        return
×
831
                }
×
832
                if subject != JSApiAccountInfo {
11✔
833
                        // Only respond from the initial server entry to the NATS system.
4✔
834
                        if c.kind == CLIENT || c.kind == LEAF {
6✔
835
                                var resp = ApiResponse{
2✔
836
                                        Type:  JSApiSystemResponseType,
2✔
837
                                        Error: NewJSNotEnabledForAccountError(),
2✔
838
                                }
2✔
839
                                s.sendAPIErrResponse(nil, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
840
                        }
2✔
841
                        return
4✔
842
                }
843
        }
844

845
        // Short circuit for no interest.
846
        if len(rr.psubs)+len(rr.qsubs) == 0 {
146,931✔
847
                if (c.kind == CLIENT || c.kind == LEAF) && acc != s.SystemAccount() {
19,984✔
848
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
849
                        var resp = ApiResponse{
×
850
                                Type:  JSApiSystemResponseType,
×
851
                                Error: NewJSBadRequestError(),
×
852
                        }
×
853
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
854
                }
×
855
                return
19,984✔
856
        }
857

858
        // We should only have psubs and only 1 per result.
859
        if len(rr.psubs) != 1 {
106,963✔
860
                s.Warnf("Malformed JetStream API Request: [%s] %q", subject, rmsg)
×
861
                if c.kind == CLIENT || c.kind == LEAF {
×
862
                        ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
×
863
                        var resp = ApiResponse{
×
864
                                Type:  JSApiSystemResponseType,
×
865
                                Error: NewJSBadRequestError(),
×
866
                        }
×
867
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
868
                }
×
869
                return
×
870
        }
871
        jsub := rr.psubs[0]
106,963✔
872

106,963✔
873
        // If this is directly from a client connection ok to do in place.
106,963✔
874
        if c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF {
149,220✔
875
                start := time.Now()
42,257✔
876
                jsub.icb(sub, c, acc, subject, reply, rmsg)
42,257✔
877
                if dur := time.Since(start); dur >= readLoopReportThreshold {
42,257✔
878
                        s.Warnf("Internal subscription on %q took too long: %v", subject, dur)
×
879
                }
×
880
                return
42,257✔
881
        }
882

883
        // If we are here we have received this request over a non-client connection.
884
        // We need to make sure not to block. We will send the request to a long-lived
885
        // pool of go routines.
886

887
        // Increment inflight. Do this before queueing.
888
        atomic.AddInt64(&js.apiInflight, 1)
64,706✔
889

64,706✔
890
        // Copy the state. Note the JSAPI only uses the hdr index to piece apart the
64,706✔
891
        // header from the msg body. No other references are needed.
64,706✔
892
        // Check pending and warn if getting backed up.
64,706✔
893
        pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
64,706✔
894
        limit := atomic.LoadInt64(&js.queueLimit)
64,706✔
895
        if pending >= int(limit) {
64,734✔
896
                s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
28✔
897
                drained := int64(s.jsAPIRoutedReqs.drain())
28✔
898
                atomic.AddInt64(&js.apiInflight, -drained)
28✔
899

28✔
900
                s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
28✔
901
                        TypedEvent: TypedEvent{
28✔
902
                                Type: JSAPILimitReachedAdvisoryType,
28✔
903
                                ID:   nuid.Next(),
28✔
904
                                Time: time.Now().UTC(),
28✔
905
                        },
28✔
906
                        Server:  s.Name(),
28✔
907
                        Domain:  js.config.Domain,
28✔
908
                        Dropped: drained,
28✔
909
                })
28✔
910
        }
28✔
911
}
912

913
func (s *Server) processJSAPIRoutedRequests() {
13,128✔
914
        defer s.grWG.Done()
13,128✔
915

13,128✔
916
        s.mu.RLock()
13,128✔
917
        queue := s.jsAPIRoutedReqs
13,128✔
918
        client := &client{srv: s, kind: JETSTREAM}
13,128✔
919
        s.mu.RUnlock()
13,128✔
920

13,128✔
921
        js := s.getJetStream()
13,128✔
922

13,128✔
923
        for {
67,722✔
924
                select {
54,594✔
925
                case <-queue.ch:
41,466✔
926
                        // Only pop one item at a time here, otherwise if the system is recovering
41,466✔
927
                        // from queue buildup, then one worker will pull off all the tasks and the
41,466✔
928
                        // others will be starved of work.
41,466✔
929
                        for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
106,144✔
930
                                client.pa = r.pa
64,678✔
931
                                start := time.Now()
64,678✔
932
                                r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
64,678✔
933
                                if dur := time.Since(start); dur >= readLoopReportThreshold {
64,678✔
934
                                        s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
×
935
                                }
×
936
                                atomic.AddInt64(&js.apiInflight, -1)
64,678✔
937
                        }
938
                case <-s.quitCh:
13,124✔
939
                        return
13,124✔
940
                }
941
        }
942
}
943

944
func (s *Server) setJetStreamExportSubs() error {
3,282✔
945
        js := s.getJetStream()
3,282✔
946
        if js == nil {
3,282✔
947
                return NewJSNotEnabledError()
×
948
        }
×
949

950
        // Start the go routine that will process API requests received by the
951
        // subscription below when they are coming from routes, etc..
952
        const maxProcs = 16
3,282✔
953
        mp := runtime.GOMAXPROCS(0)
3,282✔
954
        // Cap at 16 max for now on larger core setups.
3,282✔
955
        if mp > maxProcs {
3,282✔
956
                mp = maxProcs
×
957
        }
×
958
        s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "Routed JS API Requests")
3,282✔
959
        for i := 0; i < mp; i++ {
16,410✔
960
                s.startGoRoutine(s.processJSAPIRoutedRequests)
13,128✔
961
        }
13,128✔
962

963
        // This is the catch all now for all JetStream API calls.
964
        if _, err := s.sysSubscribe(jsAllAPI, js.apiDispatch); err != nil {
3,282✔
965
                return err
×
966
        }
×
967

968
        if err := s.SystemAccount().AddServiceExport(jsAllAPI, nil); err != nil {
3,282✔
969
                s.Warnf("Error setting up jetstream service exports: %v", err)
×
970
                return err
×
971
        }
×
972

973
        // API handles themselves.
974
        pairs := []struct {
3,282✔
975
                subject string
3,282✔
976
                handler msgHandler
3,282✔
977
        }{
3,282✔
978
                {JSApiAccountInfo, s.jsAccountInfoRequest},
3,282✔
979
                {JSApiTemplateCreate, s.jsTemplateCreateRequest},
3,282✔
980
                {JSApiTemplates, s.jsTemplateNamesRequest},
3,282✔
981
                {JSApiTemplateInfo, s.jsTemplateInfoRequest},
3,282✔
982
                {JSApiTemplateDelete, s.jsTemplateDeleteRequest},
3,282✔
983
                {JSApiStreamCreate, s.jsStreamCreateRequest},
3,282✔
984
                {JSApiStreamUpdate, s.jsStreamUpdateRequest},
3,282✔
985
                {JSApiStreams, s.jsStreamNamesRequest},
3,282✔
986
                {JSApiStreamList, s.jsStreamListRequest},
3,282✔
987
                {JSApiStreamInfo, s.jsStreamInfoRequest},
3,282✔
988
                {JSApiStreamDelete, s.jsStreamDeleteRequest},
3,282✔
989
                {JSApiStreamPurge, s.jsStreamPurgeRequest},
3,282✔
990
                {JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
3,282✔
991
                {JSApiStreamRestore, s.jsStreamRestoreRequest},
3,282✔
992
                {JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},
3,282✔
993
                {JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},
3,282✔
994
                {JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
3,282✔
995
                {JSApiMsgDelete, s.jsMsgDeleteRequest},
3,282✔
996
                {JSApiMsgGet, s.jsMsgGetRequest},
3,282✔
997
                {JSApiConsumerCreateEx, s.jsConsumerCreateRequest},
3,282✔
998
                {JSApiConsumerCreate, s.jsConsumerCreateRequest},
3,282✔
999
                {JSApiDurableCreate, s.jsConsumerCreateRequest},
3,282✔
1000
                {JSApiConsumers, s.jsConsumerNamesRequest},
3,282✔
1001
                {JSApiConsumerList, s.jsConsumerListRequest},
3,282✔
1002
                {JSApiConsumerInfo, s.jsConsumerInfoRequest},
3,282✔
1003
                {JSApiConsumerDelete, s.jsConsumerDeleteRequest},
3,282✔
1004
                {JSApiConsumerPause, s.jsConsumerPauseRequest},
3,282✔
1005
                {JSApiConsumerUnpin, s.jsConsumerUnpinRequest},
3,282✔
1006
        }
3,282✔
1007

3,282✔
1008
        js.mu.Lock()
3,282✔
1009
        defer js.mu.Unlock()
3,282✔
1010

3,282✔
1011
        for _, p := range pairs {
95,178✔
1012
                sub := &subscription{subject: []byte(p.subject), icb: p.handler}
91,896✔
1013
                if err := js.apiSubs.Insert(sub); err != nil {
91,896✔
1014
                        return err
×
1015
                }
×
1016
        }
1017

1018
        return nil
3,282✔
1019
}
1020

1021
func (s *Server) sendAPIResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
27,858✔
1022
        acc.trackAPI()
27,858✔
1023
        if reply != _EMPTY_ {
55,474✔
1024
                s.sendInternalAccountMsg(nil, reply, response)
27,616✔
1025
        }
27,616✔
1026
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
27,858✔
1027
}
1028

1029
func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
14,758✔
1030
        acc.trackAPIErr()
14,758✔
1031
        if reply != _EMPTY_ {
29,074✔
1032
                s.sendInternalAccountMsg(nil, reply, response)
14,316✔
1033
        }
14,316✔
1034
        s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
14,758✔
1035
}
1036

1037
const errRespDelay = 500 * time.Millisecond
1038

1039
type delayedAPIResponse struct {
1040
        ci       *ClientInfo
1041
        acc      *Account
1042
        subject  string
1043
        reply    string
1044
        request  string
1045
        response string
1046
        rg       *raftGroup
1047
        deadline time.Time
1048
        next     *delayedAPIResponse
1049
}
1050

1051
// Add `r` in the list that is maintained ordered by the `delayedAPIResponse.deadline` time.
1052
func addDelayedResponse(head, tail **delayedAPIResponse, r *delayedAPIResponse) {
22✔
1053
        // Check if list empty.
22✔
1054
        if *head == nil {
30✔
1055
                *head, *tail = r, r
8✔
1056
                return
8✔
1057
        }
8✔
1058
        // Check if it should be added at the end, which is if after or equal to the tail.
1059
        if r.deadline.After((*tail).deadline) || r.deadline.Equal((*tail).deadline) {
26✔
1060
                (*tail).next, *tail = r, r
12✔
1061
                return
12✔
1062
        }
12✔
1063
        // Find its spot in the list.
1064
        var prev *delayedAPIResponse
2✔
1065
        for c := *head; c != nil; c = c.next {
6✔
1066
                // We insert only if we are stricly before the current `c`.
4✔
1067
                if r.deadline.Before(c.deadline) {
6✔
1068
                        r.next = c
2✔
1069
                        if prev != nil {
3✔
1070
                                prev.next = r
1✔
1071
                        } else {
2✔
1072
                                *head = r
1✔
1073
                        }
1✔
1074
                        return
2✔
1075
                }
1076
                prev = c
2✔
1077
        }
1078
}
1079

1080
func (s *Server) delayedAPIResponder() {
6,197✔
1081
        defer s.grWG.Done()
6,197✔
1082
        var (
6,197✔
1083
                head, tail *delayedAPIResponse // Linked list.
6,197✔
1084
                r          *delayedAPIResponse // Updated by calling next().
6,197✔
1085
                rqch       <-chan struct{}     // Quit channel of the Raft group (if present).
6,197✔
1086
                tm         = time.NewTimer(time.Hour)
6,197✔
1087
        )
6,197✔
1088
        next := func() {
6,218✔
1089
                r, rqch = nil, nil
21✔
1090
                // Check that JetStream is still on. Do not exit the go routine
21✔
1091
                // since JS can be enabled/disabled. The go routine will exit
21✔
1092
                // only if server is shutdown.
21✔
1093
                js := s.getJetStream()
21✔
1094
                if js == nil {
22✔
1095
                        // Reset head and tail here. Also drain the ipQueue.
1✔
1096
                        head, tail = nil, nil
1✔
1097
                        s.delayedAPIResponses.drain()
1✔
1098
                        // Fall back into next "if" that resets timer.
1✔
1099
                }
1✔
1100
                // If there are no delayed messages then delay the timer for
1101
                // a while.
1102
                if head == nil {
29✔
1103
                        tm.Reset(time.Hour)
8✔
1104
                        return
8✔
1105
                }
8✔
1106
                // Get the first expected message and then reset the timer.
1107
                r = head
13✔
1108
                js.mu.RLock()
13✔
1109
                if r.rg != nil && r.rg.node != nil {
14✔
1110
                        // If there's an attached Raft group to the delayed response
1✔
1111
                        // then pull out the quit channel, so that we don't bother
1✔
1112
                        // sending responses for entities which are now no longer
1✔
1113
                        // running.
1✔
1114
                        rqch = r.rg.node.QuitC()
1✔
1115
                }
1✔
1116
                js.mu.RUnlock()
13✔
1117
                tm.Reset(time.Until(r.deadline))
13✔
1118
        }
1119
        pop := func() {
6,209✔
1120
                if head == nil {
12✔
1121
                        return
×
1122
                }
×
1123
                head = head.next
12✔
1124
                if head == nil {
19✔
1125
                        tail = nil
7✔
1126
                }
7✔
1127
        }
1128
        for {
12,428✔
1129
                select {
6,231✔
1130
                case <-s.delayedAPIResponses.ch:
22✔
1131
                        v, ok := s.delayedAPIResponses.popOne()
22✔
1132
                        if !ok {
22✔
1133
                                continue
×
1134
                        }
1135
                        // Add it to the list, and if ends up being the head, set things up.
1136
                        addDelayedResponse(&head, &tail, v)
22✔
1137
                        if v == head {
31✔
1138
                                next()
9✔
1139
                        }
9✔
1140
                case <-s.quitCh:
6,197✔
1141
                        return
6,197✔
1142
                case <-rqch:
1✔
1143
                        // If we were the head, drop and setup things for next.
1✔
1144
                        if r != nil && r == head {
2✔
1145
                                pop()
1✔
1146
                        }
1✔
1147
                        next()
1✔
1148
                case <-tm.C:
11✔
1149
                        if r != nil {
22✔
1150
                                s.sendAPIErrResponse(r.ci, r.acc, r.subject, r.reply, r.request, r.response)
11✔
1151
                                pop()
11✔
1152
                        }
11✔
1153
                        next()
11✔
1154
                }
1155
        }
1156
}
1157

1158
func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup, duration time.Duration) {
22✔
1159
        s.delayedAPIResponses.push(&delayedAPIResponse{
22✔
1160
                ci, acc, subject, reply, request, response, rg, time.Now().Add(duration), nil,
22✔
1161
        })
22✔
1162
}
22✔
1163

1164
func (s *Server) getRequestInfo(c *client, raw []byte) (pci *ClientInfo, acc *Account, hdr, msg []byte, err error) {
96,803✔
1165
        hdr, msg = c.msgParts(raw)
96,803✔
1166
        var ci ClientInfo
96,803✔
1167

96,803✔
1168
        if len(hdr) > 0 {
193,531✔
1169
                if err := json.Unmarshal(sliceHeader(ClientInfoHdr, hdr), &ci); err != nil {
96,728✔
1170
                        return nil, nil, nil, nil, err
×
1171
                }
×
1172
        }
1173

1174
        if ci.Service != _EMPTY_ {
96,863✔
1175
                acc, _ = s.LookupAccount(ci.Service)
60✔
1176
        } else if ci.Account != _EMPTY_ {
193,471✔
1177
                acc, _ = s.LookupAccount(ci.Account)
96,668✔
1178
        } else {
96,743✔
1179
                // Direct $SYS access.
75✔
1180
                acc = c.acc
75✔
1181
                if acc == nil {
79✔
1182
                        acc = s.SystemAccount()
4✔
1183
                }
4✔
1184
        }
1185
        if acc == nil {
96,813✔
1186
                return nil, nil, nil, nil, ErrMissingAccount
10✔
1187
        }
10✔
1188
        return &ci, acc, hdr, msg, nil
96,793✔
1189
}
1190

1191
func (s *Server) unmarshalRequest(c *client, acc *Account, subject string, msg []byte, v any) error {
15,404✔
1192
        decoder := json.NewDecoder(bytes.NewReader(msg))
15,404✔
1193
        decoder.DisallowUnknownFields()
15,404✔
1194

15,404✔
1195
        for {
46,206✔
1196
                if err := decoder.Decode(v); err != nil {
46,206✔
1197
                        if err == io.EOF {
30,802✔
1198
                                return nil
15,398✔
1199
                        }
15,398✔
1200

1201
                        var syntaxErr *json.SyntaxError
6✔
1202
                        if errors.As(err, &syntaxErr) {
6✔
1203
                                err = fmt.Errorf("%w at offset %d", err, syntaxErr.Offset)
×
1204
                        }
×
1205

1206
                        c.RateLimitWarnf("Invalid JetStream request '%s > %s': %s", acc, subject, err)
6✔
1207

6✔
1208
                        if s.JetStreamConfig().Strict {
12✔
1209
                                return err
6✔
1210
                        }
6✔
1211

1212
                        return json.Unmarshal(msg, v)
×
1213
                }
1214
        }
1215
}
1216

1217
func (a *Account) trackAPI() {
27,858✔
1218
        a.mu.RLock()
27,858✔
1219
        jsa := a.js
27,858✔
1220
        a.mu.RUnlock()
27,858✔
1221
        if jsa != nil {
55,654✔
1222
                jsa.usageMu.Lock()
27,796✔
1223
                jsa.usageApi++
27,796✔
1224
                jsa.apiTotal++
27,796✔
1225
                jsa.sendClusterUsageUpdate()
27,796✔
1226
                atomic.AddInt64(&jsa.js.apiTotal, 1)
27,796✔
1227
                jsa.usageMu.Unlock()
27,796✔
1228
        }
27,796✔
1229
}
1230

1231
func (a *Account) trackAPIErr() {
14,758✔
1232
        a.mu.RLock()
14,758✔
1233
        jsa := a.js
14,758✔
1234
        a.mu.RUnlock()
14,758✔
1235
        if jsa != nil {
29,249✔
1236
                jsa.usageMu.Lock()
14,491✔
1237
                jsa.usageApi++
14,491✔
1238
                jsa.apiTotal++
14,491✔
1239
                jsa.usageErr++
14,491✔
1240
                jsa.apiErrors++
14,491✔
1241
                jsa.sendClusterUsageUpdate()
14,491✔
1242
                atomic.AddInt64(&jsa.js.apiTotal, 1)
14,491✔
1243
                atomic.AddInt64(&jsa.js.apiErrors, 1)
14,491✔
1244
                jsa.usageMu.Unlock()
14,491✔
1245
        }
14,491✔
1246
}
1247

1248
const badAPIRequestT = "Malformed JetStream API Request: %q"
1249

1250
// Helper function to check on JetStream being enabled but also on status of leafnodes
1251
// If the local account is not enabled but does have leafnode connectivity we will not
1252
// want to error immediately and let the other side decide.
1253
func (a *Account) checkJetStream() (enabled, shouldError bool) {
39,983✔
1254
        a.mu.RLock()
39,983✔
1255
        defer a.mu.RUnlock()
39,983✔
1256
        return a.js != nil, a.nleafs+a.nrleafs == 0
39,983✔
1257
}
39,983✔
1258

1259
// Request for current usage and limits for this account.
1260
func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
415✔
1261
        if c == nil || !s.JetStreamEnabled() {
415✔
1262
                return
×
1263
        }
×
1264

1265
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
415✔
1266
        if err != nil {
416✔
1267
                s.Warnf(badAPIRequestT, msg)
1✔
1268
                return
1✔
1269
        }
1✔
1270

1271
        var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}}
414✔
1272

414✔
1273
        // Determine if we should proceed here when we are in clustered mode.
414✔
1274
        if s.JetStreamIsClustered() {
758✔
1275
                js, cc := s.getJetStreamCluster()
344✔
1276
                if js == nil || cc == nil {
344✔
1277
                        return
×
1278
                }
×
1279
                if js.isLeaderless() {
345✔
1280
                        resp.Error = NewJSClusterNotAvailError()
1✔
1281
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1282
                        return
1✔
1283
                }
1✔
1284
                // Make sure we are meta leader.
1285
                if !s.JetStreamIsLeader() {
582✔
1286
                        return
239✔
1287
                }
239✔
1288
        }
1289

1290
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
180✔
1291
                if !doErr {
7✔
1292
                        return
1✔
1293
                }
1✔
1294
                resp.Error = NewJSNotEnabledForAccountError()
5✔
1295
        } else {
168✔
1296
                stats := acc.JetStreamUsage()
168✔
1297
                resp.JetStreamAccountStats = &stats
168✔
1298
        }
168✔
1299
        b, err := json.Marshal(resp)
173✔
1300
        if err != nil {
173✔
1301
                return
×
1302
        }
×
1303

1304
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), string(b))
173✔
1305
}
1306

1307
// Helpers for token extraction.
1308
func templateNameFromSubject(subject string) string {
6✔
1309
        return tokenAt(subject, 6)
6✔
1310
}
6✔
1311

1312
func streamNameFromSubject(subject string) string {
76,165✔
1313
        return tokenAt(subject, 5)
76,165✔
1314
}
76,165✔
1315

1316
func consumerNameFromSubject(subject string) string {
48,507✔
1317
        return tokenAt(subject, 6)
48,507✔
1318
}
48,507✔
1319

1320
// Request to create a new template.
1321
func (s *Server) jsTemplateCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
6✔
1322
        if c == nil {
6✔
1323
                return
×
1324
        }
×
1325
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
6✔
1326
        if err != nil {
6✔
1327
                s.Warnf(badAPIRequestT, msg)
×
1328
                return
×
1329
        }
×
1330

1331
        var resp = JSApiStreamTemplateCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateCreateResponseType}}
6✔
1332
        if !acc.JetStreamEnabled() {
6✔
1333
                resp.Error = NewJSNotEnabledForAccountError()
×
1334
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1335
                return
×
1336
        }
×
1337

1338
        // Not supported for now.
1339
        if s.JetStreamIsClustered() {
9✔
1340
                resp.Error = NewJSClusterUnSupportFeatureError()
3✔
1341
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
1342
                return
3✔
1343
        }
3✔
1344

1345
        var cfg StreamTemplateConfig
3✔
1346
        if err := s.unmarshalRequest(c, acc, subject, msg, &cfg); err != nil {
3✔
1347
                resp.Error = NewJSInvalidJSONError(err)
×
1348
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1349
                return
×
1350
        }
×
1351
        templateName := templateNameFromSubject(subject)
3✔
1352
        if templateName != cfg.Name {
4✔
1353
                resp.Error = NewJSTemplateNameNotMatchSubjectError()
1✔
1354
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1355
                return
1✔
1356
        }
1✔
1357

1358
        t, err := acc.addStreamTemplate(&cfg)
2✔
1359
        if err != nil {
2✔
1360
                resp.Error = NewJSStreamTemplateCreateError(err, Unless(err))
×
1361
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1362
                return
×
1363
        }
×
1364
        t.mu.Lock()
2✔
1365
        tcfg := t.StreamTemplateConfig.deepCopy()
2✔
1366
        streams := t.streams
2✔
1367
        if streams == nil {
4✔
1368
                streams = []string{}
2✔
1369
        }
2✔
1370
        t.mu.Unlock()
2✔
1371
        resp.StreamTemplateInfo = &StreamTemplateInfo{Config: tcfg, Streams: streams}
2✔
1372
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
2✔
1373
}
1374

1375
// Request for the list of all template names.
1376
func (s *Server) jsTemplateNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
5✔
1377
        if c == nil {
5✔
1378
                return
×
1379
        }
×
1380
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
5✔
1381
        if err != nil {
5✔
1382
                s.Warnf(badAPIRequestT, msg)
×
1383
                return
×
1384
        }
×
1385

1386
        var resp = JSApiStreamTemplateNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateNamesResponseType}}
5✔
1387
        if !acc.JetStreamEnabled() {
5✔
1388
                resp.Error = NewJSNotEnabledForAccountError()
×
1389
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1390
                return
×
1391
        }
×
1392

1393
        // Not supported for now.
1394
        if s.JetStreamIsClustered() {
8✔
1395
                resp.Error = NewJSClusterUnSupportFeatureError()
3✔
1396
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
1397
                return
3✔
1398
        }
3✔
1399

1400
        var offset int
2✔
1401
        if isJSONObjectOrArray(msg) {
2✔
1402
                var req JSApiStreamTemplatesRequest
×
1403
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
×
1404
                        resp.Error = NewJSInvalidJSONError(err)
×
1405
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1406
                        return
×
1407
                }
×
1408
                offset = req.Offset
×
1409
        }
1410

1411
        ts := acc.templates()
2✔
1412
        slices.SortFunc(ts, func(i, j *streamTemplate) int {
3✔
1413
                return cmp.Compare(i.StreamTemplateConfig.Name, j.StreamTemplateConfig.Name)
1✔
1414
        })
1✔
1415

1416
        tcnt := len(ts)
2✔
1417
        if offset > tcnt {
2✔
1418
                offset = tcnt
×
1419
        }
×
1420

1421
        for _, t := range ts[offset:] {
5✔
1422
                t.mu.Lock()
3✔
1423
                name := t.Name
3✔
1424
                t.mu.Unlock()
3✔
1425
                resp.Templates = append(resp.Templates, name)
3✔
1426
                if len(resp.Templates) >= JSApiNamesLimit {
3✔
1427
                        break
×
1428
                }
1429
        }
1430
        resp.Total = tcnt
2✔
1431
        resp.Limit = JSApiNamesLimit
2✔
1432
        resp.Offset = offset
2✔
1433
        if resp.Templates == nil {
2✔
1434
                resp.Templates = []string{}
×
1435
        }
×
1436
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
2✔
1437
}
1438

1439
// Request for information about a stream template.
1440
func (s *Server) jsTemplateInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1✔
1441
        if c == nil {
1✔
1442
                return
×
1443
        }
×
1444
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
1✔
1445
        if err != nil {
1✔
1446
                s.Warnf(badAPIRequestT, msg)
×
1447
                return
×
1448
        }
×
1449

1450
        var resp = JSApiStreamTemplateInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateInfoResponseType}}
1✔
1451
        if !acc.JetStreamEnabled() {
1✔
1452
                resp.Error = NewJSNotEnabledForAccountError()
×
1453
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1454
                return
×
1455
        }
×
1456
        if !isEmptyRequest(msg) {
1✔
1457
                resp.Error = NewJSNotEmptyRequestError()
×
1458
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1459
                return
×
1460
        }
×
1461
        name := templateNameFromSubject(subject)
1✔
1462
        t, err := acc.lookupStreamTemplate(name)
1✔
1463
        if err != nil {
1✔
1464
                resp.Error = NewJSStreamTemplateNotFoundError()
×
1465
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1466
                return
×
1467
        }
×
1468
        t.mu.Lock()
1✔
1469
        cfg := t.StreamTemplateConfig.deepCopy()
1✔
1470
        streams := t.streams
1✔
1471
        if streams == nil {
1✔
1472
                streams = []string{}
×
1473
        }
×
1474
        t.mu.Unlock()
1✔
1475

1✔
1476
        resp.StreamTemplateInfo = &StreamTemplateInfo{Config: cfg, Streams: streams}
1✔
1477
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1✔
1478
}
1479

1480
// Request to delete a stream template.
1481
func (s *Server) jsTemplateDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
2✔
1482
        if c == nil {
2✔
1483
                return
×
1484
        }
×
1485
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
2✔
1486
        if err != nil {
2✔
1487
                s.Warnf(badAPIRequestT, msg)
×
1488
                return
×
1489
        }
×
1490

1491
        var resp = JSApiStreamTemplateDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamTemplateDeleteResponseType}}
2✔
1492
        if !acc.JetStreamEnabled() {
2✔
1493
                resp.Error = NewJSNotEnabledForAccountError()
×
1494
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1495
                return
×
1496
        }
×
1497
        if !isEmptyRequest(msg) {
2✔
1498
                resp.Error = NewJSNotEmptyRequestError()
×
1499
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1500
                return
×
1501
        }
×
1502
        name := templateNameFromSubject(subject)
2✔
1503
        err = acc.deleteStreamTemplate(name)
2✔
1504
        if err != nil {
3✔
1505
                resp.Error = NewJSStreamTemplateDeleteError(err, Unless(err))
1✔
1506
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1507
                return
1✔
1508
        }
1✔
1509
        resp.Success = true
1✔
1510
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1✔
1511
}
1512

1513
func (s *Server) jsonResponse(v any) string {
42,967✔
1514
        b, err := json.Marshal(v)
42,967✔
1515
        if err != nil {
42,967✔
1516
                s.Warnf("Problem marshaling JSON for JetStream API:", err)
×
1517
                return ""
×
1518
        }
×
1519
        return string(b)
42,967✔
1520
}
1521

1522
// Read lock must be held
1523
func (jsa *jsAccount) tieredReservation(tier string, cfg *StreamConfig) int64 {
3,191✔
1524
        reservation := int64(0)
3,191✔
1525
        if tier == _EMPTY_ {
6,360✔
1526
                for _, sa := range jsa.streams {
21,530✔
1527
                        if sa.cfg.MaxBytes > 0 {
18,379✔
1528
                                if sa.cfg.Storage == cfg.Storage && sa.cfg.Name != cfg.Name {
18✔
1529
                                        reservation += (int64(sa.cfg.Replicas) * sa.cfg.MaxBytes)
×
1530
                                }
×
1531
                        }
1532
                }
1533
        } else {
22✔
1534
                for _, sa := range jsa.streams {
37✔
1535
                        if sa.cfg.Replicas == cfg.Replicas {
29✔
1536
                                if sa.cfg.MaxBytes > 0 {
19✔
1537
                                        if isSameTier(&sa.cfg, cfg) && sa.cfg.Name != cfg.Name {
10✔
1538
                                                reservation += (int64(sa.cfg.Replicas) * sa.cfg.MaxBytes)
5✔
1539
                                        }
5✔
1540
                                }
1541
                        }
1542
                }
1543
        }
1544
        return reservation
3,191✔
1545
}
1546

1547
// Request to create a stream.
1548
func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
6,625✔
1549
        if c == nil || !s.JetStreamEnabled() {
6,811✔
1550
                return
186✔
1551
        }
186✔
1552
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
6,439✔
1553
        if err != nil {
6,442✔
1554
                s.Warnf(badAPIRequestT, msg)
3✔
1555
                return
3✔
1556
        }
3✔
1557

1558
        var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
6,436✔
1559

6,436✔
1560
        // Determine if we should proceed here when we are in clustered mode.
6,436✔
1561
        if s.JetStreamIsClustered() {
11,753✔
1562
                js, cc := s.getJetStreamCluster()
5,317✔
1563
                if js == nil || cc == nil {
5,317✔
1564
                        return
×
1565
                }
×
1566
                if js.isLeaderless() {
5,318✔
1567
                        resp.Error = NewJSClusterNotAvailError()
1✔
1568
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1569
                        return
1✔
1570
                }
1✔
1571
                // Make sure we are meta leader.
1572
                if !s.JetStreamIsLeader() {
9,074✔
1573
                        return
3,758✔
1574
                }
3,758✔
1575
        }
1576

1577
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
2,685✔
1578
                if doErr {
8✔
1579
                        resp.Error = NewJSNotEnabledForAccountError()
×
1580
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1581
                }
×
1582
                return
8✔
1583
        }
1584

1585
        var cfg StreamConfigRequest
2,669✔
1586
        if err := s.unmarshalRequest(c, acc, subject, msg, &cfg); err != nil {
2,670✔
1587
                resp.Error = NewJSInvalidJSONError(err)
1✔
1588
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1589
                return
1✔
1590
        }
1✔
1591

1592
        // Initialize asset version metadata.
1593
        setStaticStreamMetadata(&cfg.StreamConfig)
2,668✔
1594

2,668✔
1595
        streamName := streamNameFromSubject(subject)
2,668✔
1596
        if streamName != cfg.Name {
2,669✔
1597
                resp.Error = NewJSStreamMismatchError()
1✔
1598
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1599
                return
1✔
1600
        }
1✔
1601

1602
        // Check for path like separators in the name.
1603
        if strings.ContainsAny(streamName, `\/`) {
2,669✔
1604
                resp.Error = NewJSStreamNameContainsPathSeparatorsError()
2✔
1605
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1606
                return
2✔
1607
        }
2✔
1608

1609
        // Can't create a stream with a sealed state.
1610
        if cfg.Sealed {
2,667✔
1611
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for create can not be sealed"))
2✔
1612
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1613
                return
2✔
1614
        }
2✔
1615

1616
        // If we are told to do mirror direct but are not mirroring, error.
1617
        if cfg.MirrorDirect && cfg.Mirror == nil {
2,663✔
1618
                resp.Error = NewJSStreamInvalidConfigError(fmt.Errorf("stream has no mirror but does have mirror direct"))
×
1619
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1620
                return
×
1621
        }
×
1622

1623
        // Hand off to cluster for processing.
1624
        if s.JetStreamIsClustered() {
4,219✔
1625
                s.jsClusteredStreamRequest(ci, acc, subject, reply, rmsg, &cfg)
1,556✔
1626
                return
1,556✔
1627
        }
1,556✔
1628

1629
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg.StreamConfig); err != nil {
1,109✔
1630
                resp.Error = err
2✔
1631
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1632
                return
2✔
1633
        }
2✔
1634

1635
        mset, err := acc.addStreamPedantic(&cfg.StreamConfig, cfg.Pedantic)
1,105✔
1636
        if err != nil {
1,146✔
1637
                if IsNatsErr(err, JSStreamStoreFailedF) {
41✔
1638
                        s.Warnf("Stream create failed for '%s > %s': %v", acc, streamName, err)
×
1639
                        err = errStreamStoreFailed
×
1640
                }
×
1641
                resp.Error = NewJSStreamCreateError(err, Unless(err))
41✔
1642
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
41✔
1643
                return
41✔
1644
        }
1645
        msetCfg := mset.config()
1,064✔
1646
        resp.StreamInfo = &StreamInfo{
1,064✔
1647
                Created:   mset.createdTime(),
1,064✔
1648
                State:     mset.state(),
1,064✔
1649
                Config:    *setDynamicStreamMetadata(&msetCfg),
1,064✔
1650
                TimeStamp: time.Now().UTC(),
1,064✔
1651
                Mirror:    mset.mirrorInfo(),
1,064✔
1652
                Sources:   mset.sourcesInfo(),
1,064✔
1653
        }
1,064✔
1654
        resp.DidCreate = true
1,064✔
1655
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
1,064✔
1656
}
1657

1658
// Request to update a stream.
1659
func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
646✔
1660
        if c == nil || !s.JetStreamEnabled() {
646✔
1661
                return
×
1662
        }
×
1663

1664
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
646✔
1665
        if err != nil {
646✔
1666
                s.Warnf(badAPIRequestT, msg)
×
1667
                return
×
1668
        }
×
1669

1670
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
646✔
1671

646✔
1672
        // Determine if we should proceed here when we are in clustered mode.
646✔
1673
        if s.JetStreamIsClustered() {
1,225✔
1674
                js, cc := s.getJetStreamCluster()
579✔
1675
                if js == nil || cc == nil {
579✔
1676
                        return
×
1677
                }
×
1678
                if js.isLeaderless() {
581✔
1679
                        resp.Error = NewJSClusterNotAvailError()
2✔
1680
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
1681
                        return
2✔
1682
                }
2✔
1683
                // Make sure we are meta leader.
1684
                if !s.JetStreamIsLeader() {
1,032✔
1685
                        return
455✔
1686
                }
455✔
1687
        }
1688

1689
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
189✔
1690
                if doErr {
×
1691
                        resp.Error = NewJSNotEnabledForAccountError()
×
1692
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1693
                }
×
1694
                return
×
1695
        }
1696
        var ncfg StreamConfigRequest
189✔
1697
        if err := s.unmarshalRequest(c, acc, subject, msg, &ncfg); err != nil {
190✔
1698
                resp.Error = NewJSInvalidJSONError(err)
1✔
1699
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1700
                return
1✔
1701
        }
1✔
1702

1703
        cfg, apiErr := s.checkStreamCfg(&ncfg.StreamConfig, acc, ncfg.Pedantic)
188✔
1704
        if apiErr != nil {
206✔
1705
                resp.Error = apiErr
18✔
1706
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
18✔
1707
                return
18✔
1708
        }
18✔
1709

1710
        streamName := streamNameFromSubject(subject)
170✔
1711
        if streamName != cfg.Name {
171✔
1712
                resp.Error = NewJSStreamMismatchError()
1✔
1713
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1714
                return
1✔
1715
        }
1✔
1716

1717
        // Handle clustered version here.
1718
        if s.JetStreamIsClustered() {
286✔
1719
                // Always do in separate Go routine.
117✔
1720
                go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, copyBytes(rmsg), &cfg, nil, ncfg.Pedantic)
117✔
1721
                return
117✔
1722
        }
117✔
1723

1724
        mset, err := acc.lookupStream(streamName)
52✔
1725
        if err != nil {
53✔
1726
                resp.Error = NewJSStreamNotFoundError(Unless(err))
1✔
1727
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1728
                return
1✔
1729
        }
1✔
1730

1731
        // Update asset version metadata.
1732
        setStaticStreamMetadata(&cfg)
51✔
1733

51✔
1734
        if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil {
60✔
1735
                resp.Error = NewJSStreamUpdateError(err, Unless(err))
9✔
1736
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
9✔
1737
                return
9✔
1738
        }
9✔
1739

1740
        msetCfg := mset.config()
42✔
1741
        resp.StreamInfo = &StreamInfo{
42✔
1742
                Created:   mset.createdTime(),
42✔
1743
                State:     mset.state(),
42✔
1744
                Config:    *setDynamicStreamMetadata(&msetCfg),
42✔
1745
                Domain:    s.getOpts().JetStreamDomain,
42✔
1746
                Mirror:    mset.mirrorInfo(),
42✔
1747
                Sources:   mset.sourcesInfo(),
42✔
1748
                TimeStamp: time.Now().UTC(),
42✔
1749
        }
42✔
1750
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
42✔
1751
}
1752

1753
// Request for the list of all stream names.
1754
func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,234✔
1755
        if c == nil || !s.JetStreamEnabled() {
1,234✔
1756
                return
×
1757
        }
×
1758
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
1,234✔
1759
        if err != nil {
1,234✔
1760
                s.Warnf(badAPIRequestT, msg)
×
1761
                return
×
1762
        }
×
1763

1764
        var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}}
1,234✔
1765

1,234✔
1766
        // Determine if we should proceed here when we are in clustered mode.
1,234✔
1767
        if s.JetStreamIsClustered() {
2,230✔
1768
                js, cc := s.getJetStreamCluster()
996✔
1769
                if js == nil || cc == nil {
996✔
1770
                        return
×
1771
                }
×
1772
                if js.isLeaderless() {
996✔
1773
                        resp.Error = NewJSClusterNotAvailError()
×
1774
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1775
                        return
×
1776
                }
×
1777
                // Make sure we are meta leader.
1778
                if !s.JetStreamIsLeader() {
1,727✔
1779
                        return
731✔
1780
                }
731✔
1781
        }
1782

1783
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
504✔
1784
                if doErr {
1✔
1785
                        resp.Error = NewJSNotEnabledForAccountError()
×
1786
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1787
                }
×
1788
                return
1✔
1789
        }
1790

1791
        var offset int
502✔
1792
        var filter string
502✔
1793

502✔
1794
        if isJSONObjectOrArray(msg) {
851✔
1795
                var req JSApiStreamNamesRequest
349✔
1796
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
349✔
1797
                        resp.Error = NewJSInvalidJSONError(err)
×
1798
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1799
                        return
×
1800
                }
×
1801
                offset = req.Offset
349✔
1802
                if req.Subject != _EMPTY_ {
674✔
1803
                        filter = req.Subject
325✔
1804
                }
325✔
1805
        }
1806

1807
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1808
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1809
        var numStreams int
502✔
1810
        if s.JetStreamIsClustered() {
767✔
1811
                js, cc := s.getJetStreamCluster()
265✔
1812
                if js == nil || cc == nil {
265✔
1813
                        // TODO(dlc) - Debug or Warn?
×
1814
                        return
×
1815
                }
×
1816
                js.mu.RLock()
265✔
1817
                for stream, sa := range cc.streams[acc.Name] {
578✔
1818
                        if IsNatsErr(sa.err, JSClusterNotAssignedErr) {
313✔
1819
                                continue
×
1820
                        }
1821
                        if filter != _EMPTY_ {
591✔
1822
                                // These could not have subjects auto-filled in since they are raw and unprocessed.
278✔
1823
                                if len(sa.Config.Subjects) == 0 {
280✔
1824
                                        if SubjectsCollide(filter, sa.Config.Name) {
2✔
1825
                                                resp.Streams = append(resp.Streams, stream)
×
1826
                                        }
×
1827
                                } else {
276✔
1828
                                        for _, subj := range sa.Config.Subjects {
552✔
1829
                                                if SubjectsCollide(filter, subj) {
497✔
1830
                                                        resp.Streams = append(resp.Streams, stream)
221✔
1831
                                                        break
221✔
1832
                                                }
1833
                                        }
1834
                                }
1835
                        } else {
35✔
1836
                                resp.Streams = append(resp.Streams, stream)
35✔
1837
                        }
35✔
1838
                }
1839
                js.mu.RUnlock()
265✔
1840
                if len(resp.Streams) > 1 {
267✔
1841
                        slices.Sort(resp.Streams)
2✔
1842
                }
2✔
1843
                numStreams = len(resp.Streams)
265✔
1844
                if offset > numStreams {
265✔
1845
                        offset = numStreams
×
1846
                }
×
1847
                if offset > 0 {
265✔
1848
                        resp.Streams = resp.Streams[offset:]
×
1849
                }
×
1850
                if len(resp.Streams) > JSApiNamesLimit {
265✔
1851
                        resp.Streams = resp.Streams[:JSApiNamesLimit]
×
1852
                }
×
1853
        } else {
237✔
1854
                msets := acc.filteredStreams(filter)
237✔
1855
                // Since we page results order matters.
237✔
1856
                if len(msets) > 1 {
243✔
1857
                        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
21✔
1858
                }
1859

1860
                numStreams = len(msets)
237✔
1861
                if offset > numStreams {
237✔
1862
                        offset = numStreams
×
1863
                }
×
1864

1865
                for _, mset := range msets[offset:] {
484✔
1866
                        resp.Streams = append(resp.Streams, mset.cfg.Name)
247✔
1867
                        if len(resp.Streams) >= JSApiNamesLimit {
247✔
1868
                                break
×
1869
                        }
1870
                }
1871
        }
1872
        resp.Total = numStreams
502✔
1873
        resp.Limit = JSApiNamesLimit
502✔
1874
        resp.Offset = offset
502✔
1875

502✔
1876
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
502✔
1877
}
1878

1879
// Request for the list of all detailed stream info.
1880
// TODO(dlc) - combine with above long term
1881
func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
41✔
1882
        if c == nil || !s.JetStreamEnabled() {
41✔
1883
                return
×
1884
        }
×
1885
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
41✔
1886
        if err != nil {
41✔
1887
                s.Warnf(badAPIRequestT, msg)
×
1888
                return
×
1889
        }
×
1890

1891
        var resp = JSApiStreamListResponse{
41✔
1892
                ApiResponse: ApiResponse{Type: JSApiStreamListResponseType},
41✔
1893
                Streams:     []*StreamInfo{},
41✔
1894
        }
41✔
1895

41✔
1896
        // Determine if we should proceed here when we are in clustered mode.
41✔
1897
        if s.JetStreamIsClustered() {
78✔
1898
                js, cc := s.getJetStreamCluster()
37✔
1899
                if js == nil || cc == nil {
37✔
1900
                        return
×
1901
                }
×
1902
                if js.isLeaderless() {
38✔
1903
                        resp.Error = NewJSClusterNotAvailError()
1✔
1904
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1905
                        return
1✔
1906
                }
1✔
1907
                // Make sure we are meta leader.
1908
                if !s.JetStreamIsLeader() {
63✔
1909
                        return
27✔
1910
                }
27✔
1911
        }
1912

1913
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
13✔
1914
                if doErr {
×
1915
                        resp.Error = NewJSNotEnabledForAccountError()
×
1916
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
1917
                }
×
1918
                return
×
1919
        }
1920

1921
        var offset int
13✔
1922
        var filter string
13✔
1923

13✔
1924
        if isJSONObjectOrArray(msg) {
24✔
1925
                var req JSApiStreamListRequest
11✔
1926
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
1927
                        resp.Error = NewJSInvalidJSONError(err)
1✔
1928
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
1929
                        return
1✔
1930
                }
1✔
1931
                offset = req.Offset
10✔
1932
                if req.Subject != _EMPTY_ {
12✔
1933
                        filter = req.Subject
2✔
1934
                }
2✔
1935
        }
1936

1937
        // Clustered mode will invoke a scatter and gather.
1938
        if s.JetStreamIsClustered() {
21✔
1939
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
9✔
1940
                msg = copyBytes(msg)
9✔
1941
                s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) })
18✔
1942
                return
9✔
1943
        }
1944

1945
        // TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
1946
        // TODO(dlc) - If this list is long maybe do this in a Go routine?
1947
        var msets []*stream
3✔
1948
        if filter == _EMPTY_ {
5✔
1949
                msets = acc.streams()
2✔
1950
        } else {
3✔
1951
                msets = acc.filteredStreams(filter)
1✔
1952
        }
1✔
1953

1954
        slices.SortFunc(msets, func(i, j *stream) int { return cmp.Compare(i.cfg.Name, j.cfg.Name) })
4✔
1955

1956
        scnt := len(msets)
3✔
1957
        if offset > scnt {
3✔
1958
                offset = scnt
×
1959
        }
×
1960

1961
        for _, mset := range msets[offset:] {
7✔
1962
                config := mset.config()
4✔
1963
                resp.Streams = append(resp.Streams, &StreamInfo{
4✔
1964
                        Created:   mset.createdTime(),
4✔
1965
                        State:     mset.state(),
4✔
1966
                        Config:    config,
4✔
1967
                        Domain:    s.getOpts().JetStreamDomain,
4✔
1968
                        Mirror:    mset.mirrorInfo(),
4✔
1969
                        Sources:   mset.sourcesInfo(),
4✔
1970
                        TimeStamp: time.Now().UTC(),
4✔
1971
                })
4✔
1972
                if len(resp.Streams) >= JSApiListLimit {
4✔
1973
                        break
×
1974
                }
1975
        }
1976
        resp.Total = scnt
3✔
1977
        resp.Limit = JSApiListLimit
3✔
1978
        resp.Offset = offset
3✔
1979
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
3✔
1980
}
1981

1982
// Request for information about a stream.
1983
func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
23,390✔
1984
        if c == nil || !s.JetStreamEnabled() {
23,394✔
1985
                return
4✔
1986
        }
4✔
1987
        ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
23,386✔
1988
        if err != nil {
23,391✔
1989
                s.Warnf(badAPIRequestT, msg)
5✔
1990
                return
5✔
1991
        }
5✔
1992

1993
        streamName := streamNameFromSubject(subject)
23,381✔
1994

23,381✔
1995
        var resp = JSApiStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiStreamInfoResponseType}}
23,381✔
1996

23,381✔
1997
        // If someone creates a duplicate stream that is identical we will get this request forwarded to us.
23,381✔
1998
        // Make sure the response type is for a create call.
23,381✔
1999
        if rt := getHeader(JSResponseType, hdr); len(rt) > 0 && string(rt) == jsCreateResponse {
23,381✔
2000
                resp.ApiResponse.Type = JSApiStreamCreateResponseType
×
2001
        }
×
2002

2003
        var clusterWideConsCount int
23,381✔
2004

23,381✔
2005
        js, cc := s.getJetStreamCluster()
23,381✔
2006
        if js == nil {
23,381✔
2007
                return
×
2008
        }
×
2009
        // If we are in clustered mode we need to be the stream leader to proceed.
2010
        if cc != nil {
33,566✔
2011
                // Check to make sure the stream is assigned.
10,185✔
2012
                js.mu.RLock()
10,185✔
2013
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName)
10,185✔
2014
                var offline bool
10,185✔
2015
                if sa != nil {
19,147✔
2016
                        clusterWideConsCount = len(sa.consumers)
8,962✔
2017
                        offline = s.allPeersOffline(sa.Group)
8,962✔
2018
                }
8,962✔
2019
                js.mu.RUnlock()
10,185✔
2020

10,185✔
2021
                if isLeader && sa == nil {
10,448✔
2022
                        // We can't find the stream, so mimic what would be the errors below.
263✔
2023
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
263✔
2024
                                if doErr {
×
2025
                                        resp.Error = NewJSNotEnabledForAccountError()
×
2026
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2027
                                }
×
2028
                                return
×
2029
                        }
2030
                        // No stream present.
2031
                        resp.Error = NewJSStreamNotFoundError()
263✔
2032
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
263✔
2033
                        return
263✔
2034
                } else if sa == nil {
10,882✔
2035
                        if js.isLeaderless() {
960✔
2036
                                resp.Error = NewJSClusterNotAvailError()
×
2037
                                // Delaying an error response gives the leader a chance to respond before us
×
2038
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
2039
                        }
×
2040
                        return
960✔
2041
                } else if isLeader && offline {
8,963✔
2042
                        resp.Error = NewJSStreamOfflineError()
1✔
2043
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
2044
                        return
1✔
2045
                }
1✔
2046

2047
                // Check to see if we are a member of the group and if the group has no leader.
2048
                isLeaderless := js.isGroupLeaderless(sa.Group)
8,961✔
2049

8,961✔
2050
                // We have the stream assigned and a leader, so only the stream leader should answer.
8,961✔
2051
                if !acc.JetStreamIsStreamLeader(streamName) && !isLeaderless {
16,186✔
2052
                        if js.isLeaderless() {
7,225✔
2053
                                resp.Error = NewJSClusterNotAvailError()
×
2054
                                // Delaying an error response gives the leader a chance to respond before us
×
2055
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), sa.Group, errRespDelay)
×
2056
                                return
×
2057
                        }
×
2058

2059
                        // We may be in process of electing a leader, but if this is a scale up from 1 we will still be the state leader
2060
                        // while the new members work through the election and catchup process.
2061
                        // Double check for that instead of exiting here and being silent. e.g. nats stream update test --replicas=3
2062
                        js.mu.RLock()
7,225✔
2063
                        rg := sa.Group
7,225✔
2064
                        var ourID string
7,225✔
2065
                        if cc.meta != nil {
14,450✔
2066
                                ourID = cc.meta.ID()
7,225✔
2067
                        }
7,225✔
2068
                        // We have seen cases where rg is nil at this point,
2069
                        // so check explicitly and bail if that is the case.
2070
                        bail := rg == nil || !rg.isMember(ourID)
7,225✔
2071
                        if !bail {
9,795✔
2072
                                // We know we are a member here, if this group is new and we are preferred allow us to answer.
2,570✔
2073
                                // Also, we have seen cases where rg.node is nil at this point,
2,570✔
2074
                                // so check explicitly and bail if that is the case.
2,570✔
2075
                                bail = rg.Preferred != ourID || (rg.node != nil && time.Since(rg.node.Created()) > lostQuorumIntervalDefault)
2,570✔
2076
                        }
2,570✔
2077
                        js.mu.RUnlock()
7,225✔
2078
                        if bail {
14,438✔
2079
                                return
7,213✔
2080
                        }
7,213✔
2081
                }
2082
        }
2083

2084
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
14,950✔
2085
                if doErr {
7✔
2086
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
2087
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2088
                }
1✔
2089
                return
6✔
2090
        }
2091

2092
        var details bool
14,938✔
2093
        var subjects string
14,938✔
2094
        var offset int
14,938✔
2095
        if isJSONObjectOrArray(msg) {
14,971✔
2096
                var req JSApiStreamInfoRequest
33✔
2097
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
34✔
2098
                        resp.Error = NewJSInvalidJSONError(err)
1✔
2099
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2100
                        return
1✔
2101
                }
1✔
2102
                details, subjects = req.DeletedDetails, req.SubjectsFilter
32✔
2103
                offset = req.Offset
32✔
2104
        }
2105

2106
        mset, err := acc.lookupStream(streamName)
14,937✔
2107
        // Error is not to be expected at this point, but could happen if same stream trying to be created.
14,937✔
2108
        if err != nil {
15,589✔
2109
                if cc != nil {
652✔
2110
                        // This could be inflight, pause for a short bit and try again.
×
2111
                        // This will not be inline, so ok.
×
2112
                        time.Sleep(10 * time.Millisecond)
×
2113
                        mset, err = acc.lookupStream(streamName)
×
2114
                }
×
2115
                // Check again.
2116
                if err != nil {
1,304✔
2117
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
652✔
2118
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
652✔
2119
                        return
652✔
2120
                }
652✔
2121
        }
2122

2123
        config := mset.config()
14,285✔
2124
        resp.StreamInfo = &StreamInfo{
14,285✔
2125
                Created:    mset.createdTime(),
14,285✔
2126
                State:      mset.stateWithDetail(details),
14,285✔
2127
                Config:     *setDynamicStreamMetadata(&config),
14,285✔
2128
                Domain:     s.getOpts().JetStreamDomain,
14,285✔
2129
                Cluster:    js.clusterInfo(mset.raftGroup()),
14,285✔
2130
                Mirror:     mset.mirrorInfo(),
14,285✔
2131
                Sources:    mset.sourcesInfo(),
14,285✔
2132
                Alternates: js.streamAlternates(ci, config.Name),
14,285✔
2133
                TimeStamp:  time.Now().UTC(),
14,285✔
2134
        }
14,285✔
2135
        if clusterWideConsCount > 0 {
14,771✔
2136
                resp.StreamInfo.State.Consumers = clusterWideConsCount
486✔
2137
        }
486✔
2138

2139
        // Check if they have asked for subject details.
2140
        if subjects != _EMPTY_ {
14,315✔
2141
                st := mset.store.SubjectsTotals(subjects)
30✔
2142
                if lst := len(st); lst > 0 {
56✔
2143
                        // Common for both cases.
26✔
2144
                        resp.Offset = offset
26✔
2145
                        resp.Limit = JSMaxSubjectDetails
26✔
2146
                        resp.Total = lst
26✔
2147

26✔
2148
                        if offset == 0 && lst <= JSMaxSubjectDetails {
52✔
2149
                                resp.StreamInfo.State.Subjects = st
26✔
2150
                        } else {
26✔
2151
                                // Here we have to filter list due to offset or maximum constraints.
×
2152
                                subjs := make([]string, 0, len(st))
×
2153
                                for subj := range st {
×
2154
                                        subjs = append(subjs, subj)
×
2155
                                }
×
2156
                                // Sort it
2157
                                slices.Sort(subjs)
×
2158

×
2159
                                if offset > len(subjs) {
×
2160
                                        offset = len(subjs)
×
2161
                                }
×
2162

2163
                                end := offset + JSMaxSubjectDetails
×
2164
                                if end > len(subjs) {
×
2165
                                        end = len(subjs)
×
2166
                                }
×
2167
                                actualSize := end - offset
×
2168
                                var sd map[string]uint64
×
2169

×
2170
                                if actualSize > 0 {
×
2171
                                        sd = make(map[string]uint64, actualSize)
×
2172
                                        for _, ss := range subjs[offset:end] {
×
2173
                                                sd[ss] = st[ss]
×
2174
                                        }
×
2175
                                }
2176
                                resp.StreamInfo.State.Subjects = sd
×
2177
                        }
2178
                }
2179
        }
2180
        // Check for out of band catchups.
2181
        if mset.hasCatchupPeers() {
14,285✔
2182
                mset.checkClusterInfo(resp.StreamInfo.Cluster)
×
2183
        }
×
2184

2185
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
14,285✔
2186
}
2187

2188
// Request to have a stream leader stepdown.
2189
func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
264✔
2190
        if c == nil || !s.JetStreamEnabled() {
264✔
2191
                return
×
2192
        }
×
2193
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
264✔
2194
        if err != nil {
264✔
2195
                s.Warnf(badAPIRequestT, msg)
×
2196
                return
×
2197
        }
×
2198

2199
        // Have extra token for this one.
2200
        name := tokenAt(subject, 6)
264✔
2201

264✔
2202
        var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}}
264✔
2203

264✔
2204
        // If we are not in clustered mode this is a failed request.
264✔
2205
        if !s.JetStreamIsClustered() {
264✔
2206
                resp.Error = NewJSClusterRequiredError()
×
2207
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2208
                return
×
2209
        }
×
2210

2211
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2212
        js, cc := s.getJetStreamCluster()
264✔
2213
        if js == nil || cc == nil {
264✔
2214
                return
×
2215
        }
×
2216
        if js.isLeaderless() {
264✔
2217
                resp.Error = NewJSClusterNotAvailError()
×
2218
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2219
                return
×
2220
        }
×
2221

2222
        js.mu.RLock()
264✔
2223
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
264✔
2224
        js.mu.RUnlock()
264✔
2225

264✔
2226
        if isLeader && sa == nil {
264✔
2227
                resp.Error = NewJSStreamNotFoundError()
×
2228
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2229
                return
×
2230
        } else if sa == nil {
264✔
2231
                return
×
2232
        }
×
2233

2234
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
264✔
2235
                if doErr {
×
2236
                        resp.Error = NewJSNotEnabledForAccountError()
×
2237
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2238
                }
×
2239
                return
×
2240
        }
2241

2242
        // Check to see if we are a member of the group and if the group has no leader.
2243
        if js.isGroupLeaderless(sa.Group) {
264✔
2244
                resp.Error = NewJSClusterNotAvailError()
×
2245
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2246
                return
×
2247
        }
×
2248

2249
        // We have the stream assigned and a leader, so only the stream leader should answer.
2250
        if !acc.JetStreamIsStreamLeader(name) {
469✔
2251
                return
205✔
2252
        }
205✔
2253

2254
        mset, err := acc.lookupStream(name)
59✔
2255
        if err != nil {
59✔
2256
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
2257
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2258
                return
×
2259
        }
×
2260

2261
        if mset == nil {
59✔
2262
                resp.Success = true
×
2263
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2264
                return
×
2265
        }
×
2266

2267
        node := mset.raftNode()
59✔
2268
        if node == nil {
59✔
2269
                resp.Success = true
×
2270
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2271
                return
×
2272
        }
×
2273

2274
        var preferredLeader string
59✔
2275
        if isJSONObjectOrArray(msg) {
71✔
2276
                var req JSApiLeaderStepdownRequest
12✔
2277
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
12✔
2278
                        resp.Error = NewJSInvalidJSONError(err)
×
2279
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2280
                        return
×
2281
                }
×
2282
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(node, req.Placement); resp.Error != nil {
17✔
2283
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2284
                        return
5✔
2285
                }
5✔
2286
        }
2287

2288
        // Call actual stepdown.
2289
        err = node.StepDown(preferredLeader)
54✔
2290
        if err != nil {
54✔
2291
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2292
        } else {
54✔
2293
                resp.Success = true
54✔
2294
        }
54✔
2295
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
54✔
2296
}
2297

2298
// Request to have a consumer leader stepdown.
2299
func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
226✔
2300
        if c == nil || !s.JetStreamEnabled() {
226✔
2301
                return
×
2302
        }
×
2303
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
226✔
2304
        if err != nil {
226✔
2305
                s.Warnf(badAPIRequestT, msg)
×
2306
                return
×
2307
        }
×
2308

2309
        var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}}
226✔
2310

226✔
2311
        // If we are not in clustered mode this is a failed request.
226✔
2312
        if !s.JetStreamIsClustered() {
226✔
2313
                resp.Error = NewJSClusterRequiredError()
×
2314
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2315
                return
×
2316
        }
×
2317

2318
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2319
        js, cc := s.getJetStreamCluster()
226✔
2320
        if js == nil || cc == nil {
227✔
2321
                return
1✔
2322
        }
1✔
2323
        if js.isLeaderless() {
225✔
2324
                resp.Error = NewJSClusterNotAvailError()
×
2325
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2326
                return
×
2327
        }
×
2328

2329
        // Have extra token for this one.
2330
        stream := tokenAt(subject, 6)
225✔
2331
        consumer := tokenAt(subject, 7)
225✔
2332

225✔
2333
        js.mu.RLock()
225✔
2334
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
225✔
2335
        js.mu.RUnlock()
225✔
2336

225✔
2337
        if isLeader && sa == nil {
225✔
2338
                resp.Error = NewJSStreamNotFoundError()
×
2339
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2340
                return
×
2341
        } else if sa == nil {
225✔
2342
                return
×
2343
        }
×
2344
        var ca *consumerAssignment
225✔
2345
        if sa.consumers != nil {
450✔
2346
                ca = sa.consumers[consumer]
225✔
2347
        }
225✔
2348
        if ca == nil {
225✔
2349
                resp.Error = NewJSConsumerNotFoundError()
×
2350
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2351
                return
×
2352
        }
×
2353
        // Check to see if we are a member of the group and if the group has no leader.
2354
        if js.isGroupLeaderless(ca.Group) {
225✔
2355
                resp.Error = NewJSClusterNotAvailError()
×
2356
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2357
                return
×
2358
        }
×
2359

2360
        if !acc.JetStreamIsConsumerLeader(stream, consumer) {
387✔
2361
                return
162✔
2362
        }
162✔
2363

2364
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
63✔
2365
                if doErr {
×
2366
                        resp.Error = NewJSNotEnabledForAccountError()
×
2367
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2368
                }
×
2369
                return
×
2370
        }
2371

2372
        mset, err := acc.lookupStream(stream)
63✔
2373
        if err != nil {
63✔
2374
                resp.Error = NewJSStreamNotFoundError()
×
2375
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2376
                return
×
2377
        }
×
2378
        o := mset.lookupConsumer(consumer)
63✔
2379
        if o == nil {
63✔
2380
                resp.Error = NewJSConsumerNotFoundError()
×
2381
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2382
                return
×
2383
        }
×
2384

2385
        n := o.raftNode()
63✔
2386
        if n == nil {
63✔
2387
                resp.Success = true
×
2388
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
×
2389
                return
×
2390
        }
×
2391

2392
        var preferredLeader string
63✔
2393
        if isJSONObjectOrArray(msg) {
76✔
2394
                var req JSApiLeaderStepdownRequest
13✔
2395
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
13✔
2396
                        resp.Error = NewJSInvalidJSONError(err)
×
2397
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2398
                        return
×
2399
                }
×
2400
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(n, req.Placement); resp.Error != nil {
18✔
2401
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2402
                        return
5✔
2403
                }
5✔
2404
        }
2405

2406
        // Call actual stepdown.
2407
        err = n.StepDown(preferredLeader)
58✔
2408
        if err != nil {
58✔
2409
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
2410
        } else {
58✔
2411
                resp.Success = true
58✔
2412
        }
58✔
2413
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
58✔
2414
}
2415

2416
// Request to remove a peer from a clustered stream.
2417
func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
69✔
2418
        if c == nil || !s.JetStreamEnabled() {
69✔
2419
                return
×
2420
        }
×
2421
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
69✔
2422
        if err != nil {
69✔
2423
                s.Warnf(badAPIRequestT, msg)
×
2424
                return
×
2425
        }
×
2426

2427
        // Have extra token for this one.
2428
        name := tokenAt(subject, 6)
69✔
2429

69✔
2430
        var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}}
69✔
2431

69✔
2432
        // If we are not in clustered mode this is a failed request.
69✔
2433
        if !s.JetStreamIsClustered() {
69✔
2434
                resp.Error = NewJSClusterRequiredError()
×
2435
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2436
                return
×
2437
        }
×
2438

2439
        // If we are here we are clustered. See if we are the stream leader in order to proceed.
2440
        js, cc := s.getJetStreamCluster()
69✔
2441
        if js == nil || cc == nil {
69✔
2442
                return
×
2443
        }
×
2444
        if js.isLeaderless() {
69✔
2445
                resp.Error = NewJSClusterNotAvailError()
×
2446
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2447
                return
×
2448
        }
×
2449

2450
        js.mu.RLock()
69✔
2451
        isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
69✔
2452
        js.mu.RUnlock()
69✔
2453

69✔
2454
        // Make sure we are meta leader.
69✔
2455
        if !isLeader {
127✔
2456
                return
58✔
2457
        }
58✔
2458

2459
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
2460
                if doErr {
×
2461
                        resp.Error = NewJSNotEnabledForAccountError()
×
2462
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2463
                }
×
2464
                return
×
2465
        }
2466
        if isEmptyRequest(msg) {
11✔
2467
                resp.Error = NewJSBadRequestError()
×
2468
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2469
                return
×
2470
        }
×
2471

2472
        var req JSApiStreamRemovePeerRequest
11✔
2473
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
2474
                resp.Error = NewJSInvalidJSONError(err)
×
2475
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2476
                return
×
2477
        }
×
2478
        if req.Peer == _EMPTY_ {
11✔
2479
                resp.Error = NewJSBadRequestError()
×
2480
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2481
                return
×
2482
        }
×
2483

2484
        if sa == nil {
11✔
2485
                // No stream present.
×
2486
                resp.Error = NewJSStreamNotFoundError()
×
2487
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2488
                return
×
2489
        }
×
2490

2491
        // Check to see if we are a member of the group and if the group has no leader.
2492
        // Peers here is a server name, convert to node name.
2493
        nodeName := getHash(req.Peer)
11✔
2494

11✔
2495
        js.mu.RLock()
11✔
2496
        rg := sa.Group
11✔
2497
        isMember := rg.isMember(nodeName)
11✔
2498
        js.mu.RUnlock()
11✔
2499

11✔
2500
        // Make sure we are a member.
11✔
2501
        if !isMember {
12✔
2502
                resp.Error = NewJSClusterPeerNotMemberError()
1✔
2503
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
2504
                return
1✔
2505
        }
1✔
2506

2507
        // If we are here we have a valid peer member set for removal.
2508
        if !js.removePeerFromStream(sa, nodeName) {
12✔
2509
                resp.Error = NewJSPeerRemapError()
2✔
2510
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2511
                return
2✔
2512
        }
2✔
2513

2514
        resp.Success = true
8✔
2515
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
8✔
2516
}
2517

2518
// Request to have the metaleader remove a peer from the system.
2519
func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
8✔
2520
        if c == nil || !s.JetStreamEnabled() {
8✔
2521
                return
×
2522
        }
×
2523

2524
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
8✔
2525
        if err != nil {
8✔
2526
                s.Warnf(badAPIRequestT, msg)
×
2527
                return
×
2528
        }
×
2529
        if acc != s.SystemAccount() {
8✔
2530
                return
×
2531
        }
×
2532

2533
        js, cc := s.getJetStreamCluster()
8✔
2534
        if js == nil || cc == nil || cc.meta == nil {
8✔
2535
                return
×
2536
        }
×
2537

2538
        // Extra checks here but only leader is listening.
2539
        js.mu.RLock()
8✔
2540
        isLeader := cc.isLeader()
8✔
2541
        js.mu.RUnlock()
8✔
2542

8✔
2543
        if !isLeader {
8✔
2544
                return
×
2545
        }
×
2546

2547
        var resp = JSApiMetaServerRemoveResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerRemoveResponseType}}
8✔
2548

8✔
2549
        if isEmptyRequest(msg) {
8✔
2550
                resp.Error = NewJSBadRequestError()
×
2551
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2552
                return
×
2553
        }
×
2554

2555
        var req JSApiMetaServerRemoveRequest
8✔
2556
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
8✔
2557
                resp.Error = NewJSInvalidJSONError(err)
×
2558
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2559
                return
×
2560
        }
×
2561

2562
        var found string
8✔
2563
        js.mu.RLock()
8✔
2564
        for _, p := range cc.meta.Peers() {
34✔
2565
                // If Peer is specified, it takes precedence
26✔
2566
                if req.Peer != _EMPTY_ {
30✔
2567
                        if p.ID == req.Peer {
5✔
2568
                                found = req.Peer
1✔
2569
                                break
1✔
2570
                        }
2571
                        continue
3✔
2572
                }
2573
                si, ok := s.nodeToInfo.Load(p.ID)
22✔
2574
                if ok && si.(nodeInfo).name == req.Server {
26✔
2575
                        found = p.ID
4✔
2576
                        break
4✔
2577
                }
2578
        }
2579
        js.mu.RUnlock()
8✔
2580

8✔
2581
        if found == _EMPTY_ {
11✔
2582
                resp.Error = NewJSClusterServerNotMemberError()
3✔
2583
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
2584
                return
3✔
2585
        }
3✔
2586

2587
        // So we have a valid peer.
2588
        js.mu.Lock()
5✔
2589
        cc.meta.ProposeRemovePeer(found)
5✔
2590
        js.mu.Unlock()
5✔
2591

5✔
2592
        resp.Success = true
5✔
2593
        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
2594
}
2595

2596
func (s *Server) peerSetToNames(ps []string) []string {
171✔
2597
        names := make([]string, len(ps))
171✔
2598
        for i := 0; i < len(ps); i++ {
637✔
2599
                if si, ok := s.nodeToInfo.Load(ps[i]); !ok {
466✔
2600
                        names[i] = ps[i]
×
2601
                } else {
466✔
2602
                        names[i] = si.(nodeInfo).name
466✔
2603
                }
466✔
2604
        }
2605
        return names
171✔
2606
}
2607

2608
// looks up the peer id for a given server name. Cluster and domain name are optional filter criteria
2609
func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string {
29✔
2610
        js.mu.RLock()
29✔
2611
        defer js.mu.RUnlock()
29✔
2612
        if cc := js.cluster; cc != nil {
58✔
2613
                for _, p := range cc.meta.Peers() {
154✔
2614
                        si, ok := s.nodeToInfo.Load(p.ID)
125✔
2615
                        if ok && si.(nodeInfo).name == serverName {
154✔
2616
                                if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
58✔
2617
                                        if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
58✔
2618
                                                return p.ID
29✔
2619
                                        }
29✔
2620
                                }
2621
                        }
2622
                }
2623
        }
2624
        return _EMPTY_
×
2625
}
2626

2627
// Request to have the metaleader move a stream on a peer to another
2628
func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
33✔
2629
        if c == nil || !s.JetStreamEnabled() {
33✔
2630
                return
×
2631
        }
×
2632

2633
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
33✔
2634
        if err != nil {
33✔
2635
                s.Warnf(badAPIRequestT, msg)
×
2636
                return
×
2637
        }
×
2638

2639
        js, cc := s.getJetStreamCluster()
33✔
2640
        if js == nil || cc == nil || cc.meta == nil {
33✔
2641
                return
×
2642
        }
×
2643

2644
        // Extra checks here but only leader is listening.
2645
        js.mu.RLock()
33✔
2646
        isLeader := cc.isLeader()
33✔
2647
        js.mu.RUnlock()
33✔
2648

33✔
2649
        if !isLeader {
33✔
2650
                return
×
2651
        }
×
2652

2653
        accName := tokenAt(subject, 6)
33✔
2654
        streamName := tokenAt(subject, 7)
33✔
2655

33✔
2656
        if acc.GetName() != accName && acc != s.SystemAccount() {
33✔
2657
                return
×
2658
        }
×
2659

2660
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
33✔
2661

33✔
2662
        var req JSApiMetaServerStreamMoveRequest
33✔
2663
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
33✔
2664
                resp.Error = NewJSInvalidJSONError(err)
×
2665
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2666
                return
×
2667
        }
×
2668

2669
        srcPeer := _EMPTY_
33✔
2670
        if req.Server != _EMPTY_ {
62✔
2671
                srcPeer = s.nameToPeer(js, req.Server, req.Cluster, req.Domain)
29✔
2672
        }
29✔
2673

2674
        targetAcc, ok := s.accounts.Load(accName)
33✔
2675
        if !ok {
33✔
2676
                resp.Error = NewJSNoAccountError()
×
2677
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2678
                return
×
2679
        }
×
2680

2681
        var streamFound bool
33✔
2682
        cfg := StreamConfig{}
33✔
2683
        currPeers := []string{}
33✔
2684
        currCluster := _EMPTY_
33✔
2685
        js.mu.Lock()
33✔
2686
        streams, ok := cc.streams[accName]
33✔
2687
        if ok {
66✔
2688
                sa, ok := streams[streamName]
33✔
2689
                if ok {
66✔
2690
                        cfg = *sa.Config.clone()
33✔
2691
                        streamFound = true
33✔
2692
                        currPeers = sa.Group.Peers
33✔
2693
                        currCluster = sa.Group.Cluster
33✔
2694
                }
33✔
2695
        }
2696
        js.mu.Unlock()
33✔
2697

33✔
2698
        if !streamFound {
33✔
2699
                resp.Error = NewJSStreamNotFoundError()
×
2700
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2701
                return
×
2702
        }
×
2703

2704
        // if server was picked, make sure src peer exists and move it to first position.
2705
        // removal will drop peers from the left
2706
        if req.Server != _EMPTY_ {
62✔
2707
                if srcPeer == _EMPTY_ {
29✔
2708
                        resp.Error = NewJSClusterServerNotMemberError()
×
2709
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2710
                        return
×
2711
                }
×
2712
                var peerFound bool
29✔
2713
                for i := 0; i < len(currPeers); i++ {
84✔
2714
                        if currPeers[i] == srcPeer {
84✔
2715
                                copy(currPeers[1:], currPeers[:i])
29✔
2716
                                currPeers[0] = srcPeer
29✔
2717
                                peerFound = true
29✔
2718
                                break
29✔
2719
                        }
2720
                }
2721
                if !peerFound {
29✔
2722
                        resp.Error = NewJSClusterPeerNotMemberError()
×
2723
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2724
                        return
×
2725
                }
×
2726
        }
2727

2728
        // make sure client is scoped to requested account
2729
        ciNew := *(ci)
33✔
2730
        ciNew.Account = accName
33✔
2731

33✔
2732
        // backup placement such that peers can be looked up with modified tag list
33✔
2733
        var origPlacement *Placement
33✔
2734
        if cfg.Placement != nil {
33✔
2735
                tmp := *cfg.Placement
×
2736
                origPlacement = &tmp
×
2737
        }
×
2738

2739
        if len(req.Tags) > 0 {
60✔
2740
                if cfg.Placement == nil {
54✔
2741
                        cfg.Placement = &Placement{}
27✔
2742
                }
27✔
2743
                cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...)
27✔
2744
        }
2745

2746
        peers, e := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers, 1, nil)
33✔
2747
        if len(peers) <= cfg.Replicas {
35✔
2748
                // since expanding in the same cluster did not yield a result, try in different cluster
2✔
2749
                peers = nil
2✔
2750

2✔
2751
                clusters := map[string]struct{}{}
2✔
2752
                s.nodeToInfo.Range(func(_, ni any) bool {
18✔
2753
                        if currCluster != ni.(nodeInfo).cluster {
24✔
2754
                                clusters[ni.(nodeInfo).cluster] = struct{}{}
8✔
2755
                        }
8✔
2756
                        return true
16✔
2757
                })
2758
                errs := &selectPeerError{}
2✔
2759
                errs.accumulate(e)
2✔
2760
                for cluster := range clusters {
4✔
2761
                        newPeers, e := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0, nil)
2✔
2762
                        if len(newPeers) >= cfg.Replicas {
4✔
2763
                                peers = append([]string{}, currPeers...)
2✔
2764
                                peers = append(peers, newPeers[:cfg.Replicas]...)
2✔
2765
                                break
2✔
2766
                        }
2767
                        errs.accumulate(e)
×
2768
                }
2769
                if peers == nil {
2✔
2770
                        resp.Error = NewJSClusterNoPeersError(errs)
×
2771
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2772
                        return
×
2773
                }
×
2774
        }
2775

2776
        cfg.Placement = origPlacement
33✔
2777

33✔
2778
        s.Noticef("Requested move for stream '%s > %s' R=%d from %+v to %+v",
33✔
2779
                accName, streamName, cfg.Replicas, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
33✔
2780

33✔
2781
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
33✔
2782
        // We should be fine ignoring pedantic mode here. as we do not touch configuration.
33✔
2783
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
33✔
2784
}
2785

2786
// Request to have the metaleader move a stream on a peer to another
2787
func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
4✔
2788
        if c == nil || !s.JetStreamEnabled() {
4✔
2789
                return
×
2790
        }
×
2791

2792
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
4✔
2793
        if err != nil {
4✔
2794
                s.Warnf(badAPIRequestT, msg)
×
2795
                return
×
2796
        }
×
2797

2798
        js, cc := s.getJetStreamCluster()
4✔
2799
        if js == nil || cc == nil || cc.meta == nil {
4✔
2800
                return
×
2801
        }
×
2802

2803
        // Extra checks here but only leader is listening.
2804
        js.mu.RLock()
4✔
2805
        isLeader := cc.isLeader()
4✔
2806
        js.mu.RUnlock()
4✔
2807

4✔
2808
        if !isLeader {
4✔
2809
                return
×
2810
        }
×
2811

2812
        var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
4✔
2813

4✔
2814
        accName := tokenAt(subject, 6)
4✔
2815
        streamName := tokenAt(subject, 7)
4✔
2816

4✔
2817
        if acc.GetName() != accName && acc != s.SystemAccount() {
4✔
2818
                return
×
2819
        }
×
2820

2821
        targetAcc, ok := s.accounts.Load(accName)
4✔
2822
        if !ok {
4✔
2823
                resp.Error = NewJSNoAccountError()
×
2824
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2825
                return
×
2826
        }
×
2827

2828
        streamFound := false
4✔
2829
        cfg := StreamConfig{}
4✔
2830
        currPeers := []string{}
4✔
2831
        js.mu.Lock()
4✔
2832
        streams, ok := cc.streams[accName]
4✔
2833
        if ok {
8✔
2834
                sa, ok := streams[streamName]
4✔
2835
                if ok {
8✔
2836
                        cfg = *sa.Config.clone()
4✔
2837
                        streamFound = true
4✔
2838
                        currPeers = sa.Group.Peers
4✔
2839
                }
4✔
2840
        }
2841
        js.mu.Unlock()
4✔
2842

4✔
2843
        if !streamFound {
4✔
2844
                resp.Error = NewJSStreamNotFoundError()
×
2845
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2846
                return
×
2847
        }
×
2848

2849
        if len(currPeers) <= cfg.Replicas {
4✔
2850
                resp.Error = NewJSStreamMoveNotInProgressError()
×
2851
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2852
                return
×
2853
        }
×
2854

2855
        // make sure client is scoped to requested account
2856
        ciNew := *(ci)
4✔
2857
        ciNew.Account = accName
4✔
2858

4✔
2859
        peers := currPeers[:cfg.Replicas]
4✔
2860

4✔
2861
        // Remove placement in case tags don't match
4✔
2862
        // This can happen if the move was initiated by modifying the tags.
4✔
2863
        // This is an account operation.
4✔
2864
        // This can NOT happen when the move was initiated by the system account.
4✔
2865
        // There move honors the original tag list.
4✔
2866
        if cfg.Placement != nil && len(cfg.Placement.Tags) != 0 {
5✔
2867
        FOR_TAGCHECK:
1✔
2868
                for _, peer := range peers {
2✔
2869
                        si, ok := s.nodeToInfo.Load(peer)
1✔
2870
                        if !ok {
1✔
2871
                                // can't verify tags, do the safe thing and error
×
2872
                                resp.Error = NewJSStreamGeneralError(
×
2873
                                        fmt.Errorf("peer %s not present for tag validation", peer))
×
2874
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2875
                                return
×
2876
                        }
×
2877
                        nodeTags := si.(nodeInfo).tags
1✔
2878
                        for _, tag := range cfg.Placement.Tags {
2✔
2879
                                if !nodeTags.Contains(tag) {
2✔
2880
                                        // clear placement as tags don't match
1✔
2881
                                        cfg.Placement = nil
1✔
2882
                                        break FOR_TAGCHECK
1✔
2883
                                }
2884
                        }
2885

2886
                }
2887
        }
2888

2889
        s.Noticef("Requested cancel of move: R=%d '%s > %s' to peer set %+v and restore previous peer set %+v",
4✔
2890
                cfg.Replicas, accName, streamName, s.peerSetToNames(currPeers), s.peerSetToNames(peers))
4✔
2891

4✔
2892
        // We will always have peers and therefore never do a callout, therefore it is safe to call inline
4✔
2893
        s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers, false)
4✔
2894
}
2895

2896
// Request to have an account purged
2897
func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
6✔
2898
        if c == nil || !s.JetStreamEnabled() {
6✔
2899
                return
×
2900
        }
×
2901

2902
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
6✔
2903
        if err != nil {
6✔
2904
                s.Warnf(badAPIRequestT, msg)
×
2905
                return
×
2906
        }
×
2907
        if acc != s.SystemAccount() {
6✔
2908
                return
×
2909
        }
×
2910

2911
        js := s.getJetStream()
6✔
2912
        if js == nil {
6✔
2913
                return
×
2914
        }
×
2915

2916
        accName := tokenAt(subject, 5)
6✔
2917

6✔
2918
        var resp = JSApiAccountPurgeResponse{ApiResponse: ApiResponse{Type: JSApiAccountPurgeResponseType}}
6✔
2919

6✔
2920
        if !s.JetStreamIsClustered() {
8✔
2921
                var streams []*stream
2✔
2922
                var ac *Account
2✔
2923
                if ac, err = s.lookupAccount(accName); err == nil && ac != nil {
3✔
2924
                        streams = ac.streams()
1✔
2925
                }
1✔
2926

2927
                s.Noticef("Purge request for account %s (streams: %d, hasAccount: %t)",
2✔
2928
                        accName, len(streams), ac != nil)
2✔
2929

2✔
2930
                for _, mset := range streams {
3✔
2931
                        err := mset.delete()
1✔
2932
                        if err != nil {
1✔
2933
                                resp.Error = NewJSStreamDeleteError(err)
×
2934
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2935
                                return
×
2936
                        }
×
2937
                }
2938
                if err := os.RemoveAll(filepath.Join(js.config.StoreDir, accName)); err != nil {
2✔
2939
                        resp.Error = NewJSStreamGeneralError(err)
×
2940
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2941
                        return
×
2942
                }
×
2943
                resp.Initiated = true
2✔
2944
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
2945
                return
2✔
2946
        }
2947

2948
        _, cc := s.getJetStreamCluster()
4✔
2949
        if cc == nil || cc.meta == nil || !cc.isLeader() {
4✔
2950
                return
×
2951
        }
×
2952

2953
        if js.isMetaRecovering() {
4✔
2954
                // While in recovery mode, the data structures are not fully initialized
×
2955
                resp.Error = NewJSClusterNotAvailError()
×
2956
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
2957
                return
×
2958
        }
×
2959

2960
        js.mu.RLock()
4✔
2961
        ns, nc := 0, 0
4✔
2962
        streams, hasAccount := cc.streams[accName]
4✔
2963
        for _, osa := range streams {
12✔
2964
                for _, oca := range osa.consumers {
20✔
2965
                        oca.deleted = true
12✔
2966
                        ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client}
12✔
2967
                        cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
12✔
2968
                        nc++
12✔
2969
                }
12✔
2970
                sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client}
8✔
2971
                cc.meta.Propose(encodeDeleteStreamAssignment(sa))
8✔
2972
                ns++
8✔
2973
        }
2974
        js.mu.RUnlock()
4✔
2975

4✔
2976
        s.Noticef("Purge request for account %s (streams: %d, consumer: %d, hasAccount: %t)", accName, ns, nc, hasAccount)
4✔
2977

4✔
2978
        resp.Initiated = true
4✔
2979
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
2980
}
2981

2982
// Request to have the meta leader stepdown.
2983
// These will only be received by the meta leader, so less checking needed.
2984
func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
22✔
2985
        if c == nil || !s.JetStreamEnabled() {
22✔
2986
                return
×
2987
        }
×
2988

2989
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
22✔
2990
        if err != nil {
22✔
2991
                s.Warnf(badAPIRequestT, msg)
×
2992
                return
×
2993
        }
×
2994

2995
        // This should only be coming from the System Account.
2996
        if acc != s.SystemAccount() {
23✔
2997
                s.RateLimitWarnf("JetStream API stepdown request from non-system account: %q user: %q", ci.serviceAccount(), ci.User)
1✔
2998
                return
1✔
2999
        }
1✔
3000

3001
        js, cc := s.getJetStreamCluster()
21✔
3002
        if js == nil || cc == nil || cc.meta == nil {
21✔
3003
                return
×
3004
        }
×
3005

3006
        // Extra checks here but only leader is listening.
3007
        js.mu.RLock()
21✔
3008
        isLeader := cc.isLeader()
21✔
3009
        js.mu.RUnlock()
21✔
3010

21✔
3011
        if !isLeader {
21✔
3012
                return
×
3013
        }
×
3014

3015
        var preferredLeader string
21✔
3016
        var resp = JSApiLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiLeaderStepDownResponseType}}
21✔
3017

21✔
3018
        if isJSONObjectOrArray(msg) {
35✔
3019
                var req JSApiLeaderStepdownRequest
14✔
3020
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
14✔
3021
                        resp.Error = NewJSInvalidJSONError(err)
×
3022
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3023
                        return
×
3024
                }
×
3025
                if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(cc.meta, req.Placement); resp.Error != nil {
20✔
3026
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
6✔
3027
                        return
6✔
3028
                }
6✔
3029
        }
3030

3031
        // Call actual stepdown.
3032
        err = cc.meta.StepDown(preferredLeader)
15✔
3033
        if err != nil {
15✔
3034
                resp.Error = NewJSRaftGeneralError(err, Unless(err))
×
3035
        } else {
15✔
3036
                resp.Success = true
15✔
3037
        }
15✔
3038
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
15✔
3039
}
3040

3041
// Check if given []bytes is a JSON Object or Array.
3042
// Technically, valid JSON can also be a plain string or number, but for our use case,
3043
// we care only for JSON objects or arrays which starts with `[` or `{`.
3044
// This function does not have to ensure valid JSON in its entirety. It is used merely
3045
// to hint the codepath if it should attempt to parse the request as JSON or not.
3046
func isJSONObjectOrArray(req []byte) bool {
15,759✔
3047
        // Skip leading JSON whitespace (space, tab, newline, carriage return)
15,759✔
3048
        i := 0
15,759✔
3049
        for i < len(req) && (req[i] == ' ' || req[i] == '\t' || req[i] == '\n' || req[i] == '\r') {
15,771✔
3050
                i++
12✔
3051
        }
12✔
3052
        // Check for empty input after trimming
3053
        if i >= len(req) {
30,972✔
3054
                return false
15,213✔
3055
        }
15,213✔
3056
        // Check if the first non-whitespace character is '{' or '['
3057
        return req[i] == '{' || req[i] == '['
546✔
3058
}
3059

3060
func isEmptyRequest(req []byte) bool {
46,948✔
3061
        if len(req) == 0 {
92,532✔
3062
                return true
45,584✔
3063
        }
45,584✔
3064
        if bytes.Equal(req, []byte("{}")) {
1,365✔
3065
                return true
1✔
3066
        }
1✔
3067
        // If we are here we didn't get our simple match, but still could be valid.
3068
        var v any
1,363✔
3069
        if err := json.Unmarshal(req, &v); err != nil {
1,363✔
3070
                return false
×
3071
        }
×
3072
        vm, ok := v.(map[string]any)
1,363✔
3073
        if !ok {
1,363✔
3074
                return false
×
3075
        }
×
3076
        return len(vm) == 0
1,363✔
3077
}
3078

3079
// getStepDownPreferredPlacement attempts to work out what the best placement is
3080
// for a stepdown request. The preferred server name always takes precedence, but
3081
// if not specified, the placement will be used to filter by cluster. The caller
3082
// should check for return API errors and return those to the requestor if needed.
3083
func (s *Server) getStepDownPreferredPlacement(group RaftNode, placement *Placement) (string, *ApiError) {
39✔
3084
        if placement == nil {
41✔
3085
                return _EMPTY_, nil
2✔
3086
        }
2✔
3087
        var preferredLeader string
37✔
3088
        if placement.Preferred != _EMPTY_ {
54✔
3089
                for _, p := range group.Peers() {
66✔
3090
                        si, ok := s.nodeToInfo.Load(p.ID)
49✔
3091
                        if !ok || si == nil {
49✔
3092
                                continue
×
3093
                        }
3094
                        if si.(nodeInfo).name == placement.Preferred {
62✔
3095
                                preferredLeader = p.ID
13✔
3096
                                break
13✔
3097
                        }
3098
                }
3099
                if preferredLeader == group.ID() {
21✔
3100
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q is already leader", placement.Preferred))
4✔
3101
                }
4✔
3102
                if preferredLeader == _EMPTY_ {
17✔
3103
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q not known", placement.Preferred))
4✔
3104
                }
4✔
3105
        } else {
20✔
3106
                possiblePeers := make(map[*Peer]nodeInfo, len(group.Peers()))
20✔
3107
                ourID := group.ID()
20✔
3108
                for _, p := range group.Peers() {
116✔
3109
                        if p == nil {
96✔
3110
                                continue // ... shouldn't happen.
×
3111
                        }
3112
                        si, ok := s.nodeToInfo.Load(p.ID)
96✔
3113
                        if !ok || si == nil {
96✔
3114
                                continue
×
3115
                        }
3116
                        ni := si.(nodeInfo)
96✔
3117
                        if ni.offline || p.ID == ourID {
116✔
3118
                                continue
20✔
3119
                        }
3120
                        possiblePeers[p] = ni
76✔
3121
                }
3122
                // If cluster is specified, filter out anything not matching the cluster name.
3123
                if placement.Cluster != _EMPTY_ {
31✔
3124
                        for p, si := range possiblePeers {
51✔
3125
                                if si.cluster != placement.Cluster {
66✔
3126
                                        delete(possiblePeers, p)
26✔
3127
                                }
26✔
3128
                        }
3129
                }
3130
                // If tags are specified, filter out anything not matching all supplied tags.
3131
                if len(placement.Tags) > 0 {
32✔
3132
                        for p, si := range possiblePeers {
55✔
3133
                                matchesAll := true
43✔
3134
                                for _, tag := range placement.Tags {
93✔
3135
                                        if matchesAll = matchesAll && si.tags.Contains(tag); !matchesAll {
82✔
3136
                                                break
32✔
3137
                                        }
3138
                                }
3139
                                if !matchesAll {
75✔
3140
                                        delete(possiblePeers, p)
32✔
3141
                                }
32✔
3142
                        }
3143
                }
3144
                // If there are no possible peers, return an error.
3145
                if len(possiblePeers) == 0 {
28✔
3146
                        return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
8✔
3147
                }
8✔
3148
                // Take advantage of random map iteration order to select the preferred.
3149
                for p := range possiblePeers {
24✔
3150
                        preferredLeader = p.ID
12✔
3151
                        break
12✔
3152
                }
3153
        }
3154
        return preferredLeader, nil
21✔
3155
}
3156

3157
// Request to delete a stream.
3158
func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
507✔
3159
        if c == nil || !s.JetStreamEnabled() {
507✔
3160
                return
×
3161
        }
×
3162
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
507✔
3163
        if err != nil {
507✔
3164
                s.Warnf(badAPIRequestT, msg)
×
3165
                return
×
3166
        }
×
3167

3168
        var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}}
507✔
3169

507✔
3170
        // Determine if we should proceed here when we are in clustered mode.
507✔
3171
        if s.JetStreamIsClustered() {
972✔
3172
                js, cc := s.getJetStreamCluster()
465✔
3173
                if js == nil || cc == nil {
465✔
3174
                        return
×
3175
                }
×
3176
                if js.isLeaderless() {
466✔
3177
                        resp.Error = NewJSClusterNotAvailError()
1✔
3178
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3179
                        return
1✔
3180
                }
1✔
3181
                // Make sure we are meta leader.
3182
                if !s.JetStreamIsLeader() {
830✔
3183
                        return
366✔
3184
                }
366✔
3185
        }
3186

3187
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
140✔
3188
                if doErr {
×
3189
                        resp.Error = NewJSNotEnabledForAccountError()
×
3190
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3191
                }
×
3192
                return
×
3193
        }
3194

3195
        if !isEmptyRequest(msg) {
141✔
3196
                resp.Error = NewJSNotEmptyRequestError()
1✔
3197
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3198
                return
1✔
3199
        }
1✔
3200
        stream := streamNameFromSubject(subject)
139✔
3201

139✔
3202
        // Clustered.
139✔
3203
        if s.JetStreamIsClustered() {
237✔
3204
                s.jsClusteredStreamDeleteRequest(ci, acc, stream, subject, reply, msg)
98✔
3205
                return
98✔
3206
        }
98✔
3207

3208
        mset, err := acc.lookupStream(stream)
41✔
3209
        if err != nil {
46✔
3210
                resp.Error = NewJSStreamNotFoundError(Unless(err))
5✔
3211
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
5✔
3212
                return
5✔
3213
        }
5✔
3214

3215
        if err := mset.delete(); err != nil {
36✔
3216
                resp.Error = NewJSStreamDeleteError(err, Unless(err))
×
3217
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3218
                return
×
3219
        }
×
3220
        resp.Success = true
36✔
3221
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
36✔
3222
}
3223

3224
// Request to delete a message.
3225
// This expects a stream sequence number as the msg body.
3226
func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
780✔
3227
        if c == nil || !s.JetStreamEnabled() {
788✔
3228
                return
8✔
3229
        }
8✔
3230
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
772✔
3231
        if err != nil {
772✔
3232
                s.Warnf(badAPIRequestT, msg)
×
3233
                return
×
3234
        }
×
3235

3236
        stream := tokenAt(subject, 6)
772✔
3237

772✔
3238
        var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}}
772✔
3239

772✔
3240
        // If we are in clustered mode we need to be the stream leader to proceed.
772✔
3241
        if s.JetStreamIsClustered() {
1,128✔
3242
                // Check to make sure the stream is assigned.
356✔
3243
                js, cc := s.getJetStreamCluster()
356✔
3244
                if js == nil || cc == nil {
360✔
3245
                        return
4✔
3246
                }
4✔
3247
                if js.isLeaderless() {
353✔
3248
                        resp.Error = NewJSClusterNotAvailError()
1✔
3249
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3250
                        return
1✔
3251
                }
1✔
3252

3253
                js.mu.RLock()
351✔
3254
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
351✔
3255
                js.mu.RUnlock()
351✔
3256

351✔
3257
                if isLeader && sa == nil {
351✔
3258
                        // We can't find the stream, so mimic what would be the errors below.
×
3259
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3260
                                if doErr {
×
3261
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3262
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3263
                                }
×
3264
                                return
×
3265
                        }
3266
                        // No stream present.
3267
                        resp.Error = NewJSStreamNotFoundError()
×
3268
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3269
                        return
×
3270
                } else if sa == nil {
351✔
3271
                        return
×
3272
                }
×
3273

3274
                // Check to see if we are a member of the group and if the group has no leader.
3275
                if js.isGroupLeaderless(sa.Group) {
351✔
3276
                        resp.Error = NewJSClusterNotAvailError()
×
3277
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3278
                        return
×
3279
                }
×
3280

3281
                // We have the stream assigned and a leader, so only the stream leader should answer.
3282
                if !acc.JetStreamIsStreamLeader(stream) {
589✔
3283
                        return
238✔
3284
                }
238✔
3285
        }
3286

3287
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
700✔
3288
                if doErr {
340✔
3289
                        resp.Error = NewJSNotEnabledForAccountError()
169✔
3290
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
169✔
3291
                }
169✔
3292
                return
171✔
3293
        }
3294
        if isEmptyRequest(msg) {
358✔
3295
                resp.Error = NewJSBadRequestError()
×
3296
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3297
                return
×
3298
        }
×
3299
        var req JSApiMsgDeleteRequest
358✔
3300
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
358✔
3301
                resp.Error = NewJSInvalidJSONError(err)
×
3302
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3303
                return
×
3304
        }
×
3305

3306
        mset, err := acc.lookupStream(stream)
358✔
3307
        if err != nil {
360✔
3308
                resp.Error = NewJSStreamNotFoundError(Unless(err))
2✔
3309
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3310
                return
2✔
3311
        }
2✔
3312
        if mset.cfg.Sealed {
358✔
3313
                resp.Error = NewJSStreamSealedError()
2✔
3314
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3315
                return
2✔
3316
        }
2✔
3317
        if mset.cfg.DenyDelete {
355✔
3318
                resp.Error = NewJSStreamMsgDeleteFailedError(errors.New("message delete not permitted"))
1✔
3319
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3320
                return
1✔
3321
        }
1✔
3322

3323
        if s.JetStreamIsClustered() {
464✔
3324
                s.jsClusteredMsgDeleteRequest(ci, acc, mset, stream, subject, reply, &req, rmsg)
111✔
3325
                return
111✔
3326
        }
111✔
3327

3328
        var removed bool
242✔
3329
        if req.NoErase {
483✔
3330
                removed, err = mset.removeMsg(req.Seq)
241✔
3331
        } else {
242✔
3332
                removed, err = mset.eraseMsg(req.Seq)
1✔
3333
        }
1✔
3334
        if err != nil {
242✔
3335
                resp.Error = NewJSStreamMsgDeleteFailedError(err, Unless(err))
×
3336
        } else if !removed {
242✔
3337
                resp.Error = NewJSSequenceNotFoundError(req.Seq)
×
3338
        } else {
242✔
3339
                resp.Success = true
242✔
3340
        }
242✔
3341
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
242✔
3342
}
3343

3344
// Request to get a raw stream message.
3345
func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
1,599✔
3346
        if c == nil || !s.JetStreamEnabled() {
1,599✔
3347
                return
×
3348
        }
×
3349
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
1,599✔
3350
        if err != nil {
1,599✔
3351
                s.Warnf(badAPIRequestT, msg)
×
3352
                return
×
3353
        }
×
3354

3355
        stream := tokenAt(subject, 6)
1,599✔
3356

1,599✔
3357
        var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}}
1,599✔
3358

1,599✔
3359
        // If we are in clustered mode we need to be the stream leader to proceed.
1,599✔
3360
        if s.JetStreamIsClustered() {
2,564✔
3361
                // Check to make sure the stream is assigned.
965✔
3362
                js, cc := s.getJetStreamCluster()
965✔
3363
                if js == nil || cc == nil {
965✔
3364
                        return
×
3365
                }
×
3366
                if js.isLeaderless() {
965✔
3367
                        resp.Error = NewJSClusterNotAvailError()
×
3368
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3369
                        return
×
3370
                }
×
3371

3372
                js.mu.RLock()
965✔
3373
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
965✔
3374
                js.mu.RUnlock()
965✔
3375

965✔
3376
                if isLeader && sa == nil {
965✔
3377
                        // We can't find the stream, so mimic what would be the errors below.
×
3378
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3379
                                if doErr {
×
3380
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3381
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3382
                                }
×
3383
                                return
×
3384
                        }
3385
                        // No stream present.
3386
                        resp.Error = NewJSStreamNotFoundError()
×
3387
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3388
                        return
×
3389
                } else if sa == nil {
965✔
3390
                        return
×
3391
                }
×
3392

3393
                // Check to see if we are a member of the group and if the group has no leader.
3394
                if js.isGroupLeaderless(sa.Group) {
965✔
3395
                        resp.Error = NewJSClusterNotAvailError()
×
3396
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3397
                        return
×
3398
                }
×
3399

3400
                // We have the stream assigned and a leader, so only the stream leader should answer.
3401
                if !acc.JetStreamIsStreamLeader(stream) {
1,616✔
3402
                        return
651✔
3403
                }
651✔
3404
        }
3405

3406
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
951✔
3407
                if doErr {
3✔
3408
                        resp.Error = NewJSNotEnabledForAccountError()
×
3409
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3410
                }
×
3411
                return
3✔
3412
        }
3413
        if isEmptyRequest(msg) {
945✔
3414
                resp.Error = NewJSBadRequestError()
×
3415
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3416
                return
×
3417
        }
×
3418
        var req JSApiMsgGetRequest
945✔
3419
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
945✔
3420
                resp.Error = NewJSInvalidJSONError(err)
×
3421
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3422
                return
×
3423
        }
×
3424

3425
        // This version does not support batch.
3426
        if req.Batch > 0 || req.MaxBytes > 0 {
946✔
3427
                resp.Error = NewJSBadRequestError()
1✔
3428
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3429
                return
1✔
3430
        }
1✔
3431

3432
        // Validate non-conflicting options. Seq, LastFor, and AsOfTime are mutually exclusive.
3433
        // NextFor can be paired with Seq or AsOfTime indicating a filter subject.
3434
        if (req.Seq > 0 && req.LastFor != _EMPTY_) ||
944✔
3435
                (req.Seq == 0 && req.LastFor == _EMPTY_ && req.NextFor == _EMPTY_ && req.StartTime == nil) ||
944✔
3436
                (req.Seq > 0 && req.StartTime != nil) ||
944✔
3437
                (req.StartTime != nil && req.LastFor != _EMPTY_) ||
944✔
3438
                (req.LastFor != _EMPTY_ && req.NextFor != _EMPTY_) {
948✔
3439
                resp.Error = NewJSBadRequestError()
4✔
3440
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3441
                return
4✔
3442
        }
4✔
3443

3444
        mset, err := acc.lookupStream(stream)
940✔
3445
        if err != nil {
940✔
3446
                resp.Error = NewJSStreamNotFoundError()
×
3447
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3448
                return
×
3449
        }
×
3450

3451
        var svp StoreMsg
940✔
3452
        var sm *StoreMsg
940✔
3453

940✔
3454
        // If AsOfTime is set, perform this first to get the sequence.
940✔
3455
        var seq uint64
940✔
3456
        if req.StartTime != nil {
946✔
3457
                seq = mset.store.GetSeqFromTime(*req.StartTime)
6✔
3458
        } else {
940✔
3459
                seq = req.Seq
934✔
3460
        }
934✔
3461

3462
        if seq > 0 && req.NextFor == _EMPTY_ {
1,181✔
3463
                sm, err = mset.store.LoadMsg(seq, &svp)
241✔
3464
        } else if req.NextFor != _EMPTY_ {
1,042✔
3465
                sm, _, err = mset.store.LoadNextMsg(req.NextFor, subjectHasWildcard(req.NextFor), seq, &svp)
102✔
3466
        } else {
699✔
3467
                sm, err = mset.store.LoadLastMsg(req.LastFor, &svp)
597✔
3468
        }
597✔
3469
        if err != nil {
1,378✔
3470
                resp.Error = NewJSNoMessageFoundError()
438✔
3471
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
438✔
3472
                return
438✔
3473
        }
438✔
3474
        resp.Message = &StoredMsg{
502✔
3475
                Subject:  sm.subj,
502✔
3476
                Sequence: sm.seq,
502✔
3477
                Header:   sm.hdr,
502✔
3478
                Data:     sm.msg,
502✔
3479
                Time:     time.Unix(0, sm.ts).UTC(),
502✔
3480
        }
502✔
3481

502✔
3482
        // Don't send response through API layer for this call.
502✔
3483
        s.sendInternalAccountMsg(nil, reply, s.jsonResponse(resp))
502✔
3484
}
3485

3486
func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
27✔
3487
        if c == nil || !s.JetStreamEnabled() {
27✔
3488
                return
×
3489
        }
×
3490

3491
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
27✔
3492
        if err != nil {
27✔
3493
                s.Warnf(badAPIRequestT, msg)
×
3494
                return
×
3495
        }
×
3496

3497
        stream := streamNameFromSubject(subject)
27✔
3498
        consumer := consumerNameFromSubject(subject)
27✔
3499

27✔
3500
        var req JSApiConsumerUnpinRequest
27✔
3501
        var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}}
27✔
3502

27✔
3503
        if err := json.Unmarshal(msg, &req); err != nil {
27✔
3504
                resp.Error = NewJSInvalidJSONError(err)
×
3505
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3506
                return
×
3507
        }
×
3508

3509
        if req.Group == _EMPTY_ {
31✔
3510
                resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified"))
4✔
3511
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3512
                return
4✔
3513
        }
4✔
3514

3515
        if !validGroupName.MatchString(req.Group) {
27✔
3516
                resp.Error = NewJSConsumerInvalidGroupNameError()
4✔
3517
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
3518
                return
4✔
3519
        }
4✔
3520
        if s.JetStreamIsClustered() {
31✔
3521
                // Check to make sure the stream is assigned.
12✔
3522
                js, cc := s.getJetStreamCluster()
12✔
3523
                if js == nil || cc == nil {
12✔
3524
                        return
×
3525
                }
×
3526

3527
                // First check if the stream and consumer is there.
3528
                js.mu.RLock()
12✔
3529
                sa := js.streamAssignment(acc.Name, stream)
12✔
3530
                if sa == nil {
15✔
3531
                        js.mu.RUnlock()
3✔
3532
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
3✔
3533
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3534
                        return
3✔
3535
                }
3✔
3536

3537
                ca, ok := sa.consumers[consumer]
9✔
3538
                if !ok || ca == nil {
12✔
3539
                        js.mu.RUnlock()
3✔
3540
                        resp.Error = NewJSConsumerNotFoundError()
3✔
3541
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
3542
                        return
3✔
3543
                }
3✔
3544
                js.mu.RUnlock()
6✔
3545

6✔
3546
                // Then check if we are the leader.
6✔
3547
                mset, err := acc.lookupStream(stream)
6✔
3548
                if err != nil {
6✔
3549
                        return
×
3550
                }
×
3551

3552
                o := mset.lookupConsumer(consumer)
6✔
3553
                if o == nil {
6✔
3554
                        return
×
3555
                }
×
3556
                if !o.isLeader() {
10✔
3557
                        return
4✔
3558
                }
4✔
3559
        }
3560

3561
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
9✔
3562
                if doErr {
×
3563
                        resp.Error = NewJSNotEnabledForAccountError()
×
3564
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3565
                }
×
3566
                return
×
3567
        }
3568

3569
        mset, err := acc.lookupStream(stream)
9✔
3570
        if err != nil {
10✔
3571
                resp.Error = NewJSStreamNotFoundError()
1✔
3572
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3573
                return
1✔
3574
        }
1✔
3575
        o := mset.lookupConsumer(consumer)
8✔
3576
        if o == nil {
9✔
3577
                resp.Error = NewJSConsumerNotFoundError()
1✔
3578
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3579
                return
1✔
3580
        }
1✔
3581

3582
        var foundPriority bool
7✔
3583
        for _, group := range o.config().PriorityGroups {
14✔
3584
                if group == req.Group {
12✔
3585
                        foundPriority = true
5✔
3586
                        break
5✔
3587
                }
3588
        }
3589
        if !foundPriority {
9✔
3590
                resp.Error = NewJSConsumerInvalidPriorityGroupError()
2✔
3591
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3592
                return
2✔
3593
        }
2✔
3594

3595
        o.mu.Lock()
5✔
3596
        o.currentPinId = _EMPTY_
5✔
3597
        o.sendUnpinnedAdvisoryLocked(req.Group, "admin")
5✔
3598
        o.mu.Unlock()
5✔
3599
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
5✔
3600
}
3601

3602
// Request to purge a stream.
3603
func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
128✔
3604
        if c == nil || !s.JetStreamEnabled() {
128✔
3605
                return
×
3606
        }
×
3607
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
128✔
3608
        if err != nil {
128✔
3609
                s.Warnf(badAPIRequestT, msg)
×
3610
                return
×
3611
        }
×
3612

3613
        stream := streamNameFromSubject(subject)
128✔
3614

128✔
3615
        var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}}
128✔
3616

128✔
3617
        // If we are in clustered mode we need to be the stream leader to proceed.
128✔
3618
        if s.JetStreamIsClustered() {
229✔
3619
                // Check to make sure the stream is assigned.
101✔
3620
                js, cc := s.getJetStreamCluster()
101✔
3621
                if js == nil || cc == nil {
101✔
3622
                        return
×
3623
                }
×
3624

3625
                js.mu.RLock()
101✔
3626
                isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, stream)
101✔
3627
                js.mu.RUnlock()
101✔
3628

101✔
3629
                if isLeader && sa == nil {
101✔
3630
                        // We can't find the stream, so mimic what would be the errors below.
×
3631
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
×
3632
                                if doErr {
×
3633
                                        resp.Error = NewJSNotEnabledForAccountError()
×
3634
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3635
                                }
×
3636
                                return
×
3637
                        }
3638
                        // No stream present.
3639
                        resp.Error = NewJSStreamNotFoundError()
×
3640
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3641
                        return
×
3642
                } else if sa == nil {
101✔
3643
                        if js.isLeaderless() {
×
3644
                                resp.Error = NewJSClusterNotAvailError()
×
3645
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3646
                        }
×
3647
                        return
×
3648
                }
3649

3650
                // Check to see if we are a member of the group and if the group has no leader.
3651
                if js.isGroupLeaderless(sa.Group) {
101✔
3652
                        resp.Error = NewJSClusterNotAvailError()
×
3653
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3654
                        return
×
3655
                }
×
3656

3657
                // We have the stream assigned and a leader, so only the stream leader should answer.
3658
                if !acc.JetStreamIsStreamLeader(stream) {
170✔
3659
                        if js.isLeaderless() {
70✔
3660
                                resp.Error = NewJSClusterNotAvailError()
1✔
3661
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3662
                        }
1✔
3663
                        return
69✔
3664
                }
3665
        }
3666

3667
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
59✔
3668
                if doErr {
×
3669
                        resp.Error = NewJSNotEnabledForAccountError()
×
3670
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3671
                }
×
3672
                return
×
3673
        }
3674

3675
        var purgeRequest *JSApiStreamPurgeRequest
59✔
3676
        if isJSONObjectOrArray(msg) {
87✔
3677
                var req JSApiStreamPurgeRequest
28✔
3678
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
28✔
3679
                        resp.Error = NewJSInvalidJSONError(err)
×
3680
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3681
                        return
×
3682
                }
×
3683
                if req.Sequence > 0 && req.Keep > 0 {
28✔
3684
                        resp.Error = NewJSBadRequestError()
×
3685
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3686
                        return
×
3687
                }
×
3688
                purgeRequest = &req
28✔
3689
        }
3690

3691
        mset, err := acc.lookupStream(stream)
59✔
3692
        if err != nil {
59✔
3693
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
3694
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3695
                return
×
3696
        }
×
3697
        if mset.cfg.Sealed {
61✔
3698
                resp.Error = NewJSStreamSealedError()
2✔
3699
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3700
                return
2✔
3701
        }
2✔
3702
        if mset.cfg.DenyPurge {
58✔
3703
                resp.Error = NewJSStreamPurgeFailedError(errors.New("stream purge not permitted"))
1✔
3704
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3705
                return
1✔
3706
        }
1✔
3707

3708
        if s.JetStreamIsClustered() {
86✔
3709
                s.jsClusteredStreamPurgeRequest(ci, acc, mset, stream, subject, reply, rmsg, purgeRequest)
30✔
3710
                return
30✔
3711
        }
30✔
3712

3713
        purged, err := mset.purge(purgeRequest)
26✔
3714
        if err != nil {
26✔
3715
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
3716
        } else {
26✔
3717
                resp.Purged = purged
26✔
3718
                resp.Success = true
26✔
3719
        }
26✔
3720
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
26✔
3721
}
3722

3723
func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError {
1,118✔
3724
        var replicas int
1,118✔
3725
        if cfg != nil {
2,236✔
3726
                replicas = cfg.Replicas
1,118✔
3727
        }
1,118✔
3728
        selectedLimits, tier, jsa, apiErr := acc.selectLimits(replicas)
1,118✔
3729
        if apiErr != nil {
1,118✔
3730
                return apiErr
×
3731
        }
×
3732
        jsa.mu.RLock()
1,118✔
3733
        defer jsa.mu.RUnlock()
1,118✔
3734
        if selectedLimits.MaxStreams > 0 && jsa.countStreams(tier, cfg) >= selectedLimits.MaxStreams {
1,121✔
3735
                return NewJSMaximumStreamsLimitError()
3✔
3736
        }
3✔
3737
        reserved := jsa.tieredReservation(tier, cfg)
1,115✔
3738
        if err := jsa.js.checkAllLimits(selectedLimits, cfg, reserved, 0); err != nil {
1,116✔
3739
                return NewJSStreamLimitsError(err, Unless(err))
1✔
3740
        }
1✔
3741
        return nil
1,114✔
3742
}
3743

3744
// Request to restore a stream.
3745
func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
49✔
3746
        if c == nil || !s.JetStreamIsLeader() {
73✔
3747
                return
24✔
3748
        }
24✔
3749
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
25✔
3750
        if err != nil {
25✔
3751
                s.Warnf(badAPIRequestT, msg)
×
3752
                return
×
3753
        }
×
3754

3755
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
25✔
3756
        if !acc.JetStreamEnabled() {
25✔
3757
                resp.Error = NewJSNotEnabledForAccountError()
×
3758
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3759
                return
×
3760
        }
×
3761
        if isEmptyRequest(msg) {
26✔
3762
                resp.Error = NewJSBadRequestError()
1✔
3763
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3764
                return
1✔
3765
        }
1✔
3766

3767
        var req JSApiStreamRestoreRequest
24✔
3768
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
24✔
3769
                resp.Error = NewJSInvalidJSONError(err)
×
3770
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3771
                return
×
3772
        }
×
3773

3774
        stream := streamNameFromSubject(subject)
24✔
3775

24✔
3776
        if stream != req.Config.Name && req.Config.Name == _EMPTY_ {
24✔
3777
                req.Config.Name = stream
×
3778
        }
×
3779

3780
        // check stream config at the start of the restore process, not at the end
3781
        cfg, apiErr := s.checkStreamCfg(&req.Config, acc, false)
24✔
3782
        if apiErr != nil {
26✔
3783
                resp.Error = apiErr
2✔
3784
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3785
                return
2✔
3786
        }
2✔
3787

3788
        if s.JetStreamIsClustered() {
33✔
3789
                s.jsClusteredStreamRestoreRequest(ci, acc, &req, subject, reply, rmsg)
11✔
3790
                return
11✔
3791
        }
11✔
3792

3793
        if err := acc.jsNonClusteredStreamLimitsCheck(&cfg); err != nil {
13✔
3794
                resp.Error = err
2✔
3795
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
3796
                return
2✔
3797
        }
2✔
3798

3799
        if _, err := acc.lookupStream(stream); err == nil {
10✔
3800
                resp.Error = NewJSStreamNameExistRestoreFailedError()
1✔
3801
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
3802
                return
1✔
3803
        }
1✔
3804

3805
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
8✔
3806
                if doErr {
×
3807
                        resp.Error = NewJSNotEnabledForAccountError()
×
3808
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3809
                }
×
3810
                return
×
3811
        }
3812

3813
        s.processStreamRestore(ci, acc, &req.Config, subject, reply, string(msg))
8✔
3814
}
3815

3816
func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamConfig, subject, reply, msg string) <-chan error {
16✔
3817
        js := s.getJetStream()
16✔
3818

16✔
3819
        var resp = JSApiStreamRestoreResponse{ApiResponse: ApiResponse{Type: JSApiStreamRestoreResponseType}}
16✔
3820

16✔
3821
        snapDir := filepath.Join(js.config.StoreDir, snapStagingDir)
16✔
3822
        if _, err := os.Stat(snapDir); os.IsNotExist(err) {
28✔
3823
                if err := os.MkdirAll(snapDir, defaultDirPerms); err != nil {
12✔
3824
                        resp.Error = &ApiError{Code: 503, Description: "JetStream unable to create temp storage for restore"}
×
3825
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
3826
                        return nil
×
3827
                }
×
3828
        }
3829

3830
        tfile, err := os.CreateTemp(snapDir, "js-restore-")
16✔
3831
        if err != nil {
16✔
3832
                resp.Error = NewJSTempStorageFailedError()
×
3833
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
3834
                return nil
×
3835
        }
×
3836

3837
        streamName := cfg.Name
16✔
3838
        s.Noticef("Starting restore for stream '%s > %s'", acc.Name, streamName)
16✔
3839

16✔
3840
        start := time.Now().UTC()
16✔
3841
        domain := s.getOpts().JetStreamDomain
16✔
3842
        s.publishAdvisory(acc, JSAdvisoryStreamRestoreCreatePre+"."+streamName, &JSRestoreCreateAdvisory{
16✔
3843
                TypedEvent: TypedEvent{
16✔
3844
                        Type: JSRestoreCreateAdvisoryType,
16✔
3845
                        ID:   nuid.Next(),
16✔
3846
                        Time: start,
16✔
3847
                },
16✔
3848
                Stream: streamName,
16✔
3849
                Client: ci.forAdvisory(),
16✔
3850
                Domain: domain,
16✔
3851
        })
16✔
3852

16✔
3853
        // Create our internal subscription to accept the snapshot.
16✔
3854
        restoreSubj := fmt.Sprintf(jsRestoreDeliverT, streamName, nuid.Next())
16✔
3855

16✔
3856
        type result struct {
16✔
3857
                err   error
16✔
3858
                reply string
16✔
3859
        }
16✔
3860

16✔
3861
        // For signaling to upper layers.
16✔
3862
        resultCh := make(chan result, 1)
16✔
3863
        activeQ := newIPQueue[int](s, fmt.Sprintf("[ACC:%s] stream '%s' restore", acc.Name, streamName)) // of int
16✔
3864

16✔
3865
        var total int
16✔
3866

16✔
3867
        // FIXME(dlc) - Probably take out of network path eventually due to disk I/O?
16✔
3868
        processChunk := func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
116✔
3869
                // We require reply subjects to communicate back failures, flow etc. If they do not have one log and cancel.
100✔
3870
                if reply == _EMPTY_ {
101✔
3871
                        sub.client.processUnsub(sub.sid)
1✔
3872
                        resultCh <- result{
1✔
3873
                                fmt.Errorf("restore for stream '%s > %s' requires reply subject for each chunk", acc.Name, streamName),
1✔
3874
                                reply,
1✔
3875
                        }
1✔
3876
                        return
1✔
3877
                }
1✔
3878
                // Account client messages have \r\n on end. This is an error.
3879
                if len(msg) < LEN_CR_LF {
99✔
3880
                        sub.client.processUnsub(sub.sid)
×
3881
                        resultCh <- result{
×
3882
                                fmt.Errorf("restore for stream '%s > %s' received short chunk", acc.Name, streamName),
×
3883
                                reply,
×
3884
                        }
×
3885
                        return
×
3886
                }
×
3887
                // Adjust.
3888
                msg = msg[:len(msg)-LEN_CR_LF]
99✔
3889

99✔
3890
                // This means we are complete with our transfer from the client.
99✔
3891
                if len(msg) == 0 {
114✔
3892
                        s.Debugf("Finished staging restore for stream '%s > %s'", acc.Name, streamName)
15✔
3893
                        resultCh <- result{err, reply}
15✔
3894
                        return
15✔
3895
                }
15✔
3896

3897
                // We track total and check on server limits.
3898
                // TODO(dlc) - We could check apriori and cancel initial request if we know it won't fit.
3899
                total += len(msg)
84✔
3900
                if js.wouldExceedLimits(FileStorage, total) {
84✔
3901
                        s.resourcesExceededError()
×
3902
                        resultCh <- result{NewJSInsufficientResourcesError(), reply}
×
3903
                        return
×
3904
                }
×
3905

3906
                // Append chunk to temp file. Mark as issue if we encounter an error.
3907
                if n, err := tfile.Write(msg); n != len(msg) || err != nil {
84✔
3908
                        resultCh <- result{err, reply}
×
3909
                        if reply != _EMPTY_ {
×
3910
                                s.sendInternalAccountMsg(acc, reply, "-ERR 'storage failure during restore'")
×
3911
                        }
×
3912
                        return
×
3913
                }
3914

3915
                activeQ.push(len(msg))
84✔
3916

84✔
3917
                s.sendInternalAccountMsg(acc, reply, nil)
84✔
3918
        }
3919

3920
        sub, err := acc.subscribeInternal(restoreSubj, processChunk)
16✔
3921
        if err != nil {
16✔
3922
                tfile.Close()
×
3923
                os.Remove(tfile.Name())
×
3924
                resp.Error = NewJSRestoreSubscribeFailedError(err, restoreSubj)
×
3925
                s.sendAPIErrResponse(ci, acc, subject, reply, msg, s.jsonResponse(&resp))
×
3926
                return nil
×
3927
        }
×
3928

3929
        // Mark the subject so the end user knows where to send the snapshot chunks.
3930
        resp.DeliverSubject = restoreSubj
16✔
3931
        s.sendAPIResponse(ci, acc, subject, reply, msg, s.jsonResponse(resp))
16✔
3932

16✔
3933
        doneCh := make(chan error, 1)
16✔
3934

16✔
3935
        // Monitor the progress from another Go routine.
16✔
3936
        s.startGoRoutine(func() {
32✔
3937
                defer s.grWG.Done()
16✔
3938
                defer func() {
32✔
3939
                        tfile.Close()
16✔
3940
                        os.Remove(tfile.Name())
16✔
3941
                        sub.client.processUnsub(sub.sid)
16✔
3942
                        activeQ.unregister()
16✔
3943
                }()
16✔
3944

3945
                const activityInterval = 5 * time.Second
16✔
3946
                notActive := time.NewTimer(activityInterval)
16✔
3947
                defer notActive.Stop()
16✔
3948

16✔
3949
                total := 0
16✔
3950
                for {
116✔
3951
                        select {
100✔
3952
                        case result := <-resultCh:
16✔
3953
                                err := result.err
16✔
3954
                                var mset *stream
16✔
3955

16✔
3956
                                // If we staged properly go ahead and do restore now.
16✔
3957
                                if err == nil {
31✔
3958
                                        s.Debugf("Finalizing restore for stream '%s > %s'", acc.Name, streamName)
15✔
3959
                                        tfile.Seek(0, 0)
15✔
3960
                                        mset, err = acc.RestoreStream(cfg, tfile)
15✔
3961
                                } else {
16✔
3962
                                        errStr := err.Error()
1✔
3963
                                        tmp := []rune(errStr)
1✔
3964
                                        tmp[0] = unicode.ToUpper(tmp[0])
1✔
3965
                                        s.Warnf(errStr)
1✔
3966
                                }
1✔
3967

3968
                                end := time.Now().UTC()
16✔
3969

16✔
3970
                                // TODO(rip) - Should this have the error code in it??
16✔
3971
                                s.publishAdvisory(acc, JSAdvisoryStreamRestoreCompletePre+"."+streamName, &JSRestoreCompleteAdvisory{
16✔
3972
                                        TypedEvent: TypedEvent{
16✔
3973
                                                Type: JSRestoreCompleteAdvisoryType,
16✔
3974
                                                ID:   nuid.Next(),
16✔
3975
                                                Time: end,
16✔
3976
                                        },
16✔
3977
                                        Stream: streamName,
16✔
3978
                                        Start:  start,
16✔
3979
                                        End:    end,
16✔
3980
                                        Bytes:  int64(total),
16✔
3981
                                        Client: ci.forAdvisory(),
16✔
3982
                                        Domain: domain,
16✔
3983
                                })
16✔
3984

16✔
3985
                                var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
16✔
3986

16✔
3987
                                if err != nil {
20✔
3988
                                        resp.Error = NewJSStreamRestoreError(err, Unless(err))
4✔
3989
                                        s.Warnf("Restore failed for %s for stream '%s > %s' in %v",
4✔
3990
                                                friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start))
4✔
3991
                                } else {
16✔
3992
                                        msetCfg := mset.config()
12✔
3993
                                        resp.StreamInfo = &StreamInfo{
12✔
3994
                                                Created:   mset.createdTime(),
12✔
3995
                                                State:     mset.state(),
12✔
3996
                                                Config:    *setDynamicStreamMetadata(&msetCfg),
12✔
3997
                                                TimeStamp: time.Now().UTC(),
12✔
3998
                                        }
12✔
3999
                                        s.Noticef("Completed restore of %s for stream '%s > %s' in %v",
12✔
4000
                                                friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start).Round(time.Millisecond))
12✔
4001
                                }
12✔
4002

4003
                                // On the last EOF, send back the stream info or error status.
4004
                                s.sendInternalAccountMsg(acc, result.reply, s.jsonResponse(&resp))
16✔
4005
                                // Signal to the upper layers.
16✔
4006
                                doneCh <- err
16✔
4007
                                return
16✔
4008
                        case <-activeQ.ch:
84✔
4009
                                if n, ok := activeQ.popOne(); ok {
168✔
4010
                                        total += n
84✔
4011
                                        notActive.Reset(activityInterval)
84✔
4012
                                }
84✔
4013
                        case <-notActive.C:
×
4014
                                err := fmt.Errorf("restore for stream '%s > %s' is stalled", acc, streamName)
×
4015
                                doneCh <- err
×
4016
                                return
×
4017
                        }
4018
                }
4019
        })
4020

4021
        return doneCh
16✔
4022
}
4023

4024
// Process a snapshot request.
4025
func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
28✔
4026
        if c == nil || !s.JetStreamEnabled() {
28✔
4027
                return
×
4028
        }
×
4029
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
28✔
4030
        if err != nil {
28✔
4031
                s.Warnf(badAPIRequestT, msg)
×
4032
                return
×
4033
        }
×
4034

4035
        smsg := string(msg)
28✔
4036
        stream := streamNameFromSubject(subject)
28✔
4037

28✔
4038
        // If we are in clustered mode we need to be the stream leader to proceed.
28✔
4039
        if s.JetStreamIsClustered() && !acc.JetStreamIsStreamLeader(stream) {
43✔
4040
                return
15✔
4041
        }
15✔
4042

4043
        var resp = JSApiStreamSnapshotResponse{ApiResponse: ApiResponse{Type: JSApiStreamSnapshotResponseType}}
13✔
4044
        if !acc.JetStreamEnabled() {
13✔
4045
                resp.Error = NewJSNotEnabledForAccountError()
×
4046
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4047
                return
×
4048
        }
×
4049
        if isEmptyRequest(msg) {
14✔
4050
                resp.Error = NewJSBadRequestError()
1✔
4051
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4052
                return
1✔
4053
        }
1✔
4054

4055
        mset, err := acc.lookupStream(stream)
12✔
4056
        if err != nil {
13✔
4057
                resp.Error = NewJSStreamNotFoundError(Unless(err))
1✔
4058
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4059
                return
1✔
4060
        }
1✔
4061

4062
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
11✔
4063
                if doErr {
×
4064
                        resp.Error = NewJSNotEnabledForAccountError()
×
4065
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4066
                }
×
4067
                return
×
4068
        }
4069

4070
        var req JSApiStreamSnapshotRequest
11✔
4071
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
4072
                resp.Error = NewJSInvalidJSONError(err)
×
4073
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4074
                return
×
4075
        }
×
4076
        if !IsValidSubject(req.DeliverSubject) {
12✔
4077
                resp.Error = NewJSSnapshotDeliverSubjectInvalidError()
1✔
4078
                s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
1✔
4079
                return
1✔
4080
        }
1✔
4081

4082
        // We will do the snapshot in a go routine as well since check msgs may
4083
        // stall this go routine.
4084
        go func() {
20✔
4085
                if req.CheckMsgs {
12✔
4086
                        s.Noticef("Starting health check and snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
2✔
4087
                } else {
10✔
4088
                        s.Noticef("Starting snapshot for stream '%s > %s'", mset.jsa.account.Name, mset.name())
8✔
4089
                }
8✔
4090

4091
                start := time.Now().UTC()
10✔
4092

10✔
4093
                sr, err := mset.snapshot(0, req.CheckMsgs, !req.NoConsumers)
10✔
4094
                if err != nil {
10✔
4095
                        s.Warnf("Snapshot of stream '%s > %s' failed: %v", mset.jsa.account.Name, mset.name(), err)
×
4096
                        resp.Error = NewJSStreamSnapshotError(err, Unless(err))
×
4097
                        s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
×
4098
                        return
×
4099
                }
×
4100

4101
                config := mset.config()
10✔
4102
                resp.State = &sr.State
10✔
4103
                resp.Config = &config
10✔
4104

10✔
4105
                s.sendAPIResponse(ci, acc, subject, reply, smsg, s.jsonResponse(resp))
10✔
4106

10✔
4107
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCreatePre+"."+mset.name(), &JSSnapshotCreateAdvisory{
10✔
4108
                        TypedEvent: TypedEvent{
10✔
4109
                                Type: JSSnapshotCreatedAdvisoryType,
10✔
4110
                                ID:   nuid.Next(),
10✔
4111
                                Time: time.Now().UTC(),
10✔
4112
                        },
10✔
4113
                        Stream: mset.name(),
10✔
4114
                        State:  sr.State,
10✔
4115
                        Client: ci.forAdvisory(),
10✔
4116
                        Domain: s.getOpts().JetStreamDomain,
10✔
4117
                })
10✔
4118

10✔
4119
                // Now do the real streaming.
10✔
4120
                s.streamSnapshot(acc, mset, sr, &req)
10✔
4121

10✔
4122
                end := time.Now().UTC()
10✔
4123

10✔
4124
                s.publishAdvisory(acc, JSAdvisoryStreamSnapshotCompletePre+"."+mset.name(), &JSSnapshotCompleteAdvisory{
10✔
4125
                        TypedEvent: TypedEvent{
10✔
4126
                                Type: JSSnapshotCompleteAdvisoryType,
10✔
4127
                                ID:   nuid.Next(),
10✔
4128
                                Time: end,
10✔
4129
                        },
10✔
4130
                        Stream: mset.name(),
10✔
4131
                        Start:  start,
10✔
4132
                        End:    end,
10✔
4133
                        Client: ci.forAdvisory(),
10✔
4134
                        Domain: s.getOpts().JetStreamDomain,
10✔
4135
                })
10✔
4136

10✔
4137
                s.Noticef("Completed snapshot of %s for stream '%s > %s' in %v",
10✔
4138
                        friendlyBytes(int64(sr.State.Bytes)),
10✔
4139
                        mset.jsa.account.Name,
10✔
4140
                        mset.name(),
10✔
4141
                        end.Sub(start))
10✔
4142
        }()
4143
}
4144

4145
// Default chunk size for now.
4146
const defaultSnapshotChunkSize = 128 * 1024
4147
const defaultSnapshotWindowSize = 8 * 1024 * 1024 // 8MB
4148

4149
// streamSnapshot will stream out our snapshot to the reply subject.
4150
func (s *Server) streamSnapshot(acc *Account, mset *stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) {
10✔
4151
        chunkSize := req.ChunkSize
10✔
4152
        if chunkSize == 0 {
12✔
4153
                chunkSize = defaultSnapshotChunkSize
2✔
4154
        }
2✔
4155
        // Setup for the chunk stream.
4156
        reply := req.DeliverSubject
10✔
4157
        r := sr.Reader
10✔
4158
        defer r.Close()
10✔
4159

10✔
4160
        // Check interest for the snapshot deliver subject.
10✔
4161
        inch := make(chan bool, 1)
10✔
4162
        acc.sl.RegisterNotification(req.DeliverSubject, inch)
10✔
4163
        defer acc.sl.ClearNotification(req.DeliverSubject, inch)
10✔
4164
        hasInterest := <-inch
10✔
4165
        if !hasInterest {
16✔
4166
                // Allow 2 seconds or so for interest to show up.
6✔
4167
                select {
6✔
4168
                case <-inch:
5✔
4169
                case <-time.After(2 * time.Second):
1✔
4170
                }
4171
        }
4172

4173
        // Create our ack flow handler.
4174
        // This is very simple for now.
4175
        ackSize := defaultSnapshotWindowSize / chunkSize
10✔
4176
        if ackSize < 8 {
10✔
4177
                ackSize = 8
×
4178
        } else if ackSize > 8*1024 {
16✔
4179
                ackSize = 8 * 1024
6✔
4180
        }
6✔
4181
        acks := make(chan struct{}, ackSize)
10✔
4182
        acks <- struct{}{}
10✔
4183

10✔
4184
        // Track bytes outstanding.
10✔
4185
        var out int32
10✔
4186

10✔
4187
        // We will place sequence number and size of chunk sent in the reply.
10✔
4188
        ackSubj := fmt.Sprintf(jsSnapshotAckT, mset.name(), nuid.Next())
10✔
4189
        ackSub, _ := mset.subscribeInternal(ackSubj+".>", func(_ *subscription, _ *client, _ *Account, subject, _ string, _ []byte) {
67✔
4190
                cs, _ := strconv.Atoi(tokenAt(subject, 6))
57✔
4191
                // This is very crude and simple, but ok for now.
57✔
4192
                // This only matters when sending multiple chunks.
57✔
4193
                if atomic.AddInt32(&out, int32(-cs)) < defaultSnapshotWindowSize {
114✔
4194
                        select {
57✔
4195
                        case acks <- struct{}{}:
57✔
4196
                        default:
×
4197
                        }
4198
                }
4199
        })
4200
        defer mset.unsubscribe(ackSub)
10✔
4201

10✔
4202
        // TODO(dlc) - Add in NATS-Chunked-Sequence header
10✔
4203
        var hdr []byte
10✔
4204
        for index := 1; ; index++ {
110✔
4205
                chunk := make([]byte, chunkSize)
100✔
4206
                n, err := r.Read(chunk)
100✔
4207
                chunk = chunk[:n]
100✔
4208
                if err != nil {
110✔
4209
                        if n > 0 {
10✔
4210
                                mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, chunk, nil, 0))
×
4211
                        }
×
4212
                        break
10✔
4213
                }
4214

4215
                // Wait on acks for flow control if past our window size.
4216
                // Wait up to 10ms for now if no acks received.
4217
                if atomic.LoadInt32(&out) > defaultSnapshotWindowSize {
90✔
4218
                        select {
×
4219
                        case <-acks:
×
4220
                                // ok to proceed.
4221
                        case <-inch:
×
4222
                                // Lost interest
×
4223
                                hdr = []byte("NATS/1.0 408 No Interest\r\n\r\n")
×
4224
                                goto done
×
4225
                        case <-time.After(2 * time.Second):
×
4226
                                hdr = []byte("NATS/1.0 408 No Flow Response\r\n\r\n")
×
4227
                                goto done
×
4228
                        }
4229
                }
4230
                ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index)
90✔
4231
                if hdr == nil {
100✔
4232
                        hdr = []byte("NATS/1.0 204\r\n\r\n")
10✔
4233
                }
10✔
4234
                mset.outq.send(newJSPubMsg(reply, _EMPTY_, ackReply, nil, chunk, nil, 0))
90✔
4235
                atomic.AddInt32(&out, int32(len(chunk)))
90✔
4236
        }
4237

4238
        if err := <-sr.errCh; err != _EMPTY_ {
10✔
4239
                hdr = []byte(fmt.Sprintf("NATS/1.0 500 %s\r\n\r\n", err))
×
4240
        }
×
4241

4242
done:
4243
        // Send last EOF
4244
        // TODO(dlc) - place hash in header
4245
        mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
10✔
4246
}
4247

4248
// For determining consumer request type.
4249
type ccReqType uint8
4250

4251
const (
4252
        ccNew = iota
4253
        ccLegacyEphemeral
4254
        ccLegacyDurable
4255
)
4256

4257
// Request to create a consumer where stream and optional consumer name are part of the subject, and optional
4258
// filtered subjects can be at the tail end.
4259
// Assumes stream and consumer names are single tokens.
4260
func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Account, subject, reply string, rmsg []byte) {
10,621✔
4261
        if c == nil || !s.JetStreamEnabled() {
10,621✔
4262
                return
×
4263
        }
×
4264

4265
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
10,621✔
4266
        if err != nil {
10,622✔
4267
                s.Warnf(badAPIRequestT, msg)
1✔
4268
                return
1✔
4269
        }
1✔
4270

4271
        var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
10,620✔
4272

10,620✔
4273
        var req CreateConsumerRequest
10,620✔
4274
        if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
10,621✔
4275
                resp.Error = NewJSInvalidJSONError(err)
1✔
4276
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4277
                return
1✔
4278
        }
1✔
4279

4280
        var js *jetStream
10,619✔
4281
        isClustered := s.JetStreamIsClustered()
10,619✔
4282

10,619✔
4283
        // Determine if we should proceed here when we are in clustered mode.
10,619✔
4284
        if isClustered {
20,328✔
4285
                if req.Config.Direct {
10,064✔
4286
                        // Check to see if we have this stream and are the stream leader.
355✔
4287
                        if !acc.JetStreamIsStreamLeader(streamNameFromSubject(subject)) {
634✔
4288
                                return
279✔
4289
                        }
279✔
4290
                } else {
9,354✔
4291
                        var cc *jetStreamCluster
9,354✔
4292
                        js, cc = s.getJetStreamCluster()
9,354✔
4293
                        if js == nil || cc == nil {
9,354✔
4294
                                return
×
4295
                        }
×
4296
                        if js.isLeaderless() {
9,354✔
4297
                                resp.Error = NewJSClusterNotAvailError()
×
4298
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4299
                                return
×
4300
                        }
×
4301
                        // Make sure we are meta leader.
4302
                        if !s.JetStreamIsLeader() {
15,681✔
4303
                                return
6,327✔
4304
                        }
6,327✔
4305
                }
4306
        }
4307

4308
        var streamName, consumerName, filteredSubject string
4,013✔
4309
        var rt ccReqType
4,013✔
4310

4,013✔
4311
        if n := numTokens(subject); n < 5 {
4,013✔
4312
                s.Warnf(badAPIRequestT, msg)
×
4313
                return
×
4314
        } else if n == 5 {
4,755✔
4315
                // Legacy ephemeral.
742✔
4316
                rt = ccLegacyEphemeral
742✔
4317
                streamName = streamNameFromSubject(subject)
742✔
4318
        } else {
4,013✔
4319
                // New style and durable legacy.
3,271✔
4320
                if tokenAt(subject, 4) == "DURABLE" {
3,529✔
4321
                        rt = ccLegacyDurable
258✔
4322
                        if n != 7 {
258✔
4323
                                resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4324
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4325
                                return
×
4326
                        }
×
4327
                        streamName = tokenAt(subject, 6)
258✔
4328
                        consumerName = tokenAt(subject, 7)
258✔
4329
                } else {
3,013✔
4330
                        streamName = streamNameFromSubject(subject)
3,013✔
4331
                        consumerName = consumerNameFromSubject(subject)
3,013✔
4332
                        // New has optional filtered subject as part of main subject..
3,013✔
4333
                        if n > 6 {
5,616✔
4334
                                tokens := strings.Split(subject, tsep)
2,603✔
4335
                                filteredSubject = strings.Join(tokens[6:], tsep)
2,603✔
4336
                        }
2,603✔
4337
                }
4338
        }
4339

4340
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
4,016✔
4341
                if doErr {
4✔
4342
                        resp.Error = NewJSNotEnabledForAccountError()
1✔
4343
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4344
                }
1✔
4345
                return
3✔
4346
        }
4347

4348
        if streamName != req.Stream {
4,011✔
4349
                resp.Error = NewJSStreamMismatchError()
1✔
4350
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4351
                return
1✔
4352
        }
1✔
4353

4354
        if consumerName != _EMPTY_ {
7,278✔
4355
                // Check for path like separators in the name.
3,269✔
4356
                if strings.ContainsAny(consumerName, `\/`) {
3,273✔
4357
                        resp.Error = NewJSConsumerNameContainsPathSeparatorsError()
4✔
4358
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4359
                        return
4✔
4360
                }
4✔
4361
        }
4362

4363
        // Should we expect a durable name
4364
        if rt == ccLegacyDurable {
4,262✔
4365
                if numTokens(subject) < 7 {
257✔
4366
                        resp.Error = NewJSConsumerDurableNameNotInSubjectError()
×
4367
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4368
                        return
×
4369
                }
×
4370
                // Now check on requirements for durable request.
4371
                if req.Config.Durable == _EMPTY_ {
258✔
4372
                        resp.Error = NewJSConsumerDurableNameNotSetError()
1✔
4373
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4374
                        return
1✔
4375
                }
1✔
4376
                if consumerName != req.Config.Durable {
256✔
4377
                        resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4378
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4379
                        return
×
4380
                }
×
4381
        }
4382
        // If new style and durable set make sure they match.
4383
        if rt == ccNew {
7,012✔
4384
                if req.Config.Durable != _EMPTY_ {
5,559✔
4385
                        if consumerName != req.Config.Durable {
2,551✔
4386
                                resp.Error = NewJSConsumerDurableNameNotMatchSubjectError()
×
4387
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4388
                                return
×
4389
                        }
×
4390
                }
4391
                // New style ephemeral so we need to honor the name.
4392
                req.Config.Name = consumerName
3,008✔
4393
        }
4394
        // Check for legacy ephemeral mis-configuration.
4395
        if rt == ccLegacyEphemeral && req.Config.Durable != _EMPTY_ {
4,007✔
4396
                resp.Error = NewJSConsumerEphemeralWithDurableNameError()
3✔
4397
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
3✔
4398
                return
3✔
4399
        }
3✔
4400

4401
        // in case of multiple filters provided, error if new API is used.
4402
        if filteredSubject != _EMPTY_ && len(req.Config.FilterSubjects) != 0 {
4,002✔
4403
                resp.Error = NewJSConsumerMultipleFiltersNotAllowedError()
1✔
4404
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4405
                return
1✔
4406
        }
1✔
4407

4408
        // Check for a filter subject.
4409
        if filteredSubject != _EMPTY_ && req.Config.FilterSubject != filteredSubject {
4,002✔
4410
                resp.Error = NewJSConsumerCreateFilterSubjectMismatchError()
2✔
4411
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2✔
4412
                return
2✔
4413
        }
2✔
4414

4415
        if isClustered && !req.Config.Direct {
7,023✔
4416
                // If we are inline with client, we still may need to do a callout for consumer info
3,025✔
4417
                // during this call, so place in Go routine to not block client.
3,025✔
4418
                // Router and Gateway API calls already in separate context.
3,025✔
4419
                if c.kind != ROUTER && c.kind != GATEWAY {
6,050✔
4420
                        go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
3,025✔
4421
                } else {
3,025✔
4422
                        s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config, req.Action, req.Pedantic)
×
4423
                }
×
4424
                return
3,025✔
4425
        }
4426

4427
        // If we are here we are single server mode.
4428
        if req.Config.Replicas > 1 {
973✔
4429
                resp.Error = NewJSStreamReplicasNotSupportedError()
×
4430
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4431
                return
×
4432
        }
×
4433

4434
        stream, err := acc.lookupStream(req.Stream)
973✔
4435
        if err != nil {
977✔
4436
                resp.Error = NewJSStreamNotFoundError(Unless(err))
4✔
4437
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4✔
4438
                return
4✔
4439
        }
4✔
4440

4441
        if o := stream.lookupConsumer(consumerName); o != nil {
1,013✔
4442
                // If the consumer already exists then don't allow updating the PauseUntil, just set
44✔
4443
                // it back to whatever the current configured value is.
44✔
4444
                req.Config.PauseUntil = o.cfg.PauseUntil
44✔
4445
        }
44✔
4446

4447
        // Initialize/update asset version metadata.
4448
        setStaticConsumerMetadata(&req.Config)
969✔
4449

969✔
4450
        o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic)
969✔
4451

969✔
4452
        if err != nil {
1,019✔
4453
                if IsNatsErr(err, JSConsumerStoreFailedErrF) {
50✔
4454
                        cname := req.Config.Durable // Will be empty if ephemeral.
×
4455
                        s.Warnf("Consumer create failed for '%s > %s > %s': %v", acc, req.Stream, cname, err)
×
4456
                        err = errConsumerStoreFailed
×
4457
                }
×
4458
                resp.Error = NewJSConsumerCreateError(err, Unless(err))
50✔
4459
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
50✔
4460
                return
50✔
4461
        }
4462
        resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.initialInfo())
919✔
4463
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
919✔
4464

919✔
4465
        if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
923✔
4466
                o.sendPauseAdvisoryLocked(&o.cfg)
4✔
4467
        }
4✔
4468
}
4469

4470
// Request for the list of all consumer names.
4471
func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
21✔
4472
        if c == nil || !s.JetStreamEnabled() {
21✔
4473
                return
×
4474
        }
×
4475
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
21✔
4476
        if err != nil {
21✔
4477
                s.Warnf(badAPIRequestT, msg)
×
4478
                return
×
4479
        }
×
4480

4481
        var resp = JSApiConsumerNamesResponse{
21✔
4482
                ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType},
21✔
4483
                Consumers:   []string{},
21✔
4484
        }
21✔
4485

21✔
4486
        // Determine if we should proceed here when we are in clustered mode.
21✔
4487
        if s.JetStreamIsClustered() {
39✔
4488
                js, cc := s.getJetStreamCluster()
18✔
4489
                if js == nil || cc == nil {
18✔
4490
                        return
×
4491
                }
×
4492
                if js.isLeaderless() {
18✔
4493
                        resp.Error = NewJSClusterNotAvailError()
×
4494
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4495
                        return
×
4496
                }
×
4497
                // Make sure we are meta leader.
4498
                if !s.JetStreamIsLeader() {
30✔
4499
                        return
12✔
4500
                }
12✔
4501
        }
4502

4503
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
9✔
4504
                if doErr {
×
4505
                        resp.Error = NewJSNotEnabledForAccountError()
×
4506
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4507
                }
×
4508
                return
×
4509
        }
4510

4511
        var offset int
9✔
4512
        if isJSONObjectOrArray(msg) {
17✔
4513
                var req JSApiConsumersRequest
8✔
4514
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
8✔
4515
                        resp.Error = NewJSInvalidJSONError(err)
×
4516
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4517
                        return
×
4518
                }
×
4519
                offset = req.Offset
8✔
4520
        }
4521

4522
        streamName := streamNameFromSubject(subject)
9✔
4523
        var numConsumers int
9✔
4524

9✔
4525
        if s.JetStreamIsClustered() {
15✔
4526
                js, cc := s.getJetStreamCluster()
6✔
4527
                if js == nil || cc == nil {
6✔
4528
                        // TODO(dlc) - Debug or Warn?
×
4529
                        return
×
4530
                }
×
4531
                js.mu.RLock()
6✔
4532
                sas := cc.streams[acc.Name]
6✔
4533
                if sas == nil {
6✔
4534
                        js.mu.RUnlock()
×
4535
                        resp.Error = NewJSStreamNotFoundError()
×
4536
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4537
                        return
×
4538
                }
×
4539
                sa := sas[streamName]
6✔
4540
                if sa == nil || sa.err != nil {
6✔
4541
                        js.mu.RUnlock()
×
4542
                        resp.Error = NewJSStreamNotFoundError()
×
4543
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4544
                        return
×
4545
                }
×
4546
                for consumer := range sa.consumers {
11✔
4547
                        resp.Consumers = append(resp.Consumers, consumer)
5✔
4548
                }
5✔
4549
                if len(resp.Consumers) > 1 {
6✔
4550
                        slices.Sort(resp.Consumers)
×
4551
                }
×
4552
                numConsumers = len(resp.Consumers)
6✔
4553
                if offset > numConsumers {
6✔
4554
                        offset = numConsumers
×
4555
                }
×
4556
                resp.Consumers = resp.Consumers[offset:]
6✔
4557
                if len(resp.Consumers) > JSApiNamesLimit {
6✔
4558
                        resp.Consumers = resp.Consumers[:JSApiNamesLimit]
×
4559
                }
×
4560
                js.mu.RUnlock()
6✔
4561

4562
        } else {
3✔
4563
                mset, err := acc.lookupStream(streamName)
3✔
4564
                if err != nil {
3✔
4565
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4566
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4567
                        return
×
4568
                }
×
4569

4570
                obs := mset.getPublicConsumers()
3✔
4571
                slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
3✔
4572

4573
                numConsumers = len(obs)
3✔
4574
                if offset > numConsumers {
3✔
4575
                        offset = numConsumers
×
4576
                }
×
4577

4578
                for _, o := range obs[offset:] {
5✔
4579
                        resp.Consumers = append(resp.Consumers, o.String())
2✔
4580
                        if len(resp.Consumers) >= JSApiNamesLimit {
2✔
4581
                                break
×
4582
                        }
4583
                }
4584
        }
4585
        resp.Total = numConsumers
9✔
4586
        resp.Limit = JSApiNamesLimit
9✔
4587
        resp.Offset = offset
9✔
4588
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
9✔
4589
}
4590

4591
// Request for the list of all detailed consumer information.
4592
func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
71✔
4593
        if c == nil || !s.JetStreamEnabled() {
79✔
4594
                return
8✔
4595
        }
8✔
4596

4597
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
63✔
4598
        if err != nil {
63✔
4599
                s.Warnf(badAPIRequestT, msg)
×
4600
                return
×
4601
        }
×
4602

4603
        var resp = JSApiConsumerListResponse{
63✔
4604
                ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType},
63✔
4605
                Consumers:   []*ConsumerInfo{},
63✔
4606
        }
63✔
4607

63✔
4608
        // Determine if we should proceed here when we are in clustered mode.
63✔
4609
        if s.JetStreamIsClustered() {
122✔
4610
                js, cc := s.getJetStreamCluster()
59✔
4611
                if js == nil || cc == nil {
59✔
4612
                        return
×
4613
                }
×
4614
                if js.isLeaderless() {
60✔
4615
                        resp.Error = NewJSClusterNotAvailError()
1✔
4616
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4617
                        return
1✔
4618
                }
1✔
4619
                // Make sure we are meta leader.
4620
                if !s.JetStreamIsLeader() {
105✔
4621
                        return
47✔
4622
                }
47✔
4623
        }
4624

4625
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
15✔
4626
                if doErr {
×
4627
                        resp.Error = NewJSNotEnabledForAccountError()
×
4628
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4629
                }
×
4630
                return
×
4631
        }
4632

4633
        var offset int
15✔
4634
        if isJSONObjectOrArray(msg) {
25✔
4635
                var req JSApiConsumersRequest
10✔
4636
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
11✔
4637
                        resp.Error = NewJSInvalidJSONError(err)
1✔
4638
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4639
                        return
1✔
4640
                }
1✔
4641
                offset = req.Offset
9✔
4642
        }
4643

4644
        streamName := streamNameFromSubject(subject)
14✔
4645

14✔
4646
        // Clustered mode will invoke a scatter and gather.
14✔
4647
        if s.JetStreamIsClustered() {
25✔
4648
                // Need to copy these off before sending.. don't move this inside startGoRoutine!!!
11✔
4649
                msg = copyBytes(msg)
11✔
4650
                s.startGoRoutine(func() {
22✔
4651
                        s.jsClusteredConsumerListRequest(acc, ci, offset, streamName, subject, reply, msg)
11✔
4652
                })
11✔
4653
                return
11✔
4654
        }
4655

4656
        mset, err := acc.lookupStream(streamName)
3✔
4657
        if err != nil {
3✔
4658
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4659
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4660
                return
×
4661
        }
×
4662

4663
        obs := mset.getPublicConsumers()
3✔
4664
        slices.SortFunc(obs, func(i, j *consumer) int { return cmp.Compare(i.name, j.name) })
3✔
4665

4666
        ocnt := len(obs)
3✔
4667
        if offset > ocnt {
3✔
4668
                offset = ocnt
×
4669
        }
×
4670

4671
        for _, o := range obs[offset:] {
5✔
4672
                if cinfo := o.info(); cinfo != nil {
4✔
4673
                        resp.Consumers = append(resp.Consumers, cinfo)
2✔
4674
                }
2✔
4675
                if len(resp.Consumers) >= JSApiListLimit {
2✔
4676
                        break
×
4677
                }
4678
        }
4679
        resp.Total = ocnt
3✔
4680
        resp.Limit = JSApiListLimit
3✔
4681
        resp.Offset = offset
3✔
4682
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
3✔
4683
}
4684

4685
// Request for information about an consumer.
4686
func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
42,771✔
4687
        if c == nil || !s.JetStreamEnabled() {
42,771✔
4688
                return
×
4689
        }
×
4690
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
42,771✔
4691
        if err != nil {
42,771✔
4692
                s.Warnf(badAPIRequestT, msg)
×
4693
                return
×
4694
        }
×
4695

4696
        streamName := streamNameFromSubject(subject)
42,771✔
4697
        consumerName := consumerNameFromSubject(subject)
42,771✔
4698

42,771✔
4699
        var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}}
42,771✔
4700

42,771✔
4701
        if !isEmptyRequest(msg) {
42,772✔
4702
                resp.Error = NewJSNotEmptyRequestError()
1✔
4703
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4704
                return
1✔
4705
        }
1✔
4706

4707
        // If we are in clustered mode we need to be the consumer leader to proceed.
4708
        if s.JetStreamIsClustered() {
84,139✔
4709
                // Check to make sure the consumer is assigned.
41,369✔
4710
                js, cc := s.getJetStreamCluster()
41,369✔
4711
                if js == nil || cc == nil {
41,369✔
4712
                        return
×
4713
                }
×
4714

4715
                js.mu.RLock()
41,369✔
4716
                meta := cc.meta
41,369✔
4717
                js.mu.RUnlock()
41,369✔
4718

41,369✔
4719
                // Since these could wait on the Raft group lock, don't do so under the JS lock.
41,369✔
4720
                ourID := meta.ID()
41,369✔
4721
                groupLeaderless := meta.Leaderless()
41,369✔
4722
                groupCreated := meta.Created()
41,369✔
4723

41,369✔
4724
                js.mu.RLock()
41,369✔
4725
                isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName)
41,369✔
4726
                var rg *raftGroup
41,369✔
4727
                var offline, isMember bool
41,369✔
4728
                if ca != nil {
45,383✔
4729
                        if rg = ca.Group; rg != nil {
8,028✔
4730
                                offline = s.allPeersOffline(rg)
4,014✔
4731
                                isMember = rg.isMember(ourID)
4,014✔
4732
                        }
4,014✔
4733
                }
4734
                // Capture consumer leader here.
4735
                isConsumerLeader := cc.isConsumerLeader(acc.Name, streamName, consumerName)
41,369✔
4736
                // Also capture if we think there is no meta leader.
41,369✔
4737
                var isLeaderLess bool
41,369✔
4738
                if !isLeader {
69,137✔
4739
                        isLeaderLess = groupLeaderless && time.Since(groupCreated) > lostQuorumIntervalDefault
27,768✔
4740
                }
27,768✔
4741
                js.mu.RUnlock()
41,369✔
4742

41,369✔
4743
                if isLeader && ca == nil {
53,733✔
4744
                        // We can't find the consumer, so mimic what would be the errors below.
12,364✔
4745
                        if hasJS, doErr := acc.checkJetStream(); !hasJS {
12,364✔
4746
                                if doErr {
×
4747
                                        resp.Error = NewJSNotEnabledForAccountError()
×
4748
                                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4749
                                }
×
4750
                                return
×
4751
                        }
4752
                        if sa == nil {
22,367✔
4753
                                resp.Error = NewJSStreamNotFoundError()
10,003✔
4754
                                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
10,003✔
4755
                                return
10,003✔
4756
                        }
10,003✔
4757
                        // If we are here the consumer is not present.
4758
                        resp.Error = NewJSConsumerNotFoundError()
2,361✔
4759
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2,361✔
4760
                        return
2,361✔
4761
                } else if ca == nil {
53,996✔
4762
                        if isLeaderLess {
24,993✔
4763
                                resp.Error = NewJSClusterNotAvailError()
2✔
4764
                                // Delaying an error response gives the leader a chance to respond before us
2✔
4765
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
2✔
4766
                        }
2✔
4767
                        return
24,991✔
4768
                } else if isLeader && offline {
4,015✔
4769
                        resp.Error = NewJSConsumerOfflineError()
1✔
4770
                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
1✔
4771
                        return
1✔
4772
                }
1✔
4773

4774
                // Check to see if we are a member of the group and if the group has no leader.
4775
                if isMember && js.isGroupLeaderless(ca.Group) {
4,014✔
4776
                        resp.Error = NewJSClusterNotAvailError()
1✔
4777
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4778
                        return
1✔
4779
                }
1✔
4780

4781
                // We have the consumer assigned and a leader, so only the consumer leader should answer.
4782
                if !isConsumerLeader {
6,793✔
4783
                        if isLeaderLess {
2,781✔
4784
                                resp.Error = NewJSClusterNotAvailError()
×
4785
                                // Delaying an error response gives the leader a chance to respond before us
×
4786
                                s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group, errRespDelay)
×
4787
                                return
×
4788
                        }
×
4789

4790
                        var node RaftNode
2,781✔
4791
                        var leaderNotPartOfGroup bool
2,781✔
4792

2,781✔
4793
                        // We have a consumer assignment.
2,781✔
4794
                        if isMember {
4,927✔
4795
                                js.mu.RLock()
2,146✔
4796
                                if rg != nil && rg.node != nil {
4,292✔
4797
                                        node = rg.node
2,146✔
4798
                                        if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
2,146✔
4799
                                                leaderNotPartOfGroup = true
×
4800
                                        }
×
4801
                                }
4802
                                js.mu.RUnlock()
2,146✔
4803
                        }
4804

4805
                        // Check if we should ignore all together.
4806
                        if node == nil {
3,416✔
4807
                                // We have been assigned but have not created a node yet. If we are a member return
635✔
4808
                                // our config and defaults for state and no cluster info.
635✔
4809
                                if isMember {
635✔
4810
                                        // Since we access consumerAssignment, need js lock.
×
4811
                                        js.mu.RLock()
×
4812
                                        resp.ConsumerInfo = &ConsumerInfo{
×
4813
                                                Stream:    ca.Stream,
×
4814
                                                Name:      ca.Name,
×
4815
                                                Created:   ca.Created,
×
4816
                                                Config:    setDynamicConsumerMetadata(ca.Config),
×
4817
                                                TimeStamp: time.Now().UTC(),
×
4818
                                        }
×
4819
                                        b := s.jsonResponse(resp)
×
4820
                                        js.mu.RUnlock()
×
4821
                                        s.sendAPIResponse(ci, acc, subject, reply, string(msg), b)
×
4822
                                }
×
4823
                                return
635✔
4824
                        }
4825
                        // If we are a member and we have a group leader or we had a previous leader consider bailing out.
4826
                        if !node.Leaderless() || node.HadPreviousLeader() || (rg != nil && rg.Preferred != _EMPTY_ && rg.Preferred != ourID) {
4,288✔
4827
                                if leaderNotPartOfGroup {
2,142✔
4828
                                        resp.Error = NewJSConsumerOfflineError()
×
4829
                                        s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil, errRespDelay)
×
4830
                                }
×
4831
                                return
2,142✔
4832
                        }
4833
                        // If we are here we are a member and this is just a new consumer that does not have a (preferred) leader yet.
4834
                        // Will fall through and return what we have. All consumers can respond but this should be very rare
4835
                        // but makes more sense to clients when they try to create, get a consumer exists, and then do consumer info.
4836
                }
4837
        }
4838

4839
        if !acc.JetStreamEnabled() {
2,636✔
4840
                resp.Error = NewJSNotEnabledForAccountError()
×
4841
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4842
                return
×
4843
        }
×
4844

4845
        mset, err := acc.lookupStream(streamName)
2,636✔
4846
        if err != nil {
2,636✔
4847
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4848
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4849
                return
×
4850
        }
×
4851

4852
        obs := mset.lookupConsumer(consumerName)
2,636✔
4853
        if obs == nil {
2,802✔
4854
                resp.Error = NewJSConsumerNotFoundError()
166✔
4855
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
166✔
4856
                return
166✔
4857
        }
166✔
4858

4859
        if resp.ConsumerInfo = setDynamicConsumerInfoMetadata(obs.info()); resp.ConsumerInfo == nil {
2,470✔
4860
                // This consumer returned nil which means it's closed. Respond with not found.
×
4861
                resp.Error = NewJSConsumerNotFoundError()
×
4862
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4863
                return
×
4864
        }
×
4865
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
2,470✔
4866
}
4867

4868
// Request to delete an Consumer.
4869
func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
7,356✔
4870
        if c == nil || !s.JetStreamEnabled() {
7,364✔
4871
                return
8✔
4872
        }
8✔
4873
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
7,348✔
4874
        if err != nil {
7,348✔
4875
                s.Warnf(badAPIRequestT, msg)
×
4876
                return
×
4877
        }
×
4878

4879
        var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}}
7,348✔
4880

7,348✔
4881
        // Determine if we should proceed here when we are in clustered mode.
7,348✔
4882
        if s.JetStreamIsClustered() {
14,270✔
4883
                js, cc := s.getJetStreamCluster()
6,922✔
4884
                if js == nil || cc == nil {
6,923✔
4885
                        return
1✔
4886
                }
1✔
4887
                if js.isLeaderless() {
6,922✔
4888
                        resp.Error = NewJSClusterNotAvailError()
1✔
4889
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4890
                        return
1✔
4891
                }
1✔
4892
                // Make sure we are meta leader.
4893
                if !s.JetStreamIsLeader() {
11,512✔
4894
                        return
4,592✔
4895
                }
4,592✔
4896
        }
4897

4898
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
2,834✔
4899
                if doErr {
157✔
4900
                        resp.Error = NewJSNotEnabledForAccountError()
77✔
4901
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
77✔
4902
                }
77✔
4903
                return
80✔
4904
        }
4905
        if !isEmptyRequest(msg) {
2,675✔
4906
                resp.Error = NewJSNotEmptyRequestError()
1✔
4907
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
1✔
4908
                return
1✔
4909
        }
1✔
4910
        stream := streamNameFromSubject(subject)
2,673✔
4911
        consumer := consumerNameFromSubject(subject)
2,673✔
4912

2,673✔
4913
        if s.JetStreamIsClustered() {
5,001✔
4914
                s.jsClusteredConsumerDeleteRequest(ci, acc, stream, consumer, subject, reply, rmsg)
2,328✔
4915
                return
2,328✔
4916
        }
2,328✔
4917

4918
        mset, err := acc.lookupStream(stream)
345✔
4919
        if err != nil {
345✔
4920
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4921
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4922
                return
×
4923
        }
×
4924

4925
        obs := mset.lookupConsumer(consumer)
345✔
4926
        if obs == nil {
486✔
4927
                resp.Error = NewJSConsumerNotFoundError()
141✔
4928
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
141✔
4929
                return
141✔
4930
        }
141✔
4931
        if err := obs.delete(); err != nil {
204✔
4932
                resp.Error = NewJSStreamGeneralError(err, Unless(err))
×
4933
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4934
                return
×
4935
        }
×
4936
        resp.Success = true
204✔
4937
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
204✔
4938
}
4939

4940
// Request to pause or unpause a Consumer.
4941
func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
63✔
4942
        if c == nil || !s.JetStreamEnabled() {
63✔
4943
                return
×
4944
        }
×
4945
        ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
63✔
4946
        if err != nil {
63✔
4947
                s.Warnf(badAPIRequestT, msg)
×
4948
                return
×
4949
        }
×
4950

4951
        var req JSApiConsumerPauseRequest
63✔
4952
        var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}}
63✔
4953

63✔
4954
        if isJSONObjectOrArray(msg) {
118✔
4955
                if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
55✔
4956
                        resp.Error = NewJSInvalidJSONError(err)
×
4957
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4958
                        return
×
4959
                }
×
4960
        }
4961

4962
        // Determine if we should proceed here when we are in clustered mode.
4963
        isClustered := s.JetStreamIsClustered()
63✔
4964
        js, cc := s.getJetStreamCluster()
63✔
4965
        if isClustered {
117✔
4966
                if js == nil || cc == nil {
54✔
4967
                        return
×
4968
                }
×
4969
                if js.isLeaderless() {
54✔
4970
                        resp.Error = NewJSClusterNotAvailError()
×
4971
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4972
                        return
×
4973
                }
×
4974
                // Make sure we are meta leader.
4975
                if !s.JetStreamIsLeader() {
94✔
4976
                        return
40✔
4977
                }
40✔
4978
        }
4979

4980
        if hasJS, doErr := acc.checkJetStream(); !hasJS {
23✔
4981
                if doErr {
×
4982
                        resp.Error = NewJSNotEnabledForAccountError()
×
4983
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4984
                }
×
4985
                return
×
4986
        }
4987

4988
        stream := streamNameFromSubject(subject)
23✔
4989
        consumer := consumerNameFromSubject(subject)
23✔
4990

23✔
4991
        if isClustered {
37✔
4992
                js.mu.RLock()
14✔
4993
                sa := js.streamAssignment(acc.Name, stream)
14✔
4994
                if sa == nil {
14✔
4995
                        js.mu.RUnlock()
×
4996
                        resp.Error = NewJSStreamNotFoundError(Unless(err))
×
4997
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
4998
                        return
×
4999
                }
×
5000

5001
                ca, ok := sa.consumers[consumer]
14✔
5002
                if !ok || ca == nil {
14✔
5003
                        js.mu.RUnlock()
×
5004
                        resp.Error = NewJSConsumerNotFoundError()
×
5005
                        s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5006
                        return
×
5007
                }
×
5008

5009
                nca := *ca
14✔
5010
                ncfg := *ca.Config
14✔
5011
                nca.Config = &ncfg
14✔
5012
                js.mu.RUnlock()
14✔
5013
                pauseUTC := req.PauseUntil.UTC()
14✔
5014
                if !pauseUTC.IsZero() {
24✔
5015
                        nca.Config.PauseUntil = &pauseUTC
10✔
5016
                } else {
14✔
5017
                        nca.Config.PauseUntil = nil
4✔
5018
                }
4✔
5019

5020
                // Update asset version metadata due to updating pause/resume.
5021
                // Only PauseUntil is updated above, so reuse config for both.
5022
                setStaticConsumerMetadata(nca.Config)
14✔
5023

14✔
5024
                eca := encodeAddConsumerAssignment(&nca)
14✔
5025
                cc.meta.Propose(eca)
14✔
5026

14✔
5027
                resp.PauseUntil = pauseUTC
14✔
5028
                if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
24✔
5029
                        resp.PauseRemaining = time.Until(pauseUTC)
10✔
5030
                }
10✔
5031
                s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
14✔
5032
                return
14✔
5033
        }
5034

5035
        mset, err := acc.lookupStream(stream)
9✔
5036
        if err != nil {
9✔
5037
                resp.Error = NewJSStreamNotFoundError(Unless(err))
×
5038
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5039
                return
×
5040
        }
×
5041

5042
        obs := mset.lookupConsumer(consumer)
9✔
5043
        if obs == nil {
9✔
5044
                resp.Error = NewJSConsumerNotFoundError()
×
5045
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5046
                return
×
5047
        }
×
5048

5049
        ncfg := obs.cfg
9✔
5050
        pauseUTC := req.PauseUntil.UTC()
9✔
5051
        if !pauseUTC.IsZero() {
14✔
5052
                ncfg.PauseUntil = &pauseUTC
5✔
5053
        } else {
9✔
5054
                ncfg.PauseUntil = nil
4✔
5055
        }
4✔
5056

5057
        // Update asset version metadata due to updating pause/resume.
5058
        setStaticConsumerMetadata(&ncfg)
9✔
5059

9✔
5060
        if err := obs.updateConfig(&ncfg); err != nil {
9✔
5061
                // The only type of error that should be returned here is from o.store,
×
5062
                // so use a store failed error type.
×
5063
                resp.Error = NewJSConsumerStoreFailedError(err)
×
5064
                s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
×
5065
                return
×
5066
        }
×
5067

5068
        resp.PauseUntil = pauseUTC
9✔
5069
        if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
14✔
5070
                resp.PauseRemaining = time.Until(pauseUTC)
5✔
5071
        }
5✔
5072
        s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
9✔
5073
}
5074

5075
// sendJetStreamAPIAuditAdvisory will send the audit event for a given event.
5076
func (s *Server) sendJetStreamAPIAuditAdvisory(ci *ClientInfo, acc *Account, subject, request, response string) {
42,616✔
5077
        s.publishAdvisory(acc, JSAuditAdvisory, JSAPIAudit{
42,616✔
5078
                TypedEvent: TypedEvent{
42,616✔
5079
                        Type: JSAPIAuditType,
42,616✔
5080
                        ID:   nuid.Next(),
42,616✔
5081
                        Time: time.Now().UTC(),
42,616✔
5082
                },
42,616✔
5083
                Server:   s.Name(),
42,616✔
5084
                Client:   ci.forAdvisory(),
42,616✔
5085
                Subject:  subject,
42,616✔
5086
                Request:  request,
42,616✔
5087
                Response: response,
42,616✔
5088
                Domain:   s.getOpts().JetStreamDomain,
42,616✔
5089
        })
42,616✔
5090
}
42,616✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc