• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 24568210769

05 Apr 2026 08:48AM UTC coverage: 53.938% (-2.2%) from 56.164%
24568210769

push

github

daycry
Optimize

50 of 192 new or added lines in 14 files covered. (26.04%)

3 existing lines in 3 files now uncovered.

1219 of 2260 relevant lines covered (53.94%)

4.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.17
/src/Queues/BeanstalkQueue.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Queues;
15

16
use Daycry\Jobs\Interfaces\QueueInterface;
17
use Daycry\Jobs\Interfaces\WorkerInterface;
18
use Daycry\Jobs\Job as QueuesJob;
19
use Pheanstalk\Pheanstalk;
20
use Pheanstalk\Values\Job;
21
use Pheanstalk\Values\TubeName;
22
use Throwable;
23

24
/**
25
 * Beanstalkd queue implementation (via pheanstalk/pheanstalk).
26
 *
27
 * Contract notes:
28
 *  - enqueue(): returns Beanstalk internal integer job id as string.
29
 *  - watch(): reserves one job with timeout (1s poll) and returns JobEnvelope or null.
30
 *  - removeJob(): deletes the current reserved job; if $recreate true the original Job instance is re-enqueued.
31
 *
32
 * Caveats:
33
 *  - The dependency must exist or construction will fail.
34
 *  - No explicit release/bury support yet; extend removeJob() for richer semantics later.
35
 */
36
class BeanstalkQueue extends BaseQueue implements QueueInterface, WorkerInterface
37
{
38
    private int $priority           = 0;
39
    private int $ttr                = 3600; // time-to-run
40
    private ?Pheanstalk $connection = null;
41
    private ?Job $job               = null;
42

43
    public function __construct()
44
    {
45
        // Expect config('Jobs')->beanstalk or similar (adapt if needed)
46
        $cfg              = config('Jobs')->beanstalk ?? ['host' => '127.0.0.1', 'port' => 11300];
×
47
        $this->connection = Pheanstalk::create($cfg['host'] ?? '127.0.0.1', (int) ($cfg['port'] ?? 11300));
×
48
    }
49

50
    public function enqueue(object $data): string
51
    {
52
        $queue = $data->queue ?? 'default';
×
53
        $tube  = new TubeName($queue);
×
54
        $this->connection->useTube($tube);
×
55
        $delay   = $this->calculateDelay($data);
×
56
        $payload = $this->getSerializer()->serialize($data);
×
57

NEW
58
        $effectiveTtr = (int) ($data->timeout ?? config('Jobs')->jobTimeout ?? $this->ttr);
×
59

NEW
60
        return $this->connection->put($payload, $this->priority, $delay->seconds, $effectiveTtr)->getId();
×
61
    }
62

63
    public function watch(string $queue)
64
    {
65
        $tube = new TubeName($queue);
×
66

67
        // Watch the new tube first, then ignore others to avoid NotIgnoredException
68
        // (Beanstalk requires at least one tube to be watched)
69
        $this->connection->watch($tube);
×
70

71
        foreach ($this->connection->listTubesWatched() as $watched) {
×
72
            if ((string) $watched !== $queue) {
×
73
                $this->connection->ignore($watched);
×
74
            }
75
        }
76

77
        // Ensure we're watching the correct tube after ignoring others
78
        $this->connection->watch($tube);
×
79

80
        // Reserve with timeout - use 5 seconds to allow for requeue delays and network latency
81
        // With exponential backoff for retries in case of requeued jobs
82
        $maxAttempts = 3;
×
83
        $delay       = 100000; // Start with 100ms
×
84

85
        for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) {
×
86
            $this->job = $this->connection->reserveWithTimeout(5);
×
87
            if ($this->job !== null) {
×
88
                break;
×
89
            }
90
            if ($attempt < $maxAttempts) {
×
91
                usleep($delay);
×
92
                $delay *= 2; // Exponential backoff
×
93
            }
94
        }
95

96
        if (! $this->job) {
×
97
            return null;
×
98
        }
99
        $decoded = $this->getSerializer()->deserialize($this->job->getData() ?: '{}');
×
100
        if (! $decoded) {
×
101
            return null;
×
102
        }
103

104
        return JobEnvelope::fromBackend(
×
105
            backend: 'beanstalk',
×
106
            id: (string) $this->job->getId(),
×
107
            queue: $queue,
×
108
            payload: $decoded,
×
109
            extraMeta: [
×
110
                'ttr'      => $this->ttr,
×
111
                'priority' => $this->priority,
×
112
            ],
×
113
            raw: $this->job,
×
114
        );
×
115
    }
116

117
    public function removeJob(QueuesJob $job, bool $recreate = false): bool
118
    {
119
        if ($this->job !== null) {
×
120
            try {
121
                $this->connection->delete($this->job);
×
122
            } catch (Throwable) { // ignore
×
123
            }
124
        }
125

126
        if ($recreate) {
×
127
            // Re-enqueue directly to this beanstalk backend instead of using push()
128
            // which might use a different worker from QueueManager
129
            // Force immediate availability by setting delay to 0
130
            $queue   = $job->getQueue() ?? 'default';
×
131
            $tube    = new TubeName($queue);
×
132
            $payload = $this->getSerializer()->serialize($job->toObject());
×
133
            $this->connection->useTube($tube);
×
NEW
134
            $effectiveTtr = (int) ($job->getTimeout() ?? config('Jobs')->jobTimeout ?? $this->ttr);
×
NEW
135
            $this->connection->put($payload, $this->priority, 0, $effectiveTtr); // delay = 0
×
136

137
            // Sleep to ensure beanstalkd has processed the job before next reserve
138
            // Increased delay for GitHub Actions CI environments where latency is higher
139
            usleep(200000); // 200ms
×
140
        }
141
        $this->job = null;
×
142

143
        return true;
×
144
    }
145

146
    public function setPriority(int $priority): self
147
    {
148
        $this->priority = $priority;
1✔
149

150
        return $this;
1✔
151
    }
152

153
    public function setTtr(int $ttr): self
154
    {
155
        $this->ttr = $ttr;
1✔
156

157
        return $this;
1✔
158
    }
159

160
    public function getPriority(): int
161
    {
162
        return $this->priority;
1✔
163
    }
164

165
    public function getTtr(): int
166
    {
167
        return $this->ttr;
1✔
168
    }
169
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc