• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 21210552994

21 Jan 2026 12:53PM UTC coverage: 59.413%. Remained the same
21210552994

push

github

daycry
Improve BeanstalkQueue job reservation reliability

Increased reserve timeout to 5 seconds and implemented exponential backoff for retries to better handle requeue delays and network latency. Also increased the post-put sleep to 200ms to improve reliability in CI environments with higher latency.

6 of 10 new or added lines in 1 file covered. (60.0%)

8 existing lines in 1 file now uncovered.

1234 of 2077 relevant lines covered (59.41%)

4.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.95
/src/Queues/BeanstalkQueue.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Queues;
15

16
use Daycry\Jobs\Interfaces\QueueInterface;
17
use Daycry\Jobs\Interfaces\WorkerInterface;
18
use Daycry\Jobs\Job as QueuesJob;
19
use Pheanstalk\Pheanstalk;
20
use Pheanstalk\Values\Job;
21
use Pheanstalk\Values\TubeName;
22
use Throwable;
23

24
/**
25
 * Beanstalkd queue implementation (via pheanstalk/pheanstalk).
26
 *
27
 * Contract notes:
28
 *  - enqueue(): returns Beanstalk internal integer job id as string.
29
 *  - watch(): reserves one job with timeout (1s poll) and returns JobEnvelope or null.
30
 *  - removeJob(): deletes the current reserved job; if $recreate true the original Job instance is re-enqueued.
31
 *
32
 * Caveats:
33
 *  - The dependency must exist or construction will fail.
34
 *  - No explicit release/bury support yet; extend removeJob() for richer semantics later.
35
 */
36
class BeanstalkQueue extends BaseQueue implements QueueInterface, WorkerInterface
37
{
38
    private int $priority           = 0;
39
    private int $ttr                = 3600; // time-to-run
40
    private ?Pheanstalk $connection = null;
41
    private ?Job $job               = null;
42

43
    public function __construct()
44
    {
45
        // Expect config('Jobs')->beanstalk or similar (adapt if needed)
46
        $cfg              = config('Jobs')->beanstalk ?? ['host' => '127.0.0.1', 'port' => 11300];
2✔
47
        $this->connection = Pheanstalk::create($cfg['host'] ?? '127.0.0.1', (int) ($cfg['port'] ?? 11300));
2✔
48
    }
49

50
    public function enqueue(object $data): string
51
    {
52
        $queue = $data->queue ?? 'default';
2✔
53
        $tube  = new TubeName($queue);
2✔
54
        $this->connection->useTube($tube);
2✔
55
        $delay   = $this->calculateDelay($data);
2✔
56
        $payload = $this->getSerializer()->serialize($data);
2✔
57

58
        return $this->connection->put($payload, $this->priority, $delay->seconds, $this->ttr)->getId();
2✔
59
    }
60

61
    public function watch(string $queue)
62
    {
63
        $tube = new TubeName($queue);
1✔
64

65
        // Watch the new tube first, then ignore others to avoid NotIgnoredException
66
        // (Beanstalk requires at least one tube to be watched)
67
        $this->connection->watch($tube);
1✔
68

69
        foreach ($this->connection->listTubesWatched() as $watched) {
1✔
70
            if ((string) $watched !== $queue) {
1✔
71
                $this->connection->ignore($watched);
1✔
72
            }
73
        }
74

75
        // Ensure we're watching the correct tube after ignoring others
76
        $this->connection->watch($tube);
1✔
77

78
        // Reserve with timeout - use 5 seconds to allow for requeue delays and network latency
79
        // With exponential backoff for retries in case of requeued jobs
80
        $maxAttempts = 3;
1✔
81
        $delay       = 100000; // Start with 100ms
1✔
82

83
        for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) {
1✔
84
            $this->job = $this->connection->reserveWithTimeout(5);
1✔
85
            if ($this->job !== null) {
1✔
86
                break;
1✔
87
            }
NEW
88
            if ($attempt < $maxAttempts) {
×
NEW
89
                usleep($delay);
×
NEW
90
                $delay *= 2; // Exponential backoff
×
91
            }
92
        }
93

94
        if (! $this->job) {
1✔
UNCOV
95
            return null;
×
96
        }
97
        $decoded = $this->getSerializer()->deserialize($this->job->getData() ?: '{}');
1✔
98
        if (! $decoded) {
1✔
UNCOV
99
            return null;
×
100
        }
101

102
        return JobEnvelope::fromBackend(
1✔
103
            backend: 'beanstalk',
1✔
104
            id: (string) $this->job->getId(),
1✔
105
            queue: $queue,
1✔
106
            payload: $decoded,
1✔
107
            extraMeta: [
1✔
108
                'ttr'      => $this->ttr,
1✔
109
                'priority' => $this->priority,
1✔
110
            ],
1✔
111
            raw: $this->job,
1✔
112
        );
1✔
113
    }
114

115
    public function removeJob(QueuesJob $job, bool $recreate = false): bool
116
    {
117
        if ($this->job !== null) {
1✔
118
            try {
119
                $this->connection->delete($this->job);
1✔
UNCOV
120
            } catch (Throwable) { // ignore
×
121
            }
122
        }
123

124
        if ($recreate) {
1✔
125
            // Re-enqueue directly to this beanstalk backend instead of using push()
126
            // which might use a different worker from QueueManager
127
            // Force immediate availability by setting delay to 0
UNCOV
128
            $queue   = $job->getQueue() ?? 'default';
×
UNCOV
129
            $tube    = new TubeName($queue);
×
UNCOV
130
            $payload = $this->getSerializer()->serialize($job->toObject());
×
UNCOV
131
            $this->connection->useTube($tube);
×
UNCOV
132
            $this->connection->put($payload, $this->priority, 0, $this->ttr); // delay = 0
×
133

134
            // Sleep to ensure beanstalkd has processed the job before next reserve
135
            // Increased delay for GitHub Actions CI environments where latency is higher
NEW
136
            usleep(200000); // 200ms
×
137
        }
138
        $this->job = null;
1✔
139

140
        return true;
1✔
141
    }
142

143
    public function setPriority(int $priority): self
144
    {
145
        $this->priority = $priority;
1✔
146

147
        return $this;
1✔
148
    }
149

150
    public function setTtr(int $ttr): self
151
    {
152
        $this->ttr = $ttr;
1✔
153

154
        return $this;
1✔
155
    }
156

157
    public function getPriority(): int
158
    {
159
        return $this->priority;
1✔
160
    }
161

162
    public function getTtr(): int
163
    {
164
        return $this->ttr;
1✔
165
    }
166
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc