• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Tencent / ScriptX / 7078084517

03 Dec 2023 04:33PM UTC coverage: 94.329% (+0.1%) from 94.218%
7078084517

push

github

LanderlYoung
    // fix bug where awaitTermination won't return even if all worker quit.
    // https://en.cppreference.com/w/cpp/thread/condition_variable
    // follow what the STD told us to
    // "Even if the shared variable is atomic, it must be modified while owning the mutex to
    // correctly publish the modification to the waiting thread."
    // DEEP EXPLAINATION:
    // https://stackoverflow.com/questions/38147825/shared-atomic-variable-is-not-properly-published-if-it-is-not-modified-under-mut

5 of 6 new or added lines in 1 file covered. (83.33%)

3 existing lines in 2 files now uncovered.

5373 of 5696 relevant lines covered (94.33%)

199231.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.69
/src/utils/MessageQueue.cc
1
/*
2
 * Tencent is pleased to support the open source community by making ScriptX available.
3
 * Copyright (C) 2021 THL A29 Limited, a Tencent company.  All rights reserved.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *      http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17

18
#include "MessageQueue.h"
19
#include <algorithm>
20
#include <unordered_map>
21
#include "ThreadLocal.h"
22

23
namespace script::utils {
24

25
// <queue, nested count>
26
using RunnineQueueMap = std::unordered_map<MessageQueue*, int>;
27
SCRIPTX_THREAD_LOCAL(RunnineQueueMap, runningQueue_);
28

29
static inline RunnineQueueMap& getRunningQueue() { return internal::getThreadLocal(runningQueue_); }
553,870✔
30

31
class LoopQueueGuard {
32
  MessageQueue* queue_;
33

34
 public:
35
  explicit LoopQueueGuard(MessageQueue* queue) : queue_(queue) {
45,187✔
36
    getRunningQueue()[queue]++;
45,187✔
37

38
    {
39
      std::unique_lock<std::mutex> lk(queue_->queueMutex_);
45,187✔
40
      ++queue_->workerCount_;
45,187✔
41
    }
42
  }
45,187✔
43

44
  SCRIPTX_DISALLOW_COPY_AND_MOVE(LoopQueueGuard);
45

46
  ~LoopQueueGuard() {
90,374✔
47
    auto q = getRunningQueue();
90,374✔
48
    if (--q[queue_] == 0) {
45,187✔
49
      q.erase(queue_);
98✔
50
    }
51

52
    // bugfix: awaitTermination won't return even if all worker quit.
53
    // https://en.cppreference.com/w/cpp/thread/condition_variable
54
    // follow what the STD told us to
55
    // "Even if the shared variable is atomic, it must be modified while owning the mutex to
56
    // correctly publish the modification to the waiting thread."
57
    // DEEP EXPLAINATION:
58
    // https://stackoverflow.com/questions/38147825/shared-atomic-variable-is-not-properly-published-if-it-is-not-modified-under-mut
59
    {
60
      std::unique_lock<std::mutex> lk(queue_->queueMutex_);
45,183✔
61
      --queue_->workerCount_;
45,187✔
62
    }
63
    queue_->workerQuitCondition_.notify_all();
45,187✔
64
  }
45,187✔
65

66
  /**
67
   * @return if current method call is already inside a loopQueue() stack hierarchy.
68
   */
69
  static bool isCallerNestedInsideLoop(MessageQueue* queue) {
463,497✔
70
    auto q = getRunningQueue();
463,497✔
71
    return q.find(queue) != q.end();
926,994✔
72
  }
73
};
74

75
Message::Message() : handlerProc(nullptr), cleanupProc(nullptr) {}
328,164✔
76

77
Message::Message(MessageProc* handlerProc, MessageProc* cleanupProc)
8,870,720✔
78
    : handlerProc(handlerProc), cleanupProc(cleanupProc) {}
8,870,720✔
79

80
bool Message::due() const { return due(MessageQueue::timestamp()); }
9,495,060✔
81

82
bool Message::due(std::chrono::nanoseconds now) const { return dueTime <= now; }
9,528,010✔
83

84
void Message::performCleanup() {
8,848,110✔
85
  if (cleanupProc) {
8,848,110✔
86
    cleanupProc(*this);
31,993✔
87
  }
88
  data0 = data1 = data2 = data3 = 0;
8,848,110✔
89
  ptr0 = ptr1 = ptr2 = ptr3 = nullptr;
8,848,110✔
90
  name = nullptr;
8,848,110✔
91

92
  priority = what = 0;
8,848,110✔
93
  handlerProc = cleanupProc = nullptr;
8,848,110✔
94
  dueTime = std::chrono::nanoseconds(0);
8,848,110✔
95
  messageId = 0;
8,852,750✔
96
}
8,852,750✔
97

98
void Message::handle() {
8,835,190✔
99
  if (handlerProc) {
8,835,190✔
100
    handlerProc(*this);
8,839,370✔
101
  }
102
}
8,843,010✔
103

104
Message::MessageProc* Message::getHandlerProc() const { return handlerProc; }
×
105

106
Message::MessageProc* Message::getCleanupProc() const { return cleanupProc; }
×
107

108
MessageQueue::MessageQueue(std::size_t maxMessageInQueue)
476✔
109
    : maxMessageInQueue_(maxMessageInQueue),
110
      messagePool_(kDefaultPoolSize),
111
      shutdown_(ShutdownType::kNone),
112
      interrupt_(false),
113
      queueMutex_(),
114
      queueNotEmptyCondition_(),
115
      queueNotFullCondition_(),
116
      queue_(),
117
      messageIdCounter_(1),
118
      workerCount_(0),
119
      workerQuitCondition_(),
120
      supervisor_() {}
476✔
121

122
MessageQueue::~MessageQueue() { shutdownNow(true); }
476✔
123

124
std::unique_ptr<InplaceMessage> MessageQueue::obtainInplaceMessage(
18✔
125
    InplaceMessage::HandlerPorc* handlerProc) {
126
  auto msg = messagePool_.obtain();
18✔
127
  // InplaceMessage is essentially a Message with some extra non-virtual method.
128
  // so it's safe to cast
129
  msg->handlerProc = reinterpret_cast<void (*)(Message&)>(reinterpret_cast<void*>(handlerProc));
18✔
130
  return std::unique_ptr<InplaceMessage>(reinterpret_cast<InplaceMessage*>(msg));
18✔
131
}
132

133
void MessageQueue::releaseMessage(Message* message) {
8,850,200✔
134
  message->performCleanup();
8,850,200✔
135
  messagePool_.release(message);
8,852,630✔
136
}
8,873,360✔
137

138
void MessageQueue::beforeMessage(Message& message) {
8,845,690✔
139
  if (auto supervisor = supervisor_) {
8,845,690✔
140
    supervisor->beforeMessage(message);
×
141
  }
142
}
8,838,920✔
143

144
void MessageQueue::afterMessage(Message& message) {
8,840,440✔
145
  if (auto supervisor = supervisor_) {
8,840,440✔
146
    supervisor->afterMessage(message);
×
147
  }
148
}
8,838,870✔
149

150
void MessageQueue::setSupervisor(const std::shared_ptr<MessageQueue::Supervisor>& supervisor) {
×
151
  supervisor_ = supervisor;
×
152
}
×
153

154
void MessageQueue::shutdownNow(bool awaitTermination) {
544✔
155
  {
156
    std::lock_guard<std::mutex> lk(queueMutex_);
1,088✔
157
    shutdown_ = ShutdownType::kNow;
544✔
158
    for (auto r : queue_) {
15,467✔
159
      releaseMessage(r);
14,923✔
160
    }
161
    queue_.clear();
544✔
162
  }
163

164
  // wake up postMessage
165
  queueNotFullCondition_.notify_all();
544✔
166

167
  // wake up looper
168
  queueNotEmptyCondition_.notify_all();
544✔
169

170
  if (awaitTermination) {
544✔
171
    this->awaitTermination();
516✔
172
  }
173
}
544✔
174

175
void MessageQueue::shutdown(bool awaitTermination) {
23✔
176
  {
177
    std::unique_lock<std::mutex> lk(queueMutex_);
23✔
178
    shutdown_ = ShutdownType::kAwaitQueue;
23✔
179
  }
180

181
  // wake up postMessage
182
  queueNotFullCondition_.notify_all();
23✔
183

184
  // wake up looper
185
  queueNotEmptyCondition_.notify_all();
23✔
186

187
  if (awaitTermination) {
23✔
188
    this->awaitTermination();
19✔
189
  }
190
}
23✔
191

192
void MessageQueue::awaitTermination() {
567✔
193
  std::unique_lock<std::mutex> lk(queueMutex_);
1,134✔
194
  workerQuitCondition_.wait(lk, [this] { return workerCount_ == 0; });
1,185✔
195
}
567✔
196

NEW
197
bool MessageQueue::isShutdown() const {
×
198
  std::unique_lock<std::mutex> lk(queueMutex_);
×
199
  return shutdown_ != ShutdownType::kNone;
×
200
}
201

202
void MessageQueue::interrupt() {
4✔
203
  {
204
    std::lock_guard<std::mutex> lk(queueMutex_);
4✔
205
    interrupt_ = true;
4✔
206
  }
207

208
  // wake up looper to return immediately
209
  queueNotEmptyCondition_.notify_all();
4✔
210
}
4✔
211

212
bool MessageQueue::isQueueFull() const { return queue_.size() >= maxMessageInQueue_; }
18,489,340✔
213

214
void MessageQueue::awaitNotFullLocked(std::unique_lock<std::mutex>& lock) {
8,874,190✔
215
  if (isQueueFull() && LoopQueueGuard::isCallerNestedInsideLoop(this)) {
8,874,190✔
216
    // This method call is already inside loopQueue call,
217
    // can't wait again, which would cause a dead-lock...
218
    // Just return, and allow the queue to be over-full.
219
    return;
4✔
220
  }
221
  queueNotFullCondition_.wait(lock, [this] { return !isQueueFull(); });
18,489,340✔
222
}
223

224
int32_t MessageQueue::postMessage(Message* msg, int64_t delayNanos) {
8,872,820✔
225
  auto id = messageIdCounter_++;
8,872,820✔
226
  // avoid a "0 id"
227
  while (id == 0) {
8,873,730✔
228
    id = messageIdCounter_++;
×
229
  }
230

231
  msg->dueTime = timestamp() + std::chrono::nanoseconds(delayNanos);
8,873,730✔
232
  msg->messageId = id;
8,872,540✔
233

234
  {
235
    std::unique_lock<std::mutex> lk(queueMutex_);
8,872,540✔
236
    awaitNotFullLocked(lk);
8,874,190✔
237
    if (shutdown_ == ShutdownType::kNow) {
8,874,190✔
238
      releaseMessage(msg);
60✔
239
      return 0;
60✔
240
    }
241
    auto pos = findInsertPositionLocked(msg->dueTime, msg->priority);
8,874,140✔
242
    queue_.insert(pos, msg);
8,874,140✔
243
  }
244
  queueNotEmptyCondition_.notify_all();
8,871,690✔
245

246
  return id;
8,871,420✔
247
}
248

249
std::deque<Message*>::const_iterator MessageQueue::findInsertPositionLocked(
8,874,140✔
250
    std::chrono::nanoseconds dueTime, int32_t priority) const {
251
  if (queue_.empty()) {
8,874,140✔
252
    return queue_.end();
13,666✔
253
  }
254

255
  // search backwords, since add to queue-end is the most common case
256
  auto it = queue_.end() - 1;
8,860,470✔
257
  while (it != queue_.begin() && (*it)->dueTime >= dueTime) {
22,455,120✔
258
    --it;
13,594,660✔
259
  }
260

261
  // search by due-time
262
  while (it != queue_.end() && (*it)->dueTime < dueTime) {
17,333,000✔
263
    ++it;
8,472,530✔
264
  }
265

266
  // search by priority
267
  while (it != queue_.end() && (*it)->dueTime == dueTime && (*it)->priority <= priority) {
8,860,490✔
268
    ++it;
35✔
269
  }
270

271
  return it;
8,860,470✔
272
}
273

274
bool MessageQueue::removeMessageIf(
421✔
275
    const std::function<RemoveMessagePredReturnType(Message&)>& pred) {
276
  bool removed = false;
421✔
277
  {
278
    std::lock_guard<std::mutex> lk(queueMutex_);
842✔
279
    for (auto it = queue_.begin(); it != queue_.end();) {
549✔
280
      auto type = pred(**it);
128✔
281
      if (type == RemoveMessagePredReturnType::kRemoveAndContinue ||
128✔
282
          type == RemoveMessagePredReturnType::kRemove) {
283
        auto msg = *it;
120✔
284
        it = queue_.erase(it);
120✔
285
        releaseMessage(msg);
120✔
286
        removed = true;
120✔
287
        if (type == RemoveMessagePredReturnType::kRemove) break;
120✔
288
      } else {
289
        ++it;
8✔
290
      }
291
    }
292
  }
293
  if (removed) {
421✔
294
    queueNotFullCondition_.notify_all();
116✔
295
  }
296
  return removed;
421✔
297
}
298

299
bool MessageQueue::hasDueMessageLocked() const { return !queue_.empty() && queue_.front()->due(); }
9,496,340✔
300

301
size_t MessageQueue::dueMessageCount() const {
45,092✔
302
  std::lock_guard<std::mutex> lk(queueMutex_);
45,092✔
303
  auto now = timestamp();
45,092✔
304
  auto firstNotDue = std::find_if_not(queue_.begin(), queue_.end(),
45,092✔
305
                                      [now](const Message* msg) { return msg->due(now); });
123,125✔
306
  return firstNotDue - queue_.begin();
90,184✔
307
}
308

309
bool MessageQueue::checkQuitLoopNowLocked(MessageQueue::LoopType loopType, size_t onceMessageCount,
9,541,510✔
310
                                          MessageQueue::LoopReturnType& returnType) {
311
  if (shutdown_ == ShutdownType::kNow) {
9,541,510✔
312
    returnType = LoopReturnType::kShutDown;
75✔
313
    return true;
75✔
314
  }
315

316
  if (interrupt_) {
9,541,440✔
317
    interrupt_ = false;
1✔
318
    returnType = LoopReturnType::kInterrupt;
1✔
319
    return true;
1✔
320
  }
321

322
  if (loopType == LoopType::kLoopOnce && onceMessageCount == 0) {
9,541,440✔
323
    returnType = LoopReturnType::kRunOnce;
45,092✔
324
    return true;
45,092✔
325
  }
326
  return false;
9,496,340✔
327
}
328

329
bool MessageQueue::checkQuitLoopWhenNoDueMessageLocked(MessageQueue::LoopType loopType,
637,243✔
330
                                                       MessageQueue::LoopReturnType& returnType) {
331
  if (loopType == LoopType::kLoopOnce) {
637,243✔
UNCOV
332
    returnType = LoopReturnType::kRunOnce;
×
UNCOV
333
    return true;
×
334
  }
335

336
  if (shutdown_ == ShutdownType::kAwaitQueue && queue_.empty()) {
637,243✔
337
    // We have done await queue.
338
    // avoid user call loopQueue again.
339
    shutdown_ = ShutdownType::kNow;
19✔
340
    returnType = LoopReturnType::kShutDown;
19✔
341
    return true;
19✔
342
  }
343
  return false;
637,224✔
344
}
345

346
Message* MessageQueue::awaitDueMessage(MessageQueue::LoopType loopType, size_t onceMessageCount,
8,902,840✔
347
                                       MessageQueue::LoopReturnType& returnType) {
348
  Message* dueMessage = nullptr;
8,902,840✔
349
  while (true) {
350
    std::unique_lock<std::mutex> lk(queueMutex_);
9,539,850✔
351

352
    if (checkQuitLoopNowLocked(loopType, onceMessageCount, returnType)) {
9,541,510✔
353
      return nullptr;
45,168✔
354
    }
355

356
    if (!hasDueMessageLocked()) {
9,496,340✔
357
      if (checkQuitLoopWhenNoDueMessageLocked(loopType, returnType)) {
637,243✔
358
        return nullptr;
19✔
359
      }
360

361
      if (queue_.empty()) {
637,224✔
362
        // await for new message
363
        queueNotEmptyCondition_.wait(lk);
1,250✔
364
      } else {
365
        // await for next message due
366
        auto timeToWait = queue_.front()->dueTime - timestamp();
635,974✔
367
        if (timeToWait.count() > 0) {
635,974✔
368
          queueNotEmptyCondition_.wait_for(lk, timeToWait);
635,974✔
369
        }
370
      }
371

372
      // await complete, maybe for reasons
373
      // 1. have new message arrived
374
      // 2. interrupted
375
      // 3. shutdown
376
      // 4. ...
377
      // so we need to check again
378

379
      continue;
637,224✔
380
    }
381

382
    dueMessage = queue_.front();
8,859,100✔
383
    queue_.pop_front();
8,859,100✔
384
    break;
8,859,100✔
385
  }
637,010✔
386

387
  queueNotFullCondition_.notify_all();
8,848,600✔
388

389
  return dueMessage;
8,848,650✔
390
}
391

392
MessageQueue::LoopReturnType MessageQueue::loopQueue(MessageQueue::LoopType loopType) {
45,187✔
393
  LoopQueueGuard loopQueueGuard(this);
45,187✔
394

395
  // Find out how many due message we have on loopOnce call.
396
  // We can execute at most so many messages on LoopType::kLoopOnce
397
  // to prevent from corner case where processed message post another message(s)
398
  // making the loop infinite.
399
  auto onceMessageCount = static_cast<size_t>(-1);
45,187✔
400
  if (loopType == LoopType::kLoopOnce) {
45,187✔
401
    onceMessageCount = dueMessageCount();
45,092✔
402
  }
403
  LoopReturnType returnType = LoopReturnType::kRunOnce;
45,187✔
404

405
  while (true) {
406
    Message* message = awaitDueMessage(loopType, onceMessageCount, returnType);
8,903,000✔
407
    if (message == nullptr) {
8,893,560✔
408
      return returnType;
90,373✔
409
    }
410

411
    processMessage(message);
8,848,370✔
412
    onceMessageCount--;
8,857,810✔
413
  }
8,857,810✔
414
}
415

416
void MessageQueue::processMessage(Message* message) {
8,846,860✔
417
  // process message
418
  beforeMessage(*message);
8,846,860✔
419

420
  message->handle();
8,836,680✔
421

422
  afterMessage(*message);
8,844,700✔
423

424
  releaseMessage(message);
8,836,770✔
425
}
8,858,050✔
426

427
/*static*/
428
std::chrono::nanoseconds MessageQueue::timestamp() {
19,006,970✔
429
#ifdef __ANDROID__
430
  // android NDK r16b is buggy with steady_clock, workaround it...
431
  using std::chrono::nanoseconds;
432
  using std::chrono::seconds;
433
  ::timespec tp{};
434
  clock_gettime(CLOCK_MONOTONIC, &tp);
435
  // in milli-seconds
436
  return seconds(tp.tv_sec) + nanoseconds(tp.tv_nsec);
437
#else
438
  return std::chrono::steady_clock::now().time_since_epoch();
19,006,970✔
439
#endif
440
}
441

442
}  // namespace script::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc