• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 24850441053

23 Apr 2026 05:54PM UTC coverage: 52.404% (-1.5%) from 53.938%
24850441053

push

github

daycry
Fixes

104 of 219 new or added lines in 42 files covered. (47.49%)

14 existing lines in 9 files now uncovered.

1210 of 2309 relevant lines covered (52.4%)

4.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.17
/src/Queues/BeanstalkQueue.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Queues;
15

16
use Daycry\Jobs\Interfaces\QueueInterface;
17
use Daycry\Jobs\Interfaces\WorkerInterface;
18
use Daycry\Jobs\Job as QueuesJob;
19
use Pheanstalk\Pheanstalk;
20
use Pheanstalk\Values\Job;
21
use Pheanstalk\Values\TubeName;
22
use Throwable;
23

24
/**
25
 * Beanstalkd queue implementation (via pheanstalk/pheanstalk).
26
 *
27
 * Contract notes:
28
 *  - enqueue(): returns Beanstalk internal integer job id as string.
29
 *  - watch(): reserves one job with timeout (1s poll) and returns JobEnvelope or null.
30
 *  - removeJob(): deletes the current reserved job; if $recreate true the original Job instance is re-enqueued.
31
 *
32
 * Caveats:
33
 *  - The dependency must exist or construction will fail.
34
 *  - No explicit release/bury support yet; extend removeJob() for richer semantics later.
35
 */
36
class BeanstalkQueue extends BaseQueue implements QueueInterface, WorkerInterface
37
{
38
    private int $priority           = 0;
39
    private int $ttr                = 3600; // time-to-run
40
    private ?Pheanstalk $connection = null;
41
    private ?Job $job               = null;
42

43
    public function __construct()
44
    {
45
        // Expect config('Jobs')->beanstalk or similar (adapt if needed)
46
        $cfg              = config('Jobs')->beanstalk ?? ['host' => '127.0.0.1', 'port' => 11300];
×
47
        $this->connection = Pheanstalk::create($cfg['host'] ?? '127.0.0.1', (int) ($cfg['port'] ?? 11300));
×
48
    }
49

50
    public function enqueue(object $data): string
51
    {
52
        $queue = $data->queue ?? 'default';
×
53
        $tube  = new TubeName($queue);
×
54
        $this->connection->useTube($tube);
×
55
        $delay   = $this->calculateDelay($data);
×
56
        $payload = $this->getSerializer()->serialize($data);
×
57

58
        $effectiveTtr = (int) ($data->timeout ?? config('Jobs')->jobTimeout ?? $this->ttr);
×
59

60
        return $this->connection->put($payload, $this->priority, $delay->seconds, $effectiveTtr)->getId();
×
61
    }
62

63
    public function watch(string $queue): mixed
64
    {
65
        $tube = new TubeName($queue);
×
66

67
        // Watch the new tube first, then ignore others to avoid NotIgnoredException
68
        // (Beanstalk requires at least one tube to be watched)
69
        $this->connection->watch($tube);
×
70

71
        foreach ($this->connection->listTubesWatched() as $watched) {
×
72
            if ((string) $watched !== $queue) {
×
73
                $this->connection->ignore($watched);
×
74
            }
75
        }
76

77
        // Ensure we're watching the correct tube after ignoring others
78
        $this->connection->watch($tube);
×
79

80
        // Reserve with timeout - use 5 seconds to allow for requeue delays and network latency
81
        // With exponential backoff for retries in case of requeued jobs
82
        $maxAttempts = 3;
×
83
        $delay       = 100000; // Start with 100ms
×
84

85
        for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) {
×
86
            $this->job = $this->connection->reserveWithTimeout(5);
×
87
            /** @phpstan-ignore class.notFound */
NEW
88
            if ($this->job instanceof \Pheanstalk\Values\Job) {
×
UNCOV
89
                break;
×
90
            }
91
            if ($attempt < $maxAttempts) {
×
92
                usleep($delay);
×
93
                $delay *= 2; // Exponential backoff
×
94
            }
95
        }
96

97
        /** @phpstan-ignore class.notFound */
NEW
98
        if (!$this->job instanceof \Pheanstalk\Values\Job) {
×
UNCOV
99
            return null;
×
100
        }
101
        $decoded = $this->getSerializer()->deserialize($this->job->getData() ?: '{}');
×
102
        if (! $decoded) {
×
103
            return null;
×
104
        }
105

106
        return JobEnvelope::fromBackend(
×
107
            backend: 'beanstalk',
×
108
            id: (string) $this->job->getId(),
×
109
            queue: $queue,
×
110
            payload: $decoded,
×
111
            extraMeta: [
×
112
                'ttr'      => $this->ttr,
×
113
                'priority' => $this->priority,
×
114
            ],
×
115
            raw: $this->job,
×
116
        );
×
117
    }
118

119
    public function removeJob(QueuesJob $job, bool $recreate = false): bool
120
    {
121
        /** @phpstan-ignore class.notFound */
NEW
122
        if ($this->job instanceof \Pheanstalk\Values\Job) {
×
123
            try {
124
                $this->connection->delete($this->job);
×
125
            } catch (Throwable) { // ignore
×
126
            }
127
        }
128

129
        if ($recreate) {
×
130
            // Re-enqueue directly to this beanstalk backend instead of using push()
131
            // which might use a different worker from QueueManager
132
            // Force immediate availability by setting delay to 0
133
            $queue   = $job->getQueue() ?? 'default';
×
134
            $tube    = new TubeName($queue);
×
135
            $payload = $this->getSerializer()->serialize($job->toObject());
×
136
            $this->connection->useTube($tube);
×
137
            $effectiveTtr = (int) ($job->getTimeout() ?? config('Jobs')->jobTimeout ?? $this->ttr);
×
138
            $this->connection->put($payload, $this->priority, 0, $effectiveTtr); // delay = 0
×
139

140
            // Sleep to ensure beanstalkd has processed the job before next reserve
141
            // Increased delay for GitHub Actions CI environments where latency is higher
142
            usleep(200000); // 200ms
×
143
        }
144
        $this->job = null;
×
145

146
        return true;
×
147
    }
148

149
    public function setPriority(int $priority): self
150
    {
151
        $this->priority = $priority;
1✔
152

153
        return $this;
1✔
154
    }
155

156
    public function setTtr(int $ttr): self
157
    {
158
        $this->ttr = $ttr;
1✔
159

160
        return $this;
1✔
161
    }
162

163
    public function getPriority(): int
164
    {
165
        return $this->priority;
1✔
166
    }
167

168
    public function getTtr(): int
169
    {
170
        return $this->ttr;
1✔
171
    }
172
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc