• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.92
/source/util/src/ttimer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "ttimer.h"
18
#include "taoserror.h"
19
#include "tdef.h"
20
#include "tlog.h"
21
#include "tsched.h"
22

23
#define tmrFatal(...)                                                     \
24
  {                                                                       \
25
    if (tmrDebugFlag & DEBUG_FATAL) {                                     \
26
      taosPrintLog("TMR FATAL ", DEBUG_FATAL, tmrDebugFlag, __VA_ARGS__); \
27
    }                                                                     \
28
  }
29
#define tmrError(...)                                                     \
30
  {                                                                       \
31
    if (tmrDebugFlag & DEBUG_ERROR) {                                     \
32
      taosPrintLog("TMR ERROR ", DEBUG_ERROR, tmrDebugFlag, __VA_ARGS__); \
33
    }                                                                     \
34
  }
35
#define tmrWarn(...)                                                    \
36
  {                                                                     \
37
    if (tmrDebugFlag & DEBUG_WARN) {                                    \
38
      taosPrintLog("TMR WARN ", DEBUG_WARN, tmrDebugFlag, __VA_ARGS__); \
39
    }                                                                   \
40
  }
41
#define tmrInfo(...)                                               \
42
  {                                                                \
43
    if (tmrDebugFlag & DEBUG_INFO) {                               \
44
      taosPrintLog("TMR ", DEBUG_INFO, tmrDebugFlag, __VA_ARGS__); \
45
    }                                                              \
46
  }
47
#define tmrDebug(...)                                               \
48
  {                                                                 \
49
    if (tmrDebugFlag & DEBUG_DEBUG) {                               \
50
      taosPrintLog("TMR ", DEBUG_DEBUG, tmrDebugFlag, __VA_ARGS__); \
51
    }                                                               \
52
  }
53
#define tmrTrace(...)                                               \
54
  {                                                                 \
55
    if (tmrDebugFlag & DEBUG_TRACE) {                               \
56
      taosPrintLog("TMR ", DEBUG_TRACE, tmrDebugFlag, __VA_ARGS__); \
57
    }                                                               \
58
  }
59

60
#define TIMER_STATE_WAITING  0
61
#define TIMER_STATE_EXPIRED  1
62
#define TIMER_STATE_STOPPED  2
63
#define TIMER_STATE_CANCELED 3
64

65
typedef union _tmr_ctrl_t {
66
  char label[16];
67
  struct {
68
    // pad to ensure 'next' is the end of this union
69
    char               padding[16 - sizeof(union _tmr_ctrl_t*)];
70
    union _tmr_ctrl_t* next;
71
  };
72
} tmr_ctrl_t;
73

74
typedef struct tmr_obj_t {
75
  uintptr_t         id;
76
  tmr_ctrl_t*       ctrl;
77
  struct tmr_obj_t* mnext;
78
  struct tmr_obj_t* prev;
79
  struct tmr_obj_t* next;
80
  uint16_t          slot;
81
  uint8_t           wheel;
82
  uint8_t           state;
83
  uint8_t           refCount;
84
  uint8_t           reserved1;
85
  uint16_t          reserved2;
86
  union {
87
    int64_t expireAt;
88
    int64_t executedBy;
89
  };
90
  TAOS_TMR_CALLBACK fp;
91
  void*             param;
92
} tmr_obj_t;
93

94
typedef struct timer_list_t {
95
  int64_t    lockedBy;
96
  tmr_obj_t* timers;
97
} timer_list_t;
98

99
typedef struct timer_map_t {
100
  uint32_t      size;
101
  uint32_t      count;
102
  timer_list_t* slots;
103
} timer_map_t;
104

105
typedef struct time_wheel_t {
106
  TdThreadMutex mutex;
107
  int64_t       nextScanAt;
108
  uint32_t      resolution;
109
  uint16_t      size;
110
  uint16_t      index;
111
  tmr_obj_t**   slots;
112
} time_wheel_t;
113

114
static int32_t tsMaxTmrCtrl = TSDB_MAX_VNODES_PER_DB + 100;
115

116
static int32_t       tmrModuleInit = 0;
117
static TdThreadMutex tmrCtrlMutex;
118
static tmr_ctrl_t*   tmrCtrls;
119
static tmr_ctrl_t*   unusedTmrCtrl = NULL;
120
static void*         tmrQhandle;
121
static int32_t       numOfTmrCtrl = 0;
122

123
int32_t          taosTmrThreads = 1;
124
static uintptr_t nextTimerId = 0;
125

126
static time_wheel_t wheels[] = {
127
    {.resolution = MSECONDS_PER_TICK, .size = 4096},
128
    {.resolution = 1000, .size = 1024},
129
    {.resolution = 60000, .size = 1024},
130
};
131
static timer_map_t timerMap;
132

133
static uintptr_t getNextTimerId() {
6,241,431✔
134
  uintptr_t id;
135
  do {
136
    id = (uintptr_t)atomic_add_fetch_ptr((void**)&nextTimerId, 1);
6,241,431✔
137
  } while (id == 0);
6,241,450!
138
  return id;
6,241,451✔
139
}
140

141
static void timerAddRef(tmr_obj_t* timer) { (void)atomic_add_fetch_8(&timer->refCount, 1); }
18,624,025✔
142

143
static void timerDecRef(tmr_obj_t* timer) {
18,609,930✔
144
  if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
18,609,930✔
145
    taosMemoryFree(timer);
6,229,087✔
146
  }
147
}
18,609,957✔
148

149
static void lockTimerList(timer_list_t* list) {
18,716,032✔
150
  int64_t tid = taosGetSelfPthreadId();
18,716,032✔
151
  int32_t i = 0;
18,716,041✔
152
  while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
18,716,046✔
153
    if (++i % 1000 == 0) {
5!
154
      (void)sched_yield();
×
155
    }
156
  }
157
}
18,716,086✔
158

159
static void unlockTimerList(timer_list_t* list) {
18,716,086✔
160
  int64_t tid = taosGetSelfPthreadId();
18,716,086✔
161
  if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
18,716,083!
162
    uError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
×
163
  }
164
}
18,716,092✔
165

166
static void addTimer(tmr_obj_t* timer) {
6,241,448✔
167
  timerAddRef(timer);
6,241,448✔
168
  timer->wheel = tListLen(wheels);
6,241,450✔
169

170
  uint32_t      idx = (uint32_t)(timer->id % timerMap.size);
6,241,450✔
171
  timer_list_t* list = timerMap.slots + idx;
6,241,450✔
172

173
  lockTimerList(list);
6,241,450✔
174
  timer->mnext = list->timers;
6,241,446✔
175
  list->timers = timer;
6,241,446✔
176
  unlockTimerList(list);
6,241,446✔
177
}
6,241,453✔
178

179
static tmr_obj_t* findTimer(uintptr_t id) {
6,909,557✔
180
  tmr_obj_t* timer = NULL;
6,909,557✔
181
  if (id > 0) {
6,909,557✔
182
    uint32_t      idx = (uint32_t)(id % timerMap.size);
6,234,886✔
183
    timer_list_t* list = timerMap.slots + idx;
6,234,886✔
184
    lockTimerList(list);
6,234,886✔
185
    for (timer = list->timers; timer != NULL; timer = timer->mnext) {
6,234,999✔
186
      if (timer->id == id) {
6,141,246✔
187
        timerAddRef(timer);
6,141,161✔
188
        break;
6,141,163✔
189
      }
190
    }
191
    unlockTimerList(list);
6,234,916✔
192
  }
193
  return timer;
6,909,593✔
194
}
195

196
static void removeTimer(uintptr_t id) {
6,239,736✔
197
  tmr_obj_t*    prev = NULL;
6,239,736✔
198
  uint32_t      idx = (uint32_t)(id % timerMap.size);
6,239,736✔
199
  timer_list_t* list = timerMap.slots + idx;
6,239,736✔
200
  lockTimerList(list);
6,239,736✔
201
  for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
6,239,740!
202
    if (p->id == id) {
6,239,740✔
203
      if (prev == NULL) {
6,239,736✔
204
        list->timers = p->mnext;
6,239,732✔
205
      } else {
206
        prev->mnext = p->mnext;
4✔
207
      }
208
      timerDecRef(p);
6,239,736✔
209
      break;
6,239,736✔
210
    }
211
    prev = p;
4✔
212
  }
213
  unlockTimerList(list);
6,239,736✔
214
}
6,239,736✔
215

216
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
6,241,441✔
217
  timerAddRef(timer);
6,241,441✔
218
  // select a wheel for the timer, we are not an accurate timer,
219
  // but the inaccuracy should not be too large.
220
  timer->wheel = tListLen(wheels) - 1;
6,241,451✔
221
  for (uint8_t i = 0; i < tListLen(wheels); i++) {
6,257,505✔
222
    time_wheel_t* wheel = wheels + i;
6,257,504✔
223
    if (delay < wheel->resolution * wheel->size) {
6,257,504✔
224
      timer->wheel = i;
6,241,450✔
225
      break;
6,241,450✔
226
    }
227
  }
228

229
  time_wheel_t* wheel = wheels + timer->wheel;
6,241,451✔
230
  timer->prev = NULL;
6,241,451✔
231
  timer->expireAt = taosGetMonotonicMs() + delay;
6,241,451✔
232

233
  (void)taosThreadMutexLock(&wheel->mutex);
6,241,441✔
234

235
  uint32_t idx = 0;
6,241,453✔
236
  if (timer->expireAt > wheel->nextScanAt) {
6,241,453✔
237
    // adjust delay according to next scan time of this wheel
238
    // so that the timer is not fired earlier than desired.
239
    delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
6,238,935✔
240
    idx = (delay + wheel->resolution - 1) / wheel->resolution;
6,238,935✔
241
  }
242

243
  timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
6,241,453✔
244
  tmr_obj_t* p = wheel->slots[timer->slot];
6,241,453✔
245
  wheel->slots[timer->slot] = timer;
6,241,453✔
246
  timer->next = p;
6,241,453✔
247
  if (p != NULL) {
6,241,453✔
248
    p->prev = timer;
4,130,604✔
249
  }
250

251
  (void)taosThreadMutexUnlock(&wheel->mutex);
6,241,453✔
252
}
6,241,453✔
253

254
static bool removeFromWheel(tmr_obj_t* timer) {
646,879✔
255
  uint8_t wheelIdx = timer->wheel;
646,879✔
256
  if (wheelIdx >= tListLen(wheels)) {
646,879✔
257
    return false;
7✔
258
  }
259
  time_wheel_t* wheel = wheels + wheelIdx;
646,872✔
260

261
  bool removed = false;
646,872✔
262
  (void)taosThreadMutexLock(&wheel->mutex);
646,872✔
263
  // other thread may modify timer->wheel, check again.
264
  if (timer->wheel < tListLen(wheels)) {
646,877!
265
    if (timer->prev != NULL) {
646,877✔
266
      timer->prev->next = timer->next;
12,674✔
267
    }
268
    if (timer->next != NULL) {
646,877✔
269
      timer->next->prev = timer->prev;
12,821✔
270
    }
271
    if (timer == wheel->slots[timer->slot]) {
646,877✔
272
      wheel->slots[timer->slot] = timer->next;
634,203✔
273
    }
274
    timer->wheel = tListLen(wheels);
646,877✔
275
    timer->next = NULL;
646,877✔
276
    timer->prev = NULL;
646,877✔
277
    timerDecRef(timer);
646,877✔
278
    removed = true;
646,877✔
279
  }
280
  (void)taosThreadMutexUnlock(&wheel->mutex);
646,877✔
281

282
  return removed;
646,877✔
283
}
284

285
static void processExpiredTimer(void* handle, void* arg) {
5,592,859✔
286
  tmr_obj_t* timer = (tmr_obj_t*)handle;
5,592,859✔
287
  timer->executedBy = taosGetSelfPthreadId();
5,592,859✔
288
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
5,592,859✔
289
  if (state == TIMER_STATE_WAITING) {
5,592,859✔
290
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
5,592,852✔
291
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
5,592,852✔
292

293
    (*timer->fp)(timer->param, (tmr_h)timer->id);
5,592,852✔
294
    atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
5,592,852✔
295

296
    fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution end.";
5,592,852✔
297
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
5,592,852✔
298
  }
299
  removeTimer(timer->id);
5,592,859✔
300
  timerDecRef(timer);
5,592,859✔
301
}
5,592,859✔
302

303
static void addToExpired(tmr_obj_t* head) {
106,408,204✔
304
  const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
106,408,204✔
305

306
  while (head != NULL) {
112,001,063✔
307
    uintptr_t  id = head->id;
5,592,859✔
308
    tmr_obj_t* next = head->next;
5,592,859✔
309
    tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param);
5,592,859✔
310

311
    SSchedMsg schedMsg;
312
    schedMsg.fp = NULL;
5,592,859✔
313
    schedMsg.tfp = processExpiredTimer;
5,592,859✔
314
    schedMsg.msg = NULL;
5,592,859✔
315
    schedMsg.ahandle = head;
5,592,859✔
316
    schedMsg.thandle = NULL;
5,592,859✔
317
    if (taosScheduleTask(tmrQhandle, &schedMsg) != 0) {
5,592,859!
318
      tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to queue.", head->ctrl->label, id);
×
319
    }
320

321
    tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id);
5,592,859✔
322
    head = next;
5,592,859✔
323
  }
324
}
106,408,204✔
325

326
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, tmr_ctrl_t* ctrl) {
6,241,432✔
327
  uintptr_t id = getNextTimerId();
6,241,432✔
328
  timer->id = id;
6,241,451✔
329
  timer->state = TIMER_STATE_WAITING;
6,241,451✔
330
  timer->fp = fp;
6,241,451✔
331
  timer->param = param;
6,241,451✔
332
  timer->ctrl = ctrl;
6,241,451✔
333
  addTimer(timer);
6,241,451✔
334

335
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
6,241,452✔
336
  tmrDebug(fmt, ctrl->label, timer->id, timer->fp, timer->param);
6,241,452✔
337

338
  if (mseconds == 0) {
6,241,447!
339
    timer->wheel = tListLen(wheels);
×
340
    timerAddRef(timer);
×
341
    addToExpired(timer);
×
342
  } else {
343
    addToWheel(timer, mseconds);
6,241,447✔
344
  }
345

346
  // note: use `timer->id` here is unsafe as `timer` may already be freed
347
  return id;
6,241,453✔
348
}
349

350
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle) {
6,230,781✔
351
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
6,230,781✔
352
  if (ctrl == NULL || ctrl->label[0] == 0) {
6,230,781!
353
    return NULL;
×
354
  }
355

356
  tmr_obj_t* timer = (tmr_obj_t*)taosMemoryCalloc(1, sizeof(tmr_obj_t));
6,230,794✔
357
  if (timer == NULL) {
6,230,796!
358
    tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
×
359
    return NULL;
×
360
  }
361

362
  return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
6,230,796✔
363
}
364

365
static void taosTimerLoopFunc(int32_t signo) {
35,469,403✔
366
  int64_t now = taosGetMonotonicMs();
35,469,403✔
367

368
  for (int32_t i = 0; i < tListLen(wheels); i++) {
141,877,606✔
369
    // `expried` is a temporary expire list.
370
    // expired timers are first add to this list, then move
371
    // to expired queue as a batch to improve performance.
372
    // note this list is used as a stack in this function.
373
    tmr_obj_t* expired = NULL;
40,422,478✔
374

375
    time_wheel_t* wheel = wheels + i;
40,422,478✔
376
    while (now >= wheel->nextScanAt) {
142,059,605✔
377
      (void)taosThreadMutexLock(&wheel->mutex);
35,651,402✔
378
      wheel->index = (wheel->index + 1) % wheel->size;
35,651,401✔
379
      tmr_obj_t* timer = wheel->slots[wheel->index];
35,651,401✔
380
      while (timer != NULL) {
41,244,260✔
381
        tmr_obj_t* next = timer->next;
5,592,859✔
382
        if (now < timer->expireAt) {
5,592,859!
383
          timer = next;
×
384
          continue;
×
385
        }
386

387
        // remove from the wheel
388
        if (timer->prev == NULL) {
5,592,859!
389
          wheel->slots[wheel->index] = next;
5,592,859✔
390
          if (next != NULL) {
5,592,859✔
391
            next->prev = NULL;
4,107,344✔
392
          }
393
        } else {
394
          timer->prev->next = next;
×
395
          if (next != NULL) {
×
396
            next->prev = timer->prev;
×
397
          }
398
        }
399
        timer->wheel = tListLen(wheels);
5,592,859✔
400

401
        // add to temporary expire list
402
        timer->next = expired;
5,592,859✔
403
        timer->prev = NULL;
5,592,859✔
404
        if (expired != NULL) {
5,592,859✔
405
          expired->prev = timer;
4,107,514✔
406
        }
407
        expired = timer;
5,592,859✔
408

409
        timer = next;
5,592,859✔
410
      }
411
      wheel->nextScanAt += wheel->resolution;
35,651,401✔
412
      (void)taosThreadMutexUnlock(&wheel->mutex);
35,651,401✔
413
    }
414

415
    addToExpired(expired);
106,408,203✔
416
  }
417
}
101,455,128✔
418

419
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
6,141,159✔
420
  if (state == TIMER_STATE_WAITING) {
6,141,159✔
421
    bool reusable = false;
646,881✔
422
    if (removeFromWheel(timer)) {
646,881✔
423
      removeTimer(timer->id);
646,877✔
424
      // only safe to reuse the timer when timer is removed from the wheel.
425
      // we cannot guarantee the thread safety of the timr in all other cases.
426
      reusable = true;
646,877✔
427
    }
428
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is cancelled.";
646,884✔
429
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
646,884✔
430
    return reusable;
646,884✔
431
  }
432

433
  if (state != TIMER_STATE_EXPIRED) {
5,494,278!
434
    // timer already stopped or cancelled, has nothing to do in this case
UNCOV
435
    return false;
×
436
  }
437

438
  if (timer->executedBy == taosGetSelfPthreadId()) {
5,494,278✔
439
    // taosTmrReset is called in the timer callback, should do nothing in this
440
    // case to avoid dead lock. note taosTmrReset must be the last statement
441
    // of the callback funtion, will be a bug otherwise.
442
    return false;
5,494,271✔
443
  }
444

445
  // timer callback is executing in another thread, we SHOULD wait it to stop,
446
  // BUT this may result in dead lock if current thread are holding a lock which
447
  // the timer callback need to acquire. so, we HAVE TO return directly.
448
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
11✔
449
  tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
11!
450
  return false;
11✔
451
}
452

453
bool taosTmrStop(tmr_h timerId) {
742,971✔
454
  uintptr_t id = (uintptr_t)timerId;
742,971✔
455

456
  tmr_obj_t* timer = findTimer(id);
742,971✔
457
  if (timer == NULL) {
742,984✔
458
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
106,748✔
459
    return false;
106,748✔
460
  }
461

462
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
636,236✔
463
  (void)doStopTimer(timer, state);
636,239✔
464
  timerDecRef(timer);
636,239✔
465

466
  return state == TIMER_STATE_WAITING;
636,238✔
467
}
468

469
bool taosTmrStopA(tmr_h* timerId) {
12,788✔
470
  bool ret = taosTmrStop(*timerId);
12,788✔
471
  *timerId = NULL;
12,788✔
472
  return ret;
12,788✔
473
}
474

475
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) {
6,166,597✔
476
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
6,166,597✔
477
  if (ctrl == NULL || ctrl->label[0] == 0) {
6,166,597!
UNCOV
478
    return false;
×
479
  }
480

481
  uintptr_t  id = (uintptr_t)*pTmrId;
6,166,605✔
482
  bool       stopped = false;
6,166,605✔
483
  tmr_obj_t* timer = findTimer(id);
6,166,605✔
484
  if (timer == NULL) {
6,166,612✔
485
    tmrDebug("%s timer[id=%" PRIuPTR "] does not exist", ctrl->label, id);
661,685✔
486
  } else {
487
    uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
5,504,927✔
488
    if (!doStopTimer(timer, state)) {
5,504,927✔
489
      timerDecRef(timer);
5,494,278✔
490
      timer = NULL;
5,494,278✔
491
    }
492
    stopped = state == TIMER_STATE_WAITING;
5,504,927✔
493
  }
494

495
  if (timer == NULL) {
6,166,609✔
496
    *pTmrId = taosTmrStart(fp, mseconds, param, handle);
6,155,960✔
497
    if (NULL == *pTmrId) {
6,155,967!
498
      stopped = true;
×
499
    }
500
    return stopped;
6,155,967✔
501
  }
502

503
  tmrDebug("%s timer[id=%" PRIuPTR "] is reused", ctrl->label, timer->id);
10,649✔
504

505
  // wait until there's no other reference to this timer,
506
  // so that we can reuse this timer safely.
507
  for (int32_t i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
10,649!
508
    if (i % 1000 == 0) {
×
509
      (void)sched_yield();
×
510
    }
511
  }
512

513
  if (timer->refCount != 1) {
10,649!
514
    uError("timer refCount=%d not expected 1", timer->refCount);
×
515
  }
516
  memset(timer, 0, sizeof(*timer));
10,649✔
517
  *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
10,649✔
518

519
  return stopped;
10,649✔
520
}
521

522
static int32_t taosTmrModuleInit(void) {
6,550✔
523
  tmrCtrls = taosMemoryMalloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl);
6,550✔
524
  if (tmrCtrls == NULL) {
6,550!
525
    tmrError("failed to allocate memory for timer controllers.");
×
526
    return terrno;
×
527
  }
528

529
  memset(&timerMap, 0, sizeof(timerMap));
6,550✔
530

531
  for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) {
7,362,200✔
532
    tmr_ctrl_t* ctrl = tmrCtrls + i;
7,355,650✔
533
    ctrl->next = ctrl + 1;
7,355,650✔
534
  }
535
  (tmrCtrls + tsMaxTmrCtrl - 1)->next = NULL;
6,550✔
536
  unusedTmrCtrl = tmrCtrls;
6,550✔
537

538
  (void)taosThreadMutexInit(&tmrCtrlMutex, NULL);
6,550✔
539

540
  int64_t now = taosGetMonotonicMs();
6,550✔
541
  for (int32_t i = 0; i < tListLen(wheels); i++) {
26,200✔
542
    time_wheel_t* wheel = wheels + i;
19,650✔
543
    if (taosThreadMutexInit(&wheel->mutex, NULL) != 0) {
19,650!
544
      tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
×
545
      return terrno;
×
546
    }
547
    wheel->nextScanAt = now + wheel->resolution;
19,650✔
548
    wheel->index = 0;
19,650✔
549
    wheel->slots = (tmr_obj_t**)taosMemoryCalloc(wheel->size, sizeof(tmr_obj_t*));
19,650✔
550
    if (wheel->slots == NULL) {
19,650!
551
      tmrError("failed to allocate wheel slots");
×
552
      return terrno;
×
553
    }
554
    timerMap.size += wheel->size;
19,650✔
555
  }
556

557
  timerMap.count = 0;
6,550✔
558
  timerMap.slots = (timer_list_t*)taosMemoryCalloc(timerMap.size, sizeof(timer_list_t));
6,550✔
559
  if (timerMap.slots == NULL) {
6,550!
560
    tmrError("failed to allocate hash map");
×
561
    return terrno;
×
562
  }
563

564
  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
6,550✔
565
  if (taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK) != 0) {
6,550!
566
    tmrError("failed to initialize timer");
×
567
  }
568

569
  tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
6,550✔
570

571
  return 2;
6,550✔
572
}
573

574
static int32_t taosTmrInitModule(void) {
40,652✔
575
  if (atomic_load_32(&tmrModuleInit) == 2) {
40,652✔
576
    return 0;
34,100✔
577
  }
578

579
  if (atomic_load_32(&tmrModuleInit) < 0) {
6,549!
580
    return -1;
×
581
  }
582

583
  while (true) {
584
    if (0 == atomic_val_compare_exchange_32(&tmrModuleInit, 0, 1)) {
13,100✔
585
      atomic_store_32(&tmrModuleInit, taosTmrModuleInit());
6,550✔
586
    } else if (atomic_load_32(&tmrModuleInit) < 0) {
6,550!
587
      return -1;
×
588
    } else if (atomic_load_32(&tmrModuleInit) == 2) {
6,550!
589
      return 0;
6,550✔
590
    } else {
591
      taosMsleep(1);
×
592
    }
593
  }
594

595
  return -1;
596
}
597

598
void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, const char* label) {
40,652✔
599
  const char* ret = taosMonotonicInit();
40,652✔
600
  tmrDebug("ttimer monotonic clock source:%s", ret);
40,651✔
601

602
  if (taosTmrInitModule() < 0) {
40,651!
603
    return NULL;
×
604
  }
605

606
  (void)taosThreadMutexLock(&tmrCtrlMutex);
40,651✔
607
  tmr_ctrl_t* ctrl = unusedTmrCtrl;
40,653✔
608
  if (ctrl != NULL) {
40,653!
609
    unusedTmrCtrl = ctrl->next;
40,653✔
610
    numOfTmrCtrl++;
40,653✔
611
  }
612
  (void)taosThreadMutexUnlock(&tmrCtrlMutex);
40,653✔
613

614
  if (ctrl == NULL) {
40,653!
615
    tmrError("%s too many timer controllers, failed to create timer controller.", label);
×
616
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
617
    return NULL;
×
618
  }
619

620
  tstrncpy(ctrl->label, label, sizeof(ctrl->label));
40,653✔
621

622
  tmrDebug("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
40,653✔
623
  return ctrl;
40,652✔
624
}
625

626
void taosTmrCleanUp(void* handle) {
36,537✔
627
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
36,537✔
628
  if (ctrl == NULL || ctrl->label[0] == 0) {
36,537!
629
    return;
×
630
  }
631

632
  tmrDebug("%s timer controller is cleaned up.", ctrl->label);
36,537✔
633
  ctrl->label[0] = 0;
36,537✔
634

635
  (void)taosThreadMutexLock(&tmrCtrlMutex);
36,537✔
636
  ctrl->next = unusedTmrCtrl;
36,537✔
637
  numOfTmrCtrl--;
36,537✔
638
  unusedTmrCtrl = ctrl;
36,537✔
639
  (void)taosThreadMutexUnlock(&tmrCtrlMutex);
36,537✔
640

641
  tmrDebug("time controller's tmr ctrl size:  %d", numOfTmrCtrl);
36,537✔
642
  if (numOfTmrCtrl <= 0) {
36,537✔
643
    taosUninitTimer();
2,440✔
644

645
    taosCleanUpScheduler(tmrQhandle);
2,440✔
646
    taosMemoryFreeClear(tmrQhandle);
2,440!
647

648
    for (int32_t i = 0; i < tListLen(wheels); i++) {
9,760✔
649
      time_wheel_t* wheel = wheels + i;
7,320✔
650
      (void)taosThreadMutexDestroy(&wheel->mutex);
7,320✔
651
      taosMemoryFree(wheel->slots);
7,320✔
652
    }
653

654
    (void)taosThreadMutexDestroy(&tmrCtrlMutex);
2,440✔
655

656
    for (size_t i = 0; i < timerMap.size; i++) {
14,993,800✔
657
      timer_list_t* list = timerMap.slots + i;
14,991,360✔
658
      tmr_obj_t*    t = list->timers;
14,991,360✔
659
      while (t != NULL) {
14,992,092✔
660
        tmr_obj_t* next = t->mnext;
732✔
661
        taosMemoryFree(t);
732✔
662
        t = next;
732✔
663
      }
664
    }
665
    taosMemoryFree(timerMap.slots);
2,440✔
666
    taosMemoryFree(tmrCtrls);
2,440✔
667

668
    tmrCtrls = NULL;
2,440✔
669
    unusedTmrCtrl = NULL;
2,440✔
670
    atomic_store_32(&tmrModuleInit, 0);
2,440✔
671
  }
672
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc