• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

api-platform / core / 3713134090

pending completion
3713134090

Pull #5254

github

GitHub
Merge b2ec54b3c into ac711530f
Pull Request #5254: [OpenApi] Add ApiResource::openapi and deprecate openapiContext

197 of 197 new or added lines in 5 files covered. (100.0%)

10372 of 12438 relevant lines covered (83.39%)

11.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.19
/src/Doctrine/EventListener/PublishMercureUpdatesListener.php
1
<?php
2

3
/*
4
 * This file is part of the API Platform project.
5
 *
6
 * (c) Kévin Dunglas <dunglas@gmail.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11

12
declare(strict_types=1);
13

14
namespace ApiPlatform\Doctrine\EventListener;
15

16
use ApiPlatform\Api\IriConverterInterface;
17
use ApiPlatform\Api\ResourceClassResolverInterface;
18
use ApiPlatform\Api\UrlGeneratorInterface;
19
use ApiPlatform\Exception\InvalidArgumentException;
20
use ApiPlatform\Exception\OperationNotFoundException;
21
use ApiPlatform\Exception\RuntimeException;
22
use ApiPlatform\GraphQl\Subscription\MercureSubscriptionIriGeneratorInterface as GraphQlMercureSubscriptionIriGeneratorInterface;
23
use ApiPlatform\GraphQl\Subscription\SubscriptionManagerInterface as GraphQlSubscriptionManagerInterface;
24
use ApiPlatform\Metadata\Resource\Factory\ResourceMetadataCollectionFactoryInterface;
25
use ApiPlatform\Symfony\Messenger\DispatchTrait;
26
use ApiPlatform\Util\ResourceClassInfoTrait;
27
use Doctrine\Common\EventArgs;
28
use Doctrine\ODM\MongoDB\Event\OnFlushEventArgs as MongoDbOdmOnFlushEventArgs;
29
use Doctrine\ORM\Event\OnFlushEventArgs as OrmOnFlushEventArgs;
30
use Symfony\Component\ExpressionLanguage\ExpressionFunction;
31
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
32
use Symfony\Component\HttpFoundation\JsonResponse;
33
use Symfony\Component\Mercure\HubRegistry;
34
use Symfony\Component\Mercure\Update;
35
use Symfony\Component\Messenger\MessageBusInterface;
36
use Symfony\Component\Serializer\SerializerInterface;
37

38
/**
39
 * Publishes resources updates to the Mercure hub.
40
 *
41
 * @author Kévin Dunglas <dunglas@gmail.com>
42
 */
43
final class PublishMercureUpdatesListener
44
{
45
    use DispatchTrait;
46
    use ResourceClassInfoTrait;
47
    private const ALLOWED_KEYS = [
48
        'topics' => true,
49
        'data' => true,
50
        'private' => true,
51
        'id' => true,
52
        'type' => true,
53
        'retry' => true,
54
        'normalization_context' => true,
55
        'hub' => true,
56
        'enable_async_update' => true,
57
    ];
58
    private readonly ?ExpressionLanguage $expressionLanguage;
59
    private \SplObjectStorage $createdObjects;
60
    private \SplObjectStorage $updatedObjects;
61
    private \SplObjectStorage $deletedObjects;
62

63
    /**
64
     * @param array<string, string[]|string> $formats
65
     */
66
    public function __construct(ResourceClassResolverInterface $resourceClassResolver, private readonly IriConverterInterface $iriConverter, ResourceMetadataCollectionFactoryInterface $resourceMetadataFactory, private readonly SerializerInterface $serializer, private readonly array $formats, MessageBusInterface $messageBus = null, private readonly ?HubRegistry $hubRegistry = null, private readonly ?GraphQlSubscriptionManagerInterface $graphQlSubscriptionManager = null, private readonly ?GraphQlMercureSubscriptionIriGeneratorInterface $graphQlMercureSubscriptionIriGenerator = null, ExpressionLanguage $expressionLanguage = null)
67
    {
68
        if (null === $messageBus && null === $hubRegistry) {
7✔
69
            throw new InvalidArgumentException('A message bus or a hub registry must be provided.');
×
70
        }
71

72
        $this->resourceClassResolver = $resourceClassResolver;
7✔
73

74
        $this->resourceMetadataFactory = $resourceMetadataFactory;
7✔
75
        $this->messageBus = $messageBus;
7✔
76
        $this->expressionLanguage = $expressionLanguage ?? (class_exists(ExpressionLanguage::class) ? new ExpressionLanguage() : null);
7✔
77
        $this->reset();
7✔
78

79
        if ($this->expressionLanguage) {
7✔
80
            $rawurlencode = ExpressionFunction::fromPhp('rawurlencode', 'escape');
7✔
81
            $this->expressionLanguage->addFunction($rawurlencode);
7✔
82

83
            $this->expressionLanguage->addFunction(
7✔
84
                new ExpressionFunction('iri', static fn (string $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL): string => sprintf('iri(%s, %d)', $apiResource, $referenceType), static fn (array $arguments, $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL): string => $iriConverter->getIriFromResource($apiResource, $referenceType))
7✔
85
            );
7✔
86
        }
87
    }
88

89
    /**
90
     * Collects created, updated and deleted objects.
91
     */
92
    public function onFlush(EventArgs $eventArgs): void
93
    {
94
        if ($eventArgs instanceof OrmOnFlushEventArgs) {
7✔
95
            $uow = $eventArgs->getEntityManager()->getUnitOfWork();
7✔
96
        } elseif ($eventArgs instanceof MongoDbOdmOnFlushEventArgs) {
×
97
            $uow = $eventArgs->getDocumentManager()->getUnitOfWork();
×
98
        } else {
99
            return;
×
100
        }
101

102
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityInsertions' : 'getScheduledDocumentInsertions';
7✔
103
        foreach ($uow->{$methodName}() as $object) {
7✔
104
            $this->storeObjectToPublish($object, 'createdObjects');
6✔
105
        }
106

107
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityUpdates' : 'getScheduledDocumentUpdates';
7✔
108
        foreach ($uow->{$methodName}() as $object) {
7✔
109
            $this->storeObjectToPublish($object, 'updatedObjects');
2✔
110
        }
111

112
        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityDeletions' : 'getScheduledDocumentDeletions';
7✔
113
        foreach ($uow->{$methodName}() as $object) {
7✔
114
            $this->storeObjectToPublish($object, 'deletedObjects');
1✔
115
        }
116
    }
117

118
    /**
119
     * Publishes updates for changes collected on flush, and resets the store.
120
     */
121
    public function postFlush(): void
122
    {
123
        try {
124
            foreach ($this->createdObjects as $object) {
7✔
125
                $this->publishUpdate($object, $this->createdObjects[$object], 'create');
1✔
126
            }
127

128
            foreach ($this->updatedObjects as $object) {
7✔
129
                $this->publishUpdate($object, $this->updatedObjects[$object], 'update');
2✔
130
            }
131

132
            foreach ($this->deletedObjects as $object) {
7✔
133
                $this->publishUpdate($object, $this->deletedObjects[$object], 'delete');
1✔
134
            }
135
        } finally {
136
            $this->reset();
7✔
137
        }
138
    }
139

140
    private function reset(): void
141
    {
142
        $this->createdObjects = new \SplObjectStorage();
7✔
143
        $this->updatedObjects = new \SplObjectStorage();
7✔
144
        $this->deletedObjects = new \SplObjectStorage();
7✔
145
    }
146

147
    private function storeObjectToPublish(object $object, string $property): void
148
    {
149
        if (null === $resourceClass = $this->getResourceClass($object)) {
7✔
150
            return;
1✔
151
        }
152

153
        try {
154
            $options = $this->resourceMetadataFactory->create($resourceClass)->getOperation()->getMercure() ?? false;
7✔
155
        } catch (OperationNotFoundException) {
×
156
            return;
×
157
        }
158

159
        if (\is_string($options)) {
7✔
160
            if (null === $this->expressionLanguage) {
×
161
                throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".');
×
162
            }
163

164
            $options = $this->expressionLanguage->evaluate($options, ['object' => $object]);
×
165
        }
166

167
        if (false === $options) {
7✔
168
            return;
6✔
169
        }
170

171
        if (true === $options) {
2✔
172
            $options = [];
×
173
        }
174

175
        if (!\is_array($options)) {
2✔
176
            throw new InvalidArgumentException(sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of options or an expression returning this array, "%s" given.', $resourceClass, \gettype($options)));
×
177
        }
178

179
        foreach ($options as $key => $value) {
2✔
180
            if (!isset(self::ALLOWED_KEYS[$key])) {
2✔
181
                throw new InvalidArgumentException(sprintf('The option "%s" set in the "mercure" attribute of the "%s" resource does not exist. Existing options: "%s"', $key, $resourceClass, implode('", "', self::ALLOWED_KEYS)));
×
182
            }
183
        }
184

185
        $options['enable_async_update'] ??= true;
2✔
186

187
        if ($options['topics'] ?? false) {
2✔
188
            $topics = [];
1✔
189
            foreach ((array) $options['topics'] as $topic) {
1✔
190
                if (!\is_string($topic)) {
1✔
191
                    $topics[] = $topic;
×
192
                    continue;
×
193
                }
194

195
                if (!str_starts_with($topic, '@=')) {
1✔
196
                    $topics[] = $topic;
1✔
197
                    continue;
1✔
198
                }
199

200
                if (null === $this->expressionLanguage) {
×
201
                    throw new \LogicException('The "@=" expression syntax cannot be used without the Expression Language component. Try running "composer require symfony/expression-language".');
×
202
                }
203

204
                $topics[] = $this->expressionLanguage->evaluate(substr($topic, 2), ['object' => $object]);
×
205
            }
206

207
            $options['topics'] = $topics;
1✔
208
        }
209

210
        if ('deletedObjects' === $property) {
2✔
211
            $this->deletedObjects[(object) [
1✔
212
                'id' => $this->iriConverter->getIriFromResource($object),
1✔
213
                'iri' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL),
1✔
214
            ]] = $options;
1✔
215

216
            return;
1✔
217
        }
218

219
        $this->{$property}[$object] = $options;
2✔
220
    }
221

222
    private function publishUpdate(object $object, array $options, string $type): void
223
    {
224
        if ($object instanceof \stdClass) {
2✔
225
            // By convention, if the object has been deleted, we send only its IRI.
226
            // This may change in the feature, because it's not JSON Merge Patch compliant,
227
            // and I'm not a fond of this approach.
228
            $iri = $options['topics'] ?? $object->iri;
1✔
229
            /** @var string $data */
230
            $data = json_encode(['@id' => $object->id], \JSON_THROW_ON_ERROR);
1✔
231
        } else {
232
            $resourceClass = $this->getObjectClass($object);
2✔
233
            $context = $options['normalization_context'] ?? $this->resourceMetadataFactory->create($resourceClass)->getOperation()->getNormalizationContext() ?? [];
2✔
234

235
            $iri = $options['topics'] ?? $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL);
2✔
236
            $data = $options['data'] ?? $this->serializer->serialize($object, key($this->formats), $context);
2✔
237
        }
238

239
        $updates = array_merge([$this->buildUpdate($iri, $data, $options)], $this->getGraphQlSubscriptionUpdates($object, $options, $type));
2✔
240

241
        foreach ($updates as $update) {
2✔
242
            if ($options['enable_async_update'] && $this->messageBus) {
2✔
243
                $this->dispatch($update);
×
244
                continue;
×
245
            }
246

247
            $this->hubRegistry->getHub($options['hub'] ?? null)->publish($update);
2✔
248
        }
249
    }
250

251
    /**
252
     * @return Update[]
253
     */
254
    private function getGraphQlSubscriptionUpdates(object $object, array $options, string $type): array
255
    {
256
        if ('update' !== $type || !$this->graphQlSubscriptionManager || !$this->graphQlMercureSubscriptionIriGenerator) {
2✔
257
            return [];
1✔
258
        }
259

260
        $payloads = $this->graphQlSubscriptionManager->getPushPayloads($object);
1✔
261

262
        $updates = [];
1✔
263
        foreach ($payloads as [$subscriptionId, $data]) {
1✔
264
            $updates[] = $this->buildUpdate(
1✔
265
                $this->graphQlMercureSubscriptionIriGenerator->generateTopicIri($subscriptionId),
1✔
266
                (string) (new JsonResponse($data))->getContent(),
1✔
267
                $options
1✔
268
            );
1✔
269
        }
270

271
        return $updates;
1✔
272
    }
273

274
    /**
275
     * @param string|string[] $iri
276
     */
277
    private function buildUpdate(string|array $iri, string $data, array $options): Update
278
    {
279
        return new Update($iri, $data, $options['private'] ?? false, $options['id'] ?? null, $options['type'] ?? null, $options['retry'] ?? null);
2✔
280
    }
281
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc