• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

api-platform / core / 6416706730

05 Oct 2023 08:40AM UTC coverage: 58.801% (+0.007%) from 58.794%
6416706730

push

github

web-flow
Merge pull request #5817 from kamilrzany/patch-2

Update PublishMercureUpdatesListener.php

16 of 16 new or added lines in 1 file covered. (100.0%)

10864 of 18476 relevant lines covered (58.8%)

9.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.98
/src/Doctrine/EventListener/PublishMercureUpdatesListener.php
1
<?php
2

3
/*
4
 * This file is part of the API Platform project.
5
 *
6
 * (c) Kévin Dunglas <dunglas@gmail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace ApiPlatform\Doctrine\EventListener;
15

16
use ApiPlatform\Api\IriConverterInterface;
17
use ApiPlatform\Api\ResourceClassResolverInterface;
18
use ApiPlatform\Api\UrlGeneratorInterface;
19
use ApiPlatform\Exception\InvalidArgumentException;
20
use ApiPlatform\Exception\OperationNotFoundException;
21
use ApiPlatform\Exception\RuntimeException;
22
use ApiPlatform\GraphQl\Subscription\MercureSubscriptionIriGeneratorInterface as GraphQlMercureSubscriptionIriGeneratorInterface;
23
use ApiPlatform\GraphQl\Subscription\SubscriptionManagerInterface as GraphQlSubscriptionManagerInterface;
24
use ApiPlatform\Metadata\HttpOperation;
25
use ApiPlatform\Metadata\Resource\Factory\ResourceMetadataCollectionFactoryInterface;
26
use ApiPlatform\Metadata\Util\ResourceClassInfoTrait;
27
use ApiPlatform\Symfony\Messenger\DispatchTrait;
28
use Doctrine\Common\EventArgs;
29
use Doctrine\ODM\MongoDB\Event\OnFlushEventArgs as MongoDbOdmOnFlushEventArgs;
30
use Doctrine\ORM\Event\OnFlushEventArgs as OrmOnFlushEventArgs;
31
use Symfony\Component\ExpressionLanguage\ExpressionFunction;
32
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
33
use Symfony\Component\HttpFoundation\JsonResponse;
34
use Symfony\Component\Mercure\HubRegistry;
35
use Symfony\Component\Mercure\Update;
36
use Symfony\Component\Messenger\MessageBusInterface;
37
use Symfony\Component\Serializer\SerializerInterface;
38

39
/**
40
 * Publishes resources updates to the Mercure hub.
41
 *
42
 * @author Kévin Dunglas <dunglas@gmail.com>
43
 */
44
final class PublishMercureUpdatesListener
45
{
46
    use DispatchTrait;
47
    use ResourceClassInfoTrait;
48
    private const ALLOWED_KEYS = [
49
        'topics' => true,
50
        'data' => true,
51
        'private' => true,
52
        'id' => true,
53
        'type' => true,
54
        'retry' => true,
55
        'normalization_context' => true,
56
        'hub' => true,
57
        'enable_async_update' => true,
58
    ];
59
    private readonly ?ExpressionLanguage $expressionLanguage;
60
    private \SplObjectStorage $createdObjects;
61
    private \SplObjectStorage $updatedObjects;
62
    private \SplObjectStorage $deletedObjects;
63

64
    /**
65
     * @param array<string, string[]|string> $formats
66
     */
67
    public function __construct(ResourceClassResolverInterface $resourceClassResolver, private readonly IriConverterInterface $iriConverter, ResourceMetadataCollectionFactoryInterface $resourceMetadataFactory, private readonly SerializerInterface $serializer, private readonly array $formats, MessageBusInterface $messageBus = null, private readonly ?HubRegistry $hubRegistry = null, private readonly ?GraphQlSubscriptionManagerInterface $graphQlSubscriptionManager = null, private readonly ?GraphQlMercureSubscriptionIriGeneratorInterface $graphQlMercureSubscriptionIriGenerator = null, ExpressionLanguage $expressionLanguage = null, private bool $includeType = false)
68
    {
69
        if (null === $messageBus && null === $hubRegistry) {
16✔
70
            throw new InvalidArgumentException('A message bus or a hub registry must be provided.');
×
71
        }
72

73
        $this->resourceClassResolver = $resourceClassResolver;
16✔
74

75
        $this->resourceMetadataFactory = $resourceMetadataFactory;
16✔
76
        $this->messageBus = $messageBus;
16✔
77
        $this->expressionLanguage = $expressionLanguage ?? (class_exists(ExpressionLanguage::class) ? new ExpressionLanguage() : null);
16✔
78
        $this->reset();
16✔
79

80
        if ($this->expressionLanguage) {
16✔
81
            $rawurlencode = ExpressionFunction::fromPhp('rawurlencode', 'escape');
16✔
82
            $this->expressionLanguage->addFunction($rawurlencode);
16✔
83

84
            $this->expressionLanguage->addFunction(
16✔
85
                new ExpressionFunction('iri', static fn (string $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL): string => sprintf('iri(%s, %d)', $apiResource, $referenceType), static fn (array $arguments, $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL): string => $iriConverter->getIriFromResource($apiResource, $referenceType))
16✔
86
            );
16✔
87
        }
88

89
        if (false === $this->includeType) {
16✔
90
            trigger_deprecation('api-platform/core', '3.1', 'Having mercure.include_type (always include @type in Mercure updates, even delete ones) set to false in the configuration is deprecated. It will be true by default in API Platform 4.0.');
×
91
        }
92
    }
93

94
    /**
95
     * Collects created, updated and deleted objects.
96
     */
97
    public function onFlush(EventArgs $eventArgs): void
98
    {
99
        if ($eventArgs instanceof OrmOnFlushEventArgs) {
16✔
100
            $uow = method_exists($eventArgs, 'getObjectManager') ? $eventArgs->getObjectManager()->getUnitOfWork() : $eventArgs->getEntityManager()->getUnitOfWork();
16✔
101
        } elseif ($eventArgs instanceof MongoDbOdmOnFlushEventArgs) {
×
102
            $uow = $eventArgs->getDocumentManager()->getUnitOfWork();
×
103
        } else {
104
            return;
×
105
        }
106

107
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityInsertions' : 'getScheduledDocumentInsertions';
16✔
108
        foreach ($uow->{$methodName}() as $object) {
16✔
109
            $this->storeObjectToPublish($object, 'createdObjects');
14✔
110
        }
111

112
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityUpdates' : 'getScheduledDocumentUpdates';
16✔
113
        foreach ($uow->{$methodName}() as $object) {
16✔
114
            $this->storeObjectToPublish($object, 'updatedObjects');
4✔
115
        }
116

117
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityDeletions' : 'getScheduledDocumentDeletions';
16✔
118
        foreach ($uow->{$methodName}() as $object) {
16✔
119
            $this->storeObjectToPublish($object, 'deletedObjects');
2✔
120
        }
121
    }
122

123
    /**
124
     * Publishes updates for changes collected on flush, and resets the store.
125
     */
126
    public function postFlush(): void
127
    {
128
        try {
129
            foreach ($this->createdObjects as $object) {
16✔
130
                $this->publishUpdate($object, $this->createdObjects[$object], 'create');
2✔
131
            }
132

133
            foreach ($this->updatedObjects as $object) {
16✔
134
                $this->publishUpdate($object, $this->updatedObjects[$object], 'update');
4✔
135
            }
136

137
            foreach ($this->deletedObjects as $object) {
16✔
138
                $this->publishUpdate($object, $this->deletedObjects[$object], 'delete');
2✔
139
            }
140
        } finally {
141
            $this->reset();
16✔
142
        }
143
    }
144

145
    private function reset(): void
146
    {
147
        $this->createdObjects = new \SplObjectStorage();
16✔
148
        $this->updatedObjects = new \SplObjectStorage();
16✔
149
        $this->deletedObjects = new \SplObjectStorage();
16✔
150
    }
151

152
    private function storeObjectToPublish(object $object, string $property): void
153
    {
154
        if (null === $resourceClass = $this->getResourceClass($object)) {
16✔
155
            return;
2✔
156
        }
157

158
        $operation = $this->resourceMetadataFactory->create($resourceClass)->getOperation();
16✔
159
        try {
160
            $options = $operation->getMercure() ?? false;
16✔
161
        } catch (OperationNotFoundException) {
×
162
            return;
×
163
        }
164

165
        if (\is_string($options)) {
16✔
166
            if (null === $this->expressionLanguage) {
×
167
                throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".');
×
168
            }
169

170
            $options = $this->expressionLanguage->evaluate($options, ['object' => $object]);
×
171
        }
172

173
        if (false === $options) {
16✔
174
            return;
14✔
175
        }
176

177
        if (true === $options) {
4✔
178
            $options = [];
×
179
        }
180

181
        if (!\is_array($options)) {
4✔
182
            throw new InvalidArgumentException(sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of options or an expression returning this array, "%s" given.', $resourceClass, \gettype($options)));
×
183
        }
184

185
        foreach ($options as $key => $value) {
4✔
186
            if (!isset(self::ALLOWED_KEYS[$key])) {
4✔
187
                throw new InvalidArgumentException(sprintf('The option "%s" set in the "mercure" attribute of the "%s" resource does not exist. Existing options: "%s"', $key, $resourceClass, implode('", "', self::ALLOWED_KEYS)));
×
188
            }
189
        }
190

191
        $options['enable_async_update'] ??= true;
4✔
192

193
        if ('deletedObjects' === $property) {
4✔
194
            $types = $operation instanceof HttpOperation ? $operation->getTypes() : null;
2✔
195
            if (null === $types) {
2✔
196
                $types = [$operation->getShortName()];
2✔
197
            }
198

199
            // We need to evaluate it here, because in publishUpdate() the resource would be already deleted
200
            $this->evaluateTopics($options, $object);
2✔
201

202
            $this->deletedObjects[(object) [
2✔
203
                'id' => $this->iriConverter->getIriFromResource($object),
2✔
204
                'iri' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL),
2✔
205
                'type' => 1 === \count($types) ? $types[0] : $types,
2✔
206
            ]] = $options;
2✔
207

208
            return;
2✔
209
        }
210

211
        $this->{$property}[$object] = $options;
4✔
212
    }
213

214
    private function publishUpdate(object $object, array $options, string $type): void
215
    {
216
        if ($object instanceof \stdClass) {
4✔
217
            // By convention, if the object has been deleted, we send only its IRI and its type.
218
            // This may change in the feature, because it's not JSON Merge Patch compliant,
219
            // and I'm not a fond of this approach.
220
            $iri = $options['topics'] ?? $object->iri;
2✔
221
            /** @var string $data */
222
            $data = json_encode(['@id' => $object->id] + ($this->includeType ? ['@type' => $object->type] : []), \JSON_THROW_ON_ERROR);
2✔
223
        } else {
224
            $resourceClass = $this->getObjectClass($object);
4✔
225
            $context = $options['normalization_context'] ?? $this->resourceMetadataFactory->create($resourceClass)->getOperation()->getNormalizationContext() ?? [];
4✔
226

227
            // We need to evaluate it here, because in storeObjectToPublish() the resource would not have been persisted yet
228
            $this->evaluateTopics($options, $object);
4✔
229

230
            $iri = $options['topics'] ?? $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL);
4✔
231
            $data = $options['data'] ?? $this->serializer->serialize($object, key($this->formats), $context);
4✔
232
        }
233

234
        $updates = array_merge([$this->buildUpdate($iri, $data, $options)], $this->getGraphQlSubscriptionUpdates($object, $options, $type));
4✔
235

236
        foreach ($updates as $update) {
4✔
237
            if ($options['enable_async_update'] && $this->messageBus) {
4✔
238
                $this->dispatch($update);
×
239
                continue;
×
240
            }
241

242
            $this->hubRegistry->getHub($options['hub'] ?? null)->publish($update);
4✔
243
        }
244
    }
245

246
    private function evaluateTopics(array &$options, object $object): void
247
    {
248
        if (!($options['topics'] ?? false)) {
4✔
249
            return;
4✔
250
        }
251

252
        $topics = [];
2✔
253
        foreach ((array) $options['topics'] as $topic) {
2✔
254
            if (!\is_string($topic)) {
2✔
255
                $topics[] = $topic;
×
256
                continue;
×
257
            }
258

259
            if (!str_starts_with($topic, '@=')) {
2✔
260
                $topics[] = $topic;
2✔
261
                continue;
2✔
262
            }
263

264
            if (null === $this->expressionLanguage) {
×
265
                throw new \LogicException('The "@=" expression syntax cannot be used without the Expression Language component. Try running "composer require symfony/expression-language".');
×
266
            }
267

268
            $topics[] = $this->expressionLanguage->evaluate(substr($topic, 2), ['object' => $object]);
×
269
        }
270

271
        $options['topics'] = $topics;
2✔
272
    }
273

274
    /**
275
     * @return Update[]
276
     */
277
    private function getGraphQlSubscriptionUpdates(object $object, array $options, string $type): array
278
    {
279
        if ('update' !== $type || !$this->graphQlSubscriptionManager || !$this->graphQlMercureSubscriptionIriGenerator) {
4✔
280
            return [];
2✔
281
        }
282

283
        $payloads = $this->graphQlSubscriptionManager->getPushPayloads($object);
2✔
284

285
        $updates = [];
2✔
286
        foreach ($payloads as [$subscriptionId, $data]) {
2✔
287
            $updates[] = $this->buildUpdate(
2✔
288
                $this->graphQlMercureSubscriptionIriGenerator->generateTopicIri($subscriptionId),
2✔
289
                (string) (new JsonResponse($data))->getContent(),
2✔
290
                $options
2✔
291
            );
2✔
292
        }
293

294
        return $updates;
2✔
295
    }
296

297
    /**
298
     * @param string|string[] $iri
299
     */
300
    private function buildUpdate(string|array $iri, string $data, array $options): Update
301
    {
302
        return new Update($iri, $data, $options['private'] ?? false, $options['id'] ?? null, $options['type'] ?? null, $options['retry'] ?? null);
4✔
303
    }
304
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc