• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.1
/source/util/src/ttimer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "ttimer.h"
18
#include "taoserror.h"
19
#include "tdef.h"
20
#include "tlog.h"
21
#include "tsched.h"
22

23
#define tmrFatal(...)                                                     \
24
  {                                                                       \
25
    if (tmrDebugFlag & DEBUG_FATAL) {                                     \
26
      taosPrintLog("TMR FATAL ", DEBUG_FATAL, tmrDebugFlag, __VA_ARGS__); \
27
    }                                                                     \
28
  }
29
#define tmrError(...)                                                     \
30
  {                                                                       \
31
    if (tmrDebugFlag & DEBUG_ERROR) {                                     \
32
      taosPrintLog("TMR ERROR ", DEBUG_ERROR, tmrDebugFlag, __VA_ARGS__); \
33
    }                                                                     \
34
  }
35
#define tmrWarn(...)                                                    \
36
  {                                                                     \
37
    if (tmrDebugFlag & DEBUG_WARN) {                                    \
38
      taosPrintLog("TMR WARN ", DEBUG_WARN, tmrDebugFlag, __VA_ARGS__); \
39
    }                                                                   \
40
  }
41
#define tmrInfo(...)                                               \
42
  {                                                                \
43
    if (tmrDebugFlag & DEBUG_INFO) {                               \
44
      taosPrintLog("TMR ", DEBUG_INFO, tmrDebugFlag, __VA_ARGS__); \
45
    }                                                              \
46
  }
47
#define tmrDebug(...)                                               \
48
  {                                                                 \
49
    if (tmrDebugFlag & DEBUG_DEBUG) {                               \
50
      taosPrintLog("TMR ", DEBUG_DEBUG, tmrDebugFlag, __VA_ARGS__); \
51
    }                                                               \
52
  }
53
#define tmrTrace(...)                                               \
54
  {                                                                 \
55
    if (tmrDebugFlag & DEBUG_TRACE) {                               \
56
      taosPrintLog("TMR ", DEBUG_TRACE, tmrDebugFlag, __VA_ARGS__); \
57
    }                                                               \
58
  }
59

60
#define TIMER_STATE_WAITING  0
61
#define TIMER_STATE_EXPIRED  1
62
#define TIMER_STATE_STOPPED  2
63
#define TIMER_STATE_CANCELED 3
64

65
typedef union _tmr_ctrl_t {
66
  char label[16];
67
  struct {
68
    // pad to ensure 'next' is the end of this union
69
    char               padding[16 - sizeof(union _tmr_ctrl_t*)];
70
    union _tmr_ctrl_t* next;
71
  };
72
} tmr_ctrl_t;
73

74
typedef struct tmr_obj_t {
75
  uintptr_t         id;
76
  tmr_ctrl_t*       ctrl;
77
  struct tmr_obj_t* mnext;
78
  struct tmr_obj_t* prev;
79
  struct tmr_obj_t* next;
80
  uint16_t          slot;
81
  uint8_t           wheel;
82
  uint8_t           state;
83
  uint8_t           refCount;
84
  uint8_t           reserved1;
85
  uint16_t          reserved2;
86
  union {
87
    int64_t expireAt;
88
    int64_t executedBy;
89
  };
90
  TAOS_TMR_CALLBACK fp;
91
  void*             param;
92
} tmr_obj_t;
93

94
typedef struct timer_list_t {
95
  int64_t    lockedBy;
96
  tmr_obj_t* timers;
97
} timer_list_t;
98

99
typedef struct timer_map_t {
100
  uint32_t      size;
101
  uint32_t      count;
102
  timer_list_t* slots;
103
} timer_map_t;
104

105
typedef struct time_wheel_t {
106
  TdThreadMutex mutex;
107
  int64_t       nextScanAt;
108
  uint32_t      resolution;
109
  uint16_t      size;
110
  uint16_t      index;
111
  tmr_obj_t**   slots;
112
} time_wheel_t;
113

114
static int32_t tsMaxTmrCtrl = TSDB_MAX_VNODES_PER_DB + 100;
115

116
static int32_t       tmrModuleInit = 0;
117
static TdThreadMutex tmrCtrlMutex;
118
static tmr_ctrl_t*   tmrCtrls;
119
static tmr_ctrl_t*   unusedTmrCtrl = NULL;
120
static void*         tmrQhandle;
121
static int32_t       numOfTmrCtrl = 0;
122

123
int32_t          taosTmrThreads = 1;
124
static uintptr_t nextTimerId = 0;
125

126
static time_wheel_t wheels[] = {
127
    {.resolution = MSECONDS_PER_TICK, .size = 4096},
128
    {.resolution = 1000, .size = 1024},
129
    {.resolution = 60000, .size = 1024},
130
};
131
static timer_map_t timerMap;
132

133
static uintptr_t getNextTimerId() {
1,922,796✔
134
  uintptr_t id;
135
  do {
136
    id = (uintptr_t)atomic_add_fetch_ptr((void**)&nextTimerId, 1);
1,922,796✔
137
  } while (id == 0);
1,922,797✔
138
  return id;
1,922,795✔
139
}
140

141
static void timerAddRef(tmr_obj_t* timer) { (void)atomic_add_fetch_8(&timer->refCount, 1); }
5,695,844✔
142

143
static void timerDecRef(tmr_obj_t* timer) {
5,685,269✔
144
  if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
5,685,269✔
145
    taosMemoryFree(timer);
1,913,680!
146
  }
147
}
5,685,287✔
148

149
static void lockTimerList(timer_list_t* list) {
5,765,313✔
150
  int64_t tid = taosGetSelfPthreadId();
5,765,313✔
151
  int32_t i = 0;
5,765,314✔
152
  while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
5,765,335✔
153
    if (++i % 1000 == 0) {
21!
154
      (void)sched_yield();
×
155
    }
156
  }
157
}
5,765,338✔
158

159
static void unlockTimerList(timer_list_t* list) {
5,765,329✔
160
  int64_t tid = taosGetSelfPthreadId();
5,765,329✔
161
  if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
5,765,327!
162
    uError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
×
163
  }
164
}
5,765,323✔
165

166
static void addTimer(tmr_obj_t* timer) {
1,922,796✔
167
  timerAddRef(timer);
1,922,796✔
168
  timer->wheel = tListLen(wheels);
1,922,797✔
169

170
  uint32_t      idx = (uint32_t)(timer->id % timerMap.size);
1,922,797✔
171
  timer_list_t* list = timerMap.slots + idx;
1,922,797✔
172

173
  lockTimerList(list);
1,922,797✔
174
  timer->mnext = list->timers;
1,922,797✔
175
  list->timers = timer;
1,922,797✔
176
  unlockTimerList(list);
1,922,797✔
177
}
1,922,796✔
178

179
static tmr_obj_t* findTimer(uintptr_t id) {
2,520,127✔
180
  tmr_obj_t* timer = NULL;
2,520,127✔
181
  if (id > 0) {
2,520,127✔
182
    uint32_t      idx = (uint32_t)(id % timerMap.size);
1,921,170✔
183
    timer_list_t* list = timerMap.slots + idx;
1,921,170✔
184
    lockTimerList(list);
1,921,170✔
185
    for (timer = list->timers; timer != NULL; timer = timer->mnext) {
1,921,183✔
186
      if (timer->id == id) {
1,850,263✔
187
        timerAddRef(timer);
1,850,262✔
188
        break;
1,850,261✔
189
      }
190
    }
191
    unlockTimerList(list);
1,921,181✔
192
  }
193
  return timer;
2,520,141✔
194
}
195

196
static void removeTimer(uintptr_t id) {
1,921,359✔
197
  tmr_obj_t*    prev = NULL;
1,921,359✔
198
  uint32_t      idx = (uint32_t)(id % timerMap.size);
1,921,359✔
199
  timer_list_t* list = timerMap.slots + idx;
1,921,359✔
200
  lockTimerList(list);
1,921,359✔
201
  for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
1,921,359!
202
    if (p->id == id) {
1,921,359!
203
      if (prev == NULL) {
1,921,359!
204
        list->timers = p->mnext;
1,921,359✔
205
      } else {
UNCOV
206
        prev->mnext = p->mnext;
×
207
      }
208
      timerDecRef(p);
1,921,359✔
209
      break;
1,921,359✔
210
    }
UNCOV
211
    prev = p;
×
212
  }
213
  unlockTimerList(list);
1,921,359✔
214
}
1,921,359✔
215

216
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
1,922,792✔
217
  timerAddRef(timer);
1,922,792✔
218
  // select a wheel for the timer, we are not an accurate timer,
219
  // but the inaccuracy should not be too large.
220
  timer->wheel = tListLen(wheels) - 1;
1,922,798✔
221
  for (uint8_t i = 0; i < tListLen(wheels); i++) {
1,933,084!
222
    time_wheel_t* wheel = wheels + i;
1,933,084✔
223
    if (delay < wheel->resolution * wheel->size) {
1,933,084✔
224
      timer->wheel = i;
1,922,798✔
225
      break;
1,922,798✔
226
    }
227
  }
228

229
  time_wheel_t* wheel = wheels + timer->wheel;
1,922,798✔
230
  timer->prev = NULL;
1,922,798✔
231
  timer->expireAt = taosGetMonotonicMs() + delay;
1,922,798✔
232

233
  (void)taosThreadMutexLock(&wheel->mutex);
1,922,798✔
234

235
  uint32_t idx = 0;
1,922,800✔
236
  if (timer->expireAt > wheel->nextScanAt) {
1,922,800✔
237
    // adjust delay according to next scan time of this wheel
238
    // so that the timer is not fired earlier than desired.
239
    delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
1,920,731✔
240
    idx = (delay + wheel->resolution - 1) / wheel->resolution;
1,920,731✔
241
  }
242

243
  timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
1,922,800✔
244
  tmr_obj_t* p = wheel->slots[timer->slot];
1,922,800✔
245
  wheel->slots[timer->slot] = timer;
1,922,800✔
246
  timer->next = p;
1,922,800✔
247
  if (p != NULL) {
1,922,800✔
248
    p->prev = timer;
389,684✔
249
  }
250

251
  (void)taosThreadMutexUnlock(&wheel->mutex);
1,922,800✔
252
}
1,922,800✔
253

254
static bool removeFromWheel(tmr_obj_t* timer) {
575,081✔
255
  uint8_t wheelIdx = timer->wheel;
575,081✔
256
  if (wheelIdx >= tListLen(wheels)) {
575,081✔
257
    return false;
7✔
258
  }
259
  time_wheel_t* wheel = wheels + wheelIdx;
575,074✔
260

261
  bool removed = false;
575,074✔
262
  (void)taosThreadMutexLock(&wheel->mutex);
575,074✔
263
  // other thread may modify timer->wheel, check again.
264
  if (timer->wheel < tListLen(wheels)) {
575,077!
265
    if (timer->prev != NULL) {
575,077✔
266
      timer->prev->next = timer->next;
8,973✔
267
    }
268
    if (timer->next != NULL) {
575,077✔
269
      timer->next->prev = timer->prev;
8,677✔
270
    }
271
    if (timer == wheel->slots[timer->slot]) {
575,077✔
272
      wheel->slots[timer->slot] = timer->next;
566,104✔
273
    }
274
    timer->wheel = tListLen(wheels);
575,077✔
275
    timer->next = NULL;
575,077✔
276
    timer->prev = NULL;
575,077✔
277
    timerDecRef(timer);
575,077✔
278
    removed = true;
575,077✔
279
  }
280
  (void)taosThreadMutexUnlock(&wheel->mutex);
575,077✔
281

282
  return removed;
575,076✔
283
}
284

285
static void processExpiredTimer(void* handle, void* arg) {
1,346,282✔
286
  tmr_obj_t* timer = (tmr_obj_t*)handle;
1,346,282✔
287
  timer->executedBy = taosGetSelfPthreadId();
1,346,282✔
288
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
1,346,282✔
289
  if (state == TIMER_STATE_WAITING) {
1,346,282✔
290
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
1,346,275✔
291
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
1,346,275✔
292

293
    (*timer->fp)(timer->param, (tmr_h)timer->id);
1,346,275✔
294
    atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
1,346,275✔
295

296
    fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution end.";
1,346,275✔
297
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
1,346,275✔
298
  }
299
  removeTimer(timer->id);
1,346,282✔
300
  timerDecRef(timer);
1,346,282✔
301
}
1,346,282✔
302

303
static void addToExpired(tmr_obj_t* head) {
86,812,455✔
304
  const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
86,812,455✔
305

306
  while (head != NULL) {
88,158,737✔
307
    uintptr_t  id = head->id;
1,346,282✔
308
    tmr_obj_t* next = head->next;
1,346,282✔
309
    tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param);
1,346,282✔
310

311
    SSchedMsg schedMsg;
312
    schedMsg.fp = NULL;
1,346,282✔
313
    schedMsg.tfp = processExpiredTimer;
1,346,282✔
314
    schedMsg.msg = NULL;
1,346,282✔
315
    schedMsg.ahandle = head;
1,346,282✔
316
    schedMsg.thandle = NULL;
1,346,282✔
317
    if (taosScheduleTask(tmrQhandle, &schedMsg) != 0) {
1,346,282!
318
      tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to queue.", head->ctrl->label, id);
×
319
    }
320

321
    tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id);
1,346,282✔
322
    head = next;
1,346,282✔
323
  }
324
}
86,812,455✔
325

326
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, tmr_ctrl_t* ctrl) {
1,922,792✔
327
  uintptr_t id = getNextTimerId();
1,922,792✔
328
  timer->id = id;
1,922,796✔
329
  timer->state = TIMER_STATE_WAITING;
1,922,796✔
330
  timer->fp = fp;
1,922,796✔
331
  timer->param = param;
1,922,796✔
332
  timer->ctrl = ctrl;
1,922,796✔
333
  addTimer(timer);
1,922,796✔
334

335
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
1,922,795✔
336
  tmrDebug(fmt, ctrl->label, timer->id, timer->fp, timer->param);
1,922,795✔
337

338
  if (mseconds == 0) {
1,922,792!
339
    timer->wheel = tListLen(wheels);
×
340
    timerAddRef(timer);
×
341
    addToExpired(timer);
×
342
  } else {
343
    addToWheel(timer, mseconds);
1,922,792✔
344
  }
345

346
  // note: use `timer->id` here is unsafe as `timer` may already be freed
347
  return id;
1,922,800✔
348
}
349

350
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle) {
1,915,113✔
351
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
1,915,113✔
352
  if (ctrl == NULL || ctrl->label[0] == 0) {
1,915,113!
353
    return NULL;
×
354
  }
355

356
  tmr_obj_t* timer = (tmr_obj_t*)taosMemoryCalloc(1, sizeof(tmr_obj_t));
1,915,114!
357
  if (timer == NULL) {
1,915,115!
358
    tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
×
359
    return NULL;
×
360
  }
361

362
  return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
1,915,115✔
363
}
364

365
static void taosTimerLoopFunc(int32_t signo) {
28,937,485✔
366
  int64_t now = taosGetMonotonicMs();
28,937,485✔
367

368
  for (int32_t i = 0; i < tListLen(wheels); i++) {
107,200,603✔
369
    // `expried` is a temporary expire list.
370
    // expired timers are first add to this list, then move
371
    // to expired queue as a batch to improve performance.
372
    // note this list is used as a stack in this function.
373
    tmr_obj_t* expired = NULL;
86,812,455✔
374

375
    time_wheel_t* wheel = wheels + i;
86,812,455✔
376
    while (now >= wheel->nextScanAt) {
107,347,202✔
377
      (void)taosThreadMutexLock(&wheel->mutex);
29,084,084✔
378
      wheel->index = (wheel->index + 1) % wheel->size;
29,084,084✔
379
      tmr_obj_t* timer = wheel->slots[wheel->index];
29,084,084✔
380
      while (timer != NULL) {
30,430,366✔
381
        tmr_obj_t* next = timer->next;
1,346,282✔
382
        if (now < timer->expireAt) {
1,346,282!
383
          timer = next;
×
384
          continue;
×
385
        }
386

387
        // remove from the wheel
388
        if (timer->prev == NULL) {
1,346,282!
389
          wheel->slots[wheel->index] = next;
1,346,282✔
390
          if (next != NULL) {
1,346,282✔
391
            next->prev = NULL;
373,292✔
392
          }
393
        } else {
394
          timer->prev->next = next;
×
395
          if (next != NULL) {
×
396
            next->prev = timer->prev;
×
397
          }
398
        }
399
        timer->wheel = tListLen(wheels);
1,346,282✔
400

401
        // add to temporary expire list
402
        timer->next = expired;
1,346,282✔
403
        timer->prev = NULL;
1,346,282✔
404
        if (expired != NULL) {
1,346,282✔
405
          expired->prev = timer;
373,315✔
406
        }
407
        expired = timer;
1,346,282✔
408

409
        timer = next;
1,346,282✔
410
      }
411
      wheel->nextScanAt += wheel->resolution;
29,084,084✔
412
      (void)taosThreadMutexUnlock(&wheel->mutex);
29,084,084✔
413
    }
414

415
    addToExpired(expired);
78,263,118✔
416
  }
417
}
20,388,148✔
418

419
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
1,850,259✔
420
  if (state == TIMER_STATE_WAITING) {
1,850,259✔
421
    bool reusable = false;
575,082✔
422
    if (removeFromWheel(timer)) {
575,082✔
423
      removeTimer(timer->id);
575,077✔
424
      // only safe to reuse the timer when timer is removed from the wheel.
425
      // we cannot guarantee the thread safety of the timr in all other cases.
426
      reusable = true;
575,077✔
427
    }
428
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is cancelled.";
575,084✔
429
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
575,084✔
430
    return reusable;
575,084✔
431
  }
432

433
  if (state != TIMER_STATE_EXPIRED) {
1,275,177✔
434
    // timer already stopped or cancelled, has nothing to do in this case
435
    return false;
1✔
436
  }
437

438
  if (timer->executedBy == taosGetSelfPthreadId()) {
1,275,176✔
439
    // taosTmrReset is called in the timer callback, should do nothing in this
440
    // case to avoid dead lock. note taosTmrReset must be the last statement
441
    // of the callback funtion, will be a bug otherwise.
442
    return false;
1,275,171✔
443
  }
444

445
  // timer callback is executing in another thread, we SHOULD wait it to stop,
446
  // BUT this may result in dead lock if current thread are holding a lock which
447
  // the timer callback need to acquire. so, we HAVE TO return directly.
448
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
6✔
449
  tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
6!
450
  return false;
6✔
451
}
452

453
bool taosTmrStop(tmr_h timerId) {
654,023✔
454
  uintptr_t id = (uintptr_t)timerId;
654,023✔
455

456
  tmr_obj_t* timer = findTimer(id);
654,023✔
457
  if (timer == NULL) {
654,031✔
458
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
86,625✔
459
    return false;
86,620✔
460
  }
461

462
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
567,406✔
463
  (void)doStopTimer(timer, state);
567,410✔
464
  timerDecRef(timer);
567,412✔
465

466
  return state == TIMER_STATE_WAITING;
567,411✔
467
}
468

469
bool taosTmrStopA(tmr_h* timerId) {
6,969✔
470
  bool ret = taosTmrStop(*timerId);
6,969✔
471
  *timerId = NULL;
6,969✔
472
  return ret;
6,969✔
473
}
474

475
bool taosTmrIsStopped(tmr_h* timerId) {
126✔
476
  uintptr_t id = (uintptr_t)*timerId;
126✔
477

478
  tmr_obj_t* timer = findTimer(id);
126✔
479
  if (timer == NULL) {
126!
480
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
126!
481
    return true;
126✔
482
  }
483

484
  uint8_t state = atomic_load_8(&timer->state);
×
485

486
  return (state == TIMER_STATE_CANCELED) || (state == TIMER_STATE_STOPPED);
×
487
}
488

489
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) {
1,865,982✔
490
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
1,865,982✔
491
  if (ctrl == NULL || ctrl->label[0] == 0) {
1,865,982!
492
    return false;
1✔
493
  }
494

495
  uintptr_t  id = (uintptr_t)*pTmrId;
1,865,981✔
496
  bool       stopped = false;
1,865,981✔
497
  tmr_obj_t* timer = findTimer(id);
1,865,981✔
498
  if (timer == NULL) {
1,865,980✔
499
    tmrDebug("%s timer[id=%" PRIuPTR "] does not exist", ctrl->label, id);
583,130✔
500
  } else {
501
    uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
1,282,850✔
502
    if (!doStopTimer(timer, state)) {
1,282,850✔
503
      timerDecRef(timer);
1,275,171✔
504
      timer = NULL;
1,275,171✔
505
    }
506
    stopped = state == TIMER_STATE_WAITING;
1,282,850✔
507
  }
508

509
  if (timer == NULL) {
1,865,978✔
510
    *pTmrId = taosTmrStart(fp, mseconds, param, handle);
1,858,299✔
511
    if (NULL == *pTmrId) {
1,858,307!
512
      stopped = true;
×
513
    }
514
    return stopped;
1,858,307✔
515
  }
516

517
  tmrDebug("%s timer[id=%" PRIuPTR "] is reused", ctrl->label, timer->id);
7,679✔
518

519
  // wait until there's no other reference to this timer,
520
  // so that we can reuse this timer safely.
521
  for (int32_t i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
7,679!
522
    if (i % 1000 == 0) {
×
523
      (void)sched_yield();
×
524
    }
525
  }
526

527
  if (timer->refCount != 1) {
7,679!
528
    uError("timer refCount=%d not expected 1", timer->refCount);
×
529
  }
530
  memset(timer, 0, sizeof(*timer));
7,679✔
531
  *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
7,679✔
532

533
  return stopped;
7,679✔
534
}
535

536
static int32_t taosTmrModuleInit(void) {
3,859✔
537
  tmrCtrls = taosMemoryMalloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl);
3,859!
538
  if (tmrCtrls == NULL) {
3,859!
539
    tmrError("failed to allocate memory for timer controllers.");
×
540
    return terrno;
×
541
  }
542

543
  memset(&timerMap, 0, sizeof(timerMap));
3,859✔
544

545
  for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) {
4,337,516✔
546
    tmr_ctrl_t* ctrl = tmrCtrls + i;
4,333,657✔
547
    ctrl->next = ctrl + 1;
4,333,657✔
548
  }
549
  (tmrCtrls + tsMaxTmrCtrl - 1)->next = NULL;
3,859✔
550
  unusedTmrCtrl = tmrCtrls;
3,859✔
551

552
  (void)taosThreadMutexInit(&tmrCtrlMutex, NULL);
3,859✔
553

554
  int64_t now = taosGetMonotonicMs();
3,859✔
555
  for (int32_t i = 0; i < tListLen(wheels); i++) {
15,436✔
556
    time_wheel_t* wheel = wheels + i;
11,577✔
557
    if (taosThreadMutexInit(&wheel->mutex, NULL) != 0) {
11,577!
558
      tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
×
559
      return terrno;
×
560
    }
561
    wheel->nextScanAt = now + wheel->resolution;
11,577✔
562
    wheel->index = 0;
11,577✔
563
    wheel->slots = (tmr_obj_t**)taosMemoryCalloc(wheel->size, sizeof(tmr_obj_t*));
11,577!
564
    if (wheel->slots == NULL) {
11,577!
565
      tmrError("failed to allocate wheel slots");
×
566
      return terrno;
×
567
    }
568
    timerMap.size += wheel->size;
11,577✔
569
  }
570

571
  timerMap.count = 0;
3,859✔
572
  timerMap.slots = (timer_list_t*)taosMemoryCalloc(timerMap.size, sizeof(timer_list_t));
3,859!
573
  if (timerMap.slots == NULL) {
3,859!
574
    tmrError("failed to allocate hash map");
×
575
    return terrno;
×
576
  }
577

578
  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
3,859✔
579
  if (taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK) != 0) {
3,859!
580
    tmrError("failed to initialize timer");
×
581
  }
582

583
  tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
3,859✔
584

585
  return 2;
3,859✔
586
}
587

588
static int32_t taosTmrInitModule(void) {
26,140✔
589
  if (atomic_load_32(&tmrModuleInit) == 2) {
26,140✔
590
    return 0;
22,282✔
591
  }
592

593
  if (atomic_load_32(&tmrModuleInit) < 0) {
3,859!
594
    return -1;
×
595
  }
596

597
  while (true) {
598
    if (0 == atomic_val_compare_exchange_32(&tmrModuleInit, 0, 1)) {
7,718✔
599
      atomic_store_32(&tmrModuleInit, taosTmrModuleInit());
3,859✔
600
    } else if (atomic_load_32(&tmrModuleInit) < 0) {
3,859!
601
      return -1;
×
602
    } else if (atomic_load_32(&tmrModuleInit) == 2) {
3,859!
603
      return 0;
3,859✔
604
    } else {
605
      taosMsleep(1);
×
606
    }
607
  }
608

609
  return -1;
610
}
611

612
void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, const char* label) {
26,142✔
613
  const char* ret = taosMonotonicInit();
26,142✔
614
  tmrDebug("ttimer monotonic clock source:%s", ret);
26,142✔
615

616
  if (taosTmrInitModule() < 0) {
26,142!
617
    return NULL;
×
618
  }
619

620
  (void)taosThreadMutexLock(&tmrCtrlMutex);
26,141✔
621
  tmr_ctrl_t* ctrl = unusedTmrCtrl;
26,142✔
622
  if (ctrl != NULL) {
26,142!
623
    unusedTmrCtrl = ctrl->next;
26,142✔
624
    numOfTmrCtrl++;
26,142✔
625
  }
626
  (void)taosThreadMutexUnlock(&tmrCtrlMutex);
26,142✔
627

628
  if (ctrl == NULL) {
26,142!
629
    tmrError("%s too many timer controllers, failed to create timer controller.", label);
×
630
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
631
    return NULL;
×
632
  }
633

634
  tstrncpy(ctrl->label, label, sizeof(ctrl->label));
26,142✔
635

636
  tmrDebug("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
26,142✔
637
  return ctrl;
26,142✔
638
}
639

640
void taosTmrCleanUp(void* handle) {
24,159✔
641
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
24,159✔
642
  if (ctrl == NULL || ctrl->label[0] == 0) {
24,159!
643
    return;
×
644
  }
645

646
  tmrDebug("%s timer controller is cleaned up.", ctrl->label);
24,159✔
647
  ctrl->label[0] = 0;
24,159✔
648

649
  (void)taosThreadMutexLock(&tmrCtrlMutex);
24,159✔
650
  ctrl->next = unusedTmrCtrl;
24,159✔
651
  numOfTmrCtrl--;
24,159✔
652
  unusedTmrCtrl = ctrl;
24,159✔
653
  (void)taosThreadMutexUnlock(&tmrCtrlMutex);
24,159✔
654

655
  tmrDebug("time controller's tmr ctrl size:  %d", numOfTmrCtrl);
24,159✔
656
  if (numOfTmrCtrl <= 0) {
24,159✔
657
    taosUninitTimer();
1,881✔
658

659
    taosCleanUpScheduler(tmrQhandle);
1,881✔
660
    taosMemoryFreeClear(tmrQhandle);
1,881!
661

662
    for (int32_t i = 0; i < tListLen(wheels); i++) {
7,524✔
663
      time_wheel_t* wheel = wheels + i;
5,643✔
664
      (void)taosThreadMutexDestroy(&wheel->mutex);
5,643✔
665
      taosMemoryFree(wheel->slots);
5,643!
666
    }
667

668
    (void)taosThreadMutexDestroy(&tmrCtrlMutex);
1,881✔
669

670
    for (size_t i = 0; i < timerMap.size; i++) {
11,558,745✔
671
      timer_list_t* list = timerMap.slots + i;
11,556,864✔
672
      tmr_obj_t*    t = list->timers;
11,556,864✔
673
      while (t != NULL) {
11,557,480✔
674
        tmr_obj_t* next = t->mnext;
616✔
675
        taosMemoryFree(t);
616!
676
        t = next;
616✔
677
      }
678
    }
679
    taosMemoryFree(timerMap.slots);
1,881!
680
    taosMemoryFree(tmrCtrls);
1,881!
681

682
    tmrCtrls = NULL;
1,881✔
683
    unusedTmrCtrl = NULL;
1,881✔
684
    atomic_store_32(&tmrModuleInit, 0);
1,881✔
685
  }
686
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc