• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

daycry / queues / 9645444372

30 Apr 2024 01:44PM UTC coverage: 80.0% (-0.8%) from 80.811%
9645444372

push

github

daycry
Fixes: Queue worker exceptions

4 of 8 new or added lines in 1 file covered. (50.0%)

300 of 375 relevant lines covered (80.0%)

7.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.97
/src/Commands/QueueWorkerCommand.php
1
<?php
2

3
namespace Daycry\Queues\Commands;
4

5
use CodeIgniter\CLI\BaseCommand;
6
use CodeIgniter\CLI\CLI;
7
use CodeIgniter\HTTP\Response;
8
use Config\Services;
9
use Daycry\Queues\Exceptions\QueueException;
10
use Daycry\Exceptions\Interfaces\BaseExceptionInterface;
11
use Daycry\Queues\Job;
12
use Daycry\Queues\Libraries\Utils;
13
use Exception;
14

15
class QueueWorkerCommand extends BaseCommand
16
{
17
    protected $group = 'Queues';
18

19
    protected $name = 'queues:worker';
20

21
    protected $description = 'Start queue worker.';
22

23
    protected $usage = 'queue:run <queue> [Options]';
24

25
    protected $arguments = ['queue' => 'The queue name.'];
26

27
    protected $options = [ '--oneTime' => 'Only executes one time.' ];
28

29
    protected bool $locked = false;
30

31
    protected function earlyChecks(Job $job): void
32
    {
33
    }
15✔
34

35
    protected function lateChecks(Job $job): void
36
    {
37
    }
15✔
38

39
    protected function earlyCallbackChecks(Job $job): void
40
    {
41
    }
3✔
42

43
    protected function lateCallbackChecks(Job $job): void
44
    {
45
    }
3✔
46

47
    public function run(array $params)
48
    {
49
        $queue = $params[0] ?? CLI::getOption('queue');
15✔
50

51
        //CLI::write('Queue "'. $queue .'" started successfully.', 'green');
52

53
        $oneTime = false;
15✔
54
        if (array_key_exists('oneTime', $params) || CLI::getOption('oneTime')) {
15✔
55
            $oneTime = true;
15✔
56
        }
57

58
        // @codeCoverageIgnoreStart
59
        if (empty($queue)) {
60
            $queue = CLI::prompt(lang('Queue.insertQueue'));
61
        }
62
        // @codeCoverageIgnoreEnd
63

64
        while(true) {
15✔
65
            $queues = Utils::parseConfigFile(service('settings')->get('Queue.queues'));
15✔
66

67
            $response = [];
15✔
68

69
            Services::resetSingle('request');
15✔
70
            Services::resetSingle('response');
15✔
71

72
            try {
73
                $workers = service('settings')->get('Queue.workers');
15✔
74
                $worker = service('settings')->get('Queue.worker');
15✔
75

76
                if(!array_key_exists($worker, $workers)) {
15✔
77
                    throw QueueException::forInvalidWorker($worker);
×
78
                }
79

80
                $worker = new $workers[$worker]();
15✔
81

82
                $job = $worker->watch($queue);
15✔
83

84
                if(isset($job)) {
15✔
85
                    $this->locked = true;
15✔
86

87
                    $dataJob = $worker->getDataJob();
15✔
88
                    $j = new Job($dataJob);
15✔
89

90
                    $this->earlyChecks($j);
15✔
91

92
                    $result = $j->run();
15✔
93

94
                    $response['status'] = true;
15✔
95

96
                    if(!$result instanceof Response) {
15✔
97
                        $result = (Services::response(null, true))->setStatusCode(200)->setBody($result);
12✔
98
                    }
99

100
                    $response['statusCode'] = $result->getStatusCode();
15✔
101
                    $response['data'] = $result->getBody();
15✔
102

103
                    $this->lateChecks($j);
15✔
104
                }
105

106
            } catch(BaseExceptionInterface $e) {
×
107

108
                $response['statusCode'] = $e->getCode();
×
109
                $response['error'] = $e->getMessage();
×
110
                $response['status'] = false;
×
111
                $worker->removeJob($j, true);
×
112
                $this->showError($e);
×
NEW
113
            } catch(Exception $e) {
×
NEW
114
                $response['statusCode'] = $e->getCode();
×
NEW
115
                $response['error'] = $e->getMessage();
×
NEW
116
                $response['status'] = false;
×
117
            }
118

119
            if($response && isset($job)) {
15✔
120
                try {
121
                    if($response['status'] === true || $j->getAttempt() >= service('settings')->get('Queue.maxAttempts')) {
15✔
122
                        $worker->removeJob($j, false);
15✔
123
                    }
124

125
                    //callback
126
                    if($cb = $j->getCallback()) {
15✔
127
                        $cb->options->body = $response;
3✔
128
                        $c = new Job();
3✔
129
                        $c->url($cb->url, $cb->options);
3✔
130

131
                        $this->earlyCallbackChecks($c);
3✔
132
                        $r = $c->run();
3✔
133
                        $this->lateCallbackChecks($c);
15✔
134
                    }
135

136
                } catch(BaseExceptionInterface $e) {
×
137
                    $this->showError($e);
×
138
                }
139
            }
140

141
            $this->locked = false;
15✔
142
            $response = [];
15✔
143
            unset($j, $job);
15✔
144

145
            sleep(service('settings')->get('Queue.waitingTimeBetweenJobs'));
15✔
146

147
            if ($oneTime) {
15✔
148
                return;
15✔
149
            }
150
        }
151
    }
152
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc