• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tochka-developers / queue-promises / 9935584685

15 Jul 2024 07:44AM UTC coverage: 47.259% (-18.3%) from 65.524%
9935584685

push

github

darkdarin
fix: tests

4 of 4 new or added lines in 2 files covered. (100.0%)

431 existing lines in 40 files now uncovered.

612 of 1295 relevant lines covered (47.26%)

2.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/Core/PromiseWatcher.php
1
<?php
2

3
namespace Tochka\Promises\Core;
4

5
use Carbon\Carbon;
6
use Illuminate\Support\Facades\DB;
7
use Tochka\Promises\Core\Support\ConditionTransitionHandlerInterface;
8
use Tochka\Promises\Core\Support\DaemonWorker;
9
use Tochka\Promises\Enums\StateEnum;
10
use Tochka\Promises\Models\Promise;
11
use Tochka\Promises\Models\PromiseJob;
12

13
class PromiseWatcher implements PromiseWatcherInterface
14
{
15
    use DaemonWorker;
16

17
    /**
18
     * @psalm-suppress PossiblyUnusedMethod
19
     */
20
    public function __construct(
21
        private readonly ConditionTransitionHandlerInterface $conditionTransitionHandler,
22
        int $sleepTime,
23
        private readonly string $promisesTable,
24
        private readonly string $promiseJobsTable,
25
        private readonly int $promiseChunkSize = 100,
26
    ) {
UNCOV
27
        $this->sleepTime = $sleepTime;
×
UNCOV
28
        $this->lastIteration = Carbon::minValue();
×
29
    }
30

31
    public function watch(?callable $shouldQuitCallback = null, ?callable $shouldPausedCallback = null): void
32
    {
33
        if ($shouldQuitCallback === null) {
×
34
            $shouldQuitCallback = fn(): bool => false;
×
35
        }
36

UNCOV
37
        if ($shouldPausedCallback === null) {
×
UNCOV
38
            $shouldPausedCallback = fn(): bool => false;
×
39
        }
40

41
        $this->daemon(function () use ($shouldQuitCallback, $shouldPausedCallback) {
×
42
            $this->watchIteration($shouldQuitCallback, $shouldPausedCallback);
×
43
        }, $shouldQuitCallback, $shouldPausedCallback);
×
44
    }
45

46
    public function watchIteration(callable $shouldQuitCallback, callable $shouldPausedCallback): void
47
    {
48
        while (!$shouldQuitCallback() && !$shouldPausedCallback()) {
×
49
            $promises = DB::table($this->promisesTable)
×
50
                ->whereIn('state', [StateEnum::WAITING, StateEnum::RUNNING])
×
51
                ->where('watch_at', '<', Carbon::now())
×
52
                ->limit($this->promiseChunkSize)
×
53
                ->pluck('id')
×
54
                ->all();
×
55

56
            if (empty($promises)) {
×
57
                return;
×
58
            }
59

60
            $this->handlePromiseChunks($promises, $shouldQuitCallback, $shouldPausedCallback);
×
61

62
            $this->sleep(0.05);
×
63
        }
64
    }
65

66
    /**
67
     * @param array<int> $promiseIds
68
     * @param callable(): bool $shouldQuitCallback
69
     * @param callable(): bool $shouldPausedCallback
70
     */
71
    private function handlePromiseChunks(
72
        array $promiseIds,
73
        callable $shouldQuitCallback,
74
        callable $shouldPausedCallback,
75
    ): void {
76
        foreach ($promiseIds as $promise) {
×
UNCOV
77
            if ($shouldQuitCallback() || $shouldPausedCallback()) {
×
78
                return;
×
79
            }
80

81
            try {
UNCOV
82
                $this->checkPromiseConditions($promise);
×
UNCOV
83
            } catch (\Throwable $e) {
×
84
                report($e);
×
85
            }
86
        }
87
    }
88

89
    public function checkPromiseConditions(int $promiseId): void
90
    {
UNCOV
91
        $basePromise = DB::transaction(
×
92
            function () use ($promiseId) {
×
93
                /** @var Promise|null $lockedPromise */
94
                $lockedPromise = Promise::lockForUpdate()->find($promiseId);
×
UNCOV
95
                if ($lockedPromise === null) {
×
UNCOV
96
                    return null;
×
97
                }
98

99
                $basePromise = $lockedPromise->getBasePromise();
×
UNCOV
100
                if ($basePromise->getTimeoutAt() <= Carbon::now()) {
×
101
                    $basePromise->setState(StateEnum::TIMEOUT());
×
102
                } else {
103
                    $this->conditionTransitionHandler->checkConditionAndApplyTransition(
×
104
                        $basePromise,
×
105
                        $basePromise,
×
UNCOV
106
                        $basePromise,
×
UNCOV
107
                    );
×
108
                }
109

110
                $nextWatch = Carbon::now()->addSeconds(watcher_watch_timeout());
×
UNCOV
111
                if ($nextWatch > $basePromise->getTimeoutAt()) {
×
112
                    $nextWatch = $basePromise->getTimeoutAt();
×
113
                }
UNCOV
114
                if ($nextWatch > Carbon::now()) {
×
UNCOV
115
                    $basePromise->setWatchAt($nextWatch);
×
116
                }
117

118
                Promise::saveBasePromise($basePromise);
×
119

120
                return $basePromise;
×
121
            },
×
UNCOV
122
            3,
×
123
        );
×
124

UNCOV
125
        if ($basePromise === null) {
×
UNCOV
126
            return;
×
127
        }
128

129
        $jobsIds = DB::table($this->promiseJobsTable)
×
130
            ->select(['id'])
×
131
            ->where('promise_id', $promiseId)
×
UNCOV
132
            ->pluck('id')
×
133
            ->all();
×
134

UNCOV
135
        foreach ($jobsIds as $jobId) {
×
UNCOV
136
            $this->checkJobConditions($jobId, $basePromise);
×
137
        }
138
    }
139

140
    public function checkJobConditions(int $jobId, BasePromise $basePromise): void
141
    {
UNCOV
142
        DB::transaction(
×
143
            function () use ($jobId, $basePromise) {
×
144
                /** @var PromiseJob|null $lockedJob */
145
                $lockedJob = PromiseJob::query()->lockForUpdate()->find($jobId);
×
UNCOV
146
                if ($lockedJob === null) {
×
147
                    return;
×
148
                }
149
                $baseJob = $lockedJob->getBaseJob();
×
150

UNCOV
151
                if ($this->conditionTransitionHandler->checkConditionAndApplyTransition(
×
152
                    $baseJob,
×
153
                    $baseJob,
×
154
                    $basePromise,
×
UNCOV
155
                )) {
×
UNCOV
156
                    PromiseJob::saveBaseJob($baseJob);
×
157
                }
UNCOV
158
            },
×
UNCOV
159
            3,
×
UNCOV
160
        );
×
161
    }
162
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc