• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 21145580946

19 Jan 2026 04:55PM UTC coverage: 58.025% (-4.5%) from 62.567%
21145580946

push

github

daycry
Add security, performance, and health features; simplify architecture

Introduces shell command whitelisting, smart token detection, per-queue rate limiting, dead letter queue, job timeout protection, config caching, and a health monitoring command. Refactors architecture by consolidating traits, removing the CompletionStrategy pattern, and unifying retry policies under RetryPolicyFixed. Updates documentation and tests to reflect new features and architectural changes, ensuring backward compatibility and improved reliability.

143 of 340 new or added lines in 20 files covered. (42.06%)

1 existing line in 1 file now uncovered.

1193 of 2056 relevant lines covered (58.03%)

4.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/src/Queues/BeanstalkQueue.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Queues;
15

16
use Daycry\Jobs\Interfaces\QueueInterface;
17
use Daycry\Jobs\Interfaces\WorkerInterface;
18
use Daycry\Jobs\Job as QueuesJob;
19
use Pheanstalk\Pheanstalk;
20
use Pheanstalk\Values\Job;
21
use Pheanstalk\Values\TubeName;
22
use Throwable;
23

24
/**
25
 * Beanstalkd queue implementation (via pheanstalk/pheanstalk).
26
 *
27
 * Contract notes:
28
 *  - enqueue(): returns Beanstalk internal integer job id as string.
29
 *  - watch(): reserves one job with timeout (1s poll) and returns JobEnvelope or null.
30
 *  - removeJob(): deletes the current reserved job; if $recreate true the original Job instance is re-enqueued.
31
 *
32
 * Caveats:
33
 *  - The dependency must exist or construction will fail.
34
 *  - No explicit release/bury support yet; extend removeJob() for richer semantics later.
35
 */
36
class BeanstalkQueue extends BaseQueue implements QueueInterface, WorkerInterface
37
{
38
    private int $priority           = 0;
39
    private int $ttr                = 3600; // time-to-run
40
    private ?Pheanstalk $connection = null;
41
    private ?Job $job               = null;
42

43
    public function __construct()
44
    {
45
        // Expect config('Jobs')->beanstalk or similar (adapt if needed)
46
        $cfg              = config('Jobs')->beanstalk ?? ['host' => '127.0.0.1', 'port' => 11300];
2✔
47
        $this->connection = Pheanstalk::create($cfg['host'] ?? '127.0.0.1', (int) ($cfg['port'] ?? 11300));
2✔
48
    }
49

50
    public function enqueue(object $data): string
51
    {
52
        $queue = $data->queue ?? 'default';
2✔
53
        $tube  = new TubeName($queue);
2✔
54
        $this->connection->useTube($tube);
2✔
55
        $delay   = $this->calculateDelay($data);
2✔
56
        $payload = $this->getSerializer()->serialize($data);
2✔
57

58
        return $this->connection->put($payload, $this->priority, $delay->seconds, $this->ttr)->getId();
2✔
59
    }
60

61
    public function watch(string $queue)
62
    {
63
        $tube = new TubeName($queue);
1✔
64
        $this->connection->watch($tube);
1✔
65
        // Reserve with timeout to avoid blocking forever
66
        $this->job = $this->connection->reserveWithTimeout(1); // 1 second poll
1✔
67
        if (! $this->job) {
1✔
68
            return null;
×
69
        }
70
        $decoded = $this->getSerializer()->deserialize($this->job->getData() ?: '{}');
1✔
71
        if (! $decoded) {
1✔
72
            return null;
×
73
        }
74

75
        return JobEnvelope::fromBackend(
1✔
76
            backend: 'beanstalk',
1✔
77
            id: (string) $this->job->getId(),
1✔
78
            queue: $queue,
1✔
79
            payload: $decoded,
1✔
80
            extraMeta: [
1✔
81
                'ttr'      => $this->ttr,
1✔
82
                'priority' => $this->priority,
1✔
83
            ],
1✔
84
            raw: $this->job,
1✔
85
        );
1✔
86
    }
87

88
    public function removeJob(QueuesJob $job, bool $recreate = false): bool
89
    {
90
        if ($this->job !== null) {
1✔
91
            try {
92
                $this->connection->delete($this->job);
1✔
93
            } catch (Throwable) { // ignore
×
94
            }
95
        }
96

97
        if ($recreate) {
1✔
98
            // Re-enqueue directly to this beanstalk backend instead of using push()
99
            // which might use a different worker from QueueManager
UNCOV
100
            $this->enqueue($job->toObject());
×
101
        }
102
        $this->job = null;
1✔
103

104
        return true;
1✔
105
    }
106

107
    public function setPriority(int $priority): self
108
    {
109
        $this->priority = $priority;
1✔
110

111
        return $this;
1✔
112
    }
113

114
    public function setTtr(int $ttr): self
115
    {
116
        $this->ttr = $ttr;
1✔
117

118
        return $this;
1✔
119
    }
120

121
    public function getPriority(): int
122
    {
123
        return $this->priority;
1✔
124
    }
125

126
    public function getTtr(): int
127
    {
128
        return $this->ttr;
1✔
129
    }
130
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc