• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OSGeo / gdal / 17013199325

16 Aug 2025 09:41PM UTC coverage: 71.156% (-0.006%) from 71.162%
17013199325

Pull #12926

github

web-flow
Merge 13958d155 into 68df0689f
Pull Request #12926: Move shapelib file into ogr/ogrsf_frmts/shape/shapelib/

16 of 16 new or added lines in 1 file covered. (100.0%)

112 existing lines in 41 files now uncovered.

578448 of 812932 relevant lines covered (71.16%)

283574.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.31
/port/cpl_worker_thread_pool.cpp
1
/**********************************************************************
2
 *
3
 * Project:  CPL - Common Portability Library
4
 * Purpose:  CPL worker thread pool
5
 * Author:   Even Rouault, <even dot rouault at spatialys dot com>
6
 *
7
 **********************************************************************
8
 * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
9
 *
10
 * SPDX-License-Identifier: MIT
11
 ****************************************************************************/
12

13
#include "cpl_port.h"
14
#include "cpl_worker_thread_pool.h"
15

16
#include <cstddef>
17
#include <memory>
18

19
#include "cpl_conv.h"
20
#include "cpl_error.h"
21
#include "cpl_vsi.h"
22

23
static thread_local CPLWorkerThreadPool *threadLocalCurrentThreadPool = nullptr;
24

25
/************************************************************************/
26
/*                         CPLWorkerThreadPool()                        */
27
/************************************************************************/
28

29
/** Instantiate a new pool of worker threads.
30
 *
31
 * The pool is in an uninitialized state after this call. The Setup() method
32
 * must be called.
33
 */
34
CPLWorkerThreadPool::CPLWorkerThreadPool() : jobQueue{}
813✔
35
{
36
}
813✔
37

38
/** Instantiate a new pool of worker threads.
39
 *
40
 * \param nThreads  Number of threads in the pool.
41
 */
42
CPLWorkerThreadPool::CPLWorkerThreadPool(int nThreads) : jobQueue{}
239✔
43
{
44
    Setup(nThreads, nullptr, nullptr);
239✔
45
}
239✔
46

47
/************************************************************************/
48
/*                          ~CPLWorkerThreadPool()                      */
49
/************************************************************************/
50

51
/** Destroys a pool of worker threads.
52
 *
53
 * Any still pending job will be completed before the destructor returns.
54
 */
55
CPLWorkerThreadPool::~CPLWorkerThreadPool()
1,047✔
56
{
57
    WaitCompletion();
1,047✔
58

59
    {
60
        std::lock_guard<std::mutex> oGuard(m_mutex);
1,047✔
61
        eState = CPLWTS_STOP;
1,047✔
62
    }
63

64
    for (auto &wt : aWT)
3,555✔
65
    {
66
        {
67
            std::lock_guard<std::mutex> oGuard(wt->m_mutex);
5,016✔
68
            wt->m_cv.notify_one();
2,508✔
69
        }
70
        CPLJoinThread(wt->hThread);
2,508✔
71
    }
72

73
    CPLListDestroy(psWaitingWorkerThreadsList);
1,047✔
74
}
1,047✔
75

76
/************************************************************************/
77
/*                        GetThreadCount()                              */
78
/************************************************************************/
79

80
int CPLWorkerThreadPool::GetThreadCount() const
1,244✔
81
{
82
    std::unique_lock<std::mutex> oGuard(m_mutex);
1,244✔
83
    return m_nMaxThreads;
2,488✔
84
}
85

86
/************************************************************************/
87
/*                       WorkerThreadFunction()                         */
88
/************************************************************************/
89

90
void CPLWorkerThreadPool::WorkerThreadFunction(void *user_data)
3,548✔
91
{
92
    CPLWorkerThread *psWT = static_cast<CPLWorkerThread *>(user_data);
3,548✔
93
    CPLWorkerThreadPool *poTP = psWT->poTP;
3,548✔
94

95
    threadLocalCurrentThreadPool = poTP;
3,548✔
96

97
    if (psWT->pfnInitFunc)
3,548✔
98
        psWT->pfnInitFunc(psWT->pInitData);
×
99

100
    while (true)
101
    {
102
        std::function<void()> task = poTP->GetNextJob(psWT);
66,997✔
103
        if (!task)
65,974✔
104
            break;
2,508✔
105

106
        task();
63,341✔
107
#if DEBUG_VERBOSE
108
        CPLDebug("JOB", "%p finished a job", psWT);
109
#endif
110
        poTP->DeclareJobFinished();
63,499✔
111
    }
63,449✔
112
}
2,519✔
113

114
/************************************************************************/
115
/*                             SubmitJob()                              */
116
/************************************************************************/
117

118
/** Queue a new job.
119
 *
120
 * @param pfnFunc Function to run for the job.
121
 * @param pData User data to pass to the job function.
122
 * @return true in case of success.
123
 */
124
bool CPLWorkerThreadPool::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
7,359✔
125
{
126
    return SubmitJob([=] { pfnFunc(pData); });
14,695✔
127
}
128

129
/** Queue a new job.
130
 *
131
 * @param task  Void function to execute.
132
 * @return true in case of success.
133
 */
134
bool CPLWorkerThreadPool::SubmitJob(std::function<void()> task)
73,344✔
135
{
136
#ifdef DEBUG
137
    {
138
        std::unique_lock<std::mutex> oGuard(m_mutex);
146,597✔
139
        CPLAssert(m_nMaxThreads > 0);
73,253✔
140
    }
141
#endif
142

143
    bool bMustIncrementWaitingWorkerThreadsAfterSubmission = false;
73,192✔
144
    if (threadLocalCurrentThreadPool == this)
73,192✔
145
    {
146
        // If there are waiting threads or we have not started all allowed
147
        // threads, we can submit this job asynchronously
148
        {
149
            std::unique_lock<std::mutex> oGuard(m_mutex);
114,371✔
150
            if (nWaitingWorkerThreads > 0 ||
68,663✔
151
                static_cast<int>(aWT.size()) < m_nMaxThreads)
11,441✔
152
            {
153
                bMustIncrementWaitingWorkerThreadsAfterSubmission = true;
45,753✔
154
                nWaitingWorkerThreads--;
45,753✔
155
            }
156
        }
157
        if (!bMustIncrementWaitingWorkerThreadsAfterSubmission)
57,129✔
158
        {
159
            // otherwise there is a risk of deadlock, so execute synchronously.
160
            task();
11,440✔
161
            return true;
11,422✔
162
        }
163
    }
164

165
    std::unique_lock<std::mutex> oGuard(m_mutex);
61,732✔
166

167
    if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
61,939✔
168
        nWaitingWorkerThreads++;
45,797✔
169

170
    if (static_cast<int>(aWT.size()) < m_nMaxThreads)
61,939✔
171
    {
172
        // CPLDebug("CPL", "Starting new thread...");
173
        auto wt = std::make_unique<CPLWorkerThread>();
2,208✔
174
        wt->poTP = this;
1,104✔
175
        //ABELL - Why should this fail? And this is a *pool* thread, not necessarily
176
        //  tied to the submitted job. The submitted job still needs to run, even if
177
        //  this fails. If we can't create a thread, should the entire pool become invalid?
178
        wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
1,104✔
179
        /**
180
        if (!wt->hThread)
181
        {
182
            VSIFree(psJob);
183
            VSIFree(psItem);
184
            return false;
185
        }
186
        **/
187
        if (wt->hThread)
1,104✔
188
            aWT.emplace_back(std::move(wt));
1,104✔
189
    }
190

191
    jobQueue.emplace(task);
61,875✔
192
    nPendingJobs++;
61,825✔
193

194
    if (psWaitingWorkerThreadsList)
61,825✔
195
    {
196
        CPLWorkerThread *psWorkerThread =
55,341✔
197
            static_cast<CPLWorkerThread *>(psWaitingWorkerThreadsList->pData);
55,341✔
198

199
        CPLAssert(psWorkerThread->bMarkedAsWaiting);
55,341✔
200
        psWorkerThread->bMarkedAsWaiting = false;
55,341✔
201

202
        CPLList *psNext = psWaitingWorkerThreadsList->psNext;
55,341✔
203
        CPLList *psToFree = psWaitingWorkerThreadsList;
55,341✔
204
        psWaitingWorkerThreadsList = psNext;
55,341✔
205
        nWaitingWorkerThreads--;
55,341✔
206

207
#if DEBUG_VERBOSE
208
        CPLDebug("JOB", "Waking up %p", psWorkerThread);
209
#endif
210

211
#ifdef __COVERITY__
212
        CPLError(CE_Failure, CPLE_AppDefined, "Not implemented");
213
#else
214
        {
215
            std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
110,932✔
216
            // coverity[uninit_use_in_call]
217
            oGuard.unlock();
55,573✔
218
            psWorkerThread->m_cv.notify_one();
55,613✔
219
        }
220
#endif
221

222
        CPLFree(psToFree);
55,680✔
223
    }
224

225
    // coverity[double_unlock]
226
    return true;
62,184✔
227
}
228

229
/************************************************************************/
230
/*                             SubmitJobs()                              */
231
/************************************************************************/
232

233
/** Queue several jobs
234
 *
235
 * @param pfnFunc Function to run for the job.
236
 * @param apData User data instances to pass to the job function.
237
 * @return true in case of success.
238
 */
239
bool CPLWorkerThreadPool::SubmitJobs(CPLThreadFunc pfnFunc,
170✔
240
                                     const std::vector<void *> &apData)
241
{
242
    if (apData.empty())
170✔
243
        return false;
×
244

245
#ifdef DEBUG
246
    {
247
        std::unique_lock<std::mutex> oGuard(m_mutex);
340✔
248
        CPLAssert(m_nMaxThreads > 0);
170✔
249
    }
250
#endif
251

252
    if (threadLocalCurrentThreadPool == this)
170✔
253
    {
254
        // If SubmitJob() is called from a worker thread of this queue,
255
        // then synchronously run the task to avoid deadlock.
256
        for (void *pData : apData)
×
257
            pfnFunc(pData);
×
258
        return true;
×
259
    }
260

261
    std::unique_lock<std::mutex> oGuard(m_mutex);
340✔
262

263
    for (void *pData : apData)
1,604✔
264
    {
265
        if (static_cast<int>(aWT.size()) < m_nMaxThreads)
1,434✔
266
        {
267
            std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
×
268
            wt->poTP = this;
×
269
            wt->hThread =
×
270
                CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
×
271
            if (wt->hThread == nullptr)
×
272
            {
273
                if (aWT.empty())
×
274
                    return false;
×
275
            }
276
            else
277
            {
278
                aWT.emplace_back(std::move(wt));
×
279
            }
280
        }
281

282
        jobQueue.emplace([=] { pfnFunc(pData); });
2,861✔
283
        nPendingJobs++;
1,434✔
284
    }
285

286
    for (size_t i = 0; i < apData.size(); i++)
603✔
287
    {
288
        if (psWaitingWorkerThreadsList)
436✔
289
        {
290
            CPLWorkerThread *psWorkerThread;
291

292
            psWorkerThread = static_cast<CPLWorkerThread *>(
433✔
293
                psWaitingWorkerThreadsList->pData);
433✔
294

295
            CPLAssert(psWorkerThread->bMarkedAsWaiting);
433✔
296
            psWorkerThread->bMarkedAsWaiting = false;
433✔
297

298
            CPLList *psNext = psWaitingWorkerThreadsList->psNext;
433✔
299
            CPLList *psToFree = psWaitingWorkerThreadsList;
433✔
300
            psWaitingWorkerThreadsList = psNext;
433✔
301
            nWaitingWorkerThreads--;
433✔
302

303
#if DEBUG_VERBOSE
304
            CPLDebug("JOB", "Waking up %p", psWorkerThread);
305
#endif
306
            {
307
                std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
866✔
308
                // coverity[uninit_use_in_call]
309
                oGuard.unlock();
433✔
310
                psWorkerThread->m_cv.notify_one();
433✔
311
            }
312

313
            CPLFree(psToFree);
433✔
314
            oGuard.lock();
433✔
315
        }
316
        else
317
        {
318
            break;
3✔
319
        }
320
    }
321

322
    return true;
170✔
323
}
324

325
/************************************************************************/
326
/*                            WaitCompletion()                          */
327
/************************************************************************/
328

329
/** Wait for completion of part or whole jobs.
330
 *
331
 * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
332
 *                          in the queue after this method has completed. Might
333
 * be 0 to wait for all jobs.
334
 */
335
void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs)
5,771✔
336
{
337
    if (nMaxRemainingJobs < 0)
5,771✔
338
        nMaxRemainingJobs = 0;
×
339
    std::unique_lock<std::mutex> oGuard(m_mutex);
11,542✔
340
    m_cv.wait(oGuard, [this, nMaxRemainingJobs]
5,771✔
341
              { return nPendingJobs <= nMaxRemainingJobs; });
7,248✔
342
}
5,771✔
343

344
/************************************************************************/
345
/*                            WaitEvent()                               */
346
/************************************************************************/
347

348
/** Wait for completion of at least one job, if there are any remaining,
349
 * or for WakeUpWaitEvent() to have been called.
350
 */
351
void CPLWorkerThreadPool::WaitEvent()
1,469✔
352
{
353
    // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
354
    // a notification occurs, jobs could be submitted which would increase
355
    // nPendingJobs, so a job completion may looks like a spurious wakeup.
356
    std::unique_lock<std::mutex> oGuard(m_mutex);
1,469✔
357
    if (nPendingJobs == 0)
1,469✔
358
        return;
39✔
359
    const int nPendingJobsBefore = nPendingJobs;
1,430✔
360
    m_cv.wait(oGuard, [this, nPendingJobsBefore]
1,430✔
361
              { return nPendingJobs < nPendingJobsBefore || m_bNotifyEvent; });
3,047✔
362
    m_bNotifyEvent = false;
1,430✔
363
}
364

365
/************************************************************************/
366
/*                          WakeUpWaitEvent()                           */
367
/************************************************************************/
368

369
/** Wake-up WaitEvent().
370
 *
371
 * This method is thread-safe.
372
 *
373
 * @since GDAL 3.12
374
 */
375
void CPLWorkerThreadPool::WakeUpWaitEvent()
267✔
376
{
377
    std::unique_lock<std::mutex> oGuard(m_mutex);
534✔
378
    m_bNotifyEvent = true;
267✔
379
    m_cv.notify_one();
267✔
380
}
267✔
381

382
/************************************************************************/
383
/*                                Setup()                               */
384
/************************************************************************/
385

386
/** Setup the pool.
387
 *
388
 * @param nThreads Number of threads to launch
389
 * @param pfnInitFunc Initialization function to run in each thread. May be NULL
390
 * @param pasInitData Array of initialization data. Its length must be nThreads,
391
 *                    or it should be NULL.
392
 * @return true if initialization was successful.
393
 */
394
bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
619✔
395
                                void **pasInitData)
396
{
397
    return Setup(nThreads, pfnInitFunc, pasInitData, true);
619✔
398
}
399

400
/** Setup the pool.
401
 *
402
 * @param nThreads Number of threads to launch
403
 * @param pfnInitFunc Initialization function to run in each thread. May be NULL
404
 * @param pasInitData Array of initialization data. Its length must be nThreads,
405
 *                    or it should be NULL.
406
 * @param bWaitallStarted Whether to wait for all threads to be fully started.
407
 * @return true if initialization was successful.
408
 */
409
bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
645✔
410
                                void **pasInitData, bool bWaitallStarted)
411
{
412
    CPLAssert(nThreads > 0);
645✔
413

414
    if (nThreads > static_cast<int>(aWT.size()) && pfnInitFunc == nullptr &&
1,290✔
415
        pasInitData == nullptr && !bWaitallStarted)
1,290✔
416
    {
417
        std::lock_guard<std::mutex> oGuard(m_mutex);
25✔
418
        if (nThreads > m_nMaxThreads)
25✔
419
            m_nMaxThreads = nThreads;
25✔
420
        return true;
25✔
421
    }
422

423
    bool bRet = true;
620✔
424
    for (int i = static_cast<int>(aWT.size()); i < nThreads; i++)
3,063✔
425
    {
426
        auto wt = std::make_unique<CPLWorkerThread>();
2,443✔
427
        wt->pfnInitFunc = pfnInitFunc;
2,444✔
428
        wt->pInitData = pasInitData ? pasInitData[i] : nullptr;
2,444✔
429
        wt->poTP = this;
2,443✔
430
        wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
2,443✔
431
        if (wt->hThread == nullptr)
2,444✔
432
        {
433
            nThreads = i;
×
434
            bRet = false;
×
435
            break;
×
436
        }
437
        aWT.emplace_back(std::move(wt));
2,444✔
438
    }
439

440
    {
441
        std::lock_guard<std::mutex> oGuard(m_mutex);
1,240✔
442
        if (nThreads > m_nMaxThreads)
620✔
443
            m_nMaxThreads = nThreads;
620✔
444
    }
445

446
    if (bWaitallStarted)
620✔
447
    {
448
        // Wait all threads to be started
449
        std::unique_lock<std::mutex> oGuard(m_mutex);
1,240✔
450
        while (nWaitingWorkerThreads < nThreads)
1,416✔
451
        {
452
            m_cv.wait(oGuard);
796✔
453
        }
454
    }
455

456
    if (eState == CPLWTS_ERROR)
620✔
457
        bRet = false;
×
458

459
    return bRet;
620✔
460
}
461

462
/************************************************************************/
463
/*                          DeclareJobFinished()                        */
464
/************************************************************************/
465

466
void CPLWorkerThreadPool::DeclareJobFinished()
63,456✔
467
{
468
    std::lock_guard<std::mutex> oGuard(m_mutex);
126,946✔
469
    nPendingJobs--;
63,494✔
470
    m_cv.notify_one();
63,494✔
471
}
63,505✔
472

473
/************************************************************************/
474
/*                             GetNextJob()                             */
475
/************************************************************************/
476

477
std::function<void()>
478
CPLWorkerThreadPool::GetNextJob(CPLWorkerThread *psWorkerThread)
67,057✔
479
{
480
    std::unique_lock<std::mutex> oGuard(m_mutex);
67,057✔
481
    while (true)
482
    {
483
        if (eState == CPLWTS_STOP)
125,596✔
484
            return std::function<void()>();
65,990✔
485

486
        if (jobQueue.size())
123,088✔
487
        {
488
#if DEBUG_VERBOSE
489
            CPLDebug("JOB", "%p got a job", psWorkerThread);
490
#endif
491
            auto task = std::move(jobQueue.front());
126,889✔
492
            jobQueue.pop();
63,449✔
493
            return task;
63,343✔
494
        }
495

496
        if (!psWorkerThread->bMarkedAsWaiting)
59,679✔
497
        {
498
            psWorkerThread->bMarkedAsWaiting = true;
59,705✔
499
            nWaitingWorkerThreads++;
59,705✔
500

501
            CPLList *psItem =
502
                static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
59,705✔
503
            if (psItem == nullptr)
59,719✔
504
            {
505
                eState = CPLWTS_ERROR;
×
506
                m_cv.notify_one();
×
507

508
                return nullptr;
×
509
            }
510

511
            psItem->pData = psWorkerThread;
59,719✔
512
            psItem->psNext = psWaitingWorkerThreadsList;
59,719✔
513
            psWaitingWorkerThreadsList = psItem;
59,719✔
514

515
#if DEBUG_VERBOSE
516
            CPLAssert(CPLListCount(psWaitingWorkerThreadsList) ==
517
                      nWaitingWorkerThreads);
518
#endif
519
        }
520

521
        m_cv.notify_one();
59,693✔
522

523
#if DEBUG_VERBOSE
524
        CPLDebug("JOB", "%p sleeping", psWorkerThread);
525
#endif
526

527
#ifdef __COVERITY__
528
        CPLError(CE_Failure, CPLE_AppDefined, "Not implemented");
529
#else
530
        std::unique_lock<std::mutex> oGuardThisThread(psWorkerThread->m_mutex);
118,293✔
531
        // coverity[uninit_use_in_call]
532
        oGuard.unlock();
59,696✔
533
        // coverity[wait_not_in_locked_loop]
534
        psWorkerThread->m_cv.wait(oGuardThisThread);
59,704✔
535
        // coverity[lock_order]
536
        oGuard.lock();
58,528✔
537
#endif
538
    }
58,543✔
539
}
540

541
/************************************************************************/
542
/*                         CreateJobQueue()                             */
543
/************************************************************************/
544

545
/** Create a new job queue based on this worker thread pool.
546
 *
547
 * The worker thread pool must remain alive while the returned object is
548
 * itself alive.
549
 *
550
 * @since GDAL 3.2
551
 */
552
std::unique_ptr<CPLJobQueue> CPLWorkerThreadPool::CreateJobQueue()
29,734✔
553
{
554
    return std::unique_ptr<CPLJobQueue>(new CPLJobQueue(this));
29,734✔
555
}
556

557
/************************************************************************/
558
/*                            CPLJobQueue()                             */
559
/************************************************************************/
560

561
//! @cond Doxygen_Suppress
562
CPLJobQueue::CPLJobQueue(CPLWorkerThreadPool *poPool) : m_poPool(poPool)
29,707✔
563
{
564
}
29,671✔
565

566
//! @endcond
567

568
/************************************************************************/
569
/*                           ~CPLJobQueue()                             */
570
/************************************************************************/
571

572
CPLJobQueue::~CPLJobQueue()
29,814✔
573
{
574
    WaitCompletion();
29,821✔
575
}
29,814✔
576

577
/************************************************************************/
578
/*                          DeclareJobFinished()                        */
579
/************************************************************************/
580

581
void CPLJobQueue::DeclareJobFinished()
65,985✔
582
{
583
    std::lock_guard<std::mutex> oGuard(m_mutex);
132,094✔
584
    m_nPendingJobs--;
65,975✔
585
    m_cv.notify_one();
65,975✔
586
}
66,060✔
587

588
/************************************************************************/
589
/*                             SubmitJob()                              */
590
/************************************************************************/
591

592
/** Queue a new job.
593
 *
594
 * @param pfnFunc Function to run for the job.
595
 * @param pData User data to pass to the job function.
596
 * @return true in case of success.
597
 */
598
bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
6,765✔
599
{
600
    return SubmitJob([=] { pfnFunc(pData); });
13,526✔
601
}
602

603
/** Queue a new job.
604
 *
605
 * @param task  Task to execute.
606
 * @return true in case of success.
607
 */
608
bool CPLJobQueue::SubmitJob(std::function<void()> task)
65,949✔
609
{
610
    {
611
        std::lock_guard<std::mutex> oGuard(m_mutex);
65,949✔
612
        m_nPendingJobs++;
65,846✔
613
    }
614

615
    // coverity[uninit_member,copy_constructor_call]
616
    const auto lambda = [this, capturedTask = std::move(task)]
197,652✔
617
    {
618
        capturedTask();
65,828✔
619
        DeclareJobFinished();
65,944✔
620
    };
65,880✔
621
    // cppcheck-suppress knownConditionTrueFalse
622
    return m_poPool->SubmitJob(std::move(lambda));
131,722✔
623
}
624

625
/************************************************************************/
626
/*                            WaitCompletion()                          */
627
/************************************************************************/
628

629
/** Wait for completion of part or whole jobs.
630
 *
631
 * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
632
 *                          in the queue after this method has completed. Might
633
 * be 0 to wait for all jobs.
634
 */
635
void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs)
59,450✔
636
{
637
    std::unique_lock<std::mutex> oGuard(m_mutex);
118,870✔
638
    m_cv.wait(oGuard, [this, nMaxRemainingJobs]
59,416✔
639
              { return m_nPendingJobs <= nMaxRemainingJobs; });
85,839✔
640
}
59,449✔
641

642
/************************************************************************/
643
/*                             WaitEvent()                              */
644
/************************************************************************/
645

646
/** Wait for completion for at least one job.
647
 *
648
 * @return true if there are remaining jobs.
649
 */
650
bool CPLJobQueue::WaitEvent()
255✔
651
{
652
    // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
653
    // a notification occurs, jobs could be submitted which would increase
654
    // nPendingJobs, so a job completion may looks like a spurious wakeup.
655
    std::unique_lock<std::mutex> oGuard(m_mutex);
510✔
656
    if (m_nPendingJobs == 0)
255✔
UNCOV
657
        return false;
×
658

659
    const int nPendingJobsBefore = m_nPendingJobs;
255✔
660
    m_cv.wait(oGuard, [this, nPendingJobsBefore]
255✔
661
              { return m_nPendingJobs < nPendingJobsBefore; });
510✔
662
    return m_nPendingJobs > 0;
255✔
663
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc