• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3559

18 Dec 2024 12:59AM UTC coverage: 59.805% (+0.03%) from 59.778%
#3559

push

travis-ci

web-flow
Merge pull request #29187 from taosdata/merge/mainto3.0

merge: main to 3.0 branch

132705 of 287544 branches covered (46.15%)

Branch coverage included in aggregate %.

87 of 95 new or added lines in 19 files covered. (91.58%)

1132 existing lines in 133 files now uncovered.

209591 of 284807 relevant lines covered (73.59%)

8125235.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.93
/source/util/src/ttimer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "ttimer.h"
18
#include "taoserror.h"
19
#include "tdef.h"
20
#include "tlog.h"
21
#include "tsched.h"
22

23
#define tmrFatal(...)                                                     \
24
  {                                                                       \
25
    if (tmrDebugFlag & DEBUG_FATAL) {                                     \
26
      taosPrintLog("TMR FATAL ", DEBUG_FATAL, tmrDebugFlag, __VA_ARGS__); \
27
    }                                                                     \
28
  }
29
#define tmrError(...)                                                     \
30
  {                                                                       \
31
    if (tmrDebugFlag & DEBUG_ERROR) {                                     \
32
      taosPrintLog("TMR ERROR ", DEBUG_ERROR, tmrDebugFlag, __VA_ARGS__); \
33
    }                                                                     \
34
  }
35
#define tmrWarn(...)                                                    \
36
  {                                                                     \
37
    if (tmrDebugFlag & DEBUG_WARN) {                                    \
38
      taosPrintLog("TMR WARN ", DEBUG_WARN, tmrDebugFlag, __VA_ARGS__); \
39
    }                                                                   \
40
  }
41
#define tmrInfo(...)                                               \
42
  {                                                                \
43
    if (tmrDebugFlag & DEBUG_INFO) {                               \
44
      taosPrintLog("TMR ", DEBUG_INFO, tmrDebugFlag, __VA_ARGS__); \
45
    }                                                              \
46
  }
47
#define tmrDebug(...)                                               \
48
  {                                                                 \
49
    if (tmrDebugFlag & DEBUG_DEBUG) {                               \
50
      taosPrintLog("TMR ", DEBUG_DEBUG, tmrDebugFlag, __VA_ARGS__); \
51
    }                                                               \
52
  }
53
#define tmrTrace(...)                                               \
54
  {                                                                 \
55
    if (tmrDebugFlag & DEBUG_TRACE) {                               \
56
      taosPrintLog("TMR ", DEBUG_TRACE, tmrDebugFlag, __VA_ARGS__); \
57
    }                                                               \
58
  }
59

60
#define TIMER_STATE_WAITING  0
61
#define TIMER_STATE_EXPIRED  1
62
#define TIMER_STATE_STOPPED  2
63
#define TIMER_STATE_CANCELED 3
64

65
typedef union _tmr_ctrl_t {
66
  char label[16];
67
  struct {
68
    // pad to ensure 'next' is the end of this union
69
    char               padding[16 - sizeof(union _tmr_ctrl_t*)];
70
    union _tmr_ctrl_t* next;
71
  };
72
} tmr_ctrl_t;
73

74
typedef struct tmr_obj_t {
75
  uintptr_t         id;
76
  tmr_ctrl_t*       ctrl;
77
  struct tmr_obj_t* mnext;
78
  struct tmr_obj_t* prev;
79
  struct tmr_obj_t* next;
80
  uint16_t          slot;
81
  uint8_t           wheel;
82
  uint8_t           state;
83
  uint8_t           refCount;
84
  uint8_t           reserved1;
85
  uint16_t          reserved2;
86
  union {
87
    int64_t expireAt;
88
    int64_t executedBy;
89
  };
90
  TAOS_TMR_CALLBACK fp;
91
  void*             param;
92
} tmr_obj_t;
93

94
typedef struct timer_list_t {
95
  int64_t    lockedBy;
96
  tmr_obj_t* timers;
97
} timer_list_t;
98

99
typedef struct timer_map_t {
100
  uint32_t      size;
101
  uint32_t      count;
102
  timer_list_t* slots;
103
} timer_map_t;
104

105
typedef struct time_wheel_t {
106
  TdThreadMutex mutex;
107
  int64_t       nextScanAt;
108
  uint32_t      resolution;
109
  uint16_t      size;
110
  uint16_t      index;
111
  tmr_obj_t**   slots;
112
} time_wheel_t;
113

114
static int32_t tsMaxTmrCtrl = TSDB_MAX_VNODES_PER_DB + 100;
115

116
static int32_t       tmrModuleInit = 0;
117
static TdThreadMutex tmrCtrlMutex;
118
static tmr_ctrl_t*   tmrCtrls;
119
static tmr_ctrl_t*   unusedTmrCtrl = NULL;
120
static void*         tmrQhandle;
121
static int32_t       numOfTmrCtrl = 0;
122

123
int32_t          taosTmrThreads = 1;
124
static uintptr_t nextTimerId = 0;
125

126
static time_wheel_t wheels[] = {
127
    {.resolution = MSECONDS_PER_TICK, .size = 4096},
128
    {.resolution = 1000, .size = 1024},
129
    {.resolution = 60000, .size = 1024},
130
};
131
static timer_map_t timerMap;
132

133
static uintptr_t getNextTimerId() {
1,736,459✔
134
  uintptr_t id;
135
  do {
136
    id = (uintptr_t)atomic_add_fetch_ptr((void**)&nextTimerId, 1);
1,736,459✔
137
  } while (id == 0);
1,736,463✔
138
  return id;
1,736,462✔
139
}
140

141
static void timerAddRef(tmr_obj_t* timer) { (void)atomic_add_fetch_8(&timer->refCount, 1); }
5,132,674✔
142

143
static void timerDecRef(tmr_obj_t* timer) {
5,122,108✔
144
  if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
5,122,108✔
145
    taosMemoryFree(timer);
1,727,371!
146
  }
147
}
5,122,128✔
148

149
static void lockTimerList(timer_list_t* list) {
5,205,640✔
150
  int64_t tid = taosGetSelfPthreadId();
5,205,640✔
151
  int32_t i = 0;
5,205,638✔
152
  while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
5,205,641✔
153
    if (++i % 1000 == 0) {
3!
154
      (void)sched_yield();
×
155
    }
156
  }
157
}
5,205,664✔
158

159
static void unlockTimerList(timer_list_t* list) {
5,205,653✔
160
  int64_t tid = taosGetSelfPthreadId();
5,205,653✔
161
  if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
5,205,649!
162
    uError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
×
163
  }
164
}
5,205,657✔
165

166
static void addTimer(tmr_obj_t* timer) {
1,736,459✔
167
  timerAddRef(timer);
1,736,459✔
168
  timer->wheel = tListLen(wheels);
1,736,461✔
169

170
  uint32_t      idx = (uint32_t)(timer->id % timerMap.size);
1,736,461✔
171
  timer_list_t* list = timerMap.slots + idx;
1,736,461✔
172

173
  lockTimerList(list);
1,736,461✔
174
  timer->mnext = list->timers;
1,736,462✔
175
  list->timers = timer;
1,736,462✔
176
  unlockTimerList(list);
1,736,462✔
177
}
1,736,463✔
178

179
static tmr_obj_t* findTimer(uintptr_t id) {
2,288,650✔
180
  tmr_obj_t* timer = NULL;
2,288,650✔
181
  if (id > 0) {
2,288,650✔
182
    uint32_t      idx = (uint32_t)(id % timerMap.size);
1,734,179✔
183
    timer_list_t* list = timerMap.slots + idx;
1,734,179✔
184
    lockTimerList(list);
1,734,179✔
185
    for (timer = list->timers; timer != NULL; timer = timer->mnext) {
1,734,195✔
186
      if (timer->id == id) {
1,659,768!
187
        timerAddRef(timer);
1,659,768✔
188
        break;
1,659,769✔
189
      }
190
    }
191
    unlockTimerList(list);
1,734,196✔
192
  }
193
  return timer;
2,288,671✔
194
}
195

196
static void removeTimer(uintptr_t id) {
1,735,012✔
197
  tmr_obj_t*    prev = NULL;
1,735,012✔
198
  uint32_t      idx = (uint32_t)(id % timerMap.size);
1,735,012✔
199
  timer_list_t* list = timerMap.slots + idx;
1,735,012✔
200
  lockTimerList(list);
1,735,012✔
201
  for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
1,735,013!
202
    if (p->id == id) {
1,735,013!
203
      if (prev == NULL) {
1,735,013!
204
        list->timers = p->mnext;
1,735,013✔
205
      } else {
206
        prev->mnext = p->mnext;
×
207
      }
208
      timerDecRef(p);
1,735,013✔
209
      break;
1,735,013✔
210
    }
211
    prev = p;
×
212
  }
213
  unlockTimerList(list);
1,735,013✔
214
}
1,735,013✔
215

216
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
1,736,461✔
217
  timerAddRef(timer);
1,736,461✔
218
  // select a wheel for the timer, we are not an accurate timer,
219
  // but the inaccuracy should not be too large.
220
  timer->wheel = tListLen(wheels) - 1;
1,736,463✔
221
  for (uint8_t i = 0; i < tListLen(wheels); i++) {
1,746,770!
222
    time_wheel_t* wheel = wheels + i;
1,746,771✔
223
    if (delay < wheel->resolution * wheel->size) {
1,746,771✔
224
      timer->wheel = i;
1,736,464✔
225
      break;
1,736,464✔
226
    }
227
  }
228

229
  time_wheel_t* wheel = wheels + timer->wheel;
1,736,463✔
230
  timer->prev = NULL;
1,736,463✔
231
  timer->expireAt = taosGetMonotonicMs() + delay;
1,736,463✔
232

233
  (void)taosThreadMutexLock(&wheel->mutex);
1,736,460✔
234

235
  uint32_t idx = 0;
1,736,464✔
236
  if (timer->expireAt > wheel->nextScanAt) {
1,736,464✔
237
    // adjust delay according to next scan time of this wheel
238
    // so that the timer is not fired earlier than desired.
239
    delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
1,734,513✔
240
    idx = (delay + wheel->resolution - 1) / wheel->resolution;
1,734,513✔
241
  }
242

243
  timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
1,736,464✔
244
  tmr_obj_t* p = wheel->slots[timer->slot];
1,736,464✔
245
  wheel->slots[timer->slot] = timer;
1,736,464✔
246
  timer->next = p;
1,736,464✔
247
  if (p != NULL) {
1,736,464✔
248
    p->prev = timer;
329,620✔
249
  }
250

251
  (void)taosThreadMutexUnlock(&wheel->mutex);
1,736,464✔
252
}
1,736,463✔
253

254
static bool removeFromWheel(tmr_obj_t* timer) {
531,419✔
255
  uint8_t wheelIdx = timer->wheel;
531,419✔
256
  if (wheelIdx >= tListLen(wheels)) {
531,419✔
257
    return false;
4✔
258
  }
259
  time_wheel_t* wheel = wheels + wheelIdx;
531,415✔
260

261
  bool removed = false;
531,415✔
262
  (void)taosThreadMutexLock(&wheel->mutex);
531,415✔
263
  // other thread may modify timer->wheel, check again.
264
  if (timer->wheel < tListLen(wheels)) {
531,420!
265
    if (timer->prev != NULL) {
531,420✔
266
      timer->prev->next = timer->next;
9,015✔
267
    }
268
    if (timer->next != NULL) {
531,420✔
269
      timer->next->prev = timer->prev;
8,767✔
270
    }
271
    if (timer == wheel->slots[timer->slot]) {
531,420✔
272
      wheel->slots[timer->slot] = timer->next;
522,405✔
273
    }
274
    timer->wheel = tListLen(wheels);
531,420✔
275
    timer->next = NULL;
531,420✔
276
    timer->prev = NULL;
531,420✔
277
    timerDecRef(timer);
531,420✔
278
    removed = true;
531,420✔
279
  }
280
  (void)taosThreadMutexUnlock(&wheel->mutex);
531,420✔
281

282
  return removed;
531,419✔
283
}
284

285
static void processExpiredTimer(void* handle, void* arg) {
1,203,593✔
286
  tmr_obj_t* timer = (tmr_obj_t*)handle;
1,203,593✔
287
  timer->executedBy = taosGetSelfPthreadId();
1,203,593✔
288
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
1,203,593✔
289
  if (state == TIMER_STATE_WAITING) {
1,203,593✔
290
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
1,203,589✔
291
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
1,203,589✔
292

293
    (*timer->fp)(timer->param, (tmr_h)timer->id);
1,203,589✔
294
    atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
1,203,589✔
295

296
    fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution end.";
1,203,589✔
297
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
1,203,589✔
298
  }
299
  removeTimer(timer->id);
1,203,593✔
300
  timerDecRef(timer);
1,203,593✔
301
}
1,203,593✔
302

303
static void addToExpired(tmr_obj_t* head) {
85,770,441✔
304
  const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
85,770,441✔
305

306
  while (head != NULL) {
86,974,034✔
307
    uintptr_t  id = head->id;
1,203,593✔
308
    tmr_obj_t* next = head->next;
1,203,593✔
309
    tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param);
1,203,593✔
310

311
    SSchedMsg schedMsg;
312
    schedMsg.fp = NULL;
1,203,593✔
313
    schedMsg.tfp = processExpiredTimer;
1,203,593✔
314
    schedMsg.msg = NULL;
1,203,593✔
315
    schedMsg.ahandle = head;
1,203,593✔
316
    schedMsg.thandle = NULL;
1,203,593✔
317
    if (taosScheduleTask(tmrQhandle, &schedMsg) != 0) {
1,203,593!
318
      tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to queue.", head->ctrl->label, id);
×
319
    }
320

321
    tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id);
1,203,593✔
322
    head = next;
1,203,593✔
323
  }
324
}
85,770,441✔
325

326
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, tmr_ctrl_t* ctrl) {
1,736,458✔
327
  uintptr_t id = getNextTimerId();
1,736,458✔
328
  timer->id = id;
1,736,460✔
329
  timer->state = TIMER_STATE_WAITING;
1,736,460✔
330
  timer->fp = fp;
1,736,460✔
331
  timer->param = param;
1,736,460✔
332
  timer->ctrl = ctrl;
1,736,460✔
333
  addTimer(timer);
1,736,460✔
334

335
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
1,736,463✔
336
  tmrDebug(fmt, ctrl->label, timer->id, timer->fp, timer->param);
1,736,463✔
337

338
  if (mseconds == 0) {
1,736,464!
339
    timer->wheel = tListLen(wheels);
×
340
    timerAddRef(timer);
×
341
    addToExpired(timer);
×
342
  } else {
343
    addToWheel(timer, mseconds);
1,736,464✔
344
  }
345

346
  // note: use `timer->id` here is unsafe as `timer` may already be freed
347
  return id;
1,736,464✔
348
}
349

350
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle) {
1,728,816✔
351
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
1,728,816✔
352
  if (ctrl == NULL || ctrl->label[0] == 0) {
1,728,816!
353
    return NULL;
1✔
354
  }
355

356
  tmr_obj_t* timer = (tmr_obj_t*)taosMemoryCalloc(1, sizeof(tmr_obj_t));
1,728,815!
357
  if (timer == NULL) {
1,728,817!
358
    tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
×
359
    return NULL;
×
360
  }
361

362
  return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
1,728,817✔
363
}
364

365
static void taosTimerLoopFunc(int32_t signo) {
28,590,147✔
366
  int64_t now = taosGetMonotonicMs();
28,590,147✔
367

368
  for (int32_t i = 0; i < tListLen(wheels); i++) {
77,064,381✔
369
    // `expried` is a temporary expire list.
370
    // expired timers are first add to this list, then move
371
    // to expired queue as a batch to improve performance.
372
    // note this list is used as a stack in this function.
373
    tmr_obj_t* expired = NULL;
48,474,234✔
374

375
    time_wheel_t* wheel = wheels + i;
48,474,234✔
376
    while (now >= wheel->nextScanAt) {
114,504,087✔
377
      (void)taosThreadMutexLock(&wheel->mutex);
28,733,646✔
378
      wheel->index = (wheel->index + 1) % wheel->size;
28,733,646✔
379
      tmr_obj_t* timer = wheel->slots[wheel->index];
28,733,646✔
380
      while (timer != NULL) {
29,937,239✔
381
        tmr_obj_t* next = timer->next;
1,203,593✔
382
        if (now < timer->expireAt) {
1,203,593!
383
          timer = next;
×
384
          continue;
×
385
        }
386

387
        // remove from the wheel
388
        if (timer->prev == NULL) {
1,203,593!
389
          wheel->slots[wheel->index] = next;
1,203,593✔
390
          if (next != NULL) {
1,203,593✔
391
            next->prev = NULL;
312,967✔
392
          }
393
        } else {
394
          timer->prev->next = next;
×
395
          if (next != NULL) {
×
396
            next->prev = timer->prev;
×
397
          }
398
        }
399
        timer->wheel = tListLen(wheels);
1,203,593✔
400

401
        // add to temporary expire list
402
        timer->next = expired;
1,203,593✔
403
        timer->prev = NULL;
1,203,593✔
404
        if (expired != NULL) {
1,203,593✔
405
          expired->prev = timer;
312,971✔
406
        }
407
        expired = timer;
1,203,593✔
408

409
        timer = next;
1,203,593✔
410
      }
411
      wheel->nextScanAt += wheel->resolution;
28,733,646✔
412
      (void)taosThreadMutexUnlock(&wheel->mutex);
28,733,646✔
413
    }
414

415
    addToExpired(expired);
85,770,441✔
416
  }
417
}
28,590,147✔
418

419
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
1,659,767✔
420
  if (state == TIMER_STATE_WAITING) {
1,659,767✔
421
    bool reusable = false;
531,423✔
422
    if (removeFromWheel(timer)) {
531,423✔
423
      removeTimer(timer->id);
531,419✔
424
      // only safe to reuse the timer when timer is removed from the wheel.
425
      // we cannot guarantee the thread safety of the timr in all other cases.
426
      reusable = true;
531,420✔
427
    }
428
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is cancelled.";
531,424✔
429
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
531,424✔
430
    return reusable;
531,424✔
431
  }
432

433
  if (state != TIMER_STATE_EXPIRED) {
1,128,344✔
434
    // timer already stopped or cancelled, has nothing to do in this case
435
    return false;
2✔
436
  }
437

438
  if (timer->executedBy == taosGetSelfPthreadId()) {
1,128,342✔
439
    // taosTmrReset is called in the timer callback, should do nothing in this
440
    // case to avoid dead lock. note taosTmrReset must be the last statement
441
    // of the callback funtion, will be a bug otherwise.
442
    return false;
1,128,334✔
443
  }
444

445
  // timer callback is executing in another thread, we SHOULD wait it to stop,
446
  // BUT this may result in dead lock if current thread are holding a lock which
447
  // the timer callback need to acquire. so, we HAVE TO return directly.
448
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
9✔
449
  tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
9!
450
  return false;
9✔
451
}
452

453
bool taosTmrStop(tmr_h timerId) {
609,597✔
454
  uintptr_t id = (uintptr_t)timerId;
609,597✔
455

456
  tmr_obj_t* timer = findTimer(id);
609,597✔
457
  if (timer == NULL) {
609,606✔
458
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
85,821✔
459
    return false;
85,821✔
460
  }
461

462
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
523,785✔
463
  (void)doStopTimer(timer, state);
523,785✔
464
  timerDecRef(timer);
523,786✔
465

466
  return state == TIMER_STATE_WAITING;
523,784✔
467
}
468

469
bool taosTmrStopA(tmr_h* timerId) {
7,042✔
470
  bool ret = taosTmrStop(*timerId);
7,042✔
471
  *timerId = NULL;
7,042✔
472
  return ret;
7,042✔
473
}
474

475
bool taosTmrIsStopped(tmr_h* timerId) {
128✔
476
  uintptr_t id = (uintptr_t)*timerId;
128✔
477

478
  tmr_obj_t* timer = findTimer(id);
128✔
479
  if (timer == NULL) {
128!
480
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
128!
481
    return true;
128✔
482
  }
483

484
  uint8_t state = atomic_load_8(&timer->state);
×
485

486
  return (state == TIMER_STATE_CANCELED) || (state == TIMER_STATE_STOPPED);
×
487
}
488

489
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) {
1,678,933✔
490
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
1,678,933✔
491
  if (ctrl == NULL || ctrl->label[0] == 0) {
1,678,933!
UNCOV
492
    return false;
×
493
  }
494

495
  uintptr_t  id = (uintptr_t)*pTmrId;
1,678,934✔
496
  bool       stopped = false;
1,678,934✔
497
  tmr_obj_t* timer = findTimer(id);
1,678,934✔
498
  if (timer == NULL) {
1,678,936✔
499
    tmrDebug("%s timer[id=%" PRIuPTR "] does not exist", ctrl->label, id);
542,954✔
500
  } else {
501
    uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
1,135,982✔
502
    if (!doStopTimer(timer, state)) {
1,135,983✔
503
      timerDecRef(timer);
1,128,341✔
504
      timer = NULL;
1,128,341✔
505
    }
506
    stopped = state == TIMER_STATE_WAITING;
1,135,983✔
507
  }
508

509
  if (timer == NULL) {
1,678,934✔
510
    *pTmrId = taosTmrStart(fp, mseconds, param, handle);
1,671,292✔
511
    if (NULL == *pTmrId) {
1,671,295!
512
      stopped = true;
×
513
    }
514
    return stopped;
1,671,295✔
515
  }
516

517
  tmrDebug("%s timer[id=%" PRIuPTR "] is reused", ctrl->label, timer->id);
7,642✔
518

519
  // wait until there's no other reference to this timer,
520
  // so that we can reuse this timer safely.
521
  for (int32_t i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
7,642!
522
    if (i % 1000 == 0) {
×
523
      (void)sched_yield();
×
524
    }
525
  }
526

527
  if (timer->refCount != 1) {
7,642!
528
    uError("timer refCount=%d not expected 1", timer->refCount);
×
529
  }
530
  memset(timer, 0, sizeof(*timer));
7,642✔
531
  *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
7,642✔
532

533
  return stopped;
7,642✔
534
}
535

536
static int32_t taosTmrModuleInit(void) {
3,898✔
537
  tmrCtrls = taosMemoryMalloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl);
3,898!
538
  if (tmrCtrls == NULL) {
3,898!
539
    tmrError("failed to allocate memory for timer controllers.");
×
540
    return terrno;
×
541
  }
542

543
  memset(&timerMap, 0, sizeof(timerMap));
3,898✔
544

545
  for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) {
4,381,352✔
546
    tmr_ctrl_t* ctrl = tmrCtrls + i;
4,377,454✔
547
    ctrl->next = ctrl + 1;
4,377,454✔
548
  }
549
  (tmrCtrls + tsMaxTmrCtrl - 1)->next = NULL;
3,898✔
550
  unusedTmrCtrl = tmrCtrls;
3,898✔
551

552
  (void)taosThreadMutexInit(&tmrCtrlMutex, NULL);
3,898✔
553

554
  int64_t now = taosGetMonotonicMs();
3,898✔
555
  for (int32_t i = 0; i < tListLen(wheels); i++) {
15,592✔
556
    time_wheel_t* wheel = wheels + i;
11,694✔
557
    if (taosThreadMutexInit(&wheel->mutex, NULL) != 0) {
11,694!
558
      tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
×
559
      return terrno;
×
560
    }
561
    wheel->nextScanAt = now + wheel->resolution;
11,694✔
562
    wheel->index = 0;
11,694✔
563
    wheel->slots = (tmr_obj_t**)taosMemoryCalloc(wheel->size, sizeof(tmr_obj_t*));
11,694!
564
    if (wheel->slots == NULL) {
11,694!
565
      tmrError("failed to allocate wheel slots");
×
566
      return terrno;
×
567
    }
568
    timerMap.size += wheel->size;
11,694✔
569
  }
570

571
  timerMap.count = 0;
3,898✔
572
  timerMap.slots = (timer_list_t*)taosMemoryCalloc(timerMap.size, sizeof(timer_list_t));
3,898!
573
  if (timerMap.slots == NULL) {
3,898!
574
    tmrError("failed to allocate hash map");
×
575
    return terrno;
×
576
  }
577

578
  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
3,898✔
579
  if (taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK) != 0) {
3,898!
580
    tmrError("failed to initialize timer");
×
581
  }
582

583
  tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
3,898✔
584

585
  return 2;
3,898✔
586
}
587

588
static int32_t taosTmrInitModule(void) {
26,205✔
589
  if (atomic_load_32(&tmrModuleInit) == 2) {
26,205✔
590
    return 0;
22,307✔
591
  }
592

593
  if (atomic_load_32(&tmrModuleInit) < 0) {
3,898!
594
    return -1;
×
595
  }
596

597
  while (true) {
598
    if (0 == atomic_val_compare_exchange_32(&tmrModuleInit, 0, 1)) {
7,796✔
599
      atomic_store_32(&tmrModuleInit, taosTmrModuleInit());
3,898✔
600
    } else if (atomic_load_32(&tmrModuleInit) < 0) {
3,898!
601
      return -1;
×
602
    } else if (atomic_load_32(&tmrModuleInit) == 2) {
3,898!
603
      return 0;
3,898✔
604
    } else {
605
      taosMsleep(1);
×
606
    }
607
  }
608

609
  return -1;
610
}
611

612
void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, const char* label) {
26,204✔
613
  const char* ret = taosMonotonicInit();
26,204✔
614
  tmrDebug("ttimer monotonic clock source:%s", ret);
26,205✔
615

616
  if (taosTmrInitModule() < 0) {
26,205!
617
    return NULL;
×
618
  }
619

620
  (void)taosThreadMutexLock(&tmrCtrlMutex);
26,205✔
621
  tmr_ctrl_t* ctrl = unusedTmrCtrl;
26,205✔
622
  if (ctrl != NULL) {
26,205!
623
    unusedTmrCtrl = ctrl->next;
26,205✔
624
    numOfTmrCtrl++;
26,205✔
625
  }
626
  (void)taosThreadMutexUnlock(&tmrCtrlMutex);
26,205✔
627

628
  if (ctrl == NULL) {
26,205!
629
    tmrError("%s too many timer controllers, failed to create timer controller.", label);
×
630
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
631
    return NULL;
×
632
  }
633

634
  tstrncpy(ctrl->label, label, sizeof(ctrl->label));
26,205✔
635

636
  tmrDebug("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
26,205✔
637
  return ctrl;
26,205✔
638
}
639

640
void taosTmrCleanUp(void* handle) {
24,183✔
641
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
24,183✔
642
  if (ctrl == NULL || ctrl->label[0] == 0) {
24,183!
643
    return;
×
644
  }
645

646
  tmrDebug("%s timer controller is cleaned up.", ctrl->label);
24,183✔
647
  ctrl->label[0] = 0;
24,183✔
648

649
  (void)taosThreadMutexLock(&tmrCtrlMutex);
24,183✔
650
  ctrl->next = unusedTmrCtrl;
24,183✔
651
  numOfTmrCtrl--;
24,183✔
652
  unusedTmrCtrl = ctrl;
24,183✔
653
  (void)taosThreadMutexUnlock(&tmrCtrlMutex);
24,183✔
654

655
  tmrDebug("time controller's tmr ctrl size:  %d", numOfTmrCtrl);
24,183✔
656
  if (numOfTmrCtrl <= 0) {
24,183✔
657
    taosUninitTimer();
1,881✔
658

659
    taosCleanUpScheduler(tmrQhandle);
1,881✔
660
    taosMemoryFreeClear(tmrQhandle);
1,881!
661

662
    for (int32_t i = 0; i < tListLen(wheels); i++) {
7,524✔
663
      time_wheel_t* wheel = wheels + i;
5,643✔
664
      (void)taosThreadMutexDestroy(&wheel->mutex);
5,643✔
665
      taosMemoryFree(wheel->slots);
5,643!
666
    }
667

668
    (void)taosThreadMutexDestroy(&tmrCtrlMutex);
1,881✔
669

670
    for (size_t i = 0; i < timerMap.size; i++) {
11,558,745✔
671
      timer_list_t* list = timerMap.slots + i;
11,556,864✔
672
      tmr_obj_t*    t = list->timers;
11,556,864✔
673
      while (t != NULL) {
11,557,484✔
674
        tmr_obj_t* next = t->mnext;
620✔
675
        taosMemoryFree(t);
620!
676
        t = next;
620✔
677
      }
678
    }
679
    taosMemoryFree(timerMap.slots);
1,881!
680
    taosMemoryFree(tmrCtrls);
1,881!
681

682
    tmrCtrls = NULL;
1,881✔
683
    unusedTmrCtrl = NULL;
1,881✔
684
    atomic_store_32(&tmrModuleInit, 0);
1,881✔
685
  }
686
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc