• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3608

12 Feb 2025 05:57AM UTC coverage: 63.066% (+1.4%) from 61.715%
#3608

push

travis-ci

web-flow
Merge pull request #29746 from taosdata/merge/mainto3.02

merge: from main to 3.0 branch

140199 of 286257 branches covered (48.98%)

Branch coverage included in aggregate %.

89 of 161 new or added lines in 18 files covered. (55.28%)

3211 existing lines in 190 files now uncovered.

218998 of 283298 relevant lines covered (77.3%)

5949310.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.5
/source/util/src/ttimer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "ttimer.h"
18
#include "taoserror.h"
19
#include "tdef.h"
20
#include "tlog.h"
21
#include "tsched.h"
22

23
#define tmrFatal(...)                                                     \
24
  {                                                                       \
25
    if (tmrDebugFlag & DEBUG_FATAL) {                                     \
26
      taosPrintLog("TMR FATAL ", DEBUG_FATAL, tmrDebugFlag, __VA_ARGS__); \
27
    }                                                                     \
28
  }
29
#define tmrError(...)                                                     \
30
  {                                                                       \
31
    if (tmrDebugFlag & DEBUG_ERROR) {                                     \
32
      taosPrintLog("TMR ERROR ", DEBUG_ERROR, tmrDebugFlag, __VA_ARGS__); \
33
    }                                                                     \
34
  }
35
#define tmrWarn(...)                                                    \
36
  {                                                                     \
37
    if (tmrDebugFlag & DEBUG_WARN) {                                    \
38
      taosPrintLog("TMR WARN ", DEBUG_WARN, tmrDebugFlag, __VA_ARGS__); \
39
    }                                                                   \
40
  }
41
#define tmrInfo(...)                                               \
42
  {                                                                \
43
    if (tmrDebugFlag & DEBUG_INFO) {                               \
44
      taosPrintLog("TMR ", DEBUG_INFO, tmrDebugFlag, __VA_ARGS__); \
45
    }                                                              \
46
  }
47
#define tmrDebug(...)                                               \
48
  {                                                                 \
49
    if (tmrDebugFlag & DEBUG_DEBUG) {                               \
50
      taosPrintLog("TMR ", DEBUG_DEBUG, tmrDebugFlag, __VA_ARGS__); \
51
    }                                                               \
52
  }
53
#define tmrTrace(...)                                               \
54
  {                                                                 \
55
    if (tmrDebugFlag & DEBUG_TRACE) {                               \
56
      taosPrintLog("TMR ", DEBUG_TRACE, tmrDebugFlag, __VA_ARGS__); \
57
    }                                                               \
58
  }
59

60
#define TIMER_STATE_WAITING  0
61
#define TIMER_STATE_EXPIRED  1
62
#define TIMER_STATE_STOPPED  2
63
#define TIMER_STATE_CANCELED 3
64

65
typedef union _tmr_ctrl_t {
66
  char label[16];
67
  struct {
68
    // pad to ensure 'next' is the end of this union
69
    char               padding[16 - sizeof(union _tmr_ctrl_t*)];
70
    union _tmr_ctrl_t* next;
71
  };
72
} tmr_ctrl_t;
73

74
typedef struct tmr_obj_t {
75
  uintptr_t         id;
76
  tmr_ctrl_t*       ctrl;
77
  struct tmr_obj_t* mnext;
78
  struct tmr_obj_t* prev;
79
  struct tmr_obj_t* next;
80
  uint16_t          slot;
81
  uint8_t           wheel;
82
  uint8_t           state;
83
  uint8_t           refCount;
84
  uint8_t           reserved1;
85
  uint16_t          reserved2;
86
  union {
87
    int64_t expireAt;
88
    int64_t executedBy;
89
  };
90
  TAOS_TMR_CALLBACK fp;
91
  void*             param;
92
} tmr_obj_t;
93

94
typedef struct timer_list_t {
95
  int64_t    lockedBy;
96
  tmr_obj_t* timers;
97
} timer_list_t;
98

99
typedef struct timer_map_t {
100
  uint32_t      size;
101
  uint32_t      count;
102
  timer_list_t* slots;
103
} timer_map_t;
104

105
typedef struct time_wheel_t {
106
  TdThreadMutex mutex;
107
  int64_t       nextScanAt;
108
  uint32_t      resolution;
109
  uint16_t      size;
110
  uint16_t      index;
111
  tmr_obj_t**   slots;
112
} time_wheel_t;
113

114
static int32_t tsMaxTmrCtrl = TSDB_MAX_VNODES_PER_DB + 100;
115

116
static int32_t       tmrModuleInit = 0;
117
static TdThreadMutex tmrCtrlMutex;
118
static tmr_ctrl_t*   tmrCtrls;
119
static tmr_ctrl_t*   unusedTmrCtrl = NULL;
120
static void*         tmrQhandle;
121
static int32_t       numOfTmrCtrl = 0;
122

123
int32_t          taosTmrThreads = 1;
124
static uintptr_t nextTimerId = 0;
125

126
static time_wheel_t wheels[] = {
127
    {.resolution = MSECONDS_PER_TICK, .size = 4096},
128
    {.resolution = 1000, .size = 1024},
129
    {.resolution = 60000, .size = 1024},
130
};
131
static timer_map_t timerMap;
132

133
static uintptr_t getNextTimerId() {
1,498,298✔
134
  uintptr_t id;
135
  do {
136
    id = (uintptr_t)atomic_add_fetch_ptr((void**)&nextTimerId, 1);
1,498,298✔
137
  } while (id == 0);
1,498,299!
138
  return id;
1,498,300✔
139
}
140

141
static void timerAddRef(tmr_obj_t* timer) {
4,432,155✔
142
  (void)atomic_add_fetch_8(&timer->refCount, 1);
4,432,155✔
143
}
4,432,158✔
144

145
static void timerDecRef(tmr_obj_t* timer) {
4,420,945✔
146
  if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
4,420,945✔
147
    taosMemoryFree(timer);
1,488,775!
148
  }
149
}
4,420,967✔
150

151
static void lockTimerList(timer_list_t* list) {
4,497,313✔
152
  int64_t tid = taosGetSelfPthreadId();
4,497,313✔
153
  int32_t i = 0;
4,497,310✔
154
  while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
4,497,310!
UNCOV
155
    if (++i % 1000 == 0) {
×
156
      (void)sched_yield();
×
157
    }
158
  }
159
}
4,497,319✔
160

161
static void unlockTimerList(timer_list_t* list) {
4,497,314✔
162
  int64_t tid = taosGetSelfPthreadId();
4,497,314✔
163
  if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
4,497,315!
164
    uError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
×
165
  }
166
}
4,497,317✔
167

168
static void addTimer(tmr_obj_t* timer) {
1,498,300✔
169
  timerAddRef(timer);
1,498,300✔
170
  timer->wheel = tListLen(wheels);
1,498,300✔
171

172
  uint32_t      idx = (uint32_t)(timer->id % timerMap.size);
1,498,300✔
173
  timer_list_t* list = timerMap.slots + idx;
1,498,300✔
174

175
  lockTimerList(list);
1,498,300✔
176
  timer->mnext = list->timers;
1,498,300✔
177
  list->timers = timer;
1,498,300✔
178
  unlockTimerList(list);
1,498,300✔
179
}
1,498,301✔
180

181
static tmr_obj_t* findTimer(uintptr_t id) {
1,934,067✔
182
  tmr_obj_t* timer = NULL;
1,934,067✔
183
  if (id > 0) {
1,934,067✔
184
    uint32_t      idx = (uint32_t)(id % timerMap.size);
1,502,377✔
185
    timer_list_t* list = timerMap.slots + idx;
1,502,377✔
186
    lockTimerList(list);
1,502,377✔
187
    for (timer = list->timers; timer != NULL; timer = timer->mnext) {
1,502,388✔
188
      if (timer->id == id) {
1,435,571✔
189
        timerAddRef(timer);
1,435,563✔
190
        break;
1,435,562✔
191
      }
192
    }
193
    unlockTimerList(list);
1,502,379✔
194
  }
195
  return timer;
1,934,066✔
196
}
197

198
static void removeTimer(uintptr_t id) {
1,496,648✔
199
  tmr_obj_t*    prev = NULL;
1,496,648✔
200
  uint32_t      idx = (uint32_t)(id % timerMap.size);
1,496,648✔
201
  timer_list_t* list = timerMap.slots + idx;
1,496,648✔
202
  lockTimerList(list);
1,496,648✔
203
  for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
1,496,647!
204
    if (p->id == id) {
1,496,647!
205
      if (prev == NULL) {
1,496,647!
206
        list->timers = p->mnext;
1,496,647✔
207
      } else {
UNCOV
208
        prev->mnext = p->mnext;
×
209
      }
210
      timerDecRef(p);
1,496,647✔
211
      break;
1,496,648✔
212
    }
UNCOV
213
    prev = p;
×
214
  }
215
  unlockTimerList(list);
1,496,648✔
216
}
1,496,648✔
217

218
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
1,498,302✔
219
  timerAddRef(timer);
1,498,302✔
220
  // select a wheel for the timer, we are not an accurate timer,
221
  // but the inaccuracy should not be too large.
222
  timer->wheel = tListLen(wheels) - 1;
1,498,300✔
223
  for (uint8_t i = 0; i < tListLen(wheels); i++) {
1,509,432!
224
    time_wheel_t* wheel = wheels + i;
1,509,433✔
225
    if (delay < wheel->resolution * wheel->size) {
1,509,433✔
226
      timer->wheel = i;
1,498,301✔
227
      break;
1,498,301✔
228
    }
229
  }
230

231
  time_wheel_t* wheel = wheels + timer->wheel;
1,498,300✔
232
  timer->prev = NULL;
1,498,300✔
233
  timer->expireAt = taosGetMonotonicMs() + delay;
1,498,300✔
234

235
  (void)taosThreadMutexLock(&wheel->mutex);
1,498,301✔
236

237
  uint32_t idx = 0;
1,498,302✔
238
  if (timer->expireAt > wheel->nextScanAt) {
1,498,302✔
239
    // adjust delay according to next scan time of this wheel
240
    // so that the timer is not fired earlier than desired.
241
    delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
1,496,040✔
242
    idx = (delay + wheel->resolution - 1) / wheel->resolution;
1,496,040✔
243
  }
244

245
  timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
1,498,302✔
246
  tmr_obj_t* p = wheel->slots[timer->slot];
1,498,302✔
247
  wheel->slots[timer->slot] = timer;
1,498,302✔
248
  timer->next = p;
1,498,302✔
249
  if (p != NULL) {
1,498,302✔
250
    p->prev = timer;
288,073✔
251
  }
252

253
  (void)taosThreadMutexUnlock(&wheel->mutex);
1,498,302✔
254
}
1,498,302✔
255

256
static bool removeFromWheel(tmr_obj_t* timer) {
397,372✔
257
  uint8_t wheelIdx = timer->wheel;
397,372✔
258
  if (wheelIdx >= tListLen(wheels)) {
397,372✔
259
    return false;
15✔
260
  }
261
  time_wheel_t* wheel = wheels + wheelIdx;
397,357✔
262

263
  bool removed = false;
397,357✔
264
  (void)taosThreadMutexLock(&wheel->mutex);
397,357✔
265
  // other thread may modify timer->wheel, check again.
266
  if (timer->wheel < tListLen(wheels)) {
397,357!
267
    if (timer->prev != NULL) {
397,357✔
268
      timer->prev->next = timer->next;
8,591✔
269
    }
270
    if (timer->next != NULL) {
397,357✔
271
      timer->next->prev = timer->prev;
7,848✔
272
    }
273
    if (timer == wheel->slots[timer->slot]) {
397,357✔
274
      wheel->slots[timer->slot] = timer->next;
388,766✔
275
    }
276
    timer->wheel = tListLen(wheels);
397,357✔
277
    timer->next = NULL;
397,357✔
278
    timer->prev = NULL;
397,357✔
279
    timerDecRef(timer);
397,357✔
280
    removed = true;
397,357✔
281
  }
282
  (void)taosThreadMutexUnlock(&wheel->mutex);
397,357✔
283

284
  return removed;
397,357✔
285
}
286

287
static void processExpiredTimer(void* handle, void* arg) {
1,099,291✔
288
  tmr_obj_t* timer = (tmr_obj_t*)handle;
1,099,291✔
289
  timer->executedBy = taosGetSelfPthreadId();
1,099,291✔
290
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
1,099,291✔
291
  if (state == TIMER_STATE_WAITING) {
1,099,291✔
292
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
1,099,276✔
293
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
1,099,276✔
294

295
    (*timer->fp)(timer->param, (tmr_h)timer->id);
1,099,276✔
296
    atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
1,099,276✔
297

298
    fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution end.";
1,099,276✔
299
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
1,099,276✔
300
  }
301
  removeTimer(timer->id);
1,099,291✔
302
  timerDecRef(timer);
1,099,291✔
303
}
1,099,291✔
304

305
static void addToExpired(tmr_obj_t* head) {
86,023,809✔
306
  const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
86,023,809✔
307

308
  while (head != NULL) {
87,123,100✔
309
    uintptr_t  id = head->id;
1,099,291✔
310
    tmr_obj_t* next = head->next;
1,099,291✔
311
    tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param);
1,099,291✔
312

313
    SSchedMsg schedMsg;
314
    schedMsg.fp = NULL;
1,099,291✔
315
    schedMsg.tfp = processExpiredTimer;
1,099,291✔
316
    schedMsg.msg = NULL;
1,099,291✔
317
    schedMsg.ahandle = head;
1,099,291✔
318
    schedMsg.thandle = NULL;
1,099,291✔
319
    if (taosScheduleTask(tmrQhandle, &schedMsg) != 0) {
1,099,291!
320
      tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to queue.", head->ctrl->label, id);
×
321
    }
322

323
    tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id);
1,099,291✔
324
    head = next;
1,099,291✔
325
  }
326
}
86,023,809✔
327

328
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, tmr_ctrl_t* ctrl) {
1,498,299✔
329
  uintptr_t id = getNextTimerId();
1,498,299✔
330
  timer->id = id;
1,498,300✔
331
  timer->state = TIMER_STATE_WAITING;
1,498,300✔
332
  timer->fp = fp;
1,498,300✔
333
  timer->param = param;
1,498,300✔
334
  timer->ctrl = ctrl;
1,498,300✔
335
  addTimer(timer);
1,498,300✔
336

337
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
1,498,302✔
338
  tmrDebug(fmt, ctrl->label, timer->id, timer->fp, timer->param);
1,498,302✔
339

340
  if (mseconds == 0) {
1,498,302!
341
    timer->wheel = tListLen(wheels);
×
342
    timerAddRef(timer);
×
343
    addToExpired(timer);
×
344
  } else {
345
    addToWheel(timer, mseconds);
1,498,302✔
346
  }
347

348
  // note: use `timer->id` here is unsafe as `timer` may already be freed
349
  return id;
1,498,302✔
350
}
351

352
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle) {
1,490,435✔
353
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
1,490,435✔
354
  if (ctrl == NULL || ctrl->label[0] == 0) {
1,490,435!
355
    return NULL;
7✔
356
  }
357

358
  tmr_obj_t* timer = (tmr_obj_t*)taosMemoryCalloc(1, sizeof(tmr_obj_t));
1,490,428!
359
  if (timer == NULL) {
1,490,427!
360
    tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
×
361
    return NULL;
×
362
  }
363

364
  return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
1,490,427✔
365
}
366

367
static void taosTimerLoopFunc(int32_t signo) {
28,674,604✔
368
  int64_t now = taosGetMonotonicMs();
28,674,604✔
369

370
  for (int32_t i = 0; i < tListLen(wheels); i++) {
114,698,413✔
371
    // `expried` is a temporary expire list.
372
    // expired timers are first add to this list, then move
373
    // to expired queue as a batch to improve performance.
374
    // note this list is used as a stack in this function.
375
    tmr_obj_t* expired = NULL;
86,023,810✔
376

377
    time_wheel_t* wheel = wheels + i;
86,023,810✔
378
    while (now >= wheel->nextScanAt) {
114,967,371✔
379
      (void)taosThreadMutexLock(&wheel->mutex);
28,943,562✔
380
      wheel->index = (wheel->index + 1) % wheel->size;
28,943,562✔
381
      tmr_obj_t* timer = wheel->slots[wheel->index];
28,943,562✔
382
      while (timer != NULL) {
30,042,853✔
383
        tmr_obj_t* next = timer->next;
1,099,291✔
384
        if (now < timer->expireAt) {
1,099,291!
385
          timer = next;
×
386
          continue;
×
387
        }
388

389
        // remove from the wheel
390
        if (timer->prev == NULL) {
1,099,291!
391
          wheel->slots[wheel->index] = next;
1,099,291✔
392
          if (next != NULL) {
1,099,291✔
393
            next->prev = NULL;
272,542✔
394
          }
395
        } else {
396
          timer->prev->next = next;
×
397
          if (next != NULL) {
×
398
            next->prev = timer->prev;
×
399
          }
400
        }
401
        timer->wheel = tListLen(wheels);
1,099,291✔
402

403
        // add to temporary expire list
404
        timer->next = expired;
1,099,291✔
405
        timer->prev = NULL;
1,099,291✔
406
        if (expired != NULL) {
1,099,291✔
407
          expired->prev = timer;
272,747✔
408
        }
409
        expired = timer;
1,099,291✔
410

411
        timer = next;
1,099,291✔
412
      }
413
      wheel->nextScanAt += wheel->resolution;
28,943,562✔
414
      (void)taosThreadMutexUnlock(&wheel->mutex);
28,943,562✔
415
    }
416

417
    addToExpired(expired);
86,023,809✔
418
  }
419
}
28,674,603✔
420

421
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
1,435,562✔
422
  if (state == TIMER_STATE_WAITING) {
1,435,562✔
423
    bool reusable = false;
397,372✔
424
    if (removeFromWheel(timer)) {
397,372✔
425
      removeTimer(timer->id);
397,357✔
426
      // only safe to reuse the timer when timer is removed from the wheel.
427
      // we cannot guarantee the thread safety of the timr in all other cases.
428
      reusable = true;
397,357✔
429
    }
430
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is cancelled.";
397,372✔
431
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
397,372✔
432
    return reusable;
397,372✔
433
  }
434

435
  if (state != TIMER_STATE_EXPIRED) {
1,038,190!
436
    // timer already stopped or cancelled, has nothing to do in this case
437
    return false;
×
438
  }
439

440
  if (timer->executedBy == taosGetSelfPthreadId()) {
1,038,190✔
441
    // taosTmrReset is called in the timer callback, should do nothing in this
442
    // case to avoid dead lock. note taosTmrReset must be the last statement
443
    // of the callback funtion, will be a bug otherwise.
444
    return false;
1,038,169✔
445
  }
446

447
  // timer callback is executing in another thread, we SHOULD wait it to stop,
448
  // BUT this may result in dead lock if current thread are holding a lock which
449
  // the timer callback need to acquire. so, we HAVE TO return directly.
450
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
23✔
451
  tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
23!
452
  return false;
23✔
453
}
454

455
bool taosTmrStop(tmr_h timerId) {
486,838✔
456
  uintptr_t id = (uintptr_t)timerId;
486,838✔
457

458
  tmr_obj_t* timer = findTimer(id);
486,838✔
459
  if (timer == NULL) {
486,838✔
460
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
97,323✔
461
    return false;
97,323✔
462
  }
463

464
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
389,515✔
465
  (void)doStopTimer(timer, state);
389,519✔
466
  timerDecRef(timer);
389,520✔
467

468
  return state == TIMER_STATE_WAITING;
389,520✔
469
}
470

471
bool taosTmrStopA(tmr_h* timerId) {
8,063✔
472
  bool ret = taosTmrStop(*timerId);
8,063✔
473
  *timerId = NULL;
8,063✔
474
  return ret;
8,063✔
475
}
476

477
bool taosTmrIsStopped(tmr_h* timerId) {
159✔
478
  uintptr_t id = (uintptr_t)*timerId;
159✔
479

480
  tmr_obj_t* timer = findTimer(id);
159✔
481
  if (timer == NULL) {
159!
482
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
159!
483
    return true;
159✔
484
  }
485

486
  uint8_t state = atomic_load_8(&timer->state);
×
487

488
  return (state == TIMER_STATE_CANCELED) || (state == TIMER_STATE_STOPPED);
×
489
}
490

491
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) {
1,447,070✔
492
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
1,447,070✔
493
  if (ctrl == NULL || ctrl->label[0] == 0) {
1,447,070!
494
    return false;
1✔
495
  }
496

497
  uintptr_t  id = (uintptr_t)*pTmrId;
1,447,069✔
498
  bool       stopped = false;
1,447,069✔
499
  tmr_obj_t* timer = findTimer(id);
1,447,069✔
500
  if (timer == NULL) {
1,447,070✔
501
    tmrDebug("%s timer[id=%" PRIuPTR "] does not exist", ctrl->label, id);
401,026✔
502
  } else {
503
    uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
1,046,044✔
504
    if (!doStopTimer(timer, state)) {
1,046,044✔
505
      timerDecRef(timer);
1,038,172✔
506
      timer = NULL;
1,038,172✔
507
    }
508
    stopped = state == TIMER_STATE_WAITING;
1,046,044✔
509
  }
510

511
  if (timer == NULL) {
1,447,070✔
512
    *pTmrId = taosTmrStart(fp, mseconds, param, handle);
1,439,198✔
513
    if (NULL == *pTmrId) {
1,439,199!
514
      stopped = true;
×
515
    }
516
    return stopped;
1,439,199✔
517
  }
518

519
  tmrDebug("%s timer[id=%" PRIuPTR "] is reused", ctrl->label, timer->id);
7,872✔
520

521
  // wait until there's no other reference to this timer,
522
  // so that we can reuse this timer safely.
523
  for (int32_t i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
7,872!
524
    if (i % 1000 == 0) {
×
525
      (void)sched_yield();
×
526
    }
527
  }
528

529
  if (timer->refCount != 1) {
7,872!
530
    uError("timer refCount=%d not expected 1", timer->refCount);
×
531
  }
532
  memset(timer, 0, sizeof(*timer));
7,872✔
533
  *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
7,872✔
534

535
  return stopped;
7,872✔
536
}
537

538
static int32_t taosTmrModuleInit(void) {
4,548✔
539
  tmrCtrls = taosMemoryMalloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl);
4,548!
540
  if (tmrCtrls == NULL) {
4,548!
541
    tmrError("failed to allocate memory for timer controllers.");
×
542
    return terrno;
×
543
  }
544

545
  memset(&timerMap, 0, sizeof(timerMap));
4,548✔
546

547
  for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) {
5,111,952✔
548
    tmr_ctrl_t* ctrl = tmrCtrls + i;
5,107,404✔
549
    ctrl->next = ctrl + 1;
5,107,404✔
550
  }
551
  (tmrCtrls + tsMaxTmrCtrl - 1)->next = NULL;
4,548✔
552
  unusedTmrCtrl = tmrCtrls;
4,548✔
553

554
  (void)taosThreadMutexInit(&tmrCtrlMutex, NULL);
4,548✔
555

556
  int64_t now = taosGetMonotonicMs();
4,548✔
557
  for (int32_t i = 0; i < tListLen(wheels); i++) {
18,192✔
558
    time_wheel_t* wheel = wheels + i;
13,644✔
559
    if (taosThreadMutexInit(&wheel->mutex, NULL) != 0) {
13,644!
560
      tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
×
561
      return terrno;
×
562
    }
563
    wheel->nextScanAt = now + wheel->resolution;
13,644✔
564
    wheel->index = 0;
13,644✔
565
    wheel->slots = (tmr_obj_t**)taosMemoryCalloc(wheel->size, sizeof(tmr_obj_t*));
13,644!
566
    if (wheel->slots == NULL) {
13,644!
567
      tmrError("failed to allocate wheel slots");
×
568
      return terrno;
×
569
    }
570
    timerMap.size += wheel->size;
13,644✔
571
  }
572

573
  timerMap.count = 0;
4,548✔
574
  timerMap.slots = (timer_list_t*)taosMemoryCalloc(timerMap.size, sizeof(timer_list_t));
4,548!
575
  if (timerMap.slots == NULL) {
4,548!
576
    tmrError("failed to allocate hash map");
×
577
    return terrno;
×
578
  }
579

580
  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
4,548✔
581
  if (taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK) != 0) {
4,548!
582
    tmrError("failed to initialize timer");
×
583
  }
584

585
  tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
4,548✔
586

587
  return 2;
4,548✔
588
}
589

590
static int32_t taosTmrInitModule(void) {
29,433✔
591
  if (atomic_load_32(&tmrModuleInit) == 2) {
29,433✔
592
    return 0;
24,885✔
593
  }
594

595
  if (atomic_load_32(&tmrModuleInit) < 0) {
4,547!
596
    return -1;
×
597
  }
598

599
  while (true) {
600
    if (0 == atomic_val_compare_exchange_32(&tmrModuleInit, 0, 1)) {
9,096✔
601
      atomic_store_32(&tmrModuleInit, taosTmrModuleInit());
4,548✔
602
    } else if (atomic_load_32(&tmrModuleInit) < 0) {
4,548!
603
      return -1;
×
604
    } else if (atomic_load_32(&tmrModuleInit) == 2) {
4,548!
605
      return 0;
4,548✔
606
    } else {
607
      taosMsleep(1);
×
608
    }
609
  }
610

611
  return -1;
612
}
613

614
void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, const char* label) {
29,433✔
615
  const char* ret = taosMonotonicInit();
29,433✔
616
  tmrDebug("ttimer monotonic clock source:%s", ret);
29,433✔
617

618
  if (taosTmrInitModule() < 0) {
29,433!
619
    return NULL;
×
620
  }
621

622
  (void)taosThreadMutexLock(&tmrCtrlMutex);
29,433✔
623
  tmr_ctrl_t* ctrl = unusedTmrCtrl;
29,434✔
624
  if (ctrl != NULL) {
29,434!
625
    unusedTmrCtrl = ctrl->next;
29,434✔
626
    numOfTmrCtrl++;
29,434✔
627
  }
628
  (void)taosThreadMutexUnlock(&tmrCtrlMutex);
29,434✔
629

630
  if (ctrl == NULL) {
29,434!
631
    tmrError("%s too many timer controllers, failed to create timer controller.", label);
×
632
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
633
    return NULL;
×
634
  }
635

636
  tstrncpy(ctrl->label, label, sizeof(ctrl->label));
29,434✔
637

638
  tmrDebug("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
29,434✔
639
  return ctrl;
29,434✔
640
}
641

642
void taosTmrCleanUp(void* handle) {
27,044✔
643
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
27,044✔
644
  if (ctrl == NULL || ctrl->label[0] == 0) {
27,044!
645
    return;
×
646
  }
647

648
  tmrDebug("%s timer controller is cleaned up.", ctrl->label);
27,044✔
649
  ctrl->label[0] = 0;
27,044✔
650

651
  (void)taosThreadMutexLock(&tmrCtrlMutex);
27,044✔
652
  ctrl->next = unusedTmrCtrl;
27,044✔
653
  numOfTmrCtrl--;
27,044✔
654
  unusedTmrCtrl = ctrl;
27,044✔
655
  (void)taosThreadMutexUnlock(&tmrCtrlMutex);
27,044✔
656

657
  tmrDebug("time controller's tmr ctrl size:  %d", numOfTmrCtrl);
27,044✔
658
  if (numOfTmrCtrl <= 0) {
27,044✔
659
    taosUninitTimer();
2,218✔
660

661
    taosCleanUpScheduler(tmrQhandle);
2,218✔
662
    taosMemoryFreeClear(tmrQhandle);
2,218!
663

664
    for (int32_t i = 0; i < tListLen(wheels); i++) {
8,872✔
665
      time_wheel_t* wheel = wheels + i;
6,654✔
666
      (void)taosThreadMutexDestroy(&wheel->mutex);
6,654✔
667
      taosMemoryFree(wheel->slots);
6,654!
668
    }
669

670
    (void)taosThreadMutexDestroy(&tmrCtrlMutex);
2,218✔
671

672
    for (size_t i = 0; i < timerMap.size; i++) {
13,629,610✔
673
      timer_list_t* list = timerMap.slots + i;
13,627,392✔
674
      tmr_obj_t*    t = list->timers;
13,627,392✔
675
      while (t != NULL) {
13,628,085✔
676
        tmr_obj_t* next = t->mnext;
693✔
677
        taosMemoryFree(t);
693!
678
        t = next;
693✔
679
      }
680
    }
681
    taosMemoryFree(timerMap.slots);
2,218!
682
    taosMemoryFree(tmrCtrls);
2,218!
683

684
    tmrCtrls = NULL;
2,218✔
685
    unusedTmrCtrl = NULL;
2,218✔
686
    atomic_store_32(&tmrModuleInit, 0);
2,218✔
687
  }
688
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc