• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / queues / 7948892847

04 Jan 2024 12:26PM UTC coverage: 80.811% (+0.3%) from 80.541%
7948892847

push

github

daycry
Fix: schedule propterty when worker is sync

299 of 370 relevant lines covered (80.81%)

8.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.33
/src/Commands/QueueWorkerCommand.php
1
<?php
2

3
namespace Daycry\Queues\Commands;
4

5
use CodeIgniter\CLI\BaseCommand;
6
use CodeIgniter\CLI\CLI;
7
use CodeIgniter\HTTP\Response;
8
use Config\Services;
9
use Daycry\Queues\Exceptions\QueueException;
10
use Daycry\Queues\Interfaces\BaseExceptionInterface;
11
use Daycry\Queues\Job;
12
use Daycry\Queues\Libraries\Utils;
13

14
class QueueWorkerCommand extends BaseCommand
15
{
16
    protected $group = 'Queues';
17

18
    protected $name = 'queues:worker';
19

20
    protected $description = 'Start queue worker.';
21

22
    protected $usage = 'queue:run <queue> [Options]';
23

24
    protected $arguments = ['queue' => 'The queue name.'];
25

26
    protected $options = [ '--oneTime' => 'Only executes one time.' ];
27

28
    protected bool $locked = false;
29

30
    protected function earlyChecks(Job $job): void
31
    {
32
    }
15✔
33

34
    protected function lateChecks(Job $job): void
35
    {
36
    }
15✔
37

38
    protected function earlyCallbackChecks(Job $job): void
39
    {
40
    }
3✔
41

42
    protected function lateCallbackChecks(Job $job): void
43
    {
44
    }
3✔
45

46
    public function run(array $params)
47
    {
48
        $queue = $params[0] ?? CLI::getOption('queue');
15✔
49

50
        //CLI::write('Queue "'. $queue .'" started successfully.', 'green');
51

52
        $oneTime = false;
15✔
53
        if (array_key_exists('oneTime', $params) || CLI::getOption('oneTime')) {
15✔
54
            $oneTime = true;
15✔
55
        }
56

57
        // @codeCoverageIgnoreStart
58
        if (empty($queue)) {
59
            $queue = CLI::prompt(lang('Queue.insertQueue'));
60
        }
61
        // @codeCoverageIgnoreEnd
62

63
        while(true) {
15✔
64
            $queues = Utils::parseConfigFile(service('settings')->get('Queue.queues'));
15✔
65

66
            $response = [];
15✔
67

68
            Services::resetSingle('request');
15✔
69
            Services::resetSingle('response');
15✔
70

71
            try {
72
                $workers = service('settings')->get('Queue.workers');
15✔
73
                $worker = service('settings')->get('Queue.worker');
15✔
74

75
                if(!array_key_exists($worker, $workers)) {
15✔
76
                    throw QueueException::forInvalidWorker($worker);
×
77
                }
78

79
                $worker = new $workers[$worker]();
15✔
80

81
                $job = $worker->watch($queue);
15✔
82

83
                if($job) {
15✔
84
                    $this->locked = true;
15✔
85

86
                    $dataJob = $worker->getDataJob();
15✔
87
                    $j = new Job($dataJob);
15✔
88

89
                    $this->earlyChecks($j);
15✔
90

91
                    $result = $j->run();
15✔
92

93
                    $response['status'] = true;
15✔
94

95
                    if(!$result instanceof Response) {
15✔
96
                        $result = (Services::response(null, true))->setStatusCode(200)->setBody($result);
12✔
97
                    }
98

99
                    $response['statusCode'] = $result->getStatusCode();
15✔
100
                    $response['data'] = $result->getBody();
15✔
101

102
                    $this->lateChecks($j);
15✔
103
                }
104

105
            } catch(BaseExceptionInterface $e) {
×
106

107
                $response['statusCode'] = $e->getCode();
×
108
                $response['error'] = $e->getMessage();
×
109
                $response['status'] = false;
×
110
                $worker->removeJob($j, true);
×
111
                $this->showError($e);
×
112
            }
113

114
            if($response) {
15✔
115
                try {
116
                    if($response['status'] === true || $job->getAttempt() >= service('settings')->get('Queue.maxAttempts')) {
15✔
117
                        $worker->removeJob($j, false);
15✔
118
                    }
119

120
                    //callback
121
                    if($cb = $j->getCallback()) {
15✔
122
                        $cb->options->body = $response;
3✔
123
                        $c = new Job();
3✔
124
                        $c->url($cb->url, $cb->options);
3✔
125

126
                        $this->earlyCallbackChecks($c);
3✔
127
                        $r = $c->run();
3✔
128
                        $this->lateCallbackChecks($c);
15✔
129
                    }
130

131
                } catch(BaseExceptionInterface $e) {
×
132
                    $this->showError($e);
×
133
                }
134
            }
135

136
            $this->locked = false;
15✔
137
            $response = [];
15✔
138

139
            sleep(service('settings')->get('Queue.waitingTimeBetweenJobs'));
15✔
140

141
            if ($oneTime) {
15✔
142
                return;
15✔
143
            }
144

145
        }
146
    }
147
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc