• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / queues / 7100624032

29 Nov 2023 11:03AM UTC coverage: 81.707% (-1.0%) from 82.683%
7100624032

push

github

web-flow
Update README.md

335 of 410 relevant lines covered (81.71%)

10.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.0
/src/Commands/QueueWorkerCommand.php
1
<?php
2

3
namespace Daycry\Queues\Commands;
4

5
use CodeIgniter\CLI\BaseCommand;
6
use CodeIgniter\CLI\CLI;
7
use CodeIgniter\HTTP\Response;
8
use Config\Services;
9
use Daycry\Queues\Exceptions\QueueException;
10
use Daycry\Queues\Interfaces\BaseExceptionInterface;
11
use Daycry\Queues\Job;
12
use Daycry\Queues\Libraries\Utils;
13

14
class QueueWorkerCommand extends BaseCommand
15
{
16
    protected $group = 'Queues';
17

18
    protected $name = 'queues:worker';
19

20
    protected $description = 'Start queue worker.';
21

22
    protected $usage = 'queue:run <queue> [Options]';
23

24
    protected $arguments = ['queue' => 'The queue name.'];
25

26
    protected $options = [ '--oneTime' => 'Only executes one time.' ];
27

28
    protected bool $locked = false;
29

30
    protected function earlyChecks(Job $job): void {}
31

32
    protected function lateChecks(Job $job): void {}
33

34
    protected function earlyCallbackChecks(Job $job): void {}
35

36
    protected function lateCallbackChecks(Job $job): void {}
37

38
    public function run(array $params)
39
    {   
40
        $queue = $params[0] ?? CLI::getOption('queue');
15✔
41

42
        //CLI::write('Queue "'. $queue .'" started successfully.', 'green');
43

44
        $oneTime = false;
15✔
45
        if (array_key_exists('oneTime', $params) || CLI::getOption('oneTime')) {
15✔
46
            $oneTime = true;
15✔
47
        }
48

49
        // @codeCoverageIgnoreStart
50
        if (empty($queue)) {
51
            $queue = CLI::prompt(lang('Queue.insertQueue'));
52
        }
53
        // @codeCoverageIgnoreEnd
54

55
        while(true)
15✔
56
        {
57
            $queues = Utils::parseConfigFile(service('settings')->get('Queue.queues'));
15✔
58

59
            $response = [];
15✔
60

61
            Services::resetSingle('request');
15✔
62
            Services::resetSingle('response');
15✔
63

64
            try{
65
                $workers = service('settings')->get('Queue.workers');
15✔
66
                $worker = service('settings')->get('Queue.worker');
15✔
67
                    
68
                if(!array_key_exists($worker, $workers))
15✔
69
                {
70
                    throw QueueException::forInvalidWorker($worker);
×
71
                }
72

73
                $worker = new $workers[$worker]();
15✔
74

75
                $job = $worker->watch($queue);
15✔
76

77
                if($job)
15✔
78
                {
79
                    $this->locked = true;
15✔
80

81
                    $dataJob = $worker->getDataJob();
15✔
82
                    $j = new Job($dataJob);
15✔
83

84
                    $this->earlyChecks($j);
15✔
85

86
                    $result = $j->run();
15✔
87

88
                    $response['status'] = true;
15✔
89

90
                    if(!$result instanceof Response)
15✔
91
                    {
92
                        $result = (Services::response(null, true))->setStatusCode(200)->setBody($result);
12✔
93
                    }
94

95
                    $response['statusCode'] = $result->getStatusCode();
15✔
96
                    $response['data'] = $result->getBody();
15✔
97

98
                    $this->lateChecks($j);
15✔
99
                }
100

101
            } catch(BaseExceptionInterface $e) {
×
102

103
                $response['statusCode'] = $e->getCode();
×
104
                $response['error'] = $e->getMessage();
×
105
                $response['status'] = false;
×
106
                $worker->removeJob($j, true);
×
107
                $this->showError($e);
×
108
            }
109

110
            if($response)
15✔
111
            {
112
                try {
113
                    if($response['status'] === true || $job->getAttempt() >= service('settings')->get('Queue.maxAttempts'))
15✔
114
                    {
115
                        $worker->removeJob($j, false);
15✔
116
                    }
117

118
                    //callback
119
                    if($cb = $j->getCallback())
15✔
120
                    {
121
                        $cb->options->body = $response;
3✔
122
                        $c = new Job();
3✔
123
                        $c->url($cb->url, $cb->options);
3✔
124

125
                        $this->earlyCallbackChecks($c);
3✔
126
                        $r = $c->run();
3✔
127
                        $this->lateCallbackChecks($c);
15✔
128
                    }
129

130
                } catch(BaseExceptionInterface $e) {
×
131
                    $this->showError($e);
×
132
                }
133
            }
134

135
            $this->locked = false;
15✔
136
            $response = [];
15✔
137

138
            sleep(service('settings')->get('Queue.waitingTimeBetweenJobs'));
15✔
139

140
            if ($oneTime) {
15✔
141
                return;
15✔
142
            }
143
            
144
        }
145
    }
146
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc