• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 9555551525

17 Jun 2024 10:16PM UTC coverage: 70.262% (-1.1%) from 71.395%
9555551525

push

github

prolic
update coveralls repo token

3466 of 4933 relevant lines covered (70.26%)

67.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.12
/src/Internal/EventStoreNodeConnection.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\Internal;
15

16
use Amp\DeferredFuture;
17
use Closure;
18
use Prooph\EventStore\AllEventsSlice;
19
use Prooph\EventStore\CatchUpSubscriptionSettings;
20
use Prooph\EventStore\Common\SystemEventTypes;
21
use Prooph\EventStore\Common\SystemStreams;
22
use Prooph\EventStore\ConditionalWriteResult;
23
use Prooph\EventStore\DeleteResult;
24
use Prooph\EventStore\EventData;
25
use Prooph\EventStore\EventReadResult;
26
use Prooph\EventStore\EventReadStatus;
27
use Prooph\EventStore\EventStoreAllCatchUpSubscription;
28
use Prooph\EventStore\EventStoreConnection;
29
use Prooph\EventStore\EventStorePersistentSubscription;
30
use Prooph\EventStore\EventStoreStreamCatchUpSubscription;
31
use Prooph\EventStore\EventStoreSubscription;
32
use Prooph\EventStore\EventStoreTransaction;
33
use Prooph\EventStore\Exception\InvalidArgumentException;
34
use Prooph\EventStore\Exception\InvalidOperationException;
35
use Prooph\EventStore\Exception\MaxQueueSizeLimitReached;
36
use Prooph\EventStore\Exception\OutOfRangeException;
37
use Prooph\EventStore\Exception\UnexpectedValueException;
38
use Prooph\EventStore\ExpectedVersion;
39
use Prooph\EventStore\Internal\Consts;
40
use Prooph\EventStore\Internal\EventStoreTransactionConnection;
41
use Prooph\EventStore\ListenerHandler;
42
use Prooph\EventStore\PersistentSubscriptionCreateResult;
43
use Prooph\EventStore\PersistentSubscriptionDeleteResult;
44
use Prooph\EventStore\PersistentSubscriptionSettings;
45
use Prooph\EventStore\PersistentSubscriptionUpdateResult;
46
use Prooph\EventStore\Position;
47
use Prooph\EventStore\RawStreamMetadataResult;
48
use Prooph\EventStore\StreamEventsSlice;
49
use Prooph\EventStore\StreamMetadata;
50
use Prooph\EventStore\StreamMetadataResult;
51
use Prooph\EventStore\SystemSettings;
52
use Prooph\EventStore\UserCredentials;
53
use Prooph\EventStore\Util\Guid;
54
use Prooph\EventStore\Util\Json;
55
use Prooph\EventStore\WriteResult;
56
use Prooph\EventStoreClient\ClientOperations\AppendToStreamOperation;
57
use Prooph\EventStoreClient\ClientOperations\ClientOperation;
58
use Prooph\EventStoreClient\ClientOperations\CommitTransactionOperation;
59
use Prooph\EventStoreClient\ClientOperations\ConditionalAppendToStreamOperation;
60
use Prooph\EventStoreClient\ClientOperations\CreatePersistentSubscriptionOperation;
61
use Prooph\EventStoreClient\ClientOperations\DeletePersistentSubscriptionOperation;
62
use Prooph\EventStoreClient\ClientOperations\DeleteStreamOperation;
63
use Prooph\EventStoreClient\ClientOperations\ReadAllEventsBackwardOperation;
64
use Prooph\EventStoreClient\ClientOperations\ReadAllEventsForwardOperation;
65
use Prooph\EventStoreClient\ClientOperations\ReadEventOperation;
66
use Prooph\EventStoreClient\ClientOperations\ReadStreamEventsBackwardOperation;
67
use Prooph\EventStoreClient\ClientOperations\ReadStreamEventsForwardOperation;
68
use Prooph\EventStoreClient\ClientOperations\StartTransactionOperation;
69
use Prooph\EventStoreClient\ClientOperations\TransactionalWriteOperation;
70
use Prooph\EventStoreClient\ClientOperations\UpdatePersistentSubscriptionOperation;
71
use Prooph\EventStoreClient\ClusterSettings;
72
use Prooph\EventStoreClient\ConnectionSettings;
73
use Prooph\EventStoreClient\Internal\Message\CloseConnectionMessage;
74
use Prooph\EventStoreClient\Internal\Message\StartConnectionMessage;
75
use Prooph\EventStoreClient\Internal\Message\StartOperationMessage;
76
use Prooph\EventStoreClient\Internal\Message\StartSubscriptionMessage;
77

78
final class EventStoreNodeConnection implements
79
    EventStoreConnection,
80
    EventStoreTransactionConnection
81
{
82
    private readonly string $connectionName;
83

84
    private readonly ConnectionSettings $settings;
85

86
    private readonly ?ClusterSettings $clusterSettings;
87

88
    private readonly EndPointDiscoverer $endPointDiscoverer;
89

90
    private readonly EventStoreConnectionLogicHandler $handler;
91

92
    public function __construct(
93
        ConnectionSettings $settings,
94
        ?ClusterSettings $clusterSettings,
95
        EndPointDiscoverer $endPointDiscoverer,
96
        ?string $connectionName = null
97
    ) {
98
        $this->settings = $settings;
420✔
99
        $this->clusterSettings = $clusterSettings;
420✔
100
        $this->connectionName = $connectionName ?? Guid::generateAsHex();
420✔
101
        $this->endPointDiscoverer = $endPointDiscoverer;
420✔
102
        $this->handler = new EventStoreConnectionLogicHandler($this, $settings);
420✔
103
    }
104

105
    public function connectionSettings(): ConnectionSettings
106
    {
107
        return $this->settings;
12✔
108
    }
109

110
    public function clusterSettings(): ?ClusterSettings
111
    {
112
        return $this->clusterSettings;
4✔
113
    }
114

115
    public function connectionName(): string
116
    {
117
        return $this->connectionName;
420✔
118
    }
119

120
    public function connect(): void
121
    {
122
        $deferred = new DeferredFuture();
405✔
123
        $this->handler->enqueueMessage(new StartConnectionMessage($deferred, $this->endPointDiscoverer));
405✔
124

125
        $deferred->getFuture()->await();
405✔
126
    }
127

128
    public function close(): void
129
    {
130
        $this->handler->enqueueMessage(new CloseConnectionMessage('Connection close requested by client'));
408✔
131
    }
132

133
    public function deleteStream(
134
        string $stream,
135
        int $expectedVersion,
136
        bool $hardDelete = false,
137
        ?UserCredentials $userCredentials = null
138
    ): DeleteResult {
139
        if (empty($stream)) {
92✔
140
            throw new InvalidArgumentException('Stream cannot be empty');
×
141
        }
142

143
        $deferred = new DeferredFuture();
92✔
144

145
        $this->enqueueOperation(new DeleteStreamOperation(
92✔
146
            $this->settings->log(),
92✔
147
            $deferred,
92✔
148
            $this->settings->requireMaster(),
92✔
149
            $stream,
92✔
150
            $expectedVersion,
92✔
151
            $hardDelete,
92✔
152
            $userCredentials
92✔
153
        ));
92✔
154

155
        return $deferred->getFuture()->await();
92✔
156
    }
157

158
    public function appendToStream(
159
        string $stream,
160
        int $expectedVersion,
161
        array $events = [],
162
        ?UserCredentials $userCredentials = null
163
    ): WriteResult {
164
        if (empty($stream)) {
324✔
165
            throw new InvalidArgumentException('Stream cannot be empty');
1✔
166
        }
167

168
        $deferred = new DeferredFuture();
323✔
169

170
        $this->enqueueOperation(new AppendToStreamOperation(
323✔
171
            $this->settings->log(),
323✔
172
            $deferred,
323✔
173
            $this->settings->requireMaster(),
323✔
174
            $stream,
323✔
175
            $expectedVersion,
323✔
176
            $events,
323✔
177
            $userCredentials
323✔
178
        ));
323✔
179

180
        return $deferred->getFuture()->await();
323✔
181
    }
182

183
    /** @inheritdoc */
184
    public function conditionalAppendToStream(
185
        string $stream,
186
        int $expectedVersion,
187
        array $events = [],
188
        ?UserCredentials $userCredentials = null
189
    ): ConditionalWriteResult {
190
        if (empty($stream)) {
3✔
191
            throw new InvalidArgumentException('Stream cannot be empty');
×
192
        }
193

194
        $deferred = new DeferredFuture();
3✔
195

196
        $this->enqueueOperation(new ConditionalAppendToStreamOperation(
3✔
197
            $this->settings->log(),
3✔
198
            $deferred,
3✔
199
            $this->settings->requireMaster(),
3✔
200
            $stream,
3✔
201
            $expectedVersion,
3✔
202
            $events,
3✔
203
            $userCredentials
3✔
204
        ));
3✔
205

206
        return $deferred->getFuture()->await();
3✔
207
    }
208

209
    /** @inheritdoc */
210
    public function readEvent(
211
        string $stream,
212
        int $eventNumber,
213
        bool $resolveLinkTos = true,
214
        ?UserCredentials $userCredentials = null
215
    ): EventReadResult {
216
        if (empty($stream)) {
98✔
217
            throw new InvalidArgumentException('Stream cannot be empty');
1✔
218
        }
219

220
        if ($eventNumber < -1) {
97✔
221
            throw new OutOfRangeException('Event number is out of range');
1✔
222
        }
223

224
        $deferred = new DeferredFuture();
96✔
225

226
        $this->enqueueOperation(new ReadEventOperation(
96✔
227
            $this->settings->log(),
96✔
228
            $deferred,
96✔
229
            $this->settings->requireMaster(),
96✔
230
            $stream,
96✔
231
            $eventNumber,
96✔
232
            $resolveLinkTos,
96✔
233
            $userCredentials
96✔
234
        ));
96✔
235

236
        return $deferred->getFuture()->await();
96✔
237
    }
238

239
    /** @inheritdoc */
240
    public function readStreamEventsForward(
241
        string $stream,
242
        int $start,
243
        int $count,
244
        bool $resolveLinkTos = true,
245
        ?UserCredentials $userCredentials = null
246
    ): StreamEventsSlice {
247
        if (empty($stream)) {
100✔
248
            throw new InvalidArgumentException('Stream cannot be empty');
×
249
        }
250

251
        if ($start < 0) {
100✔
252
            throw new InvalidArgumentException('Start must be positive');
1✔
253
        }
254

255
        if ($count < 1) {
99✔
256
            throw new InvalidArgumentException('Count must be positive');
1✔
257
        }
258

259
        if ($count > Consts::MaxReadSize) {
98✔
260
            throw new InvalidArgumentException(\sprintf(
1✔
261
                'Count should be less than %s. For larger reads you should page.',
1✔
262
                Consts::MaxReadSize
1✔
263
            ));
1✔
264
        }
265

266
        $deferred = new DeferredFuture();
97✔
267

268
        $this->enqueueOperation(new ReadStreamEventsForwardOperation(
97✔
269
            $this->settings->log(),
97✔
270
            $deferred,
97✔
271
            $this->settings->requireMaster(),
97✔
272
            $stream,
97✔
273
            $start,
97✔
274
            $count,
97✔
275
            $resolveLinkTos,
97✔
276
            $userCredentials
97✔
277
        ));
97✔
278

279
        return $deferred->getFuture()->await();
97✔
280
    }
281

282
    /** @inheritdoc */
283
    public function readStreamEventsBackward(
284
        string $stream,
285
        int $start,
286
        int $count,
287
        bool $resolveLinkTos = true,
288
        ?UserCredentials $userCredentials = null
289
    ): StreamEventsSlice {
290
        if (empty($stream)) {
50✔
291
            throw new InvalidArgumentException('Stream cannot be empty');
×
292
        }
293

294
        if ($count < 1) {
50✔
295
            throw new InvalidArgumentException('Count must be positive');
1✔
296
        }
297

298
        if ($count > Consts::MaxReadSize) {
49✔
299
            throw new InvalidArgumentException(\sprintf(
1✔
300
                'Count should be less than %s. For larger reads you should page.',
1✔
301
                Consts::MaxReadSize
1✔
302
            ));
1✔
303
        }
304

305
        $deferred = new DeferredFuture();
48✔
306

307
        $this->enqueueOperation(new ReadStreamEventsBackwardOperation(
48✔
308
            $this->settings->log(),
48✔
309
            $deferred,
48✔
310
            $this->settings->requireMaster(),
48✔
311
            $stream,
48✔
312
            $start,
48✔
313
            $count,
48✔
314
            $resolveLinkTos,
48✔
315
            $userCredentials
48✔
316
        ));
48✔
317

318
        return $deferred->getFuture()->await();
48✔
319
    }
320

321
    /** @inheritdoc */
322
    public function readAllEventsForward(
323
        Position $position,
324
        int $count,
325
        bool $resolveLinkTos = true,
326
        ?UserCredentials $userCredentials = null
327
    ): AllEventsSlice {
328
        if ($count < 1) {
20✔
329
            throw new InvalidArgumentException('Count must be positive');
×
330
        }
331

332
        if ($count > Consts::MaxReadSize) {
20✔
333
            throw new InvalidArgumentException(\sprintf(
1✔
334
                'Count should be less than %s. For larger reads you should page.',
1✔
335
                Consts::MaxReadSize
1✔
336
            ));
1✔
337
        }
338

339
        $deferred = new DeferredFuture();
19✔
340

341
        $this->enqueueOperation(new ReadAllEventsForwardOperation(
19✔
342
            $this->settings->log(),
19✔
343
            $deferred,
19✔
344
            $this->settings->requireMaster(),
19✔
345
            $position,
19✔
346
            $count,
19✔
347
            $resolveLinkTos,
19✔
348
            $userCredentials ?? $this->settings->defaultUserCredentials()
19✔
349
        ));
19✔
350

351
        return $deferred->getFuture()->await();
19✔
352
    }
353

354
    public function readAllEventsBackward(
355
        Position $position,
356
        int $count,
357
        bool $resolveLinkTos = true,
358
        ?UserCredentials $userCredentials = null
359
    ): AllEventsSlice {
360
        if ($count < 1) {
18✔
361
            throw new InvalidArgumentException('Count must be positive');
×
362
        }
363

364
        if ($count > Consts::MaxReadSize) {
18✔
365
            throw new InvalidArgumentException(\sprintf(
1✔
366
                'Count should be less than %s. For larger reads you should page.',
1✔
367
                Consts::MaxReadSize
1✔
368
            ));
1✔
369
        }
370

371
        $deferred = new DeferredFuture();
18✔
372

373
        $this->enqueueOperation(new ReadAllEventsBackwardOperation(
18✔
374
            $this->settings->log(),
18✔
375
            $deferred,
18✔
376
            $this->settings->requireMaster(),
18✔
377
            $position,
18✔
378
            $count,
18✔
379
            $resolveLinkTos,
18✔
380
            $userCredentials
18✔
381
        ));
18✔
382

383
        return $deferred->getFuture()->await();
18✔
384
    }
385

386
    public function setStreamMetadata(
387
        string $stream,
388
        int $expectedMetaStreamVersion,
389
        ?StreamMetadata $metadata = null,
390
        ?UserCredentials $userCredentials = null
391
    ): WriteResult {
392
        $string = $metadata ? Json::encode($metadata) : '';
200✔
393

394
        return $this->setRawStreamMetadata(
200✔
395
            $stream,
200✔
396
            $expectedMetaStreamVersion,
200✔
397
            $string,
200✔
398
            $userCredentials
200✔
399
        );
200✔
400
    }
401

402
    public function setRawStreamMetadata(
403
        string $stream,
404
        int $expectedMetaStreamVersion,
405
        string $metadata = '',
406
        ?UserCredentials $userCredentials = null
407
    ): WriteResult {
408
        if (empty($stream)) {
204✔
409
            throw new InvalidArgumentException('Stream cannot be empty');
×
410
        }
411

412
        if (SystemStreams::isMetastream($stream)) {
204✔
413
            throw new InvalidOperationException(\sprintf(
×
414
                'Setting metadata for metastream \'%s\' is not supported',
×
415
                $stream
×
416
            ));
×
417
        }
418

419
        $deferred = new DeferredFuture();
204✔
420

421
        $metaEvent = new EventData(
204✔
422
            null,
204✔
423
            SystemEventTypes::StreamMetadata->value,
204✔
424
            true,
204✔
425
            $metadata
204✔
426
        );
204✔
427

428
        $this->enqueueOperation(new AppendToStreamOperation(
204✔
429
            $this->settings->log(),
204✔
430
            $deferred,
204✔
431
            $this->settings->requireMaster(),
204✔
432
            SystemStreams::metastreamOf($stream),
204✔
433
            $expectedMetaStreamVersion,
204✔
434
            [$metaEvent],
204✔
435
            $userCredentials
204✔
436
        ));
204✔
437

438
        return $deferred->getFuture()->await();
204✔
439
    }
440

441
    /** @inheritdoc */
442
    public function getStreamMetadata(string $stream, ?UserCredentials $userCredentials = null): StreamMetadataResult
443
    {
444
        $result = $this->getRawStreamMetadata($stream, $userCredentials);
19✔
445

446
        return new StreamMetadataResult(
19✔
447
            $result->stream(),
19✔
448
            $result->isStreamDeleted(),
19✔
449
            $result->metastreamVersion(),
19✔
450
            $result->streamMetadata() === ''
19✔
451
                ? new StreamMetadata()
3✔
452
                : StreamMetadata::createFromArray(Json::decode($result->streamMetadata()))
19✔
453
        );
19✔
454
    }
455

456
    /** @inheritdoc */
457
    public function getRawStreamMetadata(string $stream, ?UserCredentials $userCredentials = null): RawStreamMetadataResult
458
    {
459
        if (empty($stream)) {
56✔
460
            throw new InvalidArgumentException('Stream cannot be empty');
×
461
        }
462

463
        $eventReadResult = $this->readEvent(
56✔
464
            SystemStreams::metastreamOf($stream),
56✔
465
            -1,
56✔
466
            false,
56✔
467
            $userCredentials
56✔
468
        );
56✔
469

470
        switch ($eventReadResult->status()) {
53✔
471
            case EventReadStatus::Success:
53✔
472
                $event = $eventReadResult->event();
48✔
473

474
                if (null === $event) {
48✔
475
                    throw new UnexpectedValueException('Event is null while operation result is Success');
×
476
                }
477

478
                $event = $event->originalEvent();
48✔
479

480
                if (null === $event) {
48✔
481
                    return new RawStreamMetadataResult(
9✔
482
                        $stream,
9✔
483
                        false,
9✔
484
                        -1,
9✔
485
                        ''
9✔
486
                    );
9✔
487
                }
488

489
                return new RawStreamMetadataResult(
39✔
490
                    $stream,
39✔
491
                    false,
39✔
492
                    $event->eventNumber(),
39✔
493
                    $event->data()
39✔
494
                );
39✔
495

496
                break;
497
            case EventReadStatus::NotFound:
5✔
498
            case EventReadStatus::NoStream:
5✔
499
                return new RawStreamMetadataResult($stream, false, -1, '');
2✔
500
            case EventReadStatus::StreamDeleted:
3✔
501
                return new RawStreamMetadataResult($stream, true, \PHP_INT_MAX, '');
3✔
502
        }
503
    }
504

505
    /** @inheritdoc */
506
    public function setSystemSettings(SystemSettings $settings, ?UserCredentials $userCredentials = null): WriteResult
507
    {
508
        return $this->appendToStream(
135✔
509
            SystemStreams::SettingsStream,
135✔
510
            ExpectedVersion::Any,
135✔
511
            [new EventData(null, SystemEventTypes::Settings->value, true, Json::encode($settings))],
135✔
512
            $userCredentials
135✔
513
        );
135✔
514
    }
515

516
    public function createPersistentSubscription(
517
        string $stream,
518
        string $groupName,
519
        PersistentSubscriptionSettings $settings,
520
        ?UserCredentials $userCredentials = null
521
    ): PersistentSubscriptionCreateResult {
522
        if (empty($stream)) {
44✔
523
            throw new InvalidArgumentException('Stream cannot be empty');
×
524
        }
525

526
        if (empty($groupName)) {
44✔
527
            throw new InvalidArgumentException('Group cannot be empty');
×
528
        }
529

530
        $deferred = new DeferredFuture();
44✔
531

532
        $this->enqueueOperation(new CreatePersistentSubscriptionOperation(
44✔
533
            $this->settings->log(),
44✔
534
            $deferred,
44✔
535
            $stream,
44✔
536
            $groupName,
44✔
537
            $settings,
44✔
538
            $userCredentials
44✔
539
        ));
44✔
540

541
        return $deferred->getFuture()->await();
44✔
542
    }
543

544
    /** @inheritdoc */
545
    public function updatePersistentSubscription(
546
        string $stream,
547
        string $groupName,
548
        PersistentSubscriptionSettings $settings,
549
        ?UserCredentials $userCredentials = null
550
    ): PersistentSubscriptionUpdateResult {
551
        if (empty($stream)) {
5✔
552
            throw new InvalidArgumentException('Stream cannot be empty');
×
553
        }
554

555
        if (empty($groupName)) {
5✔
556
            throw new InvalidArgumentException('Group cannot be empty');
×
557
        }
558

559
        $deferred = new DeferredFuture();
5✔
560

561
        $this->enqueueOperation(new UpdatePersistentSubscriptionOperation(
5✔
562
            $this->settings->log(),
5✔
563
            $deferred,
5✔
564
            $stream,
5✔
565
            $groupName,
5✔
566
            $settings,
5✔
567
            $userCredentials
5✔
568
        ));
5✔
569

570
        return $deferred->getFuture()->await();
5✔
571
    }
572

573
    /** @inheritdoc */
574
    public function deletePersistentSubscription(
575
        string $stream,
576
        string $groupName,
577
        ?UserCredentials $userCredentials = null
578
    ): PersistentSubscriptionDeleteResult {
579
        if (empty($stream)) {
5✔
580
            throw new InvalidArgumentException('Stream cannot be empty');
×
581
        }
582

583
        if (empty($groupName)) {
5✔
584
            throw new InvalidArgumentException('Group cannot be empty');
×
585
        }
586

587
        $deferred = new DeferredFuture();
5✔
588

589
        $this->enqueueOperation(new DeletePersistentSubscriptionOperation(
5✔
590
            $this->settings->log(),
5✔
591
            $deferred,
5✔
592
            $stream,
5✔
593
            $groupName,
5✔
594
            $userCredentials
5✔
595
        ));
5✔
596

597
        return $deferred->getFuture()->await();
5✔
598
    }
599

600
    /** @inheritdoc */
601
    public function subscribeToStream(
602
        string $stream,
603
        bool $resolveLinkTos,
604
        Closure $eventAppeared,
605
        ?Closure $subscriptionDropped = null,
606
        ?UserCredentials $userCredentials = null
607
    ): EventStoreSubscription {
608
        if (empty($stream)) {
43✔
609
            throw new InvalidArgumentException('Stream cannot be empty');
×
610
        }
611

612
        $deferred = new DeferredFuture();
43✔
613

614
        $this->handler->enqueueMessage(new StartSubscriptionMessage(
43✔
615
            $deferred,
43✔
616
            $stream,
43✔
617
            $resolveLinkTos,
43✔
618
            $userCredentials,
43✔
619
            $eventAppeared,
43✔
620
            $subscriptionDropped,
43✔
621
            $this->settings->maxRetries(),
43✔
622
            $this->settings->operationTimeout()
43✔
623
        ));
43✔
624

625
        return $deferred->getFuture()->await();
43✔
626
    }
627

628
    public function subscribeToStreamFrom(
629
        string $stream,
630
        ?int $lastCheckpoint,
631
        ?CatchUpSubscriptionSettings $settings,
632
        Closure $eventAppeared,
633
        ?Closure $liveProcessingStarted = null,
634
        ?Closure $subscriptionDropped = null,
635
        ?UserCredentials $userCredentials = null
636
    ): EventStoreStreamCatchUpSubscription {
637
        if (empty($stream)) {
9✔
638
            throw new InvalidArgumentException('Stream cannot be empty');
×
639
        }
640

641
        if (null === $settings) {
9✔
642
            $settings = CatchUpSubscriptionSettings::default();
×
643
        }
644

645
        if ($this->settings->verboseLogging()) {
9✔
646
            $settings = $settings->enableVerboseLogging();
×
647
        }
648

649
        $subscription = new \Prooph\EventStoreClient\Internal\EventStoreStreamCatchUpSubscription(
9✔
650
            $this,
9✔
651
            $this->settings->log(),
9✔
652
            $stream,
9✔
653
            $lastCheckpoint,
9✔
654
            $userCredentials,
9✔
655
            $eventAppeared,
9✔
656
            $liveProcessingStarted,
9✔
657
            $subscriptionDropped,
9✔
658
            $settings
9✔
659
        );
9✔
660

661
        $subscription->start();
9✔
662

663
        return $subscription;
9✔
664
    }
665

666
    /** @inheritdoc */
667
    public function subscribeToAll(
668
        bool $resolveLinkTos,
669
        Closure $eventAppeared,
670
        ?Closure $subscriptionDropped = null,
671
        ?UserCredentials $userCredentials = null
672
    ): EventStoreSubscription {
673
        $deferred = new DeferredFuture();
13✔
674

675
        $this->handler->enqueueMessage(new StartSubscriptionMessage(
13✔
676
            $deferred,
13✔
677
            '',
13✔
678
            $resolveLinkTos,
13✔
679
            $userCredentials,
13✔
680
            $eventAppeared,
13✔
681
            $subscriptionDropped,
13✔
682
            $this->settings->maxRetries(),
13✔
683
            $this->settings->operationTimeout()
13✔
684
        ));
13✔
685

686
        return $deferred->getFuture()->await();
13✔
687
    }
688

689
    /** @inheritdoc */
690
    public function subscribeToAllFrom(
691
        ?Position $lastCheckpoint,
692
        ?CatchUpSubscriptionSettings $settings,
693
        Closure $eventAppeared,
694
        ?Closure $liveProcessingStarted = null,
695
        ?Closure $subscriptionDropped = null,
696
        ?UserCredentials $userCredentials = null
697
    ): EventStoreAllCatchUpSubscription {
698
        if (null === $settings) {
4✔
699
            $settings = CatchUpSubscriptionSettings::default();
×
700
        }
701

702
        if ($this->settings->verboseLogging()) {
4✔
703
            $settings = $settings->enableVerboseLogging();
×
704
        }
705

706
        $subscription = new \Prooph\EventStoreClient\Internal\EventStoreAllCatchUpSubscription(
4✔
707
            $this,
4✔
708
            $this->settings->log(),
4✔
709
            $lastCheckpoint,
4✔
710
            $userCredentials,
4✔
711
            $eventAppeared,
4✔
712
            $liveProcessingStarted,
4✔
713
            $subscriptionDropped,
4✔
714
            $settings
4✔
715
        );
4✔
716

717
        $subscription->start();
4✔
718

719
        return $subscription;
4✔
720
    }
721

722
    /** @inheritdoc */
723
    public function connectToPersistentSubscription(
724
        string $stream,
725
        string $groupName,
726
        Closure $eventAppeared,
727
        ?Closure $subscriptionDropped = null,
728
        int $bufferSize = 10,
729
        bool $autoAck = true,
730
        ?UserCredentials $userCredentials = null
731
    ): EventStorePersistentSubscription {
732
        if (empty($stream)) {
34✔
733
            throw new InvalidArgumentException('Stream cannot be empty');
×
734
        }
735

736
        if (empty($groupName)) {
34✔
737
            throw new InvalidArgumentException('Group cannot be empty');
×
738
        }
739

740
        $subscription = new \Prooph\EventStoreClient\Internal\EventStorePersistentSubscription(
34✔
741
            $groupName,
34✔
742
            $stream,
34✔
743
            $eventAppeared,
34✔
744
            $subscriptionDropped,
34✔
745
            $userCredentials,
34✔
746
            $this->settings->log(),
34✔
747
            $this->settings->verboseLogging(),
34✔
748
            $this->settings,
34✔
749
            $this->handler,
34✔
750
            $bufferSize,
34✔
751
            $autoAck
34✔
752
        );
34✔
753

754
        $subscription->start();
34✔
755

756
        return $subscription;
32✔
757
    }
758

759
    /** @inheritdoc */
760
    public function startTransaction(
761
        string $stream,
762
        int $expectedVersion,
763
        ?UserCredentials $userCredentials = null
764
    ): EventStoreTransaction {
765
        if (empty($stream)) {
40✔
766
            throw new InvalidArgumentException('Stream cannot be empty');
×
767
        }
768

769
        $deferred = new DeferredFuture();
40✔
770

771
        $this->enqueueOperation(new StartTransactionOperation(
40✔
772
            $this->settings->log(),
40✔
773
            $deferred,
40✔
774
            $this->settings->requireMaster(),
40✔
775
            $stream,
40✔
776
            $expectedVersion,
40✔
777
            $this,
40✔
778
            $userCredentials
40✔
779
        ));
40✔
780

781
        return $deferred->getFuture()->await();
40✔
782
    }
783

784
    /** @inheritdoc */
785
    public function continueTransaction(
786
        int $transactionId,
787
        ?UserCredentials $userCredentials = null
788
    ): EventStoreTransaction {
789
        if ($transactionId < 0) {
14✔
790
            throw new InvalidArgumentException('Invalid transaction id');
×
791
        }
792

793
        return new EventStoreTransaction($transactionId, $userCredentials, $this);
14✔
794
    }
795

796
    /** @inheritdoc */
797
    public function transactionalWrite(
798
        EventStoreTransaction $transaction,
799
        array $events,
800
        ?UserCredentials $userCredentials
801
    ): void {
802
        $deferred = new DeferredFuture();
34✔
803

804
        $this->enqueueOperation(new TransactionalWriteOperation(
34✔
805
            $this->settings->log(),
34✔
806
            $deferred,
34✔
807
            $this->settings->requireMaster(),
34✔
808
            $transaction->transactionId(),
34✔
809
            $events,
34✔
810
            $userCredentials
34✔
811
        ));
34✔
812

813
        $deferred->getFuture()->await();
34✔
814
    }
815

816
    /** @inheritdoc */
817
    public function commitTransaction(
818
        EventStoreTransaction $transaction,
819
        ?UserCredentials $userCredentials
820
    ): WriteResult {
821
        $deferred = new DeferredFuture();
39✔
822

823
        $this->enqueueOperation(new CommitTransactionOperation(
39✔
824
            $this->settings->log(),
39✔
825
            $deferred,
39✔
826
            $this->settings->requireMaster(),
39✔
827
            $transaction->transactionId(),
39✔
828
            $userCredentials
39✔
829
        ));
39✔
830

831
        return $deferred->getFuture()->await();
39✔
832
    }
833

834
    /** @inheritdoc */
835
    public function onConnected(Closure $handler): ListenerHandler
836
    {
837
        return $this->handler->onConnected($handler);
13✔
838
    }
839

840
    /** @inheritdoc */
841
    public function onDisconnected(Closure $handler): ListenerHandler
842
    {
843
        return $this->handler->onDisconnected($handler);
×
844
    }
845

846
    /** @inheritdoc */
847
    public function onReconnecting(Closure $handler): ListenerHandler
848
    {
849
        return $this->handler->onReconnecting($handler);
×
850
    }
851

852
    /** @inheritdoc */
853
    public function onClosed(Closure $handler): ListenerHandler
854
    {
855
        return $this->handler->onClosed($handler);
×
856
    }
857

858
    /** @inheritdoc */
859
    public function onErrorOccurred(Closure $handler): ListenerHandler
860
    {
861
        return $this->handler->onErrorOccurred($handler);
×
862
    }
863

864
    /** @inheritdoc */
865
    public function onAuthenticationFailed(Closure $handler): ListenerHandler
866
    {
867
        return $this->handler->onAuthenticationFailed($handler);
×
868
    }
869

870
    public function detach(ListenerHandler $handler): void
871
    {
872
        $this->handler->detach($handler);
10✔
873
    }
874

875
    private function enqueueOperation(ClientOperation $operation): void
876
    {
877
        if ($this->handler->totalOperationCount() >= $this->settings->maxQueueSize()) {
395✔
878
            throw MaxQueueSizeLimitReached::with($this->connectionName, $this->settings->maxQueueSize());
×
879
        }
880

881
        $this->handler->enqueueMessage(new StartOperationMessage(
395✔
882
            $operation,
395✔
883
            $this->settings->maxRetries(),
395✔
884
            $this->settings->operationTimeout()
395✔
885
        ));
395✔
886
    }
887
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc