• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 21442227581

28 Jan 2026 02:30PM UTC coverage: 56.301% (-3.1%) from 59.413%
21442227581

push

github

daycry
Improve queue handling and job execution logic

Refactor queue worker to support background execution and improve job fetching logic. Update JobLifecycleCoordinator to prioritize job-specific and default timeouts without a global cap. Replace custom UUID generation with service-based UUID v7 in JobLogger. Ensure queue scheduling uses application timezone for consistency.

13 of 24 new or added lines in 5 files covered. (54.17%)

63 existing lines in 4 files now uncovered.

1175 of 2087 relevant lines covered (56.3%)

4.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.53
/src/Queues/BeanstalkQueue.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Queues;
15

16
use Daycry\Jobs\Interfaces\QueueInterface;
17
use Daycry\Jobs\Interfaces\WorkerInterface;
18
use Daycry\Jobs\Job as QueuesJob;
19
use Pheanstalk\Pheanstalk;
20
use Pheanstalk\Values\Job;
21
use Pheanstalk\Values\TubeName;
22
use Throwable;
23

24
/**
25
 * Beanstalkd queue implementation (via pheanstalk/pheanstalk).
26
 *
27
 * Contract notes:
28
 *  - enqueue(): returns Beanstalk internal integer job id as string.
29
 *  - watch(): reserves one job with timeout (1s poll) and returns JobEnvelope or null.
30
 *  - removeJob(): deletes the current reserved job; if $recreate true the original Job instance is re-enqueued.
31
 *
32
 * Caveats:
33
 *  - The dependency must exist or construction will fail.
34
 *  - No explicit release/bury support yet; extend removeJob() for richer semantics later.
35
 */
36
class BeanstalkQueue extends BaseQueue implements QueueInterface, WorkerInterface
37
{
38
    private int $priority           = 0;
39
    private int $ttr                = 3600; // time-to-run
40
    private ?Pheanstalk $connection = null;
41
    private ?Job $job               = null;
42

43
    public function __construct()
44
    {
45
        // Expect config('Jobs')->beanstalk or similar (adapt if needed)
UNCOV
46
        $cfg              = config('Jobs')->beanstalk ?? ['host' => '127.0.0.1', 'port' => 11300];
×
UNCOV
47
        $this->connection = Pheanstalk::create($cfg['host'] ?? '127.0.0.1', (int) ($cfg['port'] ?? 11300));
×
48
    }
49

50
    public function enqueue(object $data): string
51
    {
UNCOV
52
        $queue = $data->queue ?? 'default';
×
UNCOV
53
        $tube  = new TubeName($queue);
×
UNCOV
54
        $this->connection->useTube($tube);
×
UNCOV
55
        $delay   = $this->calculateDelay($data);
×
UNCOV
56
        $payload = $this->getSerializer()->serialize($data);
×
57

UNCOV
58
        return $this->connection->put($payload, $this->priority, $delay->seconds, $this->ttr)->getId();
×
59
    }
60

61
    public function watch(string $queue)
62
    {
UNCOV
63
        $tube = new TubeName($queue);
×
64

65
        // Watch the new tube first, then ignore others to avoid NotIgnoredException
66
        // (Beanstalk requires at least one tube to be watched)
UNCOV
67
        $this->connection->watch($tube);
×
68

UNCOV
69
        foreach ($this->connection->listTubesWatched() as $watched) {
×
UNCOV
70
            if ((string) $watched !== $queue) {
×
UNCOV
71
                $this->connection->ignore($watched);
×
72
            }
73
        }
74

75
        // Ensure we're watching the correct tube after ignoring others
UNCOV
76
        $this->connection->watch($tube);
×
77

78
        // Reserve with timeout - use 5 seconds to allow for requeue delays and network latency
79
        // With exponential backoff for retries in case of requeued jobs
UNCOV
80
        $maxAttempts = 3;
×
UNCOV
81
        $delay       = 100000; // Start with 100ms
×
82

UNCOV
83
        for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) {
×
UNCOV
84
            $this->job = $this->connection->reserveWithTimeout(5);
×
UNCOV
85
            if ($this->job !== null) {
×
UNCOV
86
                break;
×
87
            }
88
            if ($attempt < $maxAttempts) {
×
89
                usleep($delay);
×
90
                $delay *= 2; // Exponential backoff
×
91
            }
92
        }
93

UNCOV
94
        if (! $this->job) {
×
95
            return null;
×
96
        }
UNCOV
97
        $decoded = $this->getSerializer()->deserialize($this->job->getData() ?: '{}');
×
UNCOV
98
        if (! $decoded) {
×
99
            return null;
×
100
        }
101

UNCOV
102
        return JobEnvelope::fromBackend(
×
UNCOV
103
            backend: 'beanstalk',
×
UNCOV
104
            id: (string) $this->job->getId(),
×
UNCOV
105
            queue: $queue,
×
UNCOV
106
            payload: $decoded,
×
UNCOV
107
            extraMeta: [
×
UNCOV
108
                'ttr'      => $this->ttr,
×
UNCOV
109
                'priority' => $this->priority,
×
UNCOV
110
            ],
×
UNCOV
111
            raw: $this->job,
×
UNCOV
112
        );
×
113
    }
114

115
    public function removeJob(QueuesJob $job, bool $recreate = false): bool
116
    {
UNCOV
117
        if ($this->job !== null) {
×
118
            try {
UNCOV
119
                $this->connection->delete($this->job);
×
120
            } catch (Throwable) { // ignore
×
121
            }
122
        }
123

UNCOV
124
        if ($recreate) {
×
125
            // Re-enqueue directly to this beanstalk backend instead of using push()
126
            // which might use a different worker from QueueManager
127
            // Force immediate availability by setting delay to 0
128
            $queue   = $job->getQueue() ?? 'default';
×
129
            $tube    = new TubeName($queue);
×
130
            $payload = $this->getSerializer()->serialize($job->toObject());
×
131
            $this->connection->useTube($tube);
×
132
            $this->connection->put($payload, $this->priority, 0, $this->ttr); // delay = 0
×
133

134
            // Sleep to ensure beanstalkd has processed the job before next reserve
135
            // Increased delay for GitHub Actions CI environments where latency is higher
136
            usleep(200000); // 200ms
×
137
        }
UNCOV
138
        $this->job = null;
×
139

UNCOV
140
        return true;
×
141
    }
142

143
    public function setPriority(int $priority): self
144
    {
145
        $this->priority = $priority;
1✔
146

147
        return $this;
1✔
148
    }
149

150
    public function setTtr(int $ttr): self
151
    {
152
        $this->ttr = $ttr;
1✔
153

154
        return $this;
1✔
155
    }
156

157
    public function getPriority(): int
158
    {
159
        return $this->priority;
1✔
160
    }
161

162
    public function getTtr(): int
163
    {
164
        return $this->ttr;
1✔
165
    }
166
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc