• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

api-platform / core / 9710836697

28 Jun 2024 09:35AM UTC coverage: 63.285% (+1.2%) from 62.122%
9710836697

push

github

soyuka
docs: changelog v3.3.7

11104 of 17546 relevant lines covered (63.29%)

52.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.21
/src/Doctrine/EventListener/PublishMercureUpdatesListener.php
1
<?php
2

3
/*
4
 * This file is part of the API Platform project.
5
 *
6
 * (c) Kévin Dunglas <dunglas@gmail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace ApiPlatform\Doctrine\EventListener;
15

16
use ApiPlatform\Api\IriConverterInterface as LegacyIriConverterInterface;
17
use ApiPlatform\Api\ResourceClassResolverInterface as LegacyResourceClassResolverInterface;
18
use ApiPlatform\Exception\InvalidArgumentException;
19
use ApiPlatform\Exception\OperationNotFoundException;
20
use ApiPlatform\Exception\RuntimeException;
21
use ApiPlatform\GraphQl\Subscription\MercureSubscriptionIriGeneratorInterface as GraphQlMercureSubscriptionIriGeneratorInterface;
22
use ApiPlatform\GraphQl\Subscription\SubscriptionManagerInterface as GraphQlSubscriptionManagerInterface;
23
use ApiPlatform\Metadata\HttpOperation;
24
use ApiPlatform\Metadata\IriConverterInterface;
25
use ApiPlatform\Metadata\Operation;
26
use ApiPlatform\Metadata\Resource\Factory\ResourceMetadataCollectionFactoryInterface;
27
use ApiPlatform\Metadata\ResourceClassResolverInterface;
28
use ApiPlatform\Metadata\UrlGeneratorInterface;
29
use ApiPlatform\Metadata\Util\ResourceClassInfoTrait;
30
use ApiPlatform\Symfony\Messenger\DispatchTrait;
31
use Doctrine\Common\EventArgs;
32
use Doctrine\ODM\MongoDB\Event\OnFlushEventArgs as MongoDbOdmOnFlushEventArgs;
33
use Doctrine\ORM\Event\OnFlushEventArgs as OrmOnFlushEventArgs;
34
use Symfony\Component\ExpressionLanguage\ExpressionFunction;
35
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
36
use Symfony\Component\HttpFoundation\JsonResponse;
37
use Symfony\Component\Mercure\HubRegistry;
38
use Symfony\Component\Mercure\Update;
39
use Symfony\Component\Messenger\MessageBusInterface;
40
use Symfony\Component\Serializer\SerializerInterface;
41

42
/**
43
 * Publishes resources updates to the Mercure hub.
44
 *
45
 * @author Kévin Dunglas <dunglas@gmail.com>
46
 */
47
final class PublishMercureUpdatesListener
48
{
49
    use DispatchTrait;
50
    use ResourceClassInfoTrait;
51
    private const ALLOWED_KEYS = [
52
        'topics' => true,
53
        'data' => true,
54
        'private' => true,
55
        'id' => true,
56
        'type' => true,
57
        'retry' => true,
58
        'normalization_context' => true,
59
        'hub' => true,
60
        'enable_async_update' => true,
61
    ];
62
    private readonly ?ExpressionLanguage $expressionLanguage;
63
    private \SplObjectStorage $createdObjects;
64
    private \SplObjectStorage $updatedObjects;
65
    private \SplObjectStorage $deletedObjects;
66

67
    /**
68
     * @param array<string, string[]|string> $formats
69
     */
70
    public function __construct(LegacyResourceClassResolverInterface|ResourceClassResolverInterface $resourceClassResolver, private readonly LegacyIriConverterInterface|IriConverterInterface $iriConverter, ResourceMetadataCollectionFactoryInterface $resourceMetadataFactory, private readonly SerializerInterface $serializer, private readonly array $formats, ?MessageBusInterface $messageBus = null, private readonly ?HubRegistry $hubRegistry = null, private readonly ?GraphQlSubscriptionManagerInterface $graphQlSubscriptionManager = null, private readonly ?GraphQlMercureSubscriptionIriGeneratorInterface $graphQlMercureSubscriptionIriGenerator = null, ?ExpressionLanguage $expressionLanguage = null, private bool $includeType = false)
71
    {
72
        if (null === $messageBus && null === $hubRegistry) {
83✔
73
            throw new InvalidArgumentException('A message bus or a hub registry must be provided.');
×
74
        }
75

76
        $this->resourceClassResolver = $resourceClassResolver;
83✔
77

78
        $this->resourceMetadataFactory = $resourceMetadataFactory;
83✔
79
        $this->messageBus = $messageBus;
83✔
80
        $this->expressionLanguage = $expressionLanguage ?? (class_exists(ExpressionLanguage::class) ? new ExpressionLanguage() : null);
83✔
81
        $this->reset();
83✔
82

83
        if ($this->expressionLanguage) {
83✔
84
            $rawurlencode = ExpressionFunction::fromPhp('rawurlencode', 'escape');
83✔
85
            $this->expressionLanguage->addFunction($rawurlencode);
83✔
86

87
            $this->expressionLanguage->addFunction(
83✔
88
                new ExpressionFunction('get_operation', static fn (string $apiResource, string $name): string => sprintf('getOperation(%s, %s)', $apiResource, $name), static fn (array $arguments, $apiResource, string $name): Operation => $resourceMetadataFactory->create($resourceClassResolver->getResourceClass($apiResource))->getOperation($name))
83✔
89
            );
83✔
90
            $this->expressionLanguage->addFunction(
83✔
91
                new ExpressionFunction('iri', static fn (string $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL, ?string $operation = null): string => sprintf('iri(%s, %d, %s)', $apiResource, $referenceType, $operation), static fn (array $arguments, $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL, $operation = null): string => $iriConverter->getIriFromResource($apiResource, $referenceType, $operation))
83✔
92
            );
83✔
93
        }
94

95
        if (false === $this->includeType) {
83✔
96
            trigger_deprecation('api-platform/core', '3.1', 'Having mercure.include_type (always include @type in Mercure updates, even delete ones) set to false in the configuration is deprecated. It will be true by default in API Platform 4.0.');
×
97
        }
98
    }
99

100
    /**
101
     * Collects created, updated and deleted objects.
102
     */
103
    public function onFlush(EventArgs $eventArgs): void
104
    {
105
        if ($eventArgs instanceof OrmOnFlushEventArgs) {
83✔
106
            // @phpstan-ignore-next-line
107
            $uow = method_exists($eventArgs, 'getObjectManager') ? $eventArgs->getObjectManager()->getUnitOfWork() : $eventArgs->getEntityManager()->getUnitOfWork();
83✔
108
        } elseif ($eventArgs instanceof MongoDbOdmOnFlushEventArgs) {
×
109
            $uow = $eventArgs->getDocumentManager()->getUnitOfWork();
×
110
        } else {
111
            return;
×
112
        }
113

114
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityInsertions' : 'getScheduledDocumentInsertions';
83✔
115
        foreach ($uow->{$methodName}() as $object) {
83✔
116
            $this->storeObjectToPublish($object, 'createdObjects');
79✔
117
        }
118

119
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityUpdates' : 'getScheduledDocumentUpdates';
83✔
120
        foreach ($uow->{$methodName}() as $object) {
83✔
121
            $this->storeObjectToPublish($object, 'updatedObjects');
12✔
122
        }
123

124
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityDeletions' : 'getScheduledDocumentDeletions';
83✔
125
        foreach ($uow->{$methodName}() as $object) {
83✔
126
            $this->storeObjectToPublish($object, 'deletedObjects');
8✔
127
        }
128
    }
129

130
    /**
131
     * Publishes updates for changes collected on flush, and resets the store.
132
     */
133
    public function postFlush(): void
134
    {
135
        try {
136
            foreach ($this->createdObjects as $object) {
83✔
137
                $this->publishUpdate($object, $this->createdObjects[$object], 'create');
8✔
138
            }
139

140
            foreach ($this->updatedObjects as $object) {
83✔
141
                $this->publishUpdate($object, $this->updatedObjects[$object], 'update');
12✔
142
            }
143

144
            foreach ($this->deletedObjects as $object) {
83✔
145
                $this->publishUpdate($object, $this->deletedObjects[$object], 'delete');
8✔
146
            }
147
        } finally {
148
            $this->reset();
83✔
149
        }
150
    }
151

152
    private function reset(): void
153
    {
154
        $this->createdObjects = new \SplObjectStorage();
83✔
155
        $this->updatedObjects = new \SplObjectStorage();
83✔
156
        $this->deletedObjects = new \SplObjectStorage();
83✔
157
    }
158

159
    private function storeObjectToPublish(object $object, string $property): void
160
    {
161
        if (null === $resourceClass = $this->getResourceClass($object)) {
83✔
162
            return;
4✔
163
        }
164

165
        $operation = $this->resourceMetadataFactory->create($resourceClass)->getOperation();
83✔
166
        try {
167
            $options = $operation->getMercure() ?? false;
83✔
168
        } catch (OperationNotFoundException) {
×
169
            return;
×
170
        }
171

172
        if (\is_string($options)) {
83✔
173
            if (null === $this->expressionLanguage) {
×
174
                throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".');
×
175
            }
176

177
            $options = $this->expressionLanguage->evaluate($options, ['object' => $object]);
×
178
        }
179

180
        if (false === $options) {
83✔
181
            return;
75✔
182
        }
183

184
        if (true === $options) {
12✔
185
            $options = [];
×
186
        }
187

188
        if (!\is_array($options)) {
12✔
189
            throw new InvalidArgumentException(sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of options or an expression returning this array, "%s" given.', $resourceClass, \gettype($options)));
×
190
        }
191

192
        foreach ($options as $key => $value) {
12✔
193
            if (!isset(self::ALLOWED_KEYS[$key])) {
12✔
194
                throw new InvalidArgumentException(sprintf('The option "%s" set in the "mercure" attribute of the "%s" resource does not exist. Existing options: "%s"', $key, $resourceClass, implode('", "', self::ALLOWED_KEYS)));
×
195
            }
196
        }
197

198
        $options['enable_async_update'] ??= true;
12✔
199

200
        if ('deletedObjects' === $property) {
12✔
201
            $types = $operation instanceof HttpOperation ? $operation->getTypes() : null;
8✔
202
            if (null === $types) {
8✔
203
                $types = [$operation->getShortName()];
8✔
204
            }
205

206
            // We need to evaluate it here, because in publishUpdate() the resource would be already deleted
207
            $this->evaluateTopics($options, $object);
8✔
208

209
            $this->deletedObjects[(object) [
8✔
210
                'id' => $this->iriConverter->getIriFromResource($object),
8✔
211
                'iri' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL),
8✔
212
                'type' => 1 === \count($types) ? $types[0] : $types,
8✔
213
            ]] = $options;
8✔
214

215
            return;
8✔
216
        }
217

218
        $this->{$property}[$object] = $options;
12✔
219
    }
220

221
    private function publishUpdate(object $object, array $options, string $type): void
222
    {
223
        if ($object instanceof \stdClass) {
12✔
224
            // By convention, if the object has been deleted, we send only its IRI and its type.
225
            // This may change in the feature, because it's not JSON Merge Patch compliant,
226
            // and I'm not a fond of this approach.
227
            $iri = $options['topics'] ?? $object->iri;
8✔
228
            /** @var string $data */
229
            $data = json_encode(['@id' => $object->id] + ($this->includeType ? ['@type' => $object->type] : []), \JSON_THROW_ON_ERROR);
8✔
230
        } else {
231
            $resourceClass = $this->getObjectClass($object);
12✔
232
            $context = $options['normalization_context'] ?? $this->resourceMetadataFactory->create($resourceClass)->getOperation()->getNormalizationContext() ?? [];
12✔
233

234
            // We need to evaluate it here, because in storeObjectToPublish() the resource would not have been persisted yet
235
            $this->evaluateTopics($options, $object);
12✔
236

237
            $iri = $options['topics'] ?? $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL);
12✔
238
            $data = $options['data'] ?? $this->serializer->serialize($object, key($this->formats), $context);
12✔
239
        }
240

241
        $updates = array_merge([$this->buildUpdate($iri, $data, $options)], $this->getGraphQlSubscriptionUpdates($object, $options, $type));
12✔
242

243
        foreach ($updates as $update) {
12✔
244
            if ($options['enable_async_update'] && $this->messageBus) {
12✔
245
                $this->dispatch($update);
×
246
                continue;
×
247
            }
248

249
            $this->hubRegistry->getHub($options['hub'] ?? null)->publish($update);
12✔
250
        }
251
    }
252

253
    private function evaluateTopics(array &$options, object $object): void
254
    {
255
        if (!($options['topics'] ?? false)) {
12✔
256
            return;
8✔
257
        }
258

259
        $topics = [];
8✔
260
        foreach ((array) $options['topics'] as $topic) {
8✔
261
            if (!\is_string($topic)) {
8✔
262
                $topics[] = $topic;
×
263
                continue;
×
264
            }
265

266
            if (!str_starts_with($topic, '@=')) {
8✔
267
                $topics[] = $topic;
4✔
268
                continue;
4✔
269
            }
270

271
            if (null === $this->expressionLanguage) {
4✔
272
                throw new \LogicException('The "@=" expression syntax cannot be used without the Expression Language component. Try running "composer require symfony/expression-language".');
×
273
            }
274

275
            $topics[] = $this->expressionLanguage->evaluate(substr($topic, 2), ['object' => $object]);
4✔
276
        }
277

278
        $options['topics'] = $topics;
8✔
279
    }
280

281
    /**
282
     * @return Update[]
283
     */
284
    private function getGraphQlSubscriptionUpdates(object $object, array $options, string $type): array
285
    {
286
        if ('update' !== $type || !$this->graphQlSubscriptionManager || !$this->graphQlMercureSubscriptionIriGenerator) {
12✔
287
            return [];
8✔
288
        }
289

290
        $payloads = $this->graphQlSubscriptionManager->getPushPayloads($object);
4✔
291

292
        $updates = [];
4✔
293
        foreach ($payloads as [$subscriptionId, $data]) {
4✔
294
            $updates[] = $this->buildUpdate(
4✔
295
                $this->graphQlMercureSubscriptionIriGenerator->generateTopicIri($subscriptionId),
4✔
296
                (string) (new JsonResponse($data))->getContent(),
4✔
297
                $options
4✔
298
            );
4✔
299
        }
300

301
        return $updates;
4✔
302
    }
303

304
    /**
305
     * @param string|string[] $iri
306
     */
307
    private function buildUpdate(string|array $iri, string $data, array $options): Update
308
    {
309
        return new Update($iri, $data, $options['private'] ?? false, $options['id'] ?? null, $options['type'] ?? null, $options['retry'] ?? null);
12✔
310
    }
311
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc