• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Tencent / ScriptX / 4461662906

pending completion
4461662906

push

github

LanderlYoung
[V8] add support for V8 version 10.8

6 of 6 new or added lines in 1 file covered. (100.0%)

4 existing lines in 1 file now uncovered.

5350 of 5671 relevant lines covered (94.34%)

25659.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.5
/src/utils/MessageQueue.cc
1
/*
2
 * Tencent is pleased to support the open source community by making ScriptX available.
3
 * Copyright (C) 2021 THL A29 Limited, a Tencent company.  All rights reserved.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *      http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17

18
#include "MessageQueue.h"
19
#include <algorithm>
20
#include <unordered_map>
21
#include "ThreadLocal.h"
22

23
namespace script::utils {
24

25
// <queue, nested count>
26
using RunnineQueueMap = std::unordered_map<MessageQueue*, int>;
27
SCRIPTX_THREAD_LOCAL(RunnineQueueMap, runningQueue_);
28

29
static inline RunnineQueueMap& getRunningQueue() { return internal::getThreadLocal(runningQueue_); }
90,258✔
30

31
class LoopQueueGuard {
32
  MessageQueue* queue_;
33

34
 public:
35
  explicit LoopQueueGuard(MessageQueue* queue) : queue_(queue) {
45,127✔
36
    queue_->workerCount_++;
45,127✔
37
    getRunningQueue()[queue]++;
45,127✔
38
  }
45,127✔
39

40
  SCRIPTX_DISALLOW_COPY_AND_MOVE(LoopQueueGuard);
41

42
  ~LoopQueueGuard() {
90,254✔
43
    auto q = getRunningQueue();
90,254✔
44
    if (--q[queue_] == 0) {
45,127✔
45
      q.erase(queue_);
43✔
46
    }
47

48
    queue_->workerCount_--;
45,127✔
49
    queue_->workerQuitCondition_.notify_all();
45,127✔
50
  }
45,127✔
51

52
  /**
53
   * @return if current method call is already inside a loopQueue() stack hierarchy.
54
   */
55
  static bool isCallerNestedInsideLoop(MessageQueue* queue) {
4✔
56
    auto q = getRunningQueue();
4✔
57
    return q.find(queue) != q.end();
8✔
58
  }
59
};
60

61
Message::Message() : handlerProc(nullptr), cleanupProc(nullptr) {}
33,134✔
62

63
Message::Message(MessageProc* handlerProc, MessageProc* cleanupProc)
53,090✔
64
    : handlerProc(handlerProc), cleanupProc(cleanupProc) {}
53,090✔
65

66
bool Message::due() const { return due(MessageQueue::timestamp()); }
53,028✔
67

68
bool Message::due(std::chrono::nanoseconds now) const { return dueTime <= now; }
85,979✔
69

70
void Message::performCleanup() {
53,145✔
71
  if (cleanupProc) {
53,145✔
72
    cleanupProc(*this);
32,003✔
73
  }
74
  data0 = data1 = data2 = data3 = 0;
53,145✔
75
  ptr0 = ptr1 = ptr2 = ptr3 = nullptr;
53,145✔
76
  name = nullptr;
53,145✔
77

78
  priority = what = 0;
53,145✔
79
  handlerProc = cleanupProc = nullptr;
53,145✔
80
  dueTime = std::chrono::nanoseconds(0);
53,145✔
81
  messageId = 0;
53,140✔
82
}
53,140✔
83

84
void Message::handle() {
53,017✔
85
  if (handlerProc) {
53,017✔
86
    handlerProc(*this);
52,973✔
87
  }
88
}
53,021✔
89

90
Message::MessageProc* Message::getHandlerProc() const { return handlerProc; }
×
91

92
Message::MessageProc* Message::getCleanupProc() const { return cleanupProc; }
×
93

94
MessageQueue::MessageQueue(std::size_t maxMessageInQueue)
448✔
95
    : maxMessageInQueue_(maxMessageInQueue),
96
      messagePool_(kDefaultPoolSize),
97
      shutdown_(ShutdownType::kNone),
98
      interrupt_(false),
99
      queueMutex_(),
100
      queueNotEmptyCondition_(),
101
      queueNotFullCondition_(),
102
      queue_(),
103
      messageIdCounter_(1),
104
      workerCount_(0),
105
      workerQuitCondition_(),
106
      supervisor_() {}
448✔
107

108
MessageQueue::~MessageQueue() { shutdownNow(true); }
448✔
109

110
std::unique_ptr<InplaceMessage> MessageQueue::obtainInplaceMessage(
18✔
111
    InplaceMessage::HandlerPorc* handlerProc) {
112
  auto msg = messagePool_.obtain();
18✔
113
  // InplaceMessage is essentially a Message with some extra non-virtual method.
114
  // so it's safe to cast
115
  msg->handlerProc = reinterpret_cast<void (*)(Message&)>(reinterpret_cast<void*>(handlerProc));
18✔
116
  return std::unique_ptr<InplaceMessage>(reinterpret_cast<InplaceMessage*>(msg));
18✔
117
}
118

119
void MessageQueue::releaseMessage(Message* message) {
53,145✔
120
  message->performCleanup();
53,145✔
121
  messagePool_.release(message);
53,140✔
122
}
53,152✔
123

124
void MessageQueue::beforeMessage(Message& message) {
53,021✔
125
  if (auto supervisor = supervisor_) {
53,021✔
126
    supervisor->beforeMessage(message);
×
127
  }
128
}
53,019✔
129

130
void MessageQueue::afterMessage(Message& message) {
53,017✔
131
  if (auto supervisor = supervisor_) {
53,017✔
132
    supervisor->afterMessage(message);
×
133
  }
134
}
53,011✔
135

136
void MessageQueue::setSupervisor(const std::shared_ptr<MessageQueue::Supervisor>& supervisor) {
×
137
  supervisor_ = supervisor;
×
138
}
×
139

140
void MessageQueue::shutdownNow(bool awaitTermination) {
460✔
141
  {
142
    std::lock_guard<std::mutex> lk(queueMutex_);
920✔
143
    shutdown_ = ShutdownType::kNow;
460✔
144
    for (auto r : queue_) {
468✔
145
      releaseMessage(r);
8✔
146
    }
147
    queue_.clear();
460✔
148
  }
149

150
  // wake up postMessage
151
  queueNotFullCondition_.notify_all();
460✔
152

153
  // wake up looper
154
  queueNotEmptyCondition_.notify_all();
460✔
155

156
  if (awaitTermination) {
460✔
157
    this->awaitTermination();
460✔
158
  }
159
}
460✔
160

161
void MessageQueue::shutdown(bool awaitTermination) {
23✔
162
  {
163
    std::unique_lock<std::mutex> lk(queueMutex_);
23✔
164
    shutdown_ = ShutdownType::kAwaitQueue;
23✔
165
  }
166

167
  // wake up postMessage
168
  queueNotFullCondition_.notify_all();
23✔
169

170
  // wake up looper
171
  queueNotEmptyCondition_.notify_all();
23✔
172

173
  if (awaitTermination) {
23✔
174
    this->awaitTermination();
19✔
175
  }
176
}
23✔
177

178
void MessageQueue::awaitTermination() {
483✔
179
  std::unique_lock<std::mutex> lk(queueMutex_);
966✔
180
  workerQuitCondition_.wait(lk, [this] { return workerCount_ == 0; });
992✔
181
}
483✔
182

183
bool MessageQueue::isShutdown() {
×
184
  std::unique_lock<std::mutex> lk(queueMutex_);
×
185
  return shutdown_ != ShutdownType::kNone;
×
186
}
187

188
void MessageQueue::interrupt() {
4✔
189
  {
190
    std::lock_guard<std::mutex> lk(queueMutex_);
4✔
191
    interrupt_ = true;
4✔
192
  }
193

194
  // wake up looper to return immediately
195
  queueNotEmptyCondition_.notify_all();
4✔
196
}
4✔
197

198
bool MessageQueue::isQueueFull() const { return queue_.size() >= maxMessageInQueue_; }
106,300✔
199

200
void MessageQueue::awaitNotFullLocked(std::unique_lock<std::mutex>& lock) {
53,152✔
201
  if (isQueueFull() && LoopQueueGuard::isCallerNestedInsideLoop(this)) {
53,152✔
202
    // This method call is already inside loopQueue call,
203
    // can't wait again, which would cause a dead-lock...
204
    // Just return, and allow the queue to be over-full.
205
    return;
4✔
206
  }
207
  queueNotFullCondition_.wait(lock, [this] { return !isQueueFull(); });
106,296✔
208
}
209

210
int32_t MessageQueue::postMessage(Message* msg, int64_t delayNanos) {
53,151✔
211
  auto id = messageIdCounter_++;
53,151✔
212
  // avoid a "0 id"
213
  while (id == 0) {
53,152✔
214
    id = messageIdCounter_++;
×
215
  }
216

217
  msg->dueTime = timestamp() + std::chrono::nanoseconds(delayNanos);
53,152✔
218
  msg->messageId = id;
53,152✔
219

220
  {
221
    std::unique_lock<std::mutex> lk(queueMutex_);
53,152✔
222
    awaitNotFullLocked(lk);
53,152✔
223
    if (shutdown_ == ShutdownType::kNow) {
53,152✔
224
      releaseMessage(msg);
×
225
      return 0;
×
226
    }
227
    auto pos = findInsertPositionLocked(msg->dueTime, msg->priority);
53,152✔
228
    queue_.insert(pos, msg);
53,152✔
229
  }
230
  queueNotEmptyCondition_.notify_all();
53,152✔
231

232
  return id;
53,152✔
233
}
234

235
std::deque<Message*>::const_iterator MessageQueue::findInsertPositionLocked(
53,152✔
236
    std::chrono::nanoseconds dueTime, int32_t priority) const {
237
  auto it = queue_.begin();
53,152✔
238
  // search by due-time
239
  while (it != queue_.end() && (*it)->dueTime < dueTime) {
19,071,870✔
240
    ++it;
19,018,750✔
241
  }
242

243
  // search by priority
244
  while (it != queue_.end() && (*it)->dueTime == dueTime && (*it)->priority <= priority) {
53,152✔
UNCOV
245
    ++it;
×
246
  }
247

248
  return it;
53,152✔
249
}
250

251
bool MessageQueue::removeMessageIf(
421✔
252
    const std::function<RemoveMessagePredReturnType(Message&)>& pred) {
253
  bool removed = false;
421✔
254
  {
255
    std::lock_guard<std::mutex> lk(queueMutex_);
842✔
256
    for (auto it = queue_.begin(); it != queue_.end();) {
549✔
257
      auto type = pred(**it);
128✔
258
      if (type == RemoveMessagePredReturnType::kRemoveAndContinue ||
128✔
259
          type == RemoveMessagePredReturnType::kRemove) {
260
        auto msg = *it;
120✔
261
        it = queue_.erase(it);
120✔
262
        releaseMessage(msg);
120✔
263
        removed = true;
120✔
264
        if (type == RemoveMessagePredReturnType::kRemove) break;
120✔
265
      } else {
266
        ++it;
8✔
267
      }
268
    }
269
  }
270
  if (removed) {
421✔
271
    queueNotFullCondition_.notify_all();
116✔
272
  }
273
  return removed;
421✔
274
}
275

276
bool MessageQueue::hasDueMessageLocked() const { return !queue_.empty() && queue_.front()->due(); }
53,885✔
277

278
size_t MessageQueue::dueMessageCount() const {
45,092✔
279
  std::lock_guard<std::mutex> lk(queueMutex_);
45,092✔
280
  auto now = timestamp();
45,092✔
281
  auto firstNotDue = std::find_if_not(queue_.begin(), queue_.end(),
45,092✔
282
                                      [now](const Message* msg) { return msg->due(now); });
123,135✔
283
  return firstNotDue - queue_.begin();
90,184✔
284
}
285

286
bool MessageQueue::checkQuitLoopNowLocked(MessageQueue::LoopType loopType, size_t onceMessageCount,
98,993✔
287
                                          MessageQueue::LoopReturnType& returnType) {
288
  if (shutdown_ == ShutdownType::kNow) {
98,993✔
289
    returnType = LoopReturnType::kShutDown;
16✔
290
    return true;
16✔
291
  }
292

293
  if (interrupt_) {
98,977✔
UNCOV
294
    interrupt_ = false;
×
UNCOV
295
    returnType = LoopReturnType::kInterrupt;
×
UNCOV
296
    return true;
×
297
  }
298

299
  if (loopType == LoopType::kLoopOnce && onceMessageCount == 0) {
98,977✔
300
    returnType = LoopReturnType::kRunOnce;
45,092✔
301
    return true;
45,092✔
302
  }
303
  return false;
53,885✔
304
}
305

306
bool MessageQueue::checkQuitLoopWhenNoDueMessageLocked(MessageQueue::LoopType loopType,
861✔
307
                                                       MessageQueue::LoopReturnType& returnType) {
308
  if (loopType == LoopType::kLoopOnce) {
861✔
309
    returnType = LoopReturnType::kRunOnce;
×
310
    return true;
×
311
  }
312

313
  if (shutdown_ == ShutdownType::kAwaitQueue && queue_.empty()) {
861✔
314
    // We have done await queue.
315
    // avoid user call loopQueue again.
316
    shutdown_ = ShutdownType::kNow;
19✔
317
    returnType = LoopReturnType::kShutDown;
19✔
318
    return true;
19✔
319
  }
320
  return false;
842✔
321
}
322

323
Message* MessageQueue::awaitDueMessage(MessageQueue::LoopType loopType, size_t onceMessageCount,
98,151✔
324
                                       MessageQueue::LoopReturnType& returnType) {
325
  Message* dueMessage = nullptr;
98,151✔
326
  while (true) {
327
    std::unique_lock<std::mutex> lk(queueMutex_);
98,993✔
328

329
    if (checkQuitLoopNowLocked(loopType, onceMessageCount, returnType)) {
98,993✔
330
      return nullptr;
45,108✔
331
    }
332

333
    if (!hasDueMessageLocked()) {
53,885✔
334
      if (checkQuitLoopWhenNoDueMessageLocked(loopType, returnType)) {
861✔
335
        return nullptr;
19✔
336
      }
337

338
      if (queue_.empty()) {
842✔
339
        // await for new message
340
        queueNotEmptyCondition_.wait(lk);
838✔
341
      } else {
342
        // await for next message due
343
        auto timeToWait = queue_.front()->dueTime - timestamp();
4✔
344
        if (timeToWait.count() > 0) {
4✔
345
          queueNotEmptyCondition_.wait_for(lk, timeToWait);
4✔
346
        }
347
      }
348

349
      // await complete, maybe for reasons
350
      // 1. have new message arrived
351
      // 2. interrupted
352
      // 3. shutdown
353
      // 4. ...
354
      // so we need to check again
355

356
      continue;
842✔
357
    }
358

359
    dueMessage = queue_.front();
53,024✔
360
    queue_.pop_front();
53,024✔
361
    break;
53,024✔
362
  }
842✔
363

364
  queueNotFullCondition_.notify_all();
53,021✔
365

366
  return dueMessage;
53,022✔
367
}
368

369
MessageQueue::LoopReturnType MessageQueue::loopQueue(MessageQueue::LoopType loopType) {
45,127✔
370
  LoopQueueGuard loopQueueGuard(this);
45,127✔
371

372
  // Find out how many due message we have on loopOnce call.
373
  // We can execute at most so many messages on LoopType::kLoopOnce
374
  // to prevent from corner case where processed message post another message(s)
375
  // making the loop infinite.
376
  auto onceMessageCount = static_cast<size_t>(-1);
45,127✔
377
  if (loopType == LoopType::kLoopOnce) {
45,127✔
378
    onceMessageCount = dueMessageCount();
45,092✔
379
  }
380
  MessageQueue::LoopReturnType returnType = LoopReturnType::kRunOnce;
45,127✔
381

382
  while (true) {
383
    Message* message = awaitDueMessage(loopType, onceMessageCount, returnType);
98,151✔
384
    if (message == nullptr) {
98,149✔
385
      return returnType;
90,254✔
386
    }
387

388
    processMessage(message);
53,022✔
389
    onceMessageCount--;
53,024✔
390
  }
53,024✔
391
}
392

393
void MessageQueue::processMessage(Message* message) {
53,021✔
394
  // process message
395
  beforeMessage(*message);
53,021✔
396

397
  message->handle();
53,018✔
398

399
  afterMessage(*message);
53,021✔
400

401
  releaseMessage(message);
53,019✔
402
}
53,024✔
403

404
/*static*/
405
std::chrono::nanoseconds MessageQueue::timestamp() {
151,223✔
406
#ifdef __ANDROID__
407
  // android NDK r16b is buggy with steady_clock, workaround it...
408
  using std::chrono::nanoseconds;
409
  using std::chrono::seconds;
410
  ::timespec tp{};
411
  clock_gettime(CLOCK_MONOTONIC, &tp);
412
  // in milli-seconds
413
  return seconds(tp.tv_sec) + nanoseconds(tp.tv_nsec);
414
#else
415
  return std::chrono::steady_clock::now().time_since_epoch();
151,223✔
416
#endif
417
}
418

419
}  // namespace script::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc