• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

umbrellio / php-table-sync / 4530357316

pending completion
4530357316

push

github

GitHub
 Added support for Laravel 10 (#19)

17 of 32 new or added lines in 12 files covered. (53.13%)

5 existing lines in 5 files now uncovered.

252 of 660 relevant lines covered (38.18%)

16.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/Rabbit/Consumer.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Umbrellio\TableSync\Rabbit;
6

7
use Closure;
8
use PhpAmqpLib\Message\AMQPMessage;
9
use Psr\Log\LoggerInterface;
10
use Psr\Log\NullLogger;
11

12
class Consumer
13
{
14
    private bool $working;
15

16
    public function __construct(
17
        private readonly ChannelContainer $channelContainer,
18
        private readonly MessageBuilder $messageBuilder,
19
        private readonly Config\Consumer $config,
NEW
20
        private readonly LoggerInterface $logger = new NullLogger()
×
21
    ) {
22
    }
×
23

24
    public function consume(): void
25
    {
26
        $channel = $this->channelContainer->getChannel();
×
27

28
        $channel->basic_consume(
×
29
            $this->config->queue(),
×
30
            $this->config->consumerTag(),
×
31
            false,
×
32
            false,
×
33
            false,
×
34
            false,
×
35
            Closure::fromCallable([$this, 'handle'])
×
36
        );
×
37

38
        $this->working = true;
×
39
        while (count($channel->callbacks) && $this->working) {
×
40
            usleep($this->config->microsecondsToSleep());
×
41
            $channel->wait();
×
42
        }
43
    }
44

45
    public function terminate(): void
46
    {
47
        $this->working = false;
×
48
    }
49

50
    private function handle(AMQPMessage $amqpMessage): void
51
    {
52
        $message = $this->messageBuilder->buildReceivedMessage($amqpMessage);
×
53
        $messageId = $amqpMessage->delivery_info['delivery_tag'];
×
54

55
        $this->config->handler()
×
56
            ->handle($message);
×
57
        $this->channelContainer->getChannel()
×
58
            ->basic_ack($messageId);
×
59
        $this->logger->info("Message #{$messageId} correctly handled", [
×
60
            'direction' => 'receive',
×
61
            'body' => $amqpMessage->getBody(),
×
62
        ]);
×
63
    }
64
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc