• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

api-platform / core / 7142557150

08 Dec 2023 02:28PM UTC coverage: 36.003% (-1.4%) from 37.36%
7142557150

push

github

web-flow
fix(jsonld): remove link to ApiDocumentation when doc is disabled (#6029)

0 of 1 new or added line in 1 file covered. (0.0%)

2297 existing lines in 182 files now uncovered.

9992 of 27753 relevant lines covered (36.0%)

147.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.07
/src/Doctrine/EventListener/PublishMercureUpdatesListener.php
1
<?php
2

3
/*
4
 * This file is part of the API Platform project.
5
 *
6
 * (c) Kévin Dunglas <dunglas@gmail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace ApiPlatform\Doctrine\EventListener;
15

16
use ApiPlatform\Api\IriConverterInterface as LegacyIriConverterInterface;
17
use ApiPlatform\Api\ResourceClassResolverInterface as LegacyResourceClassResolverInterface;
18
use ApiPlatform\Exception\InvalidArgumentException;
19
use ApiPlatform\Exception\OperationNotFoundException;
20
use ApiPlatform\Exception\RuntimeException;
21
use ApiPlatform\GraphQl\Subscription\MercureSubscriptionIriGeneratorInterface as GraphQlMercureSubscriptionIriGeneratorInterface;
22
use ApiPlatform\GraphQl\Subscription\SubscriptionManagerInterface as GraphQlSubscriptionManagerInterface;
23
use ApiPlatform\Metadata\HttpOperation;
24
use ApiPlatform\Metadata\IriConverterInterface;
25
use ApiPlatform\Metadata\Resource\Factory\ResourceMetadataCollectionFactoryInterface;
26
use ApiPlatform\Metadata\ResourceClassResolverInterface;
27
use ApiPlatform\Metadata\UrlGeneratorInterface;
28
use ApiPlatform\Metadata\Util\ResourceClassInfoTrait;
29
use ApiPlatform\Symfony\Messenger\DispatchTrait;
30
use Doctrine\Common\EventArgs;
31
use Doctrine\ODM\MongoDB\Event\OnFlushEventArgs as MongoDbOdmOnFlushEventArgs;
32
use Doctrine\ORM\Event\OnFlushEventArgs as OrmOnFlushEventArgs;
33
use Symfony\Component\ExpressionLanguage\ExpressionFunction;
34
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
35
use Symfony\Component\HttpFoundation\JsonResponse;
36
use Symfony\Component\Mercure\HubRegistry;
37
use Symfony\Component\Mercure\Update;
38
use Symfony\Component\Messenger\MessageBusInterface;
39
use Symfony\Component\Serializer\SerializerInterface;
40

41
/**
42
 * Publishes resources updates to the Mercure hub.
43
 *
44
 * @author Kévin Dunglas <dunglas@gmail.com>
45
 */
46
final class PublishMercureUpdatesListener
47
{
48
    use DispatchTrait;
49
    use ResourceClassInfoTrait;
50
    private const ALLOWED_KEYS = [
51
        'topics' => true,
52
        'data' => true,
53
        'private' => true,
54
        'id' => true,
55
        'type' => true,
56
        'retry' => true,
57
        'normalization_context' => true,
58
        'hub' => true,
59
        'enable_async_update' => true,
60
    ];
61
    private readonly ?ExpressionLanguage $expressionLanguage;
62
    private \SplObjectStorage $createdObjects;
63
    private \SplObjectStorage $updatedObjects;
64
    private \SplObjectStorage $deletedObjects;
65

66
    /**
67
     * @param array<string, string[]|string> $formats
68
     */
69
    public function __construct(LegacyResourceClassResolverInterface|ResourceClassResolverInterface $resourceClassResolver, private readonly LegacyIriConverterInterface|IriConverterInterface $iriConverter, ResourceMetadataCollectionFactoryInterface $resourceMetadataFactory, private readonly SerializerInterface $serializer, private readonly array $formats, MessageBusInterface $messageBus = null, private readonly ?HubRegistry $hubRegistry = null, private readonly ?GraphQlSubscriptionManagerInterface $graphQlSubscriptionManager = null, private readonly ?GraphQlMercureSubscriptionIriGeneratorInterface $graphQlMercureSubscriptionIriGenerator = null, ExpressionLanguage $expressionLanguage = null, private bool $includeType = false)
70
    {
71
        if (null === $messageBus && null === $hubRegistry) {
1,287✔
72
            throw new InvalidArgumentException('A message bus or a hub registry must be provided.');
×
73
        }
74

75
        $this->resourceClassResolver = $resourceClassResolver;
1,287✔
76

77
        $this->resourceMetadataFactory = $resourceMetadataFactory;
1,287✔
78
        $this->messageBus = $messageBus;
1,287✔
79
        $this->expressionLanguage = $expressionLanguage ?? (class_exists(ExpressionLanguage::class) ? new ExpressionLanguage() : null);
1,287✔
80
        $this->reset();
1,287✔
81

82
        if ($this->expressionLanguage) {
1,287✔
83
            $rawurlencode = ExpressionFunction::fromPhp('rawurlencode', 'escape');
1,287✔
84
            $this->expressionLanguage->addFunction($rawurlencode);
1,287✔
85

86
            $this->expressionLanguage->addFunction(
1,287✔
87
                new ExpressionFunction('iri', static fn (string $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL): string => sprintf('iri(%s, %d)', $apiResource, $referenceType), static fn (array $arguments, $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL): string => $iriConverter->getIriFromResource($apiResource, $referenceType))
1,287✔
88
            );
1,287✔
89
        }
90

91
        if (false === $this->includeType) {
1,287✔
92
            trigger_deprecation('api-platform/core', '3.1', 'Having mercure.include_type (always include @type in Mercure updates, even delete ones) set to false in the configuration is deprecated. It will be true by default in API Platform 4.0.');
×
93
        }
94
    }
95

96
    /**
97
     * Collects created, updated and deleted objects.
98
     */
99
    public function onFlush(EventArgs $eventArgs): void
100
    {
101
        if ($eventArgs instanceof OrmOnFlushEventArgs) {
1,287✔
102
            $uow = method_exists($eventArgs, 'getObjectManager') ? $eventArgs->getObjectManager()->getUnitOfWork() : $eventArgs->getEntityManager()->getUnitOfWork();
1,287✔
103
        } elseif ($eventArgs instanceof MongoDbOdmOnFlushEventArgs) {
×
104
            $uow = $eventArgs->getDocumentManager()->getUnitOfWork();
×
105
        } else {
106
            return;
×
107
        }
108

109
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityInsertions' : 'getScheduledDocumentInsertions';
1,287✔
110
        foreach ($uow->{$methodName}() as $object) {
1,287✔
111
            $this->storeObjectToPublish($object, 'createdObjects');
1,137✔
112
        }
113

114
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityUpdates' : 'getScheduledDocumentUpdates';
1,287✔
115
        foreach ($uow->{$methodName}() as $object) {
1,287✔
116
            $this->storeObjectToPublish($object, 'updatedObjects');
168✔
117
        }
118

119
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityDeletions' : 'getScheduledDocumentDeletions';
1,287✔
120
        foreach ($uow->{$methodName}() as $object) {
1,287✔
121
            $this->storeObjectToPublish($object, 'deletedObjects');
42✔
122
        }
123
    }
124

125
    /**
126
     * Publishes updates for changes collected on flush, and resets the store.
127
     */
128
    public function postFlush(): void
129
    {
130
        try {
131
            foreach ($this->createdObjects as $object) {
1,287✔
132
                $this->publishUpdate($object, $this->createdObjects[$object], 'create');
12✔
133
            }
134

135
            foreach ($this->updatedObjects as $object) {
1,287✔
136
                $this->publishUpdate($object, $this->updatedObjects[$object], 'update');
3✔
137
            }
138

139
            foreach ($this->deletedObjects as $object) {
1,287✔
UNCOV
140
                $this->publishUpdate($object, $this->deletedObjects[$object], 'delete');
×
141
            }
142
        } finally {
143
            $this->reset();
1,287✔
144
        }
145
    }
146

147
    private function reset(): void
148
    {
149
        $this->createdObjects = new \SplObjectStorage();
1,287✔
150
        $this->updatedObjects = new \SplObjectStorage();
1,287✔
151
        $this->deletedObjects = new \SplObjectStorage();
1,287✔
152
    }
153

154
    private function storeObjectToPublish(object $object, string $property): void
155
    {
156
        if (null === $resourceClass = $this->getResourceClass($object)) {
1,269✔
157
            return;
42✔
158
        }
159

160
        $operation = $this->resourceMetadataFactory->create($resourceClass)->getOperation();
1,248✔
161
        try {
162
            $options = $operation->getMercure() ?? false;
1,248✔
163
        } catch (OperationNotFoundException) {
×
164
            return;
×
165
        }
166

167
        if (\is_string($options)) {
1,248✔
168
            if (null === $this->expressionLanguage) {
×
169
                throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".');
×
170
            }
171

172
            $options = $this->expressionLanguage->evaluate($options, ['object' => $object]);
×
173
        }
174

175
        if (false === $options) {
1,248✔
176
            return;
1,242✔
177
        }
178

179
        if (true === $options) {
15✔
180
            $options = [];
12✔
181
        }
182

183
        if (!\is_array($options)) {
15✔
184
            throw new InvalidArgumentException(sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of options or an expression returning this array, "%s" given.', $resourceClass, \gettype($options)));
×
185
        }
186

187
        foreach ($options as $key => $value) {
15✔
188
            if (!isset(self::ALLOWED_KEYS[$key])) {
3✔
189
                throw new InvalidArgumentException(sprintf('The option "%s" set in the "mercure" attribute of the "%s" resource does not exist. Existing options: "%s"', $key, $resourceClass, implode('", "', self::ALLOWED_KEYS)));
×
190
            }
191
        }
192

193
        $options['enable_async_update'] ??= true;
15✔
194

195
        if ('deletedObjects' === $property) {
15✔
UNCOV
196
            $types = $operation instanceof HttpOperation ? $operation->getTypes() : null;
×
UNCOV
197
            if (null === $types) {
×
UNCOV
198
                $types = [$operation->getShortName()];
×
199
            }
200

201
            // We need to evaluate it here, because in publishUpdate() the resource would be already deleted
UNCOV
202
            $this->evaluateTopics($options, $object);
×
203

UNCOV
204
            $this->deletedObjects[(object) [
×
UNCOV
205
                'id' => $this->iriConverter->getIriFromResource($object),
×
UNCOV
206
                'iri' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL),
×
UNCOV
207
                'type' => 1 === \count($types) ? $types[0] : $types,
×
UNCOV
208
            ]] = $options;
×
209

UNCOV
210
            return;
×
211
        }
212

213
        $this->{$property}[$object] = $options;
15✔
214
    }
215

216
    private function publishUpdate(object $object, array $options, string $type): void
217
    {
218
        if ($object instanceof \stdClass) {
15✔
219
            // By convention, if the object has been deleted, we send only its IRI and its type.
220
            // This may change in the feature, because it's not JSON Merge Patch compliant,
221
            // and I'm not a fond of this approach.
UNCOV
222
            $iri = $options['topics'] ?? $object->iri;
×
223
            /** @var string $data */
UNCOV
224
            $data = json_encode(['@id' => $object->id] + ($this->includeType ? ['@type' => $object->type] : []), \JSON_THROW_ON_ERROR);
×
225
        } else {
226
            $resourceClass = $this->getObjectClass($object);
15✔
227
            $context = $options['normalization_context'] ?? $this->resourceMetadataFactory->create($resourceClass)->getOperation()->getNormalizationContext() ?? [];
15✔
228

229
            // We need to evaluate it here, because in storeObjectToPublish() the resource would not have been persisted yet
230
            $this->evaluateTopics($options, $object);
15✔
231

232
            $iri = $options['topics'] ?? $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL);
15✔
233
            $data = $options['data'] ?? $this->serializer->serialize($object, key($this->formats), $context);
15✔
234
        }
235

236
        $updates = array_merge([$this->buildUpdate($iri, $data, $options)], $this->getGraphQlSubscriptionUpdates($object, $options, $type));
15✔
237

238
        foreach ($updates as $update) {
15✔
239
            if ($options['enable_async_update'] && $this->messageBus) {
15✔
240
                $this->dispatch($update);
15✔
241
                continue;
15✔
242
            }
243

UNCOV
244
            $this->hubRegistry->getHub($options['hub'] ?? null)->publish($update);
×
245
        }
246
    }
247

248
    private function evaluateTopics(array &$options, object $object): void
249
    {
250
        if (!($options['topics'] ?? false)) {
15✔
251
            return;
12✔
252
        }
253

254
        $topics = [];
3✔
255
        foreach ((array) $options['topics'] as $topic) {
3✔
256
            if (!\is_string($topic)) {
3✔
257
                $topics[] = $topic;
×
258
                continue;
×
259
            }
260

261
            if (!str_starts_with($topic, '@=')) {
3✔
UNCOV
262
                $topics[] = $topic;
×
UNCOV
263
                continue;
×
264
            }
265

266
            if (null === $this->expressionLanguage) {
3✔
267
                throw new \LogicException('The "@=" expression syntax cannot be used without the Expression Language component. Try running "composer require symfony/expression-language".');
×
268
            }
269

270
            $topics[] = $this->expressionLanguage->evaluate(substr($topic, 2), ['object' => $object]);
3✔
271
        }
272

273
        $options['topics'] = $topics;
3✔
274
    }
275

276
    /**
277
     * @return Update[]
278
     */
279
    private function getGraphQlSubscriptionUpdates(object $object, array $options, string $type): array
280
    {
281
        if ('update' !== $type || !$this->graphQlSubscriptionManager || !$this->graphQlMercureSubscriptionIriGenerator) {
15✔
282
            return [];
12✔
283
        }
284

285
        $payloads = $this->graphQlSubscriptionManager->getPushPayloads($object);
3✔
286

287
        $updates = [];
3✔
288
        foreach ($payloads as [$subscriptionId, $data]) {
3✔
289
            $updates[] = $this->buildUpdate(
3✔
290
                $this->graphQlMercureSubscriptionIriGenerator->generateTopicIri($subscriptionId),
3✔
291
                (string) (new JsonResponse($data))->getContent(),
3✔
292
                $options
3✔
293
            );
3✔
294
        }
295

296
        return $updates;
3✔
297
    }
298

299
    /**
300
     * @param string|string[] $iri
301
     */
302
    private function buildUpdate(string|array $iri, string $data, array $options): Update
303
    {
304
        return new Update($iri, $data, $options['private'] ?? false, $options['id'] ?? null, $options['type'] ?? null, $options['retry'] ?? null);
15✔
305
    }
306
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc