• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

prooph / event-store-client / 9555551525

17 Jun 2024 10:16PM UTC coverage: 70.262% (-1.1%) from 71.395%
9555551525

push

github

prolic
update coveralls repo token

3466 of 4933 relevant lines covered (70.26%)

67.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.82
/src/Projections/ProjectionsClient.php
1
<?php
2

3
/**
4
 * This file is part of `prooph/event-store-client`.
5
 * (c) 2018-2024 Alexander Miertsch <kontakt@codeliner.ws>
6
 * (c) 2018-2024 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace Prooph\EventStoreClient\Projections;
15

16
use Amp\DeferredFuture;
17
use Amp\Http\Client\Response;
18
use Prooph\EventStore\EndPoint;
19
use Prooph\EventStore\Projections\ProjectionDetails;
20
use Prooph\EventStore\Projections\ProjectionStatistics;
21
use Prooph\EventStore\Projections\Query;
22
use Prooph\EventStore\Projections\State;
23
use Prooph\EventStore\Transport\Http\EndpointExtensions;
24
use Prooph\EventStore\Transport\Http\HttpStatusCode;
25
use Prooph\EventStore\UserCredentials;
26
use Prooph\EventStore\Util\Json;
27
use Prooph\EventStoreClient\Exception\ProjectionCommandConflict;
28
use Prooph\EventStoreClient\Exception\ProjectionCommandFailed;
29
use Prooph\EventStoreClient\Transport\Http\HttpClient;
30
use Throwable;
31
use UnexpectedValueException;
32

33
/** @internal */
34
class ProjectionsClient
35
{
36
    private readonly HttpClient $client;
37

38
    private readonly EndpointExtensions $httpSchema;
39

40
    public function __construct(int $operationTimeout, bool $tlsTerminatedEndpoint, bool $verifyPeer)
41
    {
42
        $this->client = new HttpClient($operationTimeout, $verifyPeer);
20✔
43
        $this->httpSchema = EndpointExtensions::useHttps($tlsTerminatedEndpoint);
20✔
44
    }
45

46
    public function enable(
47
        EndPoint $endPoint,
48
        string $name,
49
        ?UserCredentials $userCredentials = null
50
    ): void {
51
        $this->sendPost(
1✔
52
            EndpointExtensions::formatStringToHttpUrl(
1✔
53
                $endPoint,
1✔
54
                $this->httpSchema,
1✔
55
                '/projection/%s/command/enable',
1✔
56
                \urlencode($name)
1✔
57
            ),
1✔
58
            '',
1✔
59
            $userCredentials,
1✔
60
            HttpStatusCode::Ok
1✔
61
        );
1✔
62
    }
63

64
    public function disable(
65
        EndPoint $endPoint,
66
        string $name,
67
        ?UserCredentials $userCredentials = null
68
    ): void {
69
        $this->sendPost(
2✔
70
            EndpointExtensions::formatStringToHttpUrl(
2✔
71
                $endPoint,
2✔
72
                $this->httpSchema,
2✔
73
                '/projection/%s/command/disable',
2✔
74
                \urlencode($name)
2✔
75
            ),
2✔
76
            '',
2✔
77
            $userCredentials,
2✔
78
            HttpStatusCode::Ok
2✔
79
        );
2✔
80
    }
81

82
    public function abort(
83
        EndPoint $endPoint,
84
        string $name,
85
        ?UserCredentials $userCredentials = null
86
    ): void {
87
        $this->sendPost(
×
88
            EndpointExtensions::formatStringToHttpUrl(
×
89
                $endPoint,
×
90
                $this->httpSchema,
×
91
                '/projection/%s/command/abort',
×
92
                \urlencode($name)
×
93
            ),
×
94
            '',
×
95
            $userCredentials,
×
96
            HttpStatusCode::Ok
×
97
        );
×
98
    }
99

100
    public function createOneTime(
101
        EndPoint $endPoint,
102
        string $query,
103
        string $type,
104
        ?UserCredentials $userCredentials = null
105
    ): void {
106
        $this->sendPost(
2✔
107
            EndpointExtensions::formatStringToHttpUrl(
2✔
108
                $endPoint,
2✔
109
                $this->httpSchema,
2✔
110
                '/projections/onetime?type=%s',
2✔
111
                $type
2✔
112
            ),
2✔
113
            $query,
2✔
114
            $userCredentials,
2✔
115
            HttpStatusCode::Created
2✔
116
        );
2✔
117
    }
118

119
    public function createTransient(
120
        EndPoint $endPoint,
121
        string $name,
122
        string $query,
123
        string $type,
124
        ?UserCredentials $userCredentials = null
125
    ): void {
126
        $this->sendPost(
1✔
127
            EndpointExtensions::formatStringToHttpUrl(
1✔
128
                $endPoint,
1✔
129
                $this->httpSchema,
1✔
130
                '/projections/transient?name=%s&type=%s',
1✔
131
                \urlencode($name),
1✔
132
                $type
1✔
133
            ),
1✔
134
            $query,
1✔
135
            $userCredentials,
1✔
136
            HttpStatusCode::Created
1✔
137
        );
1✔
138
    }
139

140
    public function createContinuous(
141
        EndPoint $endPoint,
142
        string $name,
143
        string $query,
144
        bool $trackEmittedStreams,
145
        string $type,
146
        ?UserCredentials $userCredentials = null
147
    ): void {
148
        $this->sendPost(
16✔
149
            EndpointExtensions::formatStringToHttpUrl(
16✔
150
                $endPoint,
16✔
151
                $this->httpSchema,
16✔
152
                '/projections/continuous?name=%s&type=%s&emit=1&trackemittedstreams=%d',
16✔
153
                \urlencode($name),
16✔
154
                $type,
16✔
155
                (string) (int) $trackEmittedStreams
16✔
156
            ),
16✔
157
            $query,
16✔
158
            $userCredentials,
16✔
159
            HttpStatusCode::Created
16✔
160
        );
16✔
161
    }
162

163
    /** @return list<ProjectionDetails> */
164
    public function listAll(
165
        EndPoint $endPoint,
166
        ?UserCredentials $userCredentials = null
167
    ): array {
168
        $body = $this->sendGet(
1✔
169
            EndpointExtensions::rawUrlToHttpUrl($endPoint, $this->httpSchema, '/projections/any'),
1✔
170
            $userCredentials,
1✔
171
            HttpStatusCode::Ok
1✔
172
        );
1✔
173

174
        if ('' === $body) {
1✔
175
            throw new \UnexpectedValueException('Body cannot be empty');
×
176
        }
177

178
        $data = Json::decode($body);
1✔
179

180
        if (null === $data['projections']) {
1✔
181
            return [];
×
182
        }
183

184
        return \array_map(
1✔
185
            fn (array $entry) => $this->buildProjectionDetails($entry),
1✔
186
            $data['projections']
1✔
187
        );
1✔
188
    }
189

190
    /** @return list<ProjectionDetails> */
191
    public function listOneTime(
192
        EndPoint $endPoint,
193
        ?UserCredentials $userCredentials = null
194
    ): array {
195
        $body = $this->sendGet(
2✔
196
            EndpointExtensions::rawUrlToHttpUrl($endPoint, $this->httpSchema, '/projections/onetime'),
2✔
197
            $userCredentials,
2✔
198
            HttpStatusCode::Ok
2✔
199
        );
2✔
200

201
        if ('' === $body) {
2✔
202
            throw new \UnexpectedValueException('Body cannot be empty');
×
203
        }
204

205
        $data = Json::decode($body);
2✔
206

207
        if (null === $data['projections']) {
2✔
208
            return [];
×
209
        }
210

211
        return \array_map(
2✔
212
            fn (array $entry) => $this->buildProjectionDetails($entry),
2✔
213
            $data['projections']
2✔
214
        );
2✔
215
    }
216

217
    /** @return list<ProjectionDetails> */
218
    public function listContinuous(
219
        EndPoint $endPoint,
220
        ?UserCredentials $userCredentials = null
221
    ): array {
222
        $body = $this->sendGet(
3✔
223
            EndpointExtensions::rawUrlToHttpUrl($endPoint, $this->httpSchema, '/projections/continuous'),
3✔
224
            $userCredentials,
3✔
225
            HttpStatusCode::Ok
3✔
226
        );
3✔
227

228
        if ('' === $body) {
3✔
229
            throw new \UnexpectedValueException('Body cannot be empty');
×
230
        }
231

232
        $data = Json::decode($body);
3✔
233

234
        if (null === $data['projections']) {
3✔
235
            return [];
×
236
        }
237

238
        return \array_map(
3✔
239
            fn (array $entry) => $this->buildProjectionDetails($entry),
3✔
240
            $data['projections']
3✔
241
        );
3✔
242
    }
243

244
    public function getStatus(
245
        EndPoint $endPoint,
246
        string $name,
247
        ?UserCredentials $userCredentials = null
248
    ): ProjectionDetails {
249
        $body = $this->sendGet(
5✔
250
            EndpointExtensions::formatStringToHttpUrl(
5✔
251
                $endPoint,
5✔
252
                $this->httpSchema,
5✔
253
                '/projection/%s',
5✔
254
                $name
5✔
255
            ),
5✔
256
            $userCredentials,
5✔
257
            HttpStatusCode::Ok
5✔
258
        );
5✔
259

260
        if ('' === $body) {
5✔
261
            throw new UnexpectedValueException('No content received');
×
262
        }
263

264
        return $this->buildProjectionDetails(Json::decode($body));
5✔
265
    }
266

267
    public function getState(
268
        EndPoint $endPoint,
269
        string $name,
270
        ?UserCredentials $userCredentials = null
271
    ): State {
272
        $body = $this->sendGet(
1✔
273
            EndpointExtensions::formatStringToHttpUrl(
1✔
274
                $endPoint,
1✔
275
                $this->httpSchema,
1✔
276
                '/projection/%s/state',
1✔
277
                $name
1✔
278
            ),
1✔
279
            $userCredentials,
1✔
280
            HttpStatusCode::Ok
1✔
281
        );
1✔
282

283
        if ('' === $body) {
1✔
284
            throw new UnexpectedValueException('No content received');
×
285
        }
286

287
        return new State(Json::decode($body));
1✔
288
    }
289

290
    public function getPartitionState(
291
        EndPoint $endPoint,
292
        string $name,
293
        string $partition,
294
        ?UserCredentials $userCredentials = null
295
    ): State {
296
        $body = $this->sendGet(
1✔
297
            EndpointExtensions::formatStringToHttpUrl(
1✔
298
                $endPoint,
1✔
299
                $this->httpSchema,
1✔
300
                '/projection/%s/state?partition=%s',
1✔
301
                $name,
1✔
302
                $partition
1✔
303
            ),
1✔
304
            $userCredentials,
1✔
305
            HttpStatusCode::Ok
1✔
306
        );
1✔
307

308
        if ('' === $body) {
1✔
309
            throw new UnexpectedValueException('No content received');
×
310
        }
311

312
        return new State(Json::decode($body));
1✔
313
    }
314

315
    public function getResult(
316
        EndPoint $endPoint,
317
        string $name,
318
        ?UserCredentials $userCredentials = null
319
    ): State {
320
        $body = $this->sendGet(
1✔
321
            EndpointExtensions::formatStringToHttpUrl(
1✔
322
                $endPoint,
1✔
323
                $this->httpSchema,
1✔
324
                '/projection/%s/result',
1✔
325
                $name
1✔
326
            ),
1✔
327
            $userCredentials,
1✔
328
            HttpStatusCode::Ok
1✔
329
        );
1✔
330

331
        if ('' === $body) {
1✔
332
            throw new UnexpectedValueException('No content received');
×
333
        }
334

335
        return new State(Json::decode($body));
1✔
336
    }
337

338
    public function getPartitionResult(
339
        EndPoint $endPoint,
340
        string $name,
341
        string $partition,
342
        ?UserCredentials $userCredentials = null
343
    ): State {
344
        $body = $this->sendGet(
1✔
345
            EndpointExtensions::formatStringToHttpUrl(
1✔
346
                $endPoint,
1✔
347
                $this->httpSchema,
1✔
348
                '/projection/%s/result?partition=%s',
1✔
349
                $name,
1✔
350
                $partition
1✔
351
            ),
1✔
352
            $userCredentials,
1✔
353
            HttpStatusCode::Ok
1✔
354
        );
1✔
355

356
        if ('' === $body) {
1✔
357
            throw new UnexpectedValueException('No content received');
×
358
        }
359

360
        return new State(Json::decode($body));
1✔
361
    }
362

363
    public function getStatistics(
364
        EndPoint $endPoint,
365
        string $name,
366
        ?UserCredentials $userCredentials = null
367
    ): ProjectionStatistics {
368
        $body = $this->sendGet(
1✔
369
            EndpointExtensions::formatStringToHttpUrl(
1✔
370
                $endPoint,
1✔
371
                $this->httpSchema,
1✔
372
                '/projection/%s/statistics',
1✔
373
                $name
1✔
374
            ),
1✔
375
            $userCredentials,
1✔
376
            HttpStatusCode::Ok
1✔
377
        );
1✔
378

379
        if ('' === $body) {
1✔
380
            throw new UnexpectedValueException('No content received');
×
381
        }
382

383
        return $this->buildProjectionStatistics(Json::decode($body));
1✔
384
    }
385

386
    public function getQuery(
387
        EndPoint $endPoint,
388
        string $name,
389
        ?UserCredentials $userCredentials = null
390
    ): Query {
391
        $body = $this->sendGet(
2✔
392
            EndpointExtensions::formatStringToHttpUrl(
2✔
393
                $endPoint,
2✔
394
                $this->httpSchema,
2✔
395
                '/projection/%s/query',
2✔
396
                $name
2✔
397
            ),
2✔
398
            $userCredentials,
2✔
399
            HttpStatusCode::Ok
2✔
400
        );
2✔
401

402
        if ('' === $body) {
2✔
403
            throw new UnexpectedValueException('No content received');
×
404
        }
405

406
        return new Query($body);
2✔
407
    }
408

409
    public function updateQuery(
410
        EndPoint $endPoint,
411
        string $name,
412
        string $query,
413
        ?bool $emitEnabled = null,
414
        ?UserCredentials $userCredentials = null
415
    ): void {
416
        $url = '/projection/%s/query';
1✔
417

418
        if (null !== $emitEnabled) {
1✔
419
            $url .= '?emit=' . (int) $emitEnabled;
1✔
420
        }
421

422
        $this->sendPut(
1✔
423
            EndpointExtensions::formatStringToHttpUrl(
1✔
424
                $endPoint,
1✔
425
                $this->httpSchema,
1✔
426
                $url,
1✔
427
                $name
1✔
428
            ),
1✔
429
            $query,
1✔
430
            $userCredentials,
1✔
431
            HttpStatusCode::Ok
1✔
432
        );
1✔
433
    }
434

435
    public function reset(
436
        EndPoint $endPoint,
437
        string $name,
438
        ?UserCredentials $userCredentials = null
439
    ): void {
440
        $this->sendPost(
1✔
441
            EndpointExtensions::formatStringToHttpUrl(
1✔
442
                $endPoint,
1✔
443
                $this->httpSchema,
1✔
444
                '/projection/%s/command/reset',
1✔
445
                $name
1✔
446
            ),
1✔
447
            '',
1✔
448
            $userCredentials,
1✔
449
            HttpStatusCode::Ok
1✔
450
        );
1✔
451
    }
452

453
    public function delete(
454
        EndPoint $endPoint,
455
        string $name,
456
        bool $deleteEmittedStreams,
457
        ?UserCredentials $userCredentials = null
458
    ): void {
459
        $this->sendDelete(
×
460
            EndpointExtensions::formatStringToHttpUrl(
×
461
                $endPoint,
×
462
                $this->httpSchema,
×
463
                '/projection/%s?deleteEmittedStreams=%d',
×
464
                $name,
×
465
                (string) (int) $deleteEmittedStreams
×
466
            ),
×
467
            $userCredentials,
×
468
            HttpStatusCode::Ok
×
469
        );
×
470
    }
471

472
    private function sendGet(
473
        string $url,
474
        ?UserCredentials $userCredentials,
475
        int $expectedCode
476
    ): string {
477
        $deferred = new DeferredFuture();
18✔
478

479
        $this->client->get(
18✔
480
            $url,
18✔
481
            $userCredentials,
18✔
482
            function (Response $response) use ($deferred, $expectedCode, $url): void {
18✔
483
                if ($response->getStatus() === $expectedCode) {
18✔
484
                    $deferred->complete($response->getBody()->buffer());
18✔
485
                } else {
486
                    $deferred->error(new ProjectionCommandFailed(
×
487
                        $response->getStatus(),
×
488
                        \sprintf(
×
489
                            'Server returned %d (%s) for GET on %s',
×
490
                            $response->getStatus(),
×
491
                            $response->getReason(),
×
492
                            $url
×
493
                        )
×
494
                    ));
×
495
                }
496
            },
18✔
497
            function (Throwable $exception) use ($deferred): void {
18✔
498
                $deferred->error($exception);
×
499
            }
18✔
500
        );
18✔
501

502
        return $deferred->getFuture()->await();
18✔
503
    }
504

505
    private function sendDelete(
506
        string $url,
507
        ?UserCredentials $userCredentials,
508
        int $expectedCode
509
    ): void {
510
        $deferred = new DeferredFuture();
×
511

512
        $this->client->delete(
×
513
            $url,
×
514
            $userCredentials,
×
515
            function (Response $response) use ($deferred, $expectedCode, $url): void {
×
516
                if ($response->getStatus() === $expectedCode) {
×
517
                    $deferred->complete();
×
518
                } else {
519
                    $deferred->error(new ProjectionCommandFailed(
×
520
                        $response->getStatus(),
×
521
                        \sprintf(
×
522
                            'Server returned %d (%s) for DELETE on %s',
×
523
                            $response->getStatus(),
×
524
                            $response->getReason(),
×
525
                            $url
×
526
                        )
×
527
                    ));
×
528
                }
529
            },
×
530
            function (Throwable $exception) use ($deferred): void {
×
531
                $deferred->error($exception);
×
532
            }
×
533
        );
×
534

535
        $deferred->getFuture()->await();
×
536
    }
537

538
    private function sendPut(
539
        string $url,
540
        string $content,
541
        ?UserCredentials $userCredentials,
542
        int $expectedCode
543
    ): void {
544
        $deferred = new DeferredFuture();
1✔
545

546
        $this->client->put(
1✔
547
            $url,
1✔
548
            $content,
1✔
549
            'application/json',
1✔
550
            $userCredentials,
1✔
551
            function (Response $response) use ($deferred, $expectedCode, $url): void {
1✔
552
                if ($response->getStatus() === $expectedCode) {
1✔
553
                    $deferred->complete();
1✔
554
                } else {
555
                    $deferred->error(new ProjectionCommandFailed(
×
556
                        $response->getStatus(),
×
557
                        \sprintf(
×
558
                            'Server returned %d (%s) for PUT on %s',
×
559
                            $response->getStatus(),
×
560
                            $response->getReason(),
×
561
                            $url
×
562
                        )
×
563
                    ));
×
564
                }
565
            },
1✔
566
            function (Throwable $exception) use ($deferred): void {
1✔
567
                $deferred->error($exception);
×
568
            }
1✔
569
        );
1✔
570

571
        $deferred->getFuture()->await();
1✔
572
    }
573

574
    private function sendPost(
575
        string $url,
576
        string $content,
577
        ?UserCredentials $userCredentials,
578
        int $expectedCode
579
    ): void {
580
        $deferred = new DeferredFuture();
19✔
581

582
        $this->client->post(
19✔
583
            $url,
19✔
584
            $content,
19✔
585
            'application/json',
19✔
586
            $userCredentials,
19✔
587
            function (Response $response) use ($deferred, $expectedCode, $url): void {
19✔
588
                if ($response->getStatus() === $expectedCode) {
19✔
589
                    $deferred->complete();
19✔
590
                } elseif ($response->getStatus() === HttpStatusCode::Conflict) {
×
591
                    $deferred->error(new ProjectionCommandConflict($response->getStatus(), $response->getReason()));
×
592
                } else {
593
                    $deferred->error(new ProjectionCommandFailed(
×
594
                        $response->getStatus(),
×
595
                        \sprintf(
×
596
                            'Server returned %d (%s) for POST on %s',
×
597
                            $response->getStatus(),
×
598
                            $response->getReason(),
×
599
                            $url
×
600
                        )
×
601
                    ));
×
602
                }
603
            },
19✔
604
            function (Throwable $exception) use ($deferred): void {
19✔
605
                $deferred->error($exception);
×
606
            }
19✔
607
        );
19✔
608

609
        $deferred->getFuture()->await();
19✔
610
    }
611

612
    private function buildProjectionDetails(array $entry): ProjectionDetails
613
    {
614
        return new ProjectionDetails(
12✔
615
            $entry['coreProcessingTime'],
12✔
616
            $entry['version'],
12✔
617
            $entry['epoch'],
12✔
618
            $entry['effectiveName'],
12✔
619
            $entry['writesInProgress'],
12✔
620
            $entry['readsInProgress'],
12✔
621
            $entry['partitionsCached'],
12✔
622
            $entry['status'],
12✔
623
            $entry['stateReason'] ?? null,
12✔
624
            $entry['name'],
12✔
625
            $entry['mode'],
12✔
626
            $entry['position'],
12✔
627
            $entry['progress'],
12✔
628
            $entry['lastCheckpoint'] ?? null,
12✔
629
            $entry['eventsProcessedAfterRestart'],
12✔
630
            $entry['statusUrl'],
12✔
631
            $entry['stateUrl'],
12✔
632
            $entry['resultUrl'],
12✔
633
            $entry['queryUrl'],
12✔
634
            $entry['enableCommandUrl'],
12✔
635
            $entry['disableCommandUrl'],
12✔
636
            $entry['checkpointStatus'] ?? null,
12✔
637
            $entry['bufferedEvents'],
12✔
638
            $entry['writePendingEventsBeforeCheckpoint'],
12✔
639
            $entry['writePendingEventsAfterCheckpoint']
12✔
640
        );
12✔
641
    }
642

643
    private function buildProjectionStatistics(array $entry): ProjectionStatistics
644
    {
645
        $projections = \array_reduce($entry['projections'], function (array $carrier, array $entry) {
1✔
646
            $carrier[] = $this->buildProjectionDetails($entry);
1✔
647

648
            return $carrier;
1✔
649
        }, []);
1✔
650

651
        return new ProjectionStatistics($projections);
1✔
652
    }
653
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc