• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4834

31 Oct 2025 03:37AM UTC coverage: 58.764% (+0.2%) from 58.533%
#4834

push

travis-ci

SallyHuo-TAOS
Merge remote-tracking branch 'origin/cover/3.0' into cover/3.0

# Conflicts:
#	test/ci/run.sh

149869 of 324176 branches covered (46.23%)

Branch coverage included in aggregate %.

199000 of 269498 relevant lines covered (73.84%)

239175663.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.11
/source/util/src/ttimer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "ttimer.h"
18
#include "taoserror.h"
19
#include "tdef.h"
20
#include "tlog.h"
21
#include "tsched.h"
22

23
// clang-format off
24
#define tmrFatal(...) { if (tmrDebugFlag & DEBUG_FATAL) { taosPrintLog("TMR FATAL ", DEBUG_FATAL, tmrDebugFlag, __VA_ARGS__); }}
25
#define tmrError(...) { if (tmrDebugFlag & DEBUG_ERROR) { taosPrintLog("TMR ERROR ", DEBUG_ERROR, tmrDebugFlag, __VA_ARGS__); }}
26
#define tmrWarn(...)  { if (tmrDebugFlag & DEBUG_WARN)  { taosPrintLog("TMR WARN  ", DEBUG_WARN,  tmrDebugFlag, __VA_ARGS__); }}
27
#define tmrInfo(...)  { if (tmrDebugFlag & DEBUG_INFO)  { taosPrintLog("TMR INFO  ", DEBUG_INFO,  tmrDebugFlag, __VA_ARGS__); }}
28
#define tmrDebug(...) { if (tmrDebugFlag & DEBUG_DEBUG) { taosPrintLog("TMR DEBUG ", DEBUG_DEBUG, tmrDebugFlag, __VA_ARGS__); }}
29
#define tmrTrace(...) { if (tmrDebugFlag & DEBUG_TRACE) { taosPrintLog("TMR TRACE ", DEBUG_TRACE, tmrDebugFlag, __VA_ARGS__); }}
30
// clang-format on
31

32
#define TIMER_STATE_WAITING  0
33
#define TIMER_STATE_EXPIRED  1
34
#define TIMER_STATE_STOPPED  2
35
#define TIMER_STATE_CANCELED 3
36

37
typedef union _tmr_ctrl_t {
38
  char label[16];
39
  struct {
40
    // pad to ensure 'next' is the end of this union
41
    char               padding[16 - sizeof(union _tmr_ctrl_t*)];
42
    union _tmr_ctrl_t* next;
43
  };
44
} tmr_ctrl_t;
45

46
typedef struct tmr_obj_t {
47
  uintptr_t         id;
48
  tmr_ctrl_t*       ctrl;
49
  struct tmr_obj_t* mnext;
50
  struct tmr_obj_t* prev;
51
  struct tmr_obj_t* next;
52
  uint16_t          slot;
53
  uint8_t           wheel;
54
  uint8_t           state;
55
  uint8_t           refCount;
56
  uint8_t           reserved1;
57
  uint16_t          reserved2;
58
  union {
59
    int64_t expireAt;
60
    int64_t executedBy;
61
  };
62
  TAOS_TMR_CALLBACK fp;
63
  void*             param;
64
  uint8_t           priority;
65
} tmr_obj_t;
66

67
typedef struct timer_list_t {
68
  int64_t    lockedBy;
69
  tmr_obj_t* timers;
70
} timer_list_t;
71

72
typedef struct timer_map_t {
73
  uint32_t      size;
74
  uint32_t      count;
75
  timer_list_t* slots;
76
} timer_map_t;
77

78
typedef struct time_wheel_t {
79
  TdThreadMutex mutex;
80
  int64_t       nextScanAt;
81
  uint32_t      resolution;
82
  uint16_t      size;
83
  uint16_t      index;
84
  tmr_obj_t**   slots;
85
} time_wheel_t;
86

87
static int32_t tsMaxTmrCtrl = TSDB_MAX_VNODES_PER_DB + 100;
88

89
static int32_t       tmrModuleInit = 0;
90
static TdThreadMutex tmrCtrlMutex;
91
static tmr_ctrl_t*   tmrCtrls;
92
static tmr_ctrl_t*   unusedTmrCtrl = NULL;
93
static void*         tmrQhandle;
94
static void*         tmrQhandleHigh;
95
static int32_t       numOfTmrCtrl = 0;
96

97
int32_t          taosTmrThreads = 1;
98
static uintptr_t nextTimerId = 0;
99

100
static time_wheel_t wheels[] = {
101
    {.resolution = MSECONDS_PER_TICK, .size = 4096},
102
    {.resolution = 1000, .size = 1024},
103
    {.resolution = 60000, .size = 1024},
104
};
105
static timer_map_t timerMap;
106

107
static uintptr_t getNextTimerId() {
1,084,964,702✔
108
  uintptr_t id;
109
  do {
110
    id = (uintptr_t)atomic_add_fetch_ptr((void**)&nextTimerId, 1);
1,084,964,702✔
111
  } while (id == 0);
1,084,964,714!
112
  return id;
1,084,964,714✔
113
}
114

115
static void timerAddRef(tmr_obj_t* timer) {
2,147,483,647✔
116
  (void)atomic_add_fetch_8(&timer->refCount, 1);
2,147,483,647✔
117
}
2,147,483,647✔
118

119
static void timerDecRef(tmr_obj_t* timer) {
2,147,483,647✔
120
  if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
2,147,483,647✔
121
    taosMemoryFree(timer);
1,082,710,086!
122
  }
123
}
2,147,483,647✔
124

125
static void lockTimerList(timer_list_t* list) {
2,147,483,647✔
126
  int64_t tid = taosGetSelfPthreadId();
2,147,483,647✔
127
  int32_t i = 0;
2,147,483,647✔
128
  while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
2,147,483,647!
129
    if (++i % 1000 == 0) {
×
130
      (void)sched_yield();
×
131
    }
132
  }
133
}
2,147,483,647✔
134

135
static void unlockTimerList(timer_list_t* list) {
2,147,483,647✔
136
  int64_t tid = taosGetSelfPthreadId();
2,147,483,647✔
137
  if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
2,147,483,647!
138
    uError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
×
139
  }
140
}
2,147,483,647✔
141

142
static void addTimer(tmr_obj_t* timer) {
1,084,963,588✔
143
  timerAddRef(timer);
1,084,963,588✔
144
  timer->wheel = tListLen(wheels);
1,084,967,931✔
145

146
  uint32_t      idx = (uint32_t)(timer->id % timerMap.size);
1,084,967,931!
147
  timer_list_t* list = timerMap.slots + idx;
1,084,960,725✔
148

149
  lockTimerList(list);
1,084,961,869✔
150
  timer->mnext = list->timers;
1,084,967,312✔
151
  list->timers = timer;
1,084,965,890✔
152
  unlockTimerList(list);
1,084,965,900✔
153
}
1,084,963,233✔
154

155
static tmr_obj_t* findTimer(uintptr_t id) {
1,470,556,557✔
156
  tmr_obj_t* timer = NULL;
1,470,556,557✔
157
  if (id > 0) {
1,470,556,557✔
158
    uint32_t      idx = (uint32_t)(id % timerMap.size);
1,085,110,691!
159
    timer_list_t* list = timerMap.slots + idx;
1,085,110,691✔
160
    lockTimerList(list);
1,085,108,983✔
161
    for (timer = list->timers; timer != NULL; timer = timer->mnext) {
1,085,113,803✔
162
      if (timer->id == id) {
1,066,730,825✔
163
        timerAddRef(timer);
1,066,730,636✔
164
        break;
1,066,731,537✔
165
      }
166
    }
167
    unlockTimerList(list);
1,085,116,402✔
168
  }
169
  return timer;
1,470,562,049✔
170
}
171

172
static void removeTimer(uintptr_t id) {
1,082,712,063✔
173
  tmr_obj_t*    prev = NULL;
1,082,712,063✔
174
  uint32_t      idx = (uint32_t)(id % timerMap.size);
1,082,712,063!
175
  timer_list_t* list = timerMap.slots + idx;
1,082,712,063✔
176
  lockTimerList(list);
1,082,712,063✔
177
  for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
1,082,712,063!
178
    if (p->id == id) {
1,082,712,063!
179
      if (prev == NULL) {
1,082,712,063!
180
        list->timers = p->mnext;
1,082,712,063✔
181
      } else {
182
        prev->mnext = p->mnext;
×
183
      }
184
      timerDecRef(p);
1,082,710,579✔
185
      break;
1,082,711,176✔
186
    }
187
    prev = p;
×
188
  }
189
  unlockTimerList(list);
1,082,711,176✔
190
}
1,082,712,063✔
191

192
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
1,084,957,573✔
193
  timerAddRef(timer);
1,084,957,573✔
194
  // select a wheel for the timer, we are not an accurate timer,
195
  // but the inaccuracy should not be too large.
196
  timer->wheel = tListLen(wheels) - 1;
1,084,968,300✔
197
  for (uint8_t i = 0; i < tListLen(wheels); i++) {
1,114,830,487✔
198
    time_wheel_t* wheel = wheels + i;
1,114,818,277✔
199
    if (delay < wheel->resolution * wheel->size) {
1,114,819,800✔
200
      timer->wheel = i;
1,084,958,935✔
201
      break;
1,084,954,830✔
202
    }
203
  }
204

205
  time_wheel_t* wheel = wheels + timer->wheel;
1,084,967,040✔
206
  timer->prev = NULL;
1,084,957,001✔
207
  timer->expireAt = taosGetMonotonicMs() + delay;
1,084,950,026✔
208

209
  (void)taosThreadMutexLock(&wheel->mutex);
1,084,968,825✔
210

211
  uint32_t idx = 0;
1,084,971,876✔
212
  if (timer->expireAt > wheel->nextScanAt) {
1,084,971,876!
213
    // adjust delay according to next scan time of this wheel
214
    // so that the timer is not fired earlier than desired.
215
    delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
1,084,971,876✔
216
    idx = (delay + wheel->resolution - 1) / wheel->resolution;
1,084,971,876!
217
  }
218

219
  timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
1,084,971,876!
220
  tmr_obj_t* p = wheel->slots[timer->slot];
1,084,971,876✔
221
  wheel->slots[timer->slot] = timer;
1,084,971,876✔
222
  timer->next = p;
1,084,971,876✔
223
  if (p != NULL) {
1,084,971,876✔
224
    p->prev = timer;
103,556,493✔
225
  }
226

227
  (void)taosThreadMutexUnlock(&wheel->mutex);
1,084,971,876✔
228
}
1,084,971,876✔
229

230
static bool removeFromWheel(tmr_obj_t* timer) {
347,106,316✔
231
  uint8_t wheelIdx = timer->wheel;
347,106,316✔
232
  if (wheelIdx >= tListLen(wheels)) {
347,106,250✔
233
    return false;
3,488✔
234
  }
235
  time_wheel_t* wheel = wheels + wheelIdx;
347,102,762✔
236

237
  bool removed = false;
347,103,770✔
238
  (void)taosThreadMutexLock(&wheel->mutex);
347,103,770✔
239
  // other thread may modify timer->wheel, check again.
240
  if (timer->wheel < tListLen(wheels)) {
347,111,818!
241
    if (timer->prev != NULL) {
347,111,818✔
242
      timer->prev->next = timer->next;
4,506,128✔
243
    }
244
    if (timer->next != NULL) {
347,111,818✔
245
      timer->next->prev = timer->prev;
11,693,383✔
246
    }
247
    if (timer == wheel->slots[timer->slot]) {
347,111,818✔
248
      wheel->slots[timer->slot] = timer->next;
342,605,690✔
249
    }
250
    timer->wheel = tListLen(wheels);
347,111,818✔
251
    timer->next = NULL;
347,111,818✔
252
    timer->prev = NULL;
347,111,818✔
253
    timerDecRef(timer);
347,111,818✔
254
    removed = true;
347,111,818✔
255
  }
256
  (void)taosThreadMutexUnlock(&wheel->mutex);
347,111,818✔
257

258
  return removed;
347,111,818✔
259
}
260

261
static void processExpiredTimer(void* handle, void* arg) {
735,584,711✔
262
  tmr_obj_t* timer = (tmr_obj_t*)handle;
735,584,711✔
263
  timer->executedBy = taosGetSelfPthreadId();
735,584,711✔
264
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
735,590,794✔
265
  if (state == TIMER_STATE_WAITING) {
735,590,491✔
266
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
735,588,857✔
267
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
735,588,857✔
268

269
    (*timer->fp)(timer->param, (tmr_h)timer->id);
735,588,857✔
270
    atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
735,596,757✔
271

272
    fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution end.";
735,596,286✔
273
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
735,596,286✔
274
  }
275
  removeTimer(timer->id);
735,597,920✔
276
  timerDecRef(timer);
735,600,245✔
277
}
735,600,245✔
278

279
static void addToExpired(tmr_obj_t* head) {
2,147,483,647✔
280
  const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
2,147,483,647✔
281

282
  while (head != NULL) {
2,147,483,647✔
283
    uintptr_t  id = head->id;
735,600,245✔
284
    tmr_obj_t* next = head->next;
735,600,245✔
285
    tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param);
735,600,245✔
286

287
    SSchedMsg schedMsg;
734,988,013✔
288
    schedMsg.fp = NULL;
735,600,245✔
289
    schedMsg.tfp = processExpiredTimer;
735,600,245✔
290
    schedMsg.msg = NULL;
735,600,245✔
291
    schedMsg.ahandle = head;
735,600,245✔
292
    schedMsg.thandle = NULL;
735,600,245✔
293
    uint8_t priority = head->priority;
735,600,245✔
294

295
    if (priority == 1) {
735,600,245✔
296
      if (taosScheduleTask(tmrQhandle, &schedMsg) != 0) {
640,850,666!
297
        tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to queue.", head->ctrl->label, id);
×
298
      }
299
    } else if (priority == 2) {
94,749,579!
300
      if (taosScheduleTask(tmrQhandleHigh, &schedMsg) != 0) {
94,749,579!
301
        tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to high level queue.", head->ctrl->label, id);
×
302
      }
303
    }
304
    else{
305
      tmrError("%s invalid priority level %d for timer[id=%" PRIuPTR "].", head->ctrl->label, priority, id);
×
306
    }
307

308
    tmrDebug("timer[id=%" PRIuPTR "] has been added to queue priority:%d.", id, priority);
735,600,245✔
309
    head = next;
735,600,245✔
310
  }
311
}
2,147,483,647✔
312

313
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, tmr_ctrl_t* ctrl,
1,084,961,923✔
314
                              uint8_t priority) {
315
  uintptr_t id = getNextTimerId();
1,084,961,923✔
316
  timer->id = id;
1,084,964,714✔
317
  timer->state = TIMER_STATE_WAITING;
1,084,964,714✔
318
  timer->fp = fp;
1,084,960,874✔
319
  timer->param = param;
1,084,963,295✔
320
  timer->ctrl = ctrl;
1,084,964,184✔
321
  timer->priority = priority;
1,084,961,664✔
322
  addTimer(timer);
1,084,964,555✔
323

324
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
1,084,961,190✔
325
  tmrDebug(fmt, ctrl->label, timer->id, timer->fp, timer->param);
1,084,961,190✔
326

327
  if (mseconds == 0) {
1,084,954,640!
328
    timer->wheel = tListLen(wheels);
×
329
    timerAddRef(timer);
×
330
    addToExpired(timer);
×
331
  } else {
332
    addToWheel(timer, mseconds);
1,084,954,640✔
333
  }
334

335
  // note: use `timer->id` here is unsafe as `timer` may already be freed
336
  return id;
1,084,971,876✔
337
}
338

339
tmr_h taosTmrStartPriority(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, uint8_t priority) {
1,084,959,939✔
340
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
1,084,959,939✔
341
  if (ctrl == NULL || ctrl->label[0] == 0) {
1,084,959,939!
342
    return NULL;
×
343
  }
344

345
  tmr_obj_t* timer = (tmr_obj_t*)taosMemoryCalloc(1, sizeof(tmr_obj_t));
1,084,962,463!
346
  if (timer == NULL) {
1,084,958,765!
347
    tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
×
348
    return NULL;
×
349
  }
350

351
  return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl, priority);
1,084,958,765✔
352
}
353

354
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle) {
48,918,916✔
355
  return taosTmrStartPriority(fp, mseconds, param, handle, 1);
48,918,916✔
356
}
357

358
static void taosTimerLoopFunc(int32_t signo) {
2,147,483,647✔
359
  int64_t now = taosGetMonotonicMs();
2,147,483,647✔
360

361
  for (int32_t i = 0; i < tListLen(wheels); i++) {
2,147,483,647✔
362
    // `expried` is a temporary expire list.
363
    // expired timers are first add to this list, then move
364
    // to expired queue as a batch to improve performance.
365
    // note this list is used as a stack in this function.
366
    tmr_obj_t* expired = NULL;
2,147,483,647✔
367

368
    time_wheel_t* wheel = wheels + i;
2,147,483,647✔
369
    while (now >= wheel->nextScanAt) {
2,147,483,647✔
370
      (void)taosThreadMutexLock(&wheel->mutex);
2,147,483,647✔
371
      wheel->index = (wheel->index + 1) % wheel->size;
2,147,483,647!
372
      tmr_obj_t* timer = wheel->slots[wheel->index];
2,147,483,647✔
373
      while (timer != NULL) {
2,147,483,647✔
374
        tmr_obj_t* next = timer->next;
735,600,245✔
375
        if (now < timer->expireAt) {
735,600,245!
376
          timer = next;
×
377
          continue;
×
378
        }
379

380
        // remove from the wheel
381
        if (timer->prev == NULL) {
735,600,245!
382
          wheel->slots[wheel->index] = next;
735,600,245✔
383
          if (next != NULL) {
735,600,245✔
384
            next->prev = NULL;
87,282,738✔
385
          }
386
        } else {
387
          timer->prev->next = next;
×
388
          if (next != NULL) {
×
389
            next->prev = timer->prev;
×
390
          }
391
        }
392
        timer->wheel = tListLen(wheels);
735,600,245✔
393

394
        // add to temporary expire list
395
        timer->next = expired;
735,600,245✔
396
        timer->prev = NULL;
735,600,245✔
397
        if (expired != NULL) {
735,600,245✔
398
          expired->prev = timer;
87,814,961✔
399
        }
400
        expired = timer;
735,600,245✔
401

402
        timer = next;
735,600,245✔
403
      }
404
      wheel->nextScanAt += wheel->resolution;
2,147,483,647✔
405
      (void)taosThreadMutexUnlock(&wheel->mutex);
2,147,483,647✔
406
    }
407

408
    addToExpired(expired);
2,147,483,647✔
409
  }
410
}
2,147,483,647✔
411

412
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
1,066,719,605✔
413
  if (state == TIMER_STATE_WAITING) {
1,066,719,605✔
414
    bool reusable = false;
347,109,845✔
415
    if (removeFromWheel(timer)) {
347,109,845✔
416
      removeTimer(timer->id);
347,111,818✔
417
      // only safe to reuse the timer when timer is removed from the wheel.
418
      // we cannot guarantee the thread safety of the timr in all other cases.
419
      reusable = true;
347,111,818✔
420
    }
421
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is cancelled.";
347,115,306✔
422
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
347,115,306✔
423
    return reusable;
347,115,306✔
424
  }
425

426
  if (state != TIMER_STATE_EXPIRED) {
719,609,760✔
427
    // timer already stopped or cancelled, has nothing to do in this case
428
    return false;
3,154✔
429
  }
430

431
  if (timer->executedBy == taosGetSelfPthreadId()) {
719,606,606✔
432
    // taosTmrReset is called in the timer callback, should do nothing in this
433
    // case to avoid dead lock. note taosTmrReset must be the last statement
434
    // of the callback funtion, will be a bug otherwise.
435
    return false;
719,611,582✔
436
  }
437

438
  // timer callback is executing in another thread, we SHOULD wait it to stop,
439
  // BUT this may result in dead lock if current thread are holding a lock which
440
  // the timer callback need to acquire. so, we HAVE TO return directly.
441
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
3,916✔
442
  tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
3,916!
443
  return false;
3,916✔
444
}
445

446
bool taosTmrStop(tmr_h timerId) {
434,437,570✔
447
  uintptr_t id = (uintptr_t)timerId;
434,437,570✔
448

449
  tmr_obj_t* timer = findTimer(id);
434,437,570✔
450
  if (timer == NULL) {
434,436,850✔
451
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
87,320,045✔
452
    return false;
87,319,646✔
453
  }
454

455
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
347,116,805✔
456
  (void)doStopTimer(timer, state);
347,119,489✔
457
  timerDecRef(timer);
347,122,263✔
458

459
  return state == TIMER_STATE_WAITING;
347,118,742✔
460
}
461

462
bool taosTmrStopA(tmr_h* timerId) {
21,083,053✔
463
  bool ret = taosTmrStop(*timerId);
21,083,053✔
464
  *timerId = NULL;
21,083,053✔
465
  return ret;
21,083,053✔
466
}
467

468
bool taosTmrIsStopped(tmr_h* timerId) {
81,622✔
469
  uintptr_t id = (uintptr_t)*timerId;
81,622✔
470

471
  tmr_obj_t* timer = findTimer(id);
81,622✔
472
  if (timer == NULL) {
81,622!
473
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
81,622!
474
    return true;
81,622✔
475
  }
476

477
  uint8_t state = atomic_load_8(&timer->state);
×
478

479
  return (state == TIMER_STATE_CANCELED) || (state == TIMER_STATE_STOPPED);
×
480
}
481

482
bool taosTmrResetPriority(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId,
1,036,048,719✔
483
                          uint8_t priority) {
484
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
1,036,048,719✔
485
  if (ctrl == NULL || ctrl->label[0] == 0) {
1,036,048,719✔
486
    return false;
1,189✔
487
  }
488

489
  uintptr_t  id = (uintptr_t)*pTmrId;
1,036,047,530✔
490
  bool       stopped = false;
1,036,038,577✔
491
  tmr_obj_t* timer = findTimer(id);
1,036,038,577✔
492
  if (timer == NULL) {
1,036,042,326✔
493
    tmrDebug("%s timer[id=%" PRIuPTR "] does not exist", ctrl->label, id);
316,429,557✔
494
  } else {
495
    uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
719,612,769✔
496
    if (!doStopTimer(timer, state)) {
719,609,770✔
497
      timerDecRef(timer);
719,610,944✔
498
      timer = NULL;
719,611,084✔
499
    }
500
    stopped = state == TIMER_STATE_WAITING;
719,611,197✔
501
  }
502

503
  if (timer == NULL) {
1,036,043,237✔
504
    *pTmrId = taosTmrStartPriority(fp, mseconds, param, handle, priority);
1,036,043,124✔
505
    if (NULL == *pTmrId) {
1,036,051,049!
506
      stopped = true;
×
507
    }
508
    return stopped;
1,036,051,049✔
509
  }
510

511
  tmrDebug("%s timer[id=%" PRIuPTR "] is reused", ctrl->label, timer->id);
113!
512

513
  // wait until there's no other reference to this timer,
514
  // so that we can reuse this timer safely.
515
  for (int32_t i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
113!
516
    if (i % 1000 == 0) {
×
517
      (void)sched_yield();
×
518
    }
519
  }
520

521
  if (timer->refCount != 1) {
113!
522
    uError("timer refCount=%d not expected 1", timer->refCount);
×
523
  }
524
  memset(timer, 0, sizeof(*timer));
113!
525
  *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl, priority);
113✔
526

527
  return stopped;
113✔
528
}
529

530
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) {
933,558,684✔
531
  return taosTmrResetPriority(fp, mseconds, param, handle, pTmrId, 1);
933,558,684✔
532
}
533

534
static int32_t taosTmrModuleInit(void) {
11,285,980✔
535
  tmrCtrls = taosMemoryMalloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl);
11,285,980!
536
  if (tmrCtrls == NULL) {
11,285,980!
537
    tmrError("failed to allocate memory for timer controllers.");
×
538
    return terrno;
×
539
  }
540

541
  memset(&timerMap, 0, sizeof(timerMap));
11,285,980✔
542

543
  for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) {
2,147,483,647✔
544
    tmr_ctrl_t* ctrl = tmrCtrls + i;
2,147,483,647✔
545
    ctrl->next = ctrl + 1;
2,147,483,647✔
546
  }
547
  (tmrCtrls + tsMaxTmrCtrl - 1)->next = NULL;
11,285,980✔
548
  unusedTmrCtrl = tmrCtrls;
11,285,980✔
549

550
  (void)taosThreadMutexInit(&tmrCtrlMutex, NULL);
11,285,980✔
551

552
  int64_t now = taosGetMonotonicMs();
11,285,980✔
553
  for (int32_t i = 0; i < tListLen(wheels); i++) {
45,143,920✔
554
    time_wheel_t* wheel = wheels + i;
33,857,940✔
555
    if (taosThreadMutexInit(&wheel->mutex, NULL) != 0) {
33,857,940!
556
      tmrError("failed to create the mutex for wheel, reason:%s", strerror(ERRNO));
×
557
      return terrno;
×
558
    }
559
    wheel->nextScanAt = now + wheel->resolution;
33,857,940✔
560
    wheel->index = 0;
33,857,940✔
561
    wheel->slots = (tmr_obj_t**)taosMemoryCalloc(wheel->size, sizeof(tmr_obj_t*));
33,857,940!
562
    if (wheel->slots == NULL) {
33,857,940!
563
      tmrError("failed to allocate wheel slots");
×
564
      return terrno;
×
565
    }
566
    timerMap.size += wheel->size;
33,857,940✔
567
  }
568

569
  timerMap.count = 0;
11,285,980✔
570
  timerMap.slots = (timer_list_t*)taosMemoryCalloc(timerMap.size, sizeof(timer_list_t));
11,285,980!
571
  if (timerMap.slots == NULL) {
11,285,980!
572
    tmrError("failed to allocate hash map");
×
573
    return terrno;
×
574
  }
575

576
  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
11,285,980✔
577
  tmrQhandleHigh = taosInitScheduler(10000, taosTmrThreads, "high-tmr", NULL);
11,285,980✔
578
  if (taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK) != 0) {
11,285,980!
579
    tmrError("failed to initialize timer");
×
580
  }
581

582
  tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
11,285,980✔
583

584
  return 2;
11,285,980✔
585
}
586

587
static int32_t taosTmrInitModule(void) {
57,313,576✔
588
  if (atomic_load_32(&tmrModuleInit) == 2) {
57,313,576✔
589
    return 0;
46,027,307✔
590
  }
591

592
  if (atomic_load_32(&tmrModuleInit) < 0) {
11,285,418!
593
    return -1;
×
594
  }
595

596
  while (true) {
597
    if (0 == atomic_val_compare_exchange_32(&tmrModuleInit, 0, 1)) {
22,571,960✔
598
      atomic_store_32(&tmrModuleInit, taosTmrModuleInit());
11,285,980✔
599
    } else if (atomic_load_32(&tmrModuleInit) < 0) {
11,285,980!
600
      return -1;
×
601
    } else if (atomic_load_32(&tmrModuleInit) == 2) {
11,285,980!
602
      return 0;
11,285,980✔
603
    } else {
604
      taosMsleep(1);
×
605
    }
606
  }
607

608
  return -1;
609
}
610

611
void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, const char* label) {
57,313,576✔
612
  const char* ret = taosMonotonicInit();
57,313,576✔
613
  tmrDebug("ttimer monotonic clock source:%s", ret);
57,313,576✔
614

615
  if (taosTmrInitModule() < 0) {
57,313,576!
616
    return NULL;
×
617
  }
618

619
  (void)taosThreadMutexLock(&tmrCtrlMutex);
57,313,576✔
620
  tmr_ctrl_t* ctrl = unusedTmrCtrl;
57,314,138✔
621
  if (ctrl != NULL) {
57,314,138!
622
    unusedTmrCtrl = ctrl->next;
57,314,138✔
623
    numOfTmrCtrl++;
57,314,138✔
624
  }
625
  (void)taosThreadMutexUnlock(&tmrCtrlMutex);
57,314,138✔
626

627
  if (ctrl == NULL) {
57,314,138!
628
    tmrError("%s too many timer controllers, failed to create timer controller.", label);
×
629
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
630
    return NULL;
×
631
  }
632

633
  tstrncpy(ctrl->label, label, sizeof(ctrl->label));
57,314,138!
634

635
  tmrDebug("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
57,313,576✔
636
  return ctrl;
57,313,576✔
637
}
638

639
void taosTmrCleanUp(void* handle) {
48,324,032✔
640
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
48,324,032✔
641
  if (ctrl == NULL || ctrl->label[0] == 0) {
48,324,032!
642
    return;
4,284✔
643
  }
644

645
  tmrDebug("%s timer controller is cleaned up.", ctrl->label);
48,319,748✔
646
  ctrl->label[0] = 0;
48,319,748✔
647

648
  (void)taosThreadMutexLock(&tmrCtrlMutex);
48,319,748✔
649
  ctrl->next = unusedTmrCtrl;
48,319,748✔
650
  numOfTmrCtrl--;
48,319,748✔
651
  unusedTmrCtrl = ctrl;
48,319,748✔
652
  (void)taosThreadMutexUnlock(&tmrCtrlMutex);
48,319,748✔
653

654
  tmrDebug("time controller's tmr ctrl size:  %d", numOfTmrCtrl);
48,319,748✔
655
  if (numOfTmrCtrl <= 0) {
48,319,748✔
656
    taosUninitTimer();
2,291,875✔
657

658
    taosCleanUpScheduler(tmrQhandle);
2,291,875✔
659
    taosMemoryFreeClear(tmrQhandle);
2,291,875!
660

661
    taosCleanUpScheduler(tmrQhandleHigh);
2,291,875✔
662
    taosMemoryFreeClear(tmrQhandleHigh);
2,291,875!
663

664
    for (int32_t i = 0; i < tListLen(wheels); i++) {
9,167,500✔
665
      time_wheel_t* wheel = wheels + i;
6,875,625✔
666
      (void)taosThreadMutexDestroy(&wheel->mutex);
6,875,625✔
667
      taosMemoryFree(wheel->slots);
6,875,625!
668
    }
669

670
    (void)taosThreadMutexDestroy(&tmrCtrlMutex);
2,291,875✔
671

672
    for (size_t i = 0; i < timerMap.size; i++) {
2,147,483,647✔
673
      timer_list_t* list = timerMap.slots + i;
2,147,483,647✔
674
      tmr_obj_t*    t = list->timers;
2,147,483,647✔
675
      while (t != NULL) {
2,147,483,647✔
676
        tmr_obj_t* next = t->mnext;
1,023,439✔
677
        taosMemoryFree(t);
1,023,439!
678
        t = next;
1,023,439✔
679
      }
680
    }
681
    taosMemoryFree(timerMap.slots);
2,291,875!
682
    taosMemoryFree(tmrCtrls);
2,291,875!
683

684
    tmrCtrls = NULL;
2,291,875✔
685
    unusedTmrCtrl = NULL;
2,291,875✔
686
    atomic_store_32(&tmrModuleInit, 0);
2,291,875✔
687
  }
688
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc