• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Tencent / ScriptX / 10957478192

20 Sep 2024 10:18AM UTC coverage: 94.224% (-0.02%) from 94.242%
10957478192

push

github

web-flow
remove unnecessary std::void_t usage (#109)

5 of 5 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

5367 of 5696 relevant lines covered (94.22%)

18930.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.78
/src/utils/MessageQueue.cc
1
/*
2
 * Tencent is pleased to support the open source community by making ScriptX available.
3
 * Copyright (C) 2021 THL A29 Limited, a Tencent company.  All rights reserved.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *      http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17

18
#include "MessageQueue.h"
19
#include <algorithm>
20
#include <unordered_map>
21
#include "ThreadLocal.h"
22

23
namespace script::utils {
24

25
// <queue, nested count>
26
using RunnineQueueMap = std::unordered_map<MessageQueue*, int>;
27
SCRIPTX_THREAD_LOCAL(RunnineQueueMap, runningQueue_);
28

29
static inline RunnineQueueMap& getRunningQueue() { return internal::getThreadLocal(runningQueue_); }
90,258✔
30

31
class LoopQueueGuard {
32
  MessageQueue* queue_;
33

34
 public:
35
  explicit LoopQueueGuard(MessageQueue* queue) : queue_(queue) {
45,127✔
36
    getRunningQueue()[queue]++;
45,127✔
37

38
    {
39
      std::unique_lock<std::mutex> lk(queue_->queueMutex_);
45,127✔
40
      ++queue_->workerCount_;
45,127✔
41
    }
42
  }
45,127✔
43

44
  SCRIPTX_DISALLOW_COPY_AND_MOVE(LoopQueueGuard);
45

46
  ~LoopQueueGuard() {
90,254✔
47
    auto q = getRunningQueue();
90,254✔
48
    if (--q[queue_] == 0) {
45,125✔
49
      q.erase(queue_);
43✔
50
    }
51

52
    // bugfix: awaitTermination won't return even if all worker quit.
53
    // https://en.cppreference.com/w/cpp/thread/condition_variable
54
    // follow what the STD told us to
55
    // "Even if the shared variable is atomic, it must be modified while owning the mutex to
56
    // correctly publish the modification to the waiting thread."
57
    // DEEP EXPLAINATION:
58
    // https://stackoverflow.com/questions/38147825/shared-atomic-variable-is-not-properly-published-if-it-is-not-modified-under-mut
59
    {
60
      std::unique_lock<std::mutex> lk(queue_->queueMutex_);
45,125✔
61
      --queue_->workerCount_;
45,127✔
62
    }
63
    queue_->workerQuitCondition_.notify_all();
45,127✔
64
  }
45,127✔
65

66
  /**
67
   * @return if current method call is already inside a loopQueue() stack hierarchy.
68
   */
69
  static bool isCallerNestedInsideLoop(MessageQueue* queue) {
4✔
70
    auto q = getRunningQueue();
4✔
71
    return q.find(queue) != q.end();
8✔
72
  }
73
};
74

75
Message::Message() : handlerProc(nullptr), cleanupProc(nullptr) {}
24,102✔
76

77
Message::Message(MessageProc* handlerProc, MessageProc* cleanupProc)
53,055✔
78
    : handlerProc(handlerProc), cleanupProc(cleanupProc) {}
53,055✔
79

80
bool Message::due() const { return due(MessageQueue::timestamp()); }
53,018✔
81

82
bool Message::due(std::chrono::nanoseconds now) const { return dueTime <= now; }
85,959✔
83

84
void Message::performCleanup() {
53,079✔
85
  if (cleanupProc) {
53,079✔
86
    cleanupProc(*this);
31,993✔
87
  }
88
  data0 = data1 = data2 = data3 = 0;
53,079✔
89
  ptr0 = ptr1 = ptr2 = ptr3 = nullptr;
53,079✔
90
  name = nullptr;
53,079✔
91

92
  priority = what = 0;
53,079✔
93
  handlerProc = cleanupProc = nullptr;
53,079✔
94
  dueTime = std::chrono::nanoseconds(0);
53,079✔
95
  messageId = 0;
53,093✔
96
}
53,093✔
97

98
void Message::handle() {
52,956✔
99
  if (handlerProc) {
52,956✔
100
    handlerProc(*this);
52,920✔
101
  }
102
}
52,978✔
103

104
Message::MessageProc* Message::getHandlerProc() const { return handlerProc; }
×
105

106
Message::MessageProc* Message::getCleanupProc() const { return cleanupProc; }
×
107

108
MessageQueue::MessageQueue(std::size_t maxMessageInQueue)
448✔
109
    : maxMessageInQueue_(maxMessageInQueue),
110
      messagePool_(kDefaultPoolSize),
111
      shutdown_(ShutdownType::kNone),
112
      interrupt_(false),
113
      queueMutex_(),
114
      queueNotEmptyCondition_(),
115
      queueNotFullCondition_(),
116
      queue_(),
117
      messageIdCounter_(1),
118
      workerCount_(0),
119
      workerQuitCondition_(),
120
      supervisor_() {}
448✔
121

122
MessageQueue::~MessageQueue() { shutdownNow(true); }
448✔
123

124
std::unique_ptr<InplaceMessage> MessageQueue::obtainInplaceMessage(
18✔
125
    InplaceMessage::HandlerPorc* handlerProc) {
126
  auto msg = messagePool_.obtain();
18✔
127
  // InplaceMessage is essentially a Message with some extra non-virtual method.
128
  // so it's safe to cast
129
  msg->handlerProc = reinterpret_cast<void (*)(Message&)>(reinterpret_cast<void*>(handlerProc));
18✔
130
  return std::unique_ptr<InplaceMessage>(reinterpret_cast<InplaceMessage*>(msg));
18✔
131
}
132

133
void MessageQueue::releaseMessage(Message* message) {
53,081✔
134
  message->performCleanup();
53,081✔
135
  messagePool_.release(message);
53,095✔
136
}
53,135✔
137

138
void MessageQueue::beforeMessage(Message& message) {
52,982✔
139
  if (auto supervisor = supervisor_) {
52,982✔
140
    supervisor->beforeMessage(message);
×
141
  }
142
}
52,963✔
143

144
void MessageQueue::afterMessage(Message& message) {
52,972✔
145
  if (auto supervisor = supervisor_) {
52,972✔
146
    supervisor->afterMessage(message);
×
147
  }
148
}
52,961✔
149

150
void MessageQueue::setSupervisor(const std::shared_ptr<MessageQueue::Supervisor>& supervisor) {
×
151
  supervisor_ = supervisor;
×
152
}
×
153

154
void MessageQueue::shutdownNow(bool awaitTermination) {
460✔
155
  {
156
    std::lock_guard<std::mutex> lk(queueMutex_);
920✔
157
    shutdown_ = ShutdownType::kNow;
460✔
158
    for (auto r : queue_) {
468✔
159
      releaseMessage(r);
8✔
160
    }
161
    queue_.clear();
460✔
162
  }
163

164
  // wake up postMessage
165
  queueNotFullCondition_.notify_all();
460✔
166

167
  // wake up looper
168
  queueNotEmptyCondition_.notify_all();
460✔
169

170
  if (awaitTermination) {
460✔
171
    this->awaitTermination();
460✔
172
  }
173
}
460✔
174

175
void MessageQueue::shutdown(bool awaitTermination) {
23✔
176
  {
177
    std::unique_lock<std::mutex> lk(queueMutex_);
23✔
178
    shutdown_ = ShutdownType::kAwaitQueue;
23✔
179
  }
180

181
  // wake up postMessage
182
  queueNotFullCondition_.notify_all();
23✔
183

184
  // wake up looper
185
  queueNotEmptyCondition_.notify_all();
23✔
186

187
  if (awaitTermination) {
23✔
188
    this->awaitTermination();
19✔
189
  }
190
}
23✔
191

192
void MessageQueue::awaitTermination() {
483✔
193
  std::unique_lock<std::mutex> lk(queueMutex_);
966✔
194
  workerQuitCondition_.wait(lk, [this] { return workerCount_ == 0; });
986✔
195
}
483✔
196

197
bool MessageQueue::isShutdown() const {
×
198
  std::unique_lock<std::mutex> lk(queueMutex_);
×
199
  return shutdown_ != ShutdownType::kNone;
×
200
}
201

202
void MessageQueue::interrupt() {
4✔
203
  {
204
    std::lock_guard<std::mutex> lk(queueMutex_);
4✔
205
    interrupt_ = true;
4✔
206
  }
207

208
  // wake up looper to return immediately
209
  queueNotEmptyCondition_.notify_all();
4✔
210
}
4✔
211

212
bool MessageQueue::isQueueFull() const { return queue_.size() >= maxMessageInQueue_; }
106,280✔
213

214
void MessageQueue::awaitNotFullLocked(std::unique_lock<std::mutex>& lock) {
53,142✔
215
  if (isQueueFull() && LoopQueueGuard::isCallerNestedInsideLoop(this)) {
53,142✔
216
    // This method call is already inside loopQueue call,
217
    // can't wait again, which would cause a dead-lock...
218
    // Just return, and allow the queue to be over-full.
219
    return;
4✔
220
  }
221
  queueNotFullCondition_.wait(lock, [this] { return !isQueueFull(); });
106,276✔
222
}
223

224
int32_t MessageQueue::postMessage(Message* msg, int64_t delayNanos) {
53,116✔
225
  auto id = messageIdCounter_++;
53,116✔
226
  // avoid a "0 id"
227
  while (id == 0) {
53,138✔
228
    id = messageIdCounter_++;
×
229
  }
230

231
  msg->dueTime = timestamp() + std::chrono::nanoseconds(delayNanos);
53,138✔
232
  msg->messageId = id;
53,128✔
233

234
  {
235
    std::unique_lock<std::mutex> lk(queueMutex_);
53,128✔
236
    awaitNotFullLocked(lk);
53,142✔
237
    if (shutdown_ == ShutdownType::kNow) {
53,142✔
238
      releaseMessage(msg);
×
239
      return 0;
×
240
    }
241
    auto pos = findInsertPositionLocked(msg->dueTime, msg->priority);
53,142✔
242
    queue_.insert(pos, msg);
53,142✔
243
  }
244
  queueNotEmptyCondition_.notify_all();
53,123✔
245

246
  return id;
53,120✔
247
}
248

249
std::deque<Message*>::const_iterator MessageQueue::findInsertPositionLocked(
53,142✔
250
    std::chrono::nanoseconds dueTime, int32_t priority) const {
251
  if (queue_.empty()) {
53,142✔
252
    return queue_.end();
14,124✔
253
  }
254

255
  // search backwords, since add to queue-end is the most common case
256
  auto it = queue_.end() - 1;
39,018✔
257
  while (it != queue_.begin() && (*it)->dueTime >= dueTime) {
56,798✔
258
    --it;
17,780✔
259
  }
260

261
  // search by due-time
262
  while (it != queue_.end() && (*it)->dueTime < dueTime) {
78,012✔
263
    ++it;
38,994✔
264
  }
265

266
  // search by priority
267
  while (it != queue_.end() && (*it)->dueTime == dueTime && (*it)->priority <= priority) {
39,018✔
UNCOV
268
    ++it;
×
269
  }
270

271
  return it;
39,018✔
272
}
273

274
bool MessageQueue::removeMessageIf(
421✔
275
    const std::function<RemoveMessagePredReturnType(Message&)>& pred) {
276
  bool removed = false;
421✔
277
  {
278
    std::lock_guard<std::mutex> lk(queueMutex_);
842✔
279
    for (auto it = queue_.begin(); it != queue_.end();) {
549✔
280
      auto type = pred(**it);
128✔
281
      if (type == RemoveMessagePredReturnType::kRemoveAndContinue ||
128✔
282
          type == RemoveMessagePredReturnType::kRemove) {
283
        auto msg = *it;
120✔
284
        it = queue_.erase(it);
120✔
285
        releaseMessage(msg);
120✔
286
        removed = true;
120✔
287
        if (type == RemoveMessagePredReturnType::kRemove) break;
120✔
288
      } else {
289
        ++it;
8✔
290
      }
291
    }
292
  }
293
  if (removed) {
421✔
294
    queueNotFullCondition_.notify_all();
116✔
295
  }
296
  return removed;
421✔
297
}
298

299
bool MessageQueue::hasDueMessageLocked() const { return !queue_.empty() && queue_.front()->due(); }
55,419✔
300

301
size_t MessageQueue::dueMessageCount() const {
45,092✔
302
  std::lock_guard<std::mutex> lk(queueMutex_);
45,092✔
303
  auto now = timestamp();
45,092✔
304
  auto firstNotDue = std::find_if_not(queue_.begin(), queue_.end(),
45,092✔
305
                                      [now](const Message* msg) { return msg->due(now); });
123,125✔
306
  return firstNotDue - queue_.begin();
90,184✔
307
}
308

309
bool MessageQueue::checkQuitLoopNowLocked(MessageQueue::LoopType loopType, size_t onceMessageCount,
100,527✔
310
                                          MessageQueue::LoopReturnType& returnType) {
311
  if (shutdown_ == ShutdownType::kNow) {
100,527✔
312
    returnType = LoopReturnType::kShutDown;
16✔
313
    return true;
16✔
314
  }
315

316
  if (interrupt_) {
100,511✔
317
    interrupt_ = false;
×
318
    returnType = LoopReturnType::kInterrupt;
×
319
    return true;
×
320
  }
321

322
  if (loopType == LoopType::kLoopOnce && onceMessageCount == 0) {
100,511✔
323
    returnType = LoopReturnType::kRunOnce;
45,092✔
324
    return true;
45,092✔
325
  }
326
  return false;
55,419✔
327
}
328

329
bool MessageQueue::checkQuitLoopWhenNoDueMessageLocked(MessageQueue::LoopType loopType,
2,405✔
330
                                                       MessageQueue::LoopReturnType& returnType) {
331
  if (loopType == LoopType::kLoopOnce) {
2,405✔
332
    returnType = LoopReturnType::kRunOnce;
×
333
    return true;
×
334
  }
335

336
  if (shutdown_ == ShutdownType::kAwaitQueue && queue_.empty()) {
2,405✔
337
    // We have done await queue.
338
    // avoid user call loopQueue again.
339
    shutdown_ = ShutdownType::kNow;
19✔
340
    returnType = LoopReturnType::kShutDown;
19✔
341
    return true;
19✔
342
  }
343
  return false;
2,386✔
344
}
345

346
Message* MessageQueue::awaitDueMessage(MessageQueue::LoopType loopType, size_t onceMessageCount,
98,132✔
347
                                       MessageQueue::LoopReturnType& returnType) {
348
  Message* dueMessage = nullptr;
98,132✔
349
  while (true) {
350
    std::unique_lock<std::mutex> lk(queueMutex_);
100,517✔
351

352
    if (checkQuitLoopNowLocked(loopType, onceMessageCount, returnType)) {
100,527✔
353
      return nullptr;
45,108✔
354
    }
355

356
    if (!hasDueMessageLocked()) {
55,419✔
357
      if (checkQuitLoopWhenNoDueMessageLocked(loopType, returnType)) {
2,405✔
358
        return nullptr;
19✔
359
      }
360

361
      if (queue_.empty()) {
2,386✔
362
        // await for new message
363
        queueNotEmptyCondition_.wait(lk);
2,382✔
364
      } else {
365
        // await for next message due
366
        auto timeToWait = queue_.front()->dueTime - timestamp();
4✔
367
        if (timeToWait.count() > 0) {
4✔
368
          queueNotEmptyCondition_.wait_for(lk, timeToWait);
4✔
369
        }
370
      }
371

372
      // await complete, maybe for reasons
373
      // 1. have new message arrived
374
      // 2. interrupted
375
      // 3. shutdown
376
      // 4. ...
377
      // so we need to check again
378

379
      continue;
2,386✔
380
    }
381

382
    dueMessage = queue_.front();
53,014✔
383
    queue_.pop_front();
53,014✔
384
    break;
53,014✔
385
  }
2,385✔
386

387
  queueNotFullCondition_.notify_all();
52,999✔
388

389
  return dueMessage;
52,996✔
390
}
391

392
MessageQueue::LoopReturnType MessageQueue::loopQueue(MessageQueue::LoopType loopType) {
45,127✔
393
  LoopQueueGuard loopQueueGuard(this);
45,127✔
394

395
  // Find out how many due message we have on loopOnce call.
396
  // We can execute at most so many messages on LoopType::kLoopOnce
397
  // to prevent from corner case where processed message post another message(s)
398
  // making the loop infinite.
399
  auto onceMessageCount = static_cast<size_t>(-1);
45,127✔
400
  if (loopType == LoopType::kLoopOnce) {
45,127✔
401
    onceMessageCount = dueMessageCount();
45,092✔
402
  }
403
  LoopReturnType returnType = LoopReturnType::kRunOnce;
45,127✔
404

405
  while (true) {
406
    Message* message = awaitDueMessage(loopType, onceMessageCount, returnType);
98,133✔
407
    if (message == nullptr) {
98,117✔
408
      return returnType;
90,254✔
409
    }
410

411
    processMessage(message);
52,990✔
412
    onceMessageCount--;
53,006✔
413
  }
53,006✔
414
}
415

416
void MessageQueue::processMessage(Message* message) {
52,988✔
417
  // process message
418
  beforeMessage(*message);
52,988✔
419

420
  message->handle();
52,957✔
421

422
  afterMessage(*message);
52,994✔
423

424
  releaseMessage(message);
52,954✔
425
}
53,006✔
426

427
/*static*/
428
std::chrono::nanoseconds MessageQueue::timestamp() {
151,175✔
429
#ifdef __ANDROID__
430
  // android NDK r16b is buggy with steady_clock, workaround it...
431
  using std::chrono::nanoseconds;
432
  using std::chrono::seconds;
433
  ::timespec tp{};
434
  clock_gettime(CLOCK_MONOTONIC, &tp);
435
  // in milli-seconds
436
  return seconds(tp.tv_sec) + nanoseconds(tp.tv_nsec);
437
#else
438
  return std::chrono::steady_clock::now().time_since_epoch();
151,175✔
439
#endif
440
}
441

442
}  // namespace script::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc