• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

api-platform / core / 15904482964

26 Jun 2025 02:22PM UTC coverage: 21.957%. First build
15904482964

Pull #6904

github

web-flow
Merge cea37cac5 into 4ecde01e8
Pull Request #6904: feat(graphql): added support for graphql subscriptions to work for actions

55 of 252 new or added lines in 9 files covered. (21.83%)

11494 of 52347 relevant lines covered (21.96%)

21.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.66
/src/Symfony/Doctrine/EventListener/PublishMercureUpdatesListener.php
1
<?php
2

3
/*
4
 * This file is part of the API Platform project.
5
 *
6
 * (c) Kévin Dunglas <dunglas@gmail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace ApiPlatform\Symfony\Doctrine\EventListener;
15

16
use ApiPlatform\Doctrine\Common\Messenger\DispatchTrait;
17
use ApiPlatform\GraphQl\Subscription\MercureSubscriptionIriGeneratorInterface as GraphQlMercureSubscriptionIriGeneratorInterface;
18
use ApiPlatform\GraphQl\Subscription\SubscriptionManagerInterface as GraphQlSubscriptionManagerInterface;
19
use ApiPlatform\Metadata\Exception\InvalidArgumentException;
20
use ApiPlatform\Metadata\Exception\OperationNotFoundException;
21
use ApiPlatform\Metadata\Exception\RuntimeException;
22
use ApiPlatform\Metadata\HttpOperation;
23
use ApiPlatform\Metadata\IriConverterInterface;
24
use ApiPlatform\Metadata\Operation;
25
use ApiPlatform\Metadata\Resource\Factory\ResourceMetadataCollectionFactoryInterface;
26
use ApiPlatform\Metadata\ResourceClassResolverInterface;
27
use ApiPlatform\Metadata\UrlGeneratorInterface;
28
use ApiPlatform\Metadata\Util\ResourceClassInfoTrait;
29
use Doctrine\Common\EventArgs;
30
use Doctrine\ODM\MongoDB\Event\OnFlushEventArgs as MongoDbOdmOnFlushEventArgs;
31
use Doctrine\ORM\Event\OnFlushEventArgs as OrmOnFlushEventArgs;
32
use Symfony\Component\ExpressionLanguage\ExpressionFunction;
33
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
34
use Symfony\Component\HttpFoundation\JsonResponse;
35
use Symfony\Component\Mercure\HubRegistry;
36
use Symfony\Component\Mercure\Update;
37
use Symfony\Component\Messenger\MessageBusInterface;
38
use Symfony\Component\Serializer\SerializerInterface;
39

40
/**
41
 * Publishes resources updates to the Mercure hub.
42
 *
43
 * @author Kévin Dunglas <dunglas@gmail.com>
44
 */
45
final class PublishMercureUpdatesListener
46
{
47
    use DispatchTrait;
48
    use ResourceClassInfoTrait;
49
    private const ALLOWED_KEYS = [
50
        'topics' => true,
51
        'data' => true,
52
        'private' => true,
53
        'private_fields' => true,
54
        'id' => true,
55
        'type' => true,
56
        'retry' => true,
57
        'normalization_context' => true,
58
        'hub' => true,
59
        'enable_async_update' => true,
60
    ];
61
    private readonly ?ExpressionLanguage $expressionLanguage;
62
    private \SplObjectStorage $createdObjects;
63
    private \SplObjectStorage $updatedObjects;
64
    private \SplObjectStorage $deletedObjects;
65

66
    /**
67
     * @param array<string, string[]|string> $formats
68
     */
69
    public function __construct(ResourceClassResolverInterface $resourceClassResolver, private readonly IriConverterInterface $iriConverter, ResourceMetadataCollectionFactoryInterface $resourceMetadataFactory, private readonly SerializerInterface $serializer, private readonly array $formats, ?MessageBusInterface $messageBus = null, private readonly ?HubRegistry $hubRegistry = null, private readonly ?GraphQlSubscriptionManagerInterface $graphQlSubscriptionManager = null, private readonly ?GraphQlMercureSubscriptionIriGeneratorInterface $graphQlMercureSubscriptionIriGenerator = null, ?ExpressionLanguage $expressionLanguage = null, private bool $includeType = false)
70
    {
71
        if (null === $messageBus && null === $hubRegistry) {
246✔
72
            throw new InvalidArgumentException('A message bus or a hub registry must be provided.');
×
73
        }
74

75
        $this->resourceClassResolver = $resourceClassResolver;
246✔
76

77
        $this->resourceMetadataFactory = $resourceMetadataFactory;
246✔
78
        $this->messageBus = $messageBus;
246✔
79
        $this->expressionLanguage = $expressionLanguage ?? (class_exists(ExpressionLanguage::class) ? new ExpressionLanguage() : null);
246✔
80
        $this->reset();
246✔
81

82
        if ($this->expressionLanguage) {
246✔
83
            $rawurlencode = ExpressionFunction::fromPhp('rawurlencode', 'escape');
246✔
84
            $this->expressionLanguage->addFunction($rawurlencode);
246✔
85

86
            $this->expressionLanguage->addFunction(
246✔
87
                new ExpressionFunction('get_operation', static fn (string $apiResource, string $name): string => \sprintf('getOperation(%s, %s)', $apiResource, $name), static fn (array $arguments, $apiResource, string $name): Operation => $resourceMetadataFactory->create($resourceClassResolver->getResourceClass($apiResource))->getOperation($name))
246✔
88
            );
246✔
89
            $this->expressionLanguage->addFunction(
246✔
90
                new ExpressionFunction('iri', static fn (string $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL, ?string $operation = null): string => \sprintf('iri(%s, %d, %s)', $apiResource, $referenceType, $operation), static fn (array $arguments, $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL, $operation = null): string => $iriConverter->getIriFromResource($apiResource, $referenceType, $operation))
246✔
91
            );
246✔
92
        }
93

94
        if (false === $this->includeType) {
246✔
95
            trigger_deprecation('api-platform/core', '3.1', 'Having mercure.include_type (always include @type in Mercure updates, even delete ones) set to false in the configuration is deprecated. It will be true by default in API Platform 4.0.');
×
96
        }
97
    }
98

99
    /**
100
     * Collects created, updated and deleted objects.
101
     */
102
    public function onFlush(EventArgs $eventArgs): void
103
    {
104
        if ($eventArgs instanceof OrmOnFlushEventArgs) {
246✔
105
            // @phpstan-ignore-next-line
106
            $uow = method_exists($eventArgs, 'getObjectManager') ? $eventArgs->getObjectManager()->getUnitOfWork() : $eventArgs->getEntityManager()->getUnitOfWork();
246✔
107
        } elseif ($eventArgs instanceof MongoDbOdmOnFlushEventArgs) {
×
108
            $uow = $eventArgs->getDocumentManager()->getUnitOfWork();
×
109
        } else {
110
            return;
×
111
        }
112

113
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityInsertions' : 'getScheduledDocumentInsertions';
246✔
114
        foreach ($uow->{$methodName}() as $object) {
246✔
115
            $this->storeObjectToPublish($object, 'createdObjects');
246✔
116
        }
117

118
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityUpdates' : 'getScheduledDocumentUpdates';
246✔
119
        foreach ($uow->{$methodName}() as $object) {
246✔
120
            $this->storeObjectToPublish($object, 'updatedObjects');
×
121
        }
122

123
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityDeletions' : 'getScheduledDocumentDeletions';
246✔
124
        foreach ($uow->{$methodName}() as $object) {
246✔
125
            $this->storeObjectToPublish($object, 'deletedObjects');
×
126
        }
127
    }
128

129
    /**
130
     * Publishes updates for changes collected on flush, and resets the store.
131
     */
132
    public function postFlush(): void
133
    {
134
        try {
135
            $creatingObjects = clone $this->createdObjects;
246✔
136
            foreach ($creatingObjects as $object) {
246✔
137
                if ($this->createdObjects->contains($object)) {
×
138
                    $this->createdObjects->detach($object);
×
139
                }
140
                $this->publishUpdate($object, $creatingObjects[$object], 'create');
×
141
            }
142

143
            $updatingObjects = clone $this->updatedObjects;
246✔
144
            foreach ($updatingObjects as $object) {
246✔
145
                if ($this->updatedObjects->contains($object)) {
×
146
                    $this->updatedObjects->detach($object);
×
147
                }
148
                $this->publishUpdate($object, $updatingObjects[$object], 'update');
×
149
            }
150

151
            $deletingObjects = clone $this->deletedObjects;
246✔
152
            foreach ($deletingObjects as $object) {
246✔
153
                $options = $this->deletedObjects[$object];
×
154
                if ($this->deletedObjects->contains($object)) {
×
155
                    $this->deletedObjects->detach($object);
×
156
                }
157
                $this->publishUpdate($object, $deletingObjects[$object], 'delete');
×
158
            }
159
        } finally {
160
            $this->reset();
246✔
161
        }
162
    }
163

164
    private function reset(): void
165
    {
166
        $this->createdObjects = new \SplObjectStorage();
246✔
167
        $this->updatedObjects = new \SplObjectStorage();
246✔
168
        $this->deletedObjects = new \SplObjectStorage();
246✔
169
    }
170

171
    private function storeObjectToPublish(object $object, string $property): void
172
    {
173
        if (null === $resourceClass = $this->getResourceClass($object)) {
246✔
174
            return;
4✔
175
        }
176

177
        $operation = $this->resourceMetadataFactory->create($resourceClass)->getOperation();
242✔
178
        try {
179
            $options = $operation->getMercure() ?? false;
242✔
180
        } catch (OperationNotFoundException) {
×
181
            return;
×
182
        }
183

184
        if (\is_string($options)) {
242✔
185
            if (null === $this->expressionLanguage) {
×
186
                throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".');
×
187
            }
188

189
            $options = $this->expressionLanguage->evaluate($options, ['object' => $object]);
×
190
        }
191

192
        if (false === $options) {
242✔
193
            return;
242✔
194
        }
195

196
        if (true === $options) {
×
197
            $options = [];
×
198
        }
199

200
        if (!\is_array($options)) {
×
201
            throw new InvalidArgumentException(\sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of options or an expression returning this array, "%s" given.', $resourceClass, \gettype($options)));
×
202
        }
203

204
        foreach ($options as $key => $value) {
×
205
            if (!isset(self::ALLOWED_KEYS[$key])) {
×
206
                throw new InvalidArgumentException(\sprintf('The option "%s" set in the "mercure" attribute of the "%s" resource does not exist. Existing options: "%s"', $key, $resourceClass, implode('", "', array_keys(self::ALLOWED_KEYS))));
×
207
            }
208
        }
209

210
        $options['enable_async_update'] ??= true;
×
211

212
        if ('deletedObjects' === $property) {
×
213
            $types = $operation instanceof HttpOperation ? $operation->getTypes() : null;
×
214
            if (null === $types) {
×
215
                $types = [$operation->getShortName()];
×
216
            }
217

218
            // We need to evaluate it here, because in publishUpdate() the resource would be already deleted
219
            $this->evaluateTopics($options, $object);
×
220

NEW
221
            $privateData = [];
×
NEW
222
            $mercureOptions = $operation ? ($operation->getMercure() ?? false) : false;
×
NEW
223
            $private = $mercureOptions['private'] ?? false;
×
NEW
224
            $privateFields = $mercureOptions['private_fields'] ?? [];
×
NEW
225
            if ($private && $privateFields) {
×
NEW
226
                foreach ($privateFields as $privateField) {
×
NEW
227
                    if (property_exists($object, $privateField)) {
×
NEW
228
                        $privateData[$privateField] = $this->getResourceId($privateField, $object);
×
229
                    }
230
                }
231
            }
232

233
            $this->deletedObjects[(object) [
×
234
                'id' => $this->iriConverter->getIriFromResource($object),
×
235
                'iri' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL),
×
236
                'type' => 1 === \count($types) ? $types[0] : $types,
×
NEW
237
                'private' => $privateData,
×
238
            ]] = $options;
×
239

240
            return;
×
241
        }
242

243
        $this->{$property}[$object] = $options;
×
244
    }
245

246
    private function publishUpdate(object $object, array $options, string $type): void
247
    {
248
        if ($object instanceof \stdClass) {
×
249
            // By convention, if the object has been deleted, we send only its IRI and its type.
250
            // This may change in the feature, because it's not JSON Merge Patch compliant,
251
            // and I'm not a fond of this approach.
252
            $iri = $options['topics'] ?? $object->iri;
×
253
            /** @var string $data */
254
            $data = json_encode(['@id' => $object->id] + ($this->includeType ? ['@type' => $object->type] : []), \JSON_THROW_ON_ERROR);
×
255
        } else {
256
            $resourceClass = $this->getObjectClass($object);
×
257
            $context = $options['normalization_context'] ?? $this->resourceMetadataFactory->create($resourceClass)->getOperation()->getNormalizationContext() ?? [];
×
258

259
            // We need to evaluate it here, because in storeObjectToPublish() the resource would not have been persisted yet
260
            $this->evaluateTopics($options, $object);
×
261

262
            $iri = $options['topics'] ?? $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL);
×
263
            $data = $options['data'] ?? $this->serializer->serialize($object, key($this->formats), $context);
×
264
        }
265

266
        $updates = array_merge([$this->buildUpdate($iri, $data, $options)], $this->getGraphQlSubscriptionUpdates($object, $options, $type));
×
267
        foreach ($updates as $update) {
×
268
            if ($options['enable_async_update'] && $this->messageBus) {
×
269
                $this->dispatch($update);
×
270
                continue;
×
271
            }
272

273
            $this->hubRegistry->getHub($options['hub'] ?? null)->publish($update);
×
274
        }
275
    }
276

277
    private function evaluateTopics(array &$options, object $object): void
278
    {
279
        if (!($options['topics'] ?? false)) {
×
280
            return;
×
281
        }
282

283
        $topics = [];
×
284
        foreach ((array) $options['topics'] as $topic) {
×
285
            if (!\is_string($topic)) {
×
286
                $topics[] = $topic;
×
287
                continue;
×
288
            }
289

290
            if (!str_starts_with($topic, '@=')) {
×
291
                $topics[] = $topic;
×
292
                continue;
×
293
            }
294

295
            if (null === $this->expressionLanguage) {
×
296
                throw new \LogicException('The "@=" expression syntax cannot be used without the Expression Language component. Try running "composer require symfony/expression-language".');
×
297
            }
298

299
            $topics[] = $this->expressionLanguage->evaluate(substr($topic, 2), ['object' => $object]);
×
300
        }
301

302
        $options['topics'] = $topics;
×
303
    }
304

305
    /**
306
     * @return Update[]
307
     */
308
    private function getGraphQlSubscriptionUpdates(object $object, array $options, string $type): array
309
    {
NEW
310
        if (!$this->graphQlSubscriptionManager || !$this->graphQlMercureSubscriptionIriGenerator) {
×
311
            return [];
×
312
        }
313

NEW
314
        $payloads = $this->graphQlSubscriptionManager->getPushPayloads($object, $type);
×
315

316
        $updates = [];
×
317
        foreach ($payloads as [$subscriptionId, $data]) {
×
318
            $updates[] = $this->buildUpdate(
×
319
                $this->graphQlMercureSubscriptionIriGenerator->generateTopicIri($subscriptionId),
×
320
                (string) (new JsonResponse($data))->getContent(),
×
321
                $options
×
322
            );
×
323
        }
324

325
        return $updates;
×
326
    }
327

328
    /**
329
     * @param string|string[] $iri
330
     */
331
    private function buildUpdate(string|array $iri, string $data, array $options): Update
332
    {
333
        return new Update($iri, $data, $options['private'] ?? false, $options['id'] ?? null, $options['type'] ?? null, $options['retry'] ?? null);
×
334
    }
335

336
    private function getResourceId(string $privateField, object $object): string
337
    {
NEW
338
        $id = $object->{'get'.ucfirst($privateField)}()->getId();
×
NEW
339
        if ($id instanceof \Stringable || is_numeric($id)) {
×
NEW
340
            return (string) $id;
×
341
        }
342

NEW
343
        return $id;
×
344
    }
345
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc