• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3561

19 Dec 2024 03:15AM UTC coverage: 58.812% (-1.3%) from 60.124%
#3561

push

travis-ci

web-flow
Merge pull request #29213 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

130770 of 287658 branches covered (45.46%)

Branch coverage included in aggregate %.

32 of 78 new or added lines in 4 files covered. (41.03%)

7347 existing lines in 166 files now uncovered.

205356 of 283866 relevant lines covered (72.34%)

7187865.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.42
/source/util/src/ttimer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "ttimer.h"
18
#include "taoserror.h"
19
#include "tdef.h"
20
#include "tlog.h"
21
#include "tsched.h"
22

23
#define tmrFatal(...)                                                     \
24
  {                                                                       \
25
    if (tmrDebugFlag & DEBUG_FATAL) {                                     \
26
      taosPrintLog("TMR FATAL ", DEBUG_FATAL, tmrDebugFlag, __VA_ARGS__); \
27
    }                                                                     \
28
  }
29
#define tmrError(...)                                                     \
30
  {                                                                       \
31
    if (tmrDebugFlag & DEBUG_ERROR) {                                     \
32
      taosPrintLog("TMR ERROR ", DEBUG_ERROR, tmrDebugFlag, __VA_ARGS__); \
33
    }                                                                     \
34
  }
35
#define tmrWarn(...)                                                    \
36
  {                                                                     \
37
    if (tmrDebugFlag & DEBUG_WARN) {                                    \
38
      taosPrintLog("TMR WARN ", DEBUG_WARN, tmrDebugFlag, __VA_ARGS__); \
39
    }                                                                   \
40
  }
41
#define tmrInfo(...)                                               \
42
  {                                                                \
43
    if (tmrDebugFlag & DEBUG_INFO) {                               \
44
      taosPrintLog("TMR ", DEBUG_INFO, tmrDebugFlag, __VA_ARGS__); \
45
    }                                                              \
46
  }
47
#define tmrDebug(...)                                               \
48
  {                                                                 \
49
    if (tmrDebugFlag & DEBUG_DEBUG) {                               \
50
      taosPrintLog("TMR ", DEBUG_DEBUG, tmrDebugFlag, __VA_ARGS__); \
51
    }                                                               \
52
  }
53
#define tmrTrace(...)                                               \
54
  {                                                                 \
55
    if (tmrDebugFlag & DEBUG_TRACE) {                               \
56
      taosPrintLog("TMR ", DEBUG_TRACE, tmrDebugFlag, __VA_ARGS__); \
57
    }                                                               \
58
  }
59

60
#define TIMER_STATE_WAITING  0
61
#define TIMER_STATE_EXPIRED  1
62
#define TIMER_STATE_STOPPED  2
63
#define TIMER_STATE_CANCELED 3
64

65
typedef union _tmr_ctrl_t {
66
  char label[16];
67
  struct {
68
    // pad to ensure 'next' is the end of this union
69
    char               padding[16 - sizeof(union _tmr_ctrl_t*)];
70
    union _tmr_ctrl_t* next;
71
  };
72
} tmr_ctrl_t;
73

74
typedef struct tmr_obj_t {
75
  uintptr_t         id;
76
  tmr_ctrl_t*       ctrl;
77
  struct tmr_obj_t* mnext;
78
  struct tmr_obj_t* prev;
79
  struct tmr_obj_t* next;
80
  uint16_t          slot;
81
  uint8_t           wheel;
82
  uint8_t           state;
83
  uint8_t           refCount;
84
  uint8_t           reserved1;
85
  uint16_t          reserved2;
86
  union {
87
    int64_t expireAt;
88
    int64_t executedBy;
89
  };
90
  TAOS_TMR_CALLBACK fp;
91
  void*             param;
92
} tmr_obj_t;
93

94
typedef struct timer_list_t {
95
  int64_t    lockedBy;
96
  tmr_obj_t* timers;
97
} timer_list_t;
98

99
typedef struct timer_map_t {
100
  uint32_t      size;
101
  uint32_t      count;
102
  timer_list_t* slots;
103
} timer_map_t;
104

105
typedef struct time_wheel_t {
106
  TdThreadMutex mutex;
107
  int64_t       nextScanAt;
108
  uint32_t      resolution;
109
  uint16_t      size;
110
  uint16_t      index;
111
  tmr_obj_t**   slots;
112
} time_wheel_t;
113

114
static int32_t tsMaxTmrCtrl = TSDB_MAX_VNODES_PER_DB + 100;
115

116
static int32_t       tmrModuleInit = 0;
117
static TdThreadMutex tmrCtrlMutex;
118
static tmr_ctrl_t*   tmrCtrls;
119
static tmr_ctrl_t*   unusedTmrCtrl = NULL;
120
static void*         tmrQhandle;
121
static int32_t       numOfTmrCtrl = 0;
122

123
int32_t          taosTmrThreads = 1;
124
static uintptr_t nextTimerId = 0;
125

126
static time_wheel_t wheels[] = {
127
    {.resolution = MSECONDS_PER_TICK, .size = 4096},
128
    {.resolution = 1000, .size = 1024},
129
    {.resolution = 60000, .size = 1024},
130
};
131
static timer_map_t timerMap;
132

133
static uintptr_t getNextTimerId() {
1,690,173✔
134
  uintptr_t id;
135
  do {
136
    id = (uintptr_t)atomic_add_fetch_ptr((void**)&nextTimerId, 1);
1,690,173✔
137
  } while (id == 0);
1,690,180✔
138
  return id;
1,690,177✔
139
}
140

141
static void timerAddRef(tmr_obj_t* timer) { (void)atomic_add_fetch_8(&timer->refCount, 1); }
4,993,065✔
142

143
static void timerDecRef(tmr_obj_t* timer) {
4,982,930✔
144
  if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {
4,982,930✔
145
    taosMemoryFree(timer);
1,681,374!
146
  }
147
}
4,982,949✔
148

149
static void lockTimerList(timer_list_t* list) {
5,066,547✔
150
  int64_t tid = taosGetSelfPthreadId();
5,066,547✔
151
  int32_t i = 0;
5,066,550✔
152
  while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
5,066,550!
UNCOV
153
    if (++i % 1000 == 0) {
×
154
      (void)sched_yield();
×
155
    }
156
  }
157
}
5,066,583✔
158

159
static void unlockTimerList(timer_list_t* list) {
5,066,572✔
160
  int64_t tid = taosGetSelfPthreadId();
5,066,572✔
161
  if (atomic_val_compare_exchange_64(&(list->lockedBy), tid, 0) != tid) {
5,066,570!
162
    uError("%" PRId64 " trying to unlock a timer list not locked by current thread.", tid);
×
163
  }
164
}
5,066,574✔
165

166
static void addTimer(tmr_obj_t* timer) {
1,690,175✔
167
  timerAddRef(timer);
1,690,175✔
168
  timer->wheel = tListLen(wheels);
1,690,182✔
169

170
  uint32_t      idx = (uint32_t)(timer->id % timerMap.size);
1,690,182✔
171
  timer_list_t* list = timerMap.slots + idx;
1,690,182✔
172

173
  lockTimerList(list);
1,690,182✔
174
  timer->mnext = list->timers;
1,690,181✔
175
  list->timers = timer;
1,690,181✔
176
  unlockTimerList(list);
1,690,181✔
177
}
1,690,183✔
178

179
static tmr_obj_t* findTimer(uintptr_t id) {
2,240,691✔
180
  tmr_obj_t* timer = NULL;
2,240,691✔
181
  if (id > 0) {
2,240,691✔
182
    uint32_t      idx = (uint32_t)(id % timerMap.size);
1,687,528✔
183
    timer_list_t* list = timerMap.slots + idx;
1,687,528✔
184
    lockTimerList(list);
1,687,528✔
185
    for (timer = list->timers; timer != NULL; timer = timer->mnext) {
1,687,545✔
186
      if (timer->id == id) {
1,612,724!
187
        timerAddRef(timer);
1,612,724✔
188
        break;
1,612,723✔
189
      }
190
    }
191
    unlockTimerList(list);
1,687,544✔
192
  }
193
  return timer;
2,240,712✔
194
}
195

196
static void removeTimer(uintptr_t id) {
1,688,858✔
197
  tmr_obj_t*    prev = NULL;
1,688,858✔
198
  uint32_t      idx = (uint32_t)(id % timerMap.size);
1,688,858✔
199
  timer_list_t* list = timerMap.slots + idx;
1,688,858✔
200
  lockTimerList(list);
1,688,858✔
201
  for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
1,688,859!
202
    if (p->id == id) {
1,688,859✔
203
      if (prev == NULL) {
1,688,858✔
204
        list->timers = p->mnext;
1,688,857✔
205
      } else {
206
        prev->mnext = p->mnext;
1✔
207
      }
208
      timerDecRef(p);
1,688,858✔
209
      break;
1,688,858✔
210
    }
211
    prev = p;
1✔
212
  }
213
  unlockTimerList(list);
1,688,858✔
214
}
1,688,857✔
215

216
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
1,690,174✔
217
  timerAddRef(timer);
1,690,174✔
218
  // select a wheel for the timer, we are not an accurate timer,
219
  // but the inaccuracy should not be too large.
220
  timer->wheel = tListLen(wheels) - 1;
1,690,182✔
221
  for (uint8_t i = 0; i < tListLen(wheels); i++) {
1,700,114✔
222
    time_wheel_t* wheel = wheels + i;
1,700,112✔
223
    if (delay < wheel->resolution * wheel->size) {
1,700,112✔
224
      timer->wheel = i;
1,690,180✔
225
      break;
1,690,180✔
226
    }
227
  }
228

229
  time_wheel_t* wheel = wheels + timer->wheel;
1,690,182✔
230
  timer->prev = NULL;
1,690,182✔
231
  timer->expireAt = taosGetMonotonicMs() + delay;
1,690,182✔
232

233
  (void)taosThreadMutexLock(&wheel->mutex);
1,690,180✔
234

235
  uint32_t idx = 0;
1,690,183✔
236
  if (timer->expireAt > wheel->nextScanAt) {
1,690,183✔
237
    // adjust delay according to next scan time of this wheel
238
    // so that the timer is not fired earlier than desired.
239
    delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
1,688,258✔
240
    idx = (delay + wheel->resolution - 1) / wheel->resolution;
1,688,258✔
241
  }
242

243
  timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
1,690,183✔
244
  tmr_obj_t* p = wheel->slots[timer->slot];
1,690,183✔
245
  wheel->slots[timer->slot] = timer;
1,690,183✔
246
  timer->next = p;
1,690,183✔
247
  if (p != NULL) {
1,690,183✔
248
    p->prev = timer;
304,069✔
249
  }
250

251
  (void)taosThreadMutexUnlock(&wheel->mutex);
1,690,183✔
252
}
1,690,183✔
253

254
static bool removeFromWheel(tmr_obj_t* timer) {
533,806✔
255
  uint8_t wheelIdx = timer->wheel;
533,806✔
256
  if (wheelIdx >= tListLen(wheels)) {
533,806✔
257
    return false;
3✔
258
  }
259
  time_wheel_t* wheel = wheels + wheelIdx;
533,803✔
260

261
  bool removed = false;
533,803✔
262
  (void)taosThreadMutexLock(&wheel->mutex);
533,803✔
263
  // other thread may modify timer->wheel, check again.
264
  if (timer->wheel < tListLen(wheels)) {
533,815!
265
    if (timer->prev != NULL) {
533,815✔
266
      timer->prev->next = timer->next;
8,673✔
267
    }
268
    if (timer->next != NULL) {
533,815✔
269
      timer->next->prev = timer->prev;
8,656✔
270
    }
271
    if (timer == wheel->slots[timer->slot]) {
533,815✔
272
      wheel->slots[timer->slot] = timer->next;
525,142✔
273
    }
274
    timer->wheel = tListLen(wheels);
533,815✔
275
    timer->next = NULL;
533,815✔
276
    timer->prev = NULL;
533,815✔
277
    timerDecRef(timer);
533,815✔
278
    removed = true;
533,815✔
279
  }
280
  (void)taosThreadMutexUnlock(&wheel->mutex);
533,815✔
281

282
  return removed;
533,815✔
283
}
284

285
static void processExpiredTimer(void* handle, void* arg) {
1,155,043✔
286
  tmr_obj_t* timer = (tmr_obj_t*)handle;
1,155,043✔
287
  timer->executedBy = taosGetSelfPthreadId();
1,155,043✔
288
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
1,155,043✔
289
  if (state == TIMER_STATE_WAITING) {
1,155,043✔
290
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution start.";
1,155,040✔
291
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
1,155,040!
292

293
    (*timer->fp)(timer->param, (tmr_h)timer->id);
1,155,040✔
294
    atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
1,155,040✔
295

296
    fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] execution end.";
1,155,040✔
297
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
1,155,040!
298
  }
299
  removeTimer(timer->id);
1,155,043✔
300
  timerDecRef(timer);
1,155,043✔
301
}
1,155,043✔
302

303
static void addToExpired(tmr_obj_t* head) {
81,604,254✔
304
  const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
81,604,254✔
305

306
  while (head != NULL) {
82,759,297✔
307
    uintptr_t  id = head->id;
1,155,043✔
308
    tmr_obj_t* next = head->next;
1,155,043✔
309
    tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param);
1,155,043!
310

311
    SSchedMsg schedMsg;
312
    schedMsg.fp = NULL;
1,155,043✔
313
    schedMsg.tfp = processExpiredTimer;
1,155,043✔
314
    schedMsg.msg = NULL;
1,155,043✔
315
    schedMsg.ahandle = head;
1,155,043✔
316
    schedMsg.thandle = NULL;
1,155,043✔
317
    if (taosScheduleTask(tmrQhandle, &schedMsg) != 0) {
1,155,043!
318
      tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to queue.", head->ctrl->label, id);
×
319
    }
320

321
    tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id);
1,155,043!
322
    head = next;
1,155,043✔
323
  }
324
}
81,604,254✔
325

326
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, tmr_ctrl_t* ctrl) {
1,690,176✔
327
  uintptr_t id = getNextTimerId();
1,690,176✔
328
  timer->id = id;
1,690,176✔
329
  timer->state = TIMER_STATE_WAITING;
1,690,176✔
330
  timer->fp = fp;
1,690,176✔
331
  timer->param = param;
1,690,176✔
332
  timer->ctrl = ctrl;
1,690,176✔
333
  addTimer(timer);
1,690,176✔
334

335
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
1,690,183✔
336
  tmrDebug(fmt, ctrl->label, timer->id, timer->fp, timer->param);
1,690,183!
337

338
  if (mseconds == 0) {
1,690,176!
339
    timer->wheel = tListLen(wheels);
×
340
    timerAddRef(timer);
×
341
    addToExpired(timer);
×
342
  } else {
343
    addToWheel(timer, mseconds);
1,690,176✔
344
  }
345

346
  // note: use `timer->id` here is unsafe as `timer` may already be freed
347
  return id;
1,690,183✔
348
}
349

350
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle) {
1,682,687✔
351
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
1,682,687✔
352
  if (ctrl == NULL || ctrl->label[0] == 0) {
1,682,687!
UNCOV
353
    return NULL;
×
354
  }
355

356
  tmr_obj_t* timer = (tmr_obj_t*)taosMemoryCalloc(1, sizeof(tmr_obj_t));
1,682,690!
357
  if (timer == NULL) {
1,682,693!
358
    tmrError("%s failed to allocated memory for new timer object.", ctrl->label);
×
359
    return NULL;
×
360
  }
361

362
  return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
1,682,693✔
363
}
364

365
static void taosTimerLoopFunc(int32_t signo) {
27,201,420✔
366
  int64_t now = taosGetMonotonicMs();
27,201,420✔
367

368
  for (int32_t i = 0; i < tListLen(wheels); i++) {
108,805,674✔
369
    // `expried` is a temporary expire list.
370
    // expired timers are first add to this list, then move
371
    // to expired queue as a batch to improve performance.
372
    // note this list is used as a stack in this function.
373
    tmr_obj_t* expired = NULL;
81,604,256✔
374

375
    time_wheel_t* wheel = wheels + i;
81,604,256✔
376
    while (now >= wheel->nextScanAt) {
108,942,154✔
377
      (void)taosThreadMutexLock(&wheel->mutex);
27,337,900✔
378
      wheel->index = (wheel->index + 1) % wheel->size;
27,337,900✔
379
      tmr_obj_t* timer = wheel->slots[wheel->index];
27,337,900✔
380
      while (timer != NULL) {
28,492,943✔
381
        tmr_obj_t* next = timer->next;
1,155,043✔
382
        if (now < timer->expireAt) {
1,155,043!
383
          timer = next;
×
384
          continue;
×
385
        }
386

387
        // remove from the wheel
388
        if (timer->prev == NULL) {
1,155,043!
389
          wheel->slots[wheel->index] = next;
1,155,043✔
390
          if (next != NULL) {
1,155,043✔
391
            next->prev = NULL;
287,979✔
392
          }
393
        } else {
394
          timer->prev->next = next;
×
395
          if (next != NULL) {
×
396
            next->prev = timer->prev;
×
397
          }
398
        }
399
        timer->wheel = tListLen(wheels);
1,155,043✔
400

401
        // add to temporary expire list
402
        timer->next = expired;
1,155,043✔
403
        timer->prev = NULL;
1,155,043✔
404
        if (expired != NULL) {
1,155,043✔
405
          expired->prev = timer;
287,987✔
406
        }
407
        expired = timer;
1,155,043✔
408

409
        timer = next;
1,155,043✔
410
      }
411
      wheel->nextScanAt += wheel->resolution;
27,337,900✔
412
      (void)taosThreadMutexUnlock(&wheel->mutex);
27,337,900✔
413
    }
414

415
    addToExpired(expired);
81,604,254✔
416
  }
417
}
27,201,418✔
418

419
static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
1,612,720✔
420
  if (state == TIMER_STATE_WAITING) {
1,612,720✔
421
    bool reusable = false;
533,813✔
422
    if (removeFromWheel(timer)) {
533,813✔
423
      removeTimer(timer->id);
533,815✔
424
      // only safe to reuse the timer when timer is removed from the wheel.
425
      // we cannot guarantee the thread safety of the timr in all other cases.
426
      reusable = true;
533,814✔
427
    }
428
    const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is cancelled.";
533,817✔
429
    tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
533,817!
430
    return reusable;
533,817✔
431
  }
432

433
  if (state != TIMER_STATE_EXPIRED) {
1,078,907✔
434
    // timer already stopped or cancelled, has nothing to do in this case
435
    return false;
1✔
436
  }
437

438
  if (timer->executedBy == taosGetSelfPthreadId()) {
1,078,906✔
439
    // taosTmrReset is called in the timer callback, should do nothing in this
440
    // case to avoid dead lock. note taosTmrReset must be the last statement
441
    // of the callback funtion, will be a bug otherwise.
442
    return false;
1,078,898✔
443
  }
444

445
  // timer callback is executing in another thread, we SHOULD wait it to stop,
446
  // BUT this may result in dead lock if current thread are holding a lock which
447
  // the timer callback need to acquire. so, we HAVE TO return directly.
448
  const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] is executing and cannot be stopped.";
9✔
449
  tmrDebug(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
9!
450
  return false;
9✔
451
}
452

453
bool taosTmrStop(tmr_h timerId) {
606,503✔
454
  uintptr_t id = (uintptr_t)timerId;
606,503✔
455

456
  tmr_obj_t* timer = findTimer(id);
606,503✔
457
  if (timer == NULL) {
606,514✔
458
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
80,179!
459
    return false;
80,172✔
460
  }
461

462
  uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
526,335✔
463
  (void)doStopTimer(timer, state);
526,333✔
464
  timerDecRef(timer);
526,338✔
465

466
  return state == TIMER_STATE_WAITING;
526,339✔
467
}
468

469
bool taosTmrStopA(tmr_h* timerId) {
6,715✔
470
  bool ret = taosTmrStop(*timerId);
6,715✔
471
  *timerId = NULL;
6,715✔
472
  return ret;
6,715✔
473
}
474

475
bool taosTmrIsStopped(tmr_h* timerId) {
112✔
476
  uintptr_t id = (uintptr_t)*timerId;
112✔
477

478
  tmr_obj_t* timer = findTimer(id);
112✔
479
  if (timer == NULL) {
112!
480
    tmrDebug("timer[id=%" PRIuPTR "] does not exist", id);
112!
481
    return true;
112✔
482
  }
483

484
  uint8_t state = atomic_load_8(&timer->state);
×
485

486
  return (state == TIMER_STATE_CANCELED) || (state == TIMER_STATE_STOPPED);
×
487
}
488

489
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) {
1,634,078✔
490
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
1,634,078✔
491
  if (ctrl == NULL || ctrl->label[0] == 0) {
1,634,078!
492
    return false;
×
493
  }
494

495
  uintptr_t  id = (uintptr_t)*pTmrId;
1,634,080✔
496
  bool       stopped = false;
1,634,080✔
497
  tmr_obj_t* timer = findTimer(id);
1,634,080✔
498
  if (timer == NULL) {
1,634,085✔
499
    tmrDebug("%s timer[id=%" PRIuPTR "] does not exist", ctrl->label, id);
547,698!
500
  } else {
501
    uint8_t state = atomic_val_compare_exchange_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
1,086,387✔
502
    if (!doStopTimer(timer, state)) {
1,086,387✔
503
      timerDecRef(timer);
1,078,903✔
504
      timer = NULL;
1,078,903✔
505
    }
506
    stopped = state == TIMER_STATE_WAITING;
1,086,387✔
507
  }
508

509
  if (timer == NULL) {
1,634,082✔
510
    *pTmrId = taosTmrStart(fp, mseconds, param, handle);
1,626,598✔
511
    if (NULL == *pTmrId) {
1,626,601!
512
      stopped = true;
×
513
    }
514
    return stopped;
1,626,601✔
515
  }
516

517
  tmrDebug("%s timer[id=%" PRIuPTR "] is reused", ctrl->label, timer->id);
7,484!
518

519
  // wait until there's no other reference to this timer,
520
  // so that we can reuse this timer safely.
521
  for (int32_t i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
7,484!
522
    if (i % 1000 == 0) {
×
523
      (void)sched_yield();
×
524
    }
525
  }
526

527
  if (timer->refCount != 1) {
7,484!
528
    uError("timer refCount=%d not expected 1", timer->refCount);
×
529
  }
530
  memset(timer, 0, sizeof(*timer));
7,484✔
531
  *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
7,484✔
532

533
  return stopped;
7,484✔
534
}
535

536
static int32_t taosTmrModuleInit(void) {
3,624✔
537
  tmrCtrls = taosMemoryMalloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl);
3,624!
538
  if (tmrCtrls == NULL) {
3,624!
539
    tmrError("failed to allocate memory for timer controllers.");
×
540
    return terrno;
×
541
  }
542

543
  memset(&timerMap, 0, sizeof(timerMap));
3,624✔
544

545
  for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) {
4,073,376✔
546
    tmr_ctrl_t* ctrl = tmrCtrls + i;
4,069,752✔
547
    ctrl->next = ctrl + 1;
4,069,752✔
548
  }
549
  (tmrCtrls + tsMaxTmrCtrl - 1)->next = NULL;
3,624✔
550
  unusedTmrCtrl = tmrCtrls;
3,624✔
551

552
  (void)taosThreadMutexInit(&tmrCtrlMutex, NULL);
3,624✔
553

554
  int64_t now = taosGetMonotonicMs();
3,624✔
555
  for (int32_t i = 0; i < tListLen(wheels); i++) {
14,496✔
556
    time_wheel_t* wheel = wheels + i;
10,872✔
557
    if (taosThreadMutexInit(&wheel->mutex, NULL) != 0) {
10,872!
558
      tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
×
559
      return terrno;
×
560
    }
561
    wheel->nextScanAt = now + wheel->resolution;
10,872✔
562
    wheel->index = 0;
10,872✔
563
    wheel->slots = (tmr_obj_t**)taosMemoryCalloc(wheel->size, sizeof(tmr_obj_t*));
10,872!
564
    if (wheel->slots == NULL) {
10,872!
565
      tmrError("failed to allocate wheel slots");
×
566
      return terrno;
×
567
    }
568
    timerMap.size += wheel->size;
10,872✔
569
  }
570

571
  timerMap.count = 0;
3,624✔
572
  timerMap.slots = (timer_list_t*)taosMemoryCalloc(timerMap.size, sizeof(timer_list_t));
3,624!
573
  if (timerMap.slots == NULL) {
3,624!
574
    tmrError("failed to allocate hash map");
×
575
    return terrno;
×
576
  }
577

578
  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
3,624✔
579
  if (taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK) != 0) {
3,624!
580
    tmrError("failed to initialize timer");
×
581
  }
582

583
  tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
3,624!
584

585
  return 2;
3,624✔
586
}
587

588
static int32_t taosTmrInitModule(void) {
24,694✔
589
  if (atomic_load_32(&tmrModuleInit) == 2) {
24,694✔
590
    return 0;
21,069✔
591
  }
592

593
  if (atomic_load_32(&tmrModuleInit) < 0) {
3,623!
594
    return -1;
×
595
  }
596

597
  while (true) {
598
    if (0 == atomic_val_compare_exchange_32(&tmrModuleInit, 0, 1)) {
7,248✔
599
      atomic_store_32(&tmrModuleInit, taosTmrModuleInit());
3,624✔
600
    } else if (atomic_load_32(&tmrModuleInit) < 0) {
3,624!
601
      return -1;
×
602
    } else if (atomic_load_32(&tmrModuleInit) == 2) {
3,624!
603
      return 0;
3,624✔
604
    } else {
605
      taosMsleep(1);
×
606
    }
607
  }
608

609
  return -1;
610
}
611

612
void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, const char* label) {
24,693✔
613
  const char* ret = taosMonotonicInit();
24,693✔
614
  tmrDebug("ttimer monotonic clock source:%s", ret);
24,693!
615

616
  if (taosTmrInitModule() < 0) {
24,693!
617
    return NULL;
×
618
  }
619

620
  (void)taosThreadMutexLock(&tmrCtrlMutex);
24,692✔
621
  tmr_ctrl_t* ctrl = unusedTmrCtrl;
24,694✔
622
  if (ctrl != NULL) {
24,694!
623
    unusedTmrCtrl = ctrl->next;
24,694✔
624
    numOfTmrCtrl++;
24,694✔
625
  }
626
  (void)taosThreadMutexUnlock(&tmrCtrlMutex);
24,694✔
627

628
  if (ctrl == NULL) {
24,694!
629
    tmrError("%s too many timer controllers, failed to create timer controller.", label);
×
630
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
631
    return NULL;
×
632
  }
633

634
  tstrncpy(ctrl->label, label, sizeof(ctrl->label));
24,694✔
635

636
  tmrDebug("%s timer controller is initialized, number of timer controllers: %d.", label, numOfTmrCtrl);
24,694!
637
  return ctrl;
24,694✔
638
}
639

640
void taosTmrCleanUp(void* handle) {
22,716✔
641
  tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
22,716✔
642
  if (ctrl == NULL || ctrl->label[0] == 0) {
22,716!
643
    return;
×
644
  }
645

646
  tmrDebug("%s timer controller is cleaned up.", ctrl->label);
22,716!
647
  ctrl->label[0] = 0;
22,716✔
648

649
  (void)taosThreadMutexLock(&tmrCtrlMutex);
22,716✔
650
  ctrl->next = unusedTmrCtrl;
22,716✔
651
  numOfTmrCtrl--;
22,716✔
652
  unusedTmrCtrl = ctrl;
22,716✔
653
  (void)taosThreadMutexUnlock(&tmrCtrlMutex);
22,716✔
654

655
  tmrDebug("time controller's tmr ctrl size:  %d", numOfTmrCtrl);
22,716!
656
  if (numOfTmrCtrl <= 0) {
22,716✔
657
    taosUninitTimer();
1,647✔
658

659
    taosCleanUpScheduler(tmrQhandle);
1,647✔
660
    taosMemoryFreeClear(tmrQhandle);
1,647!
661

662
    for (int32_t i = 0; i < tListLen(wheels); i++) {
6,588✔
663
      time_wheel_t* wheel = wheels + i;
4,941✔
664
      (void)taosThreadMutexDestroy(&wheel->mutex);
4,941✔
665
      taosMemoryFree(wheel->slots);
4,941!
666
    }
667

668
    (void)taosThreadMutexDestroy(&tmrCtrlMutex);
1,647✔
669

670
    for (size_t i = 0; i < timerMap.size; i++) {
10,120,815✔
671
      timer_list_t* list = timerMap.slots + i;
10,119,168✔
672
      tmr_obj_t*    t = list->timers;
10,119,168✔
673
      while (t != NULL) {
10,119,709✔
674
        tmr_obj_t* next = t->mnext;
541✔
675
        taosMemoryFree(t);
541!
676
        t = next;
541✔
677
      }
678
    }
679
    taosMemoryFree(timerMap.slots);
1,647!
680
    taosMemoryFree(tmrCtrls);
1,647!
681

682
    tmrCtrls = NULL;
1,647✔
683
    unusedTmrCtrl = NULL;
1,647✔
684
    atomic_store_32(&tmrModuleInit, 0);
1,647✔
685
  }
686
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc