• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 19498141349

19 Nov 2025 10:30AM UTC coverage: 62.5% (+0.7%) from 61.815%
19498141349

push

github

daycry
- Fixes

4 of 4 new or added lines in 4 files covered. (100.0%)

41 existing lines in 9 files now uncovered.

1145 of 1832 relevant lines covered (62.5%)

4.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.5
/src/Queues/BeanstalkQueue.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Queues;
15

16
use Daycry\Jobs\Interfaces\QueueInterface;
17
use Daycry\Jobs\Interfaces\WorkerInterface;
18
use Daycry\Jobs\Job as QueuesJob;
19
use Daycry\Jobs\Libraries\DateTimeHelper;
20
use Daycry\Jobs\Libraries\Priority;
21
use Pheanstalk\Pheanstalk;
22
use Pheanstalk\Values\Job;
23
use Pheanstalk\Values\TubeName;
24
use Throwable;
25

26
/**
27
 * Beanstalkd queue implementation (via pheanstalk/pheanstalk).
28
 *
29
 * Contract notes:
30
 *  - enqueue(): returns Beanstalk internal integer job id as string.
31
 *  - watch(): reserves one job with timeout (1s poll) and returns JobEnvelope or null.
32
 *  - removeJob(): deletes the current reserved job; if $recreate true the original Job instance is re-enqueued.
33
 *
34
 * Caveats:
35
 *  - The dependency must exist or construction will fail.
36
 *  - No explicit release/bury support yet; extend removeJob() for richer semantics later.
37
 */
38
class BeanstalkQueue extends BaseQueue implements QueueInterface, WorkerInterface
39
{
40
    private int $priority           = 0;
41
    private int $ttr                = 3600; // time-to-run
42
    private ?Pheanstalk $connection = null;
43
    private ?Job $job               = null;
44

45
    public function __construct()
46
    {
47
        // Expect config('Jobs')->beanstalk or similar (adapt if needed)
48
        $cfg              = config('Jobs')->beanstalk ?? ['host' => '127.0.0.1', 'port' => 11300];
2✔
49
        $this->connection = Pheanstalk::create($cfg['host'] ?? '127.0.0.1', (int) ($cfg['port'] ?? 11300));
2✔
50
    }
51

52
    public function enqueue(object $data): string
53
    {
54
        $queue = $data->queue ?? 'default';
2✔
55
        $tube  = new TubeName($queue);
2✔
56
        $this->connection->useTube($tube);
2✔
57
        $delay = $this->calculateDelay($data);
2✔
58
        $payload = $this->getSerializer()->serialize($data);
2✔
59

60
        return $this->connection->put($payload, $this->priority, $delay->seconds, $this->ttr)->getId();
2✔
61
    }
62

63
    public function watch(string $queue)
64
    {
65
        $tube = new TubeName($queue);
1✔
66
        $this->connection->watch($tube);
1✔
67
        // Reserve with timeout to avoid blocking forever
68
        $this->job = $this->connection->reserveWithTimeout(1); // 1 second poll
1✔
69
        if (! $this->job) {
1✔
UNCOV
70
            return null;
×
71
        }
72
        $decoded = $this->getSerializer()->deserialize($this->job->getData() ?: '{}');
1✔
73
        if (! $decoded) {
1✔
UNCOV
74
            return null;
×
75
        }
76

77
        return JobEnvelope::fromBackend(
1✔
78
            backend: 'beanstalk',
1✔
79
            id: (string) $this->job->getId(),
1✔
80
            queue: $queue,
1✔
81
            payload: $decoded,
1✔
82
            extraMeta: [
1✔
83
                'ttr'      => $this->ttr,
1✔
84
                'priority' => $this->priority,
1✔
85
            ],
1✔
86
            raw: $this->job,
1✔
87
        );
1✔
88
    }
89

90
    public function removeJob(QueuesJob $job, bool $recreate = false): bool
91
    {
92
        if ($this->job) {
1✔
93
            try {
94
                $this->connection->delete($this->job);
1✔
UNCOV
95
            } catch (Throwable) { // ignore
×
96
            }
97
        }
98
        if ($recreate) {
1✔
99
            // Re-enqueue directly to this beanstalk backend instead of using push()
100
            // which might use a different worker from QueueManager
101
            $this->enqueue($job->toObject());
1✔
102
        }
103
        $this->job = null;
1✔
104

105
        return true;
1✔
106
    }
107

108
    public function setPriority(int $priority): self
109
    {
110
        $this->priority = $priority;
1✔
111

112
        return $this;
1✔
113
    }
114

115
    public function setTtr(int $ttr): self
116
    {
117
        $this->ttr = $ttr;
1✔
118

119
        return $this;
1✔
120
    }
121

122
    public function getPriority(): int
123
    {
124
        return $this->priority;
1✔
125
    }
126

127
    public function getTtr(): int
128
    {
129
        return $this->ttr;
1✔
130
    }
131
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc