• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 9555551525

17 Jun 2024 10:16PM UTC coverage: 70.262% (-1.1%) from 71.395%
9555551525

push

github

prolic
update coveralls repo token

3466 of 4933 relevant lines covered (70.26%)

67.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.19
/src/ClientOperations/UpdatePersistentSubscriptionOperation.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\ClientOperations;
15

16
use Amp\DeferredFuture;
17
use Google\Protobuf\Internal\Message;
18
use Prooph\EventStore\Common\SystemConsumerStrategy;
19
use Prooph\EventStore\Exception\AccessDenied;
20
use Prooph\EventStore\Exception\InvalidOperationException;
21
use Prooph\EventStore\Exception\UnexpectedOperationResult;
22
use Prooph\EventStore\PersistentSubscriptionSettings;
23
use Prooph\EventStore\PersistentSubscriptionUpdateResult;
24
use Prooph\EventStore\PersistentSubscriptionUpdateStatus;
25
use Prooph\EventStore\UserCredentials;
26
use Prooph\EventStoreClient\Messages\ClientMessages\UpdatePersistentSubscription;
27
use Prooph\EventStoreClient\Messages\ClientMessages\UpdatePersistentSubscriptionCompleted;
28
use Prooph\EventStoreClient\Messages\ClientMessages\UpdatePersistentSubscriptionCompleted\UpdatePersistentSubscriptionResult;
29
use Prooph\EventStoreClient\SystemData\InspectionDecision;
30
use Prooph\EventStoreClient\SystemData\InspectionResult;
31
use Prooph\EventStoreClient\SystemData\TcpCommand;
32
use Psr\Log\LoggerInterface as Logger;
33

34
/**
35
 * @internal
36
 * @extends AbstractOperation<UpdatePersistentSubscriptionCompleted, PersistentSubscriptionUpdateResult>
37
 */
38
class UpdatePersistentSubscriptionOperation extends AbstractOperation
39
{
40
    public function __construct(
41
        Logger $logger,
42
        DeferredFuture $deferred,
43
        private readonly string $stream,
44
        private readonly string $groupName,
45
        private readonly PersistentSubscriptionSettings $settings,
46
        ?UserCredentials $userCredentials
47
    ) {
48
        parent::__construct(
5✔
49
            $logger,
5✔
50
            $deferred,
5✔
51
            $userCredentials,
5✔
52
            TcpCommand::UpdatePersistentSubscription,
5✔
53
            TcpCommand::UpdatePersistentSubscriptionCompleted,
5✔
54
            UpdatePersistentSubscriptionCompleted::class
5✔
55
        );
5✔
56
    }
57

58
    protected function createRequestDto(): Message
59
    {
60
        $message = new UpdatePersistentSubscription();
5✔
61
        $message->setSubscriptionGroupName($this->groupName);
5✔
62
        $message->setEventStreamId($this->stream);
5✔
63
        $message->setResolveLinkTos($this->settings->resolveLinkTos());
5✔
64
        $message->setStartFrom($this->settings->startFrom());
5✔
65
        $message->setMessageTimeoutMilliseconds($this->settings->messageTimeoutMilliseconds());
5✔
66
        $message->setRecordStatistics($this->settings->extraStatistics());
5✔
67
        $message->setLiveBufferSize($this->settings->liveBufferSize());
5✔
68
        $message->setReadBatchSize($this->settings->readBatchSize());
5✔
69
        $message->setBufferSize($this->settings->bufferSize());
5✔
70
        $message->setMaxRetryCount($this->settings->maxRetryCount());
5✔
71
        $message->setPreferRoundRobin($this->settings->namedConsumerStrategy() === SystemConsumerStrategy::RoundRobin);
5✔
72
        $message->setCheckpointAfterTime($this->settings->checkPointAfterMilliseconds());
5✔
73
        $message->setCheckpointMaxCount($this->settings->maxCheckPointCount());
5✔
74
        $message->setCheckpointMinCount($this->settings->minCheckPointCount());
5✔
75
        $message->setSubscriberMaxCount($this->settings->maxSubscriberCount());
5✔
76
        $message->setNamedConsumerStrategy($this->settings->namedConsumerStrategy()->name);
5✔
77

78
        return $message;
5✔
79
    }
80

81
    /**
82
     * @param UpdatePersistentSubscriptionCompleted $response
83
     * @return InspectionResult
84
     */
85
    protected function inspectResponse(Message $response): InspectionResult
86
    {
87
        switch ($response->getResult()) {
5✔
88
            case UpdatePersistentSubscriptionResult::Success:
5✔
89
                $this->succeed($response);
3✔
90

91
                return new InspectionResult(InspectionDecision::EndOperation, 'Success');
3✔
92
            case UpdatePersistentSubscriptionResult::Fail:
2✔
93
                $this->fail(new InvalidOperationException(\sprintf(
×
94
                    'Subscription group \'%s\' on stream \'%s\' failed \'%s\'',
×
95
                    $this->groupName,
×
96
                    $this->stream,
×
97
                    $response->getReason()
×
98
                )));
×
99

100
                return new InspectionResult(InspectionDecision::EndOperation, 'Fail');
×
101
            case UpdatePersistentSubscriptionResult::AccessDenied:
2✔
102
                $this->fail(AccessDenied::toStream($this->stream));
1✔
103

104
                return new InspectionResult(InspectionDecision::EndOperation, 'AccessDenied');
1✔
105
            case UpdatePersistentSubscriptionResult::DoesNotExist:
1✔
106
                $this->fail(new InvalidOperationException(\sprintf(
1✔
107
                    'Subscription group \'%s\' on stream \'%s\' does not exist',
1✔
108
                    $this->groupName,
1✔
109
                    $this->stream
1✔
110
                )));
1✔
111

112
                return new InspectionResult(InspectionDecision::EndOperation, 'DoesNotExist');
1✔
113
            default:
114
                throw new UnexpectedOperationResult();
×
115
        }
116
    }
117

118
    /**
119
     * @param UpdatePersistentSubscriptionCompleted $response
120
     * @return PersistentSubscriptionUpdateResult
121
     */
122
    protected function transformResponse(Message $response): PersistentSubscriptionUpdateResult
123
    {
124
        return new PersistentSubscriptionUpdateResult(
3✔
125
            PersistentSubscriptionUpdateStatus::Success
3✔
126
        );
3✔
127
    }
128

129
    public function name(): string
130
    {
131
        return 'UpdatePersistentSubscription';
5✔
132
    }
133

134
    public function __toString(): string
135
    {
136
        return \sprintf('Stream: %s, Group Name: %s', $this->stream, $this->groupName);
5✔
137
    }
138
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc