• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / jobs / 26886467550

03 Jun 2026 01:01PM UTC coverage: 88.948% (+14.0%) from 74.974%
26886467550

push

github

web-flow
v3.0: single clean architecture (remove V1, lease-based queues, secure-by-default)

Complete v3.0 rewrite into a single, clean architecture. The v1 API and the V2\ scaffolding
are removed (no facade, no dual code); the package passes PHPStan level 6 + strict-rules +
codeigniter with NO baseline.

- Definition: Jobs::define()->...->dispatch() fluent builder -> immutable JobDefinition.
- Handlers decoupled from the god-object (JobHandlerInterface / AbstractJobHandler / TypedJobHandler + JobContext).
- One QueueBackend contract (enqueue/fetch(lease)/ack/nack(delay)/abandon/reapExpired) with 5 backends:
  Sync, Database, Redis, Beanstalk, ServiceBus.
- Runtime: one attempt per fetch; real interrupting Timeout; opt-in idempotency; single-instance lock.
- Worker/Cron: jobs:queue:work, jobs:queue:reap, jobs:cronjob:run, jobs:queue:purge.
- Secure-by-default: HMAC-signed envelopes, per-queue handler allowlist, ShellHandler deny-by-default,
  EventHandler allowlist, UrlHandler anti-SSRF.

Resolves audit findings #1,#2,#3,#4,#5,#6,#7,#8,#10,#12,#13,#17,#18,#19,#20,#22.
Tests: 359 (Beanstalk live); line coverage 88.9%; PHPStan/Psalm/Rector/cs green on PHP 8.2-8.5.

BREAKING CHANGE: v1 API removed. See docs/MIGRATION-v1-to-v3.md.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

983 of 1103 new or added lines in 43 files covered. (89.12%)

15 existing lines in 3 files now uncovered.

1497 of 1683 relevant lines covered (88.95%)

7.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.87
/src/Queues/Backends/DatabaseBackend.php
1
<?php
2

3
declare(strict_types=1);
4

5
/**
6
 * This file is part of Daycry Queues.
7
 *
8
 * (c) Daycry <daycry9@proton.me>
9
 *
10
 * For the full copyright and license information, please view
11
 * the LICENSE file that was distributed with this source code.
12
 */
13

14
namespace Daycry\Jobs\Queues\Backends;
15

16
use DateTime;
17
use DateTimeImmutable;
18
use DateTimeZone;
19
use Daycry\Jobs\Definition\JobDefinition;
20
use Daycry\Jobs\Entities\Queue as QueueEntity;
21
use Daycry\Jobs\Models\QueueModel;
22
use Daycry\Jobs\Queues\EnvelopeFactory;
23
use Daycry\Jobs\Queues\JobEnvelope;
24
use Daycry\Jobs\Queues\JobLease;
25
use Daycry\Jobs\Queues\QueueBackend;
26
use stdClass;
27

28
/**
29
 * Database-backed queue implementing the v3 {@see QueueBackend} contract.
30
 *
31
 * Reuses the proven atomic claim of {@see QueueModel::reserveJob()} (FOR UPDATE SKIP LOCKED
32
 * with optimistic fallback) and adds the v3 lease semantics:
33
 *  - fetch() stamps reserved_at + a per-worker owner token and returns a JobLease.
34
 *  - nack() requeues IN PLACE (same row/id, attempts+1, available_at = now + delay) — no orphan rows.
35
 *  - reapExpired() returns rows whose lease exceeded the visibility timeout to 'pending'.
36
 *
37
 * Delivery is at-least-once; handlers should be idempotent.
38
 */
39
final class DatabaseBackend implements QueueBackend
40
{
41
    private const BACKEND = 'database';
42

43
    public function enqueue(JobDefinition $definition): string
44
    {
45
        $identifier = bin2hex(random_bytes(8));
5✔
46
        $wire       = EnvelopeFactory::toWire($definition, $identifier);
5✔
47
        $tz         = new DateTimeZone(config('App')->appTimezone);
5✔
48
        $schedule   = $definition->scheduledAt instanceof DateTimeImmutable
5✔
NEW
49
            ? DateTime::createFromInterface($definition->scheduledAt)
×
50
            : new DateTime('now', $tz);
5✔
51

52
        $entity              = new QueueEntity();
5✔
53
        $entity->identifier  = $identifier;
5✔
54
        $entity->queue       = $definition->queue ?? 'default';
5✔
55
        $entity->payload     = json_encode($wire);
5✔
56
        $entity->priority    = $definition->priority;
5✔
57
        $entity->schedule    = $schedule->format('Y-m-d H:i:s');
5✔
58
        $entity->status      = 'pending';
5✔
59
        $entity->max_retries = $definition->maxRetries;
5✔
60
        $entity->attempts    = 0;
5✔
61

62
        (new QueueModel())->insert($entity);
5✔
63

64
        return $identifier;
5✔
65
    }
66

67
    public function fetch(string $queue): ?JobLease
68
    {
69
        $visibilityTimeout = (int) (config('Jobs')->databaseVisibilityTimeout ?? 300);
5✔
70
        $owner             = bin2hex(random_bytes(16));
5✔
71

72
        $row = (new QueueModel())->reserveJob($queue, $owner);
5✔
73
        if (! $row instanceof QueueEntity) {
5✔
74
            return null;
1✔
75
        }
76

77
        $decoded = json_decode($row->payload ?? '{}');
5✔
78
        $payload = $decoded instanceof stdClass ? $decoded : new stdClass();
5✔
79

80
        $envelope = new JobEnvelope(
5✔
81
            id: (string) $row->identifier,
5✔
82
            queue: (string) $row->queue,
5✔
83
            payload: $payload,
5✔
84
            name: isset($payload->name) && is_string($payload->name) ? $payload->name : null,
5✔
85
            attempts: (int) $row->attempts,
5✔
86
            priority: isset($payload->priority) ? (int) $payload->priority : null,
5✔
87
            meta: ['backend' => self::BACKEND, 'entity_id' => $row->id, 'status' => $row->status],
5✔
88
            raw: $row,
5✔
89
        );
5✔
90

91
        return JobLease::withRelativeExpiry($envelope, (string) $row->id, $owner, $visibilityTimeout, self::BACKEND);
5✔
92
    }
93

94
    public function ack(JobLease $lease): bool
95
    {
96
        return (new QueueModel())->markStatus((int) $lease->token, 'completed');
2✔
97
    }
98

99
    public function nack(JobLease $lease, ?int $delaySeconds = null): bool
100
    {
101
        return (new QueueModel())->requeueInPlace((int) $lease->token, max(0, $delaySeconds ?? 0));
1✔
102
    }
103

104
    public function abandon(JobLease $lease): bool
105
    {
106
        // No row-level native DLQ: mark 'failed' so the record is retained for audit but never
107
        // re-fetched. The runtime forwards the payload to the configured DeadLetterQueue (if any)
108
        // before calling abandon().
NEW
109
        return (new QueueModel())->markStatus((int) $lease->token, 'failed');
×
110
    }
111

112
    public function reapExpired(string $queue, int $visibilityTimeout): int
113
    {
114
        return (new QueueModel())->reapStuck($queue, $visibilityTimeout);
2✔
115
    }
116
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc