• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.77
/source/os/src/osThread.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define ALLOW_FORBID_FUNC
17
#include <pthread.h>
18
#include "os.h"
19

20
int32_t taosThreadCreate(TdThread *tid, const TdThreadAttr *attr, void *(*start)(void *), void *arg) {
2,239,979✔
21
  OS_PARAM_CHECK(tid);
2,239,979✔
22
  OS_PARAM_CHECK(start);
2,239,978✔
23
#ifdef TD_ASTRA
24
  int32_t code = 0;
25
  if (!attr) {
26
    pthread_attr_t threadAttr;
27
    pthread_attr_init(&threadAttr);
28
    pthread_attr_setstacksize(&threadAttr, STACK_SIZE_DEFAULT);
29
    code = pthread_create(tid, &threadAttr, start, arg);
30
    pthread_attr_destroy(&threadAttr);
31
  } else {
32
    int32_t stackSize = 0;
33
    pthread_attr_getstacksize(attr, &stackSize);
34
    if (stackSize == 0) {
35
      pthread_attr_setstacksize(attr, STACK_SIZE_DEFAULT);
36
    }
37
    code = pthread_create(tid, attr, start, arg);
38
  }
39
#else
40
  int32_t code = pthread_create(tid, attr, start, arg);
2,239,977✔
41
#endif
42
  if (code) {
2,239,982!
43
    taosThreadClear(tid);
×
44
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
45
  }
46
  return code;
2,239,982✔
47
}
48

49
int32_t taosThreadAttrDestroy(TdThreadAttr *attr) {
1,824,024✔
50
  OS_PARAM_CHECK(attr);
1,824,024✔
51
  int32_t code = pthread_attr_destroy(attr);
1,824,023✔
52
  if (code) {
1,824,026!
53
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
54
  }
55
  return code;
1,824,026✔
56
}
57

58
int32_t taosThreadAttrGetDetachState(const TdThreadAttr *attr, int32_t *detachstate) {
3✔
59
  OS_PARAM_CHECK(attr);
3✔
60
  OS_PARAM_CHECK(detachstate);
2✔
61
  int32_t code = pthread_attr_getdetachstate(attr, detachstate);
1✔
62
  if (code) {
1!
63
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
64
  }
65
  return code;
1✔
66
}
67

68
int32_t taosThreadAttrGetInheritSched(const TdThreadAttr *attr, int32_t *inheritsched) {
3✔
69
  OS_PARAM_CHECK(attr);
3✔
70
  OS_PARAM_CHECK(inheritsched);
2✔
71
  int32_t code = pthread_attr_getinheritsched(attr, inheritsched);
1✔
72
  if (code) {
1!
73
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
74
  }
75
  return code;
1✔
76
}
77

78
int32_t taosThreadAttrGetSchedParam(const TdThreadAttr *attr, struct sched_param *param) {
3✔
79
  OS_PARAM_CHECK(attr);
3✔
80
  OS_PARAM_CHECK(param);
2✔
81
  int32_t code = pthread_attr_getschedparam(attr, param);
1✔
82
  if (code) {
1!
83
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
84
  }
85
  return code;
1✔
86
}
87

88
int32_t taosThreadAttrGetSchedPolicy(const TdThreadAttr *attr, int32_t *policy) {
3✔
89
  OS_PARAM_CHECK(attr);
3✔
90
  OS_PARAM_CHECK(policy);
2✔
91
  int32_t code = pthread_attr_getschedpolicy(attr, policy);
1✔
92
  if (code) {
1!
93
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
94
  }
95
  return code;
1✔
96
}
97

98
int32_t taosThreadAttrGetScope(const TdThreadAttr *attr, int32_t *contentionscope) {
3✔
99
  OS_PARAM_CHECK(attr);
3✔
100
  OS_PARAM_CHECK(contentionscope);
2✔
101
  int32_t code = pthread_attr_getscope(attr, contentionscope);
1✔
102
  if (code) {
1!
103
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
104
  }
105
  return code;
1✔
106
}
107

108
int32_t taosThreadAttrGetStackSize(const TdThreadAttr *attr, size_t *stacksize) {
3✔
109
  OS_PARAM_CHECK(attr);
3✔
110
  OS_PARAM_CHECK(stacksize);
2✔
111
  int32_t code = pthread_attr_getstacksize(attr, stacksize);
1✔
112
  if (code) {
1!
113
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
114
  }
115
  return code;
1✔
116
}
117

118
int32_t taosThreadAttrInit(TdThreadAttr *attr) {
1,827,579✔
119
  OS_PARAM_CHECK(attr);
1,827,579✔
120
  int32_t code = pthread_attr_init(attr);
1,827,578✔
121
  if (code) {
1,827,578!
UNCOV
122
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
123
  }
124
  return code;
1,827,579✔
125
}
126

127
int32_t taosThreadAttrSetDetachState(TdThreadAttr *attr, int32_t detachstate) {
1,789,519✔
128
  OS_PARAM_CHECK(attr);
1,789,519✔
129
  int32_t code = pthread_attr_setdetachstate(attr, detachstate);
1,789,518✔
130
  if (code) {
1,789,519!
UNCOV
131
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
132
  }
133
  return code;
1,789,519✔
134
}
135

136
int32_t taosThreadAttrSetInheritSched(TdThreadAttr *attr, int32_t inheritsched) {
3✔
137
  OS_PARAM_CHECK(attr);
3✔
138
  int32_t code = pthread_attr_setinheritsched(attr, inheritsched);
2✔
139
  if (code) {
2✔
140
    return (terrno = TAOS_SYSTEM_ERROR(code));
1✔
141
  }
142
  return code;
1✔
143
}
144

145
int32_t taosThreadAttrSetSchedParam(TdThreadAttr *attr, const struct sched_param *param) {
3✔
146
  OS_PARAM_CHECK(attr);
3✔
147
  int32_t code = pthread_attr_setschedparam(attr, param);
2✔
148
  if (code) {
2✔
149
    return (terrno = TAOS_SYSTEM_ERROR(code));
1✔
150
  }
151
  return code;
1✔
152
}
153

154
int32_t taosThreadAttrSetSchedPolicy(TdThreadAttr *attr, int32_t policy) {
3✔
155
  OS_PARAM_CHECK(attr);
3✔
156
  int32_t code = pthread_attr_setschedpolicy(attr, policy);
2✔
157
  if (code) {
2✔
158
    return (terrno = TAOS_SYSTEM_ERROR(code));
1✔
159
  }
160
  return code;
1✔
161
}
162

163
int32_t taosThreadAttrSetScope(TdThreadAttr *attr, int32_t contentionscope) {
3✔
164
  OS_PARAM_CHECK(attr);
3✔
165
  int32_t code = pthread_attr_setscope(attr, contentionscope);
2✔
166
  if (code) {
2✔
167
    return (terrno = TAOS_SYSTEM_ERROR(code));
1✔
168
  }
169
  return code;
1✔
170
}
171

172
int32_t taosThreadAttrSetStackSize(TdThreadAttr *attr, size_t stacksize) {
3✔
173
  OS_PARAM_CHECK(attr);
3✔
174
  int32_t code = pthread_attr_setstacksize(attr, stacksize);
2✔
175
  if (code) {
2✔
176
    return (terrno = TAOS_SYSTEM_ERROR(code));
1✔
177
  }
178
  return code;
1✔
179
}
180

181
int32_t taosThreadCancel(TdThread thread) {
2✔
182
  int32_t code = pthread_cancel(thread);
2✔
183
  if (code) {
2!
184
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
185
  }
186
  return code;
2✔
187
}
188

189
int32_t taosThreadCondDestroy(TdThreadCond *cond) {
24,785,137✔
190
  OS_PARAM_CHECK(cond);
24,785,137✔
191
#ifdef __USE_WIN_THREAD
192
  return 0;
193
#else
194
  int32_t code = pthread_cond_destroy(cond);
24,785,136✔
195
  if (code) {
24,790,741!
196
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
197
  }
198
  return code;
24,791,538✔
199
#endif
200
}
201

202
int32_t taosThreadCondInit(TdThreadCond *cond, const TdThreadCondAttr *attr) {
24,710,400✔
203
  OS_PARAM_CHECK(cond);
24,710,400✔
204
#ifdef __USE_WIN_THREAD
205
  InitializeConditionVariable(cond);
206
  return 0;
207
#else
208
  int32_t code = pthread_cond_init(cond, attr);
24,710,399✔
209
  if (code) {
24,722,088!
210
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
211
  }
212
  return code;
24,723,983✔
213
#endif
214
}
215

216
int32_t taosThreadCondSignal(TdThreadCond *cond) {
6,498,869✔
217
  OS_PARAM_CHECK(cond);
6,498,869✔
218
#ifdef __USE_WIN_THREAD
219
  WakeConditionVariable(cond);
220
  return 0;
221
#else
222
  int32_t code = pthread_cond_signal(cond);
6,498,868✔
223
  if (code) {
6,498,945!
224
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
225
  }
226
  return code;
6,498,981✔
227
#endif
228
}
229

230
int32_t taosThreadCondBroadcast(TdThreadCond *cond) {
68,979✔
231
  OS_PARAM_CHECK(cond);
68,979✔
232
#ifdef __USE_WIN_THREAD
233
  WakeAllConditionVariable(cond);
234
  return 0;
235
#else
236
  int32_t code = pthread_cond_broadcast(cond);
68,978✔
237
  if (code) {
68,978!
238
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
239
  }
240
  return code;
68,978✔
241
#endif
242
}
243

244
int32_t taosThreadCondWait(TdThreadCond *cond, TdThreadMutex *mutex) {
2,304,741✔
245
  OS_PARAM_CHECK(cond);
2,304,741✔
246
  OS_PARAM_CHECK(mutex);
2,304,740✔
247
#ifdef __USE_WIN_THREAD
248
  if (!SleepConditionVariableCS(cond, mutex, INFINITE)) {
249
    return EINVAL;
250
  }
251
  return 0;
252
#else
253
  int32_t code = pthread_cond_wait(cond, mutex);
2,304,739✔
254
  if (code) {
2,304,737!
255
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
256
  }
257
  return code;
2,304,737✔
258
#endif
259
}
260

261
int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const struct timespec *abstime) {
3,107,442✔
262
  if (!abstime) return 0;
3,107,442✔
263
  OS_PARAM_CHECK(cond);
3,107,441✔
264
  OS_PARAM_CHECK(mutex);
3,107,440✔
265
#ifdef __USE_WIN_THREAD
266
  if (SleepConditionVariableCS(cond, mutex, (DWORD)(abstime->tv_sec * 1e3 + abstime->tv_nsec / 1e6))) return 0;
267
  DWORD error = GetLastError();
268
  if (error == ERROR_TIMEOUT) {
269
    return TSDB_CODE_TIMEOUT_ERROR;
270
  }
271
  return TAOS_SYSTEM_WINAPI_ERROR(error);
272
#else
273
  int32_t code = pthread_cond_timedwait(cond, mutex, abstime);
3,107,439✔
274
  if (code == ETIMEDOUT) {
3,107,438✔
275
    return TSDB_CODE_TIMEOUT_ERROR;
716,367✔
276
  } else if (code) {
2,391,071!
277
    return TAOS_SYSTEM_ERROR(code);
×
278
  } else {
279
    return 0;
2,391,071✔
280
  }
281
#endif
282
}
283

284
int32_t taosThreadCondAttrDestroy(TdThreadCondAttr *attr) {
236,690✔
285
#ifdef __USE_WIN_THREAD
286
  return 0;
287
#else
288
  OS_PARAM_CHECK(attr);
236,690✔
289
  int32_t code = pthread_condattr_destroy(attr);
236,689✔
290
  if (code) {
236,690!
UNCOV
291
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
292
  }
293
  return code;
236,690✔
294
#endif
295
}
296

297
int32_t taosThreadCondAttrGetPshared(const TdThreadCondAttr *attr, int32_t *pshared) {
3✔
298
  OS_PARAM_CHECK(attr);
3✔
299
  OS_PARAM_CHECK(pshared);
2✔
300
#ifdef __USE_WIN_THREAD
301
  if (pshared) *pshared = PTHREAD_PROCESS_PRIVATE;
302
  return 0;
303
#else
304
  OS_PARAM_CHECK(attr);
1!
305
  int32_t code = pthread_condattr_getpshared(attr, pshared);
1✔
306
  if (code) {
1!
307
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
308
  }
309
  return code;
1✔
310
#endif
311
}
312

313
int32_t taosThreadCondAttrInit(TdThreadCondAttr *attr) {
236,688✔
314
#ifdef __USE_WIN_THREAD
315
  return 0;
316
#else
317
  OS_PARAM_CHECK(attr);
236,688✔
318
  int32_t code = pthread_condattr_init(attr);
236,687✔
319
  if (code) {
236,685!
320
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
321
  }
322
  return code;
236,687✔
323
#endif
324
}
325

326
int32_t taosThreadCondAttrSetclock(TdThreadCondAttr *attr, int clockId) {
236,683✔
327
#ifdef __USE_WIN_THREAD
328
  return 0;
329
#elif defined(__APPLE__)
330
  return 0;
331
#else
332
  OS_PARAM_CHECK(attr);
236,683✔
333
  int32_t code = pthread_condattr_setclock(attr, clockId);
236,682✔
334
  if (code) {
236,676!
335
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
336
  }
337
  return code;
236,679✔
338
#endif
339
}
340

341
int32_t taosThreadCondAttrSetPshared(TdThreadCondAttr *attr, int32_t pshared) {
3✔
342
  OS_PARAM_CHECK(attr);
3✔
343
#ifdef __USE_WIN_THREAD
344
  return 0;
345
#else
346
  int32_t code = pthread_condattr_setpshared(attr, pshared);
2✔
347
  if (code) {
2✔
348
    return (terrno = TAOS_SYSTEM_ERROR(code));
1✔
349
  }
350
  return code;
1✔
351
#endif
352
}
353

354
int32_t taosThreadDetach(TdThread thread) {
2✔
355
  int32_t code = pthread_detach(thread);
2✔
356
  if (code) {
2!
357
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
358
  }
359
  return code;
2✔
360
}
361

362
int32_t taosThreadEqual(TdThread t1, TdThread t2) { return pthread_equal(t1, t2); }
1✔
363

364
void taosThreadExit(void *valuePtr) {
2✔
365
  if (valuePtr) return pthread_exit(valuePtr);
2✔
366
}
367

368
int32_t taosThreadGetSchedParam(TdThread thread, int32_t *policy, struct sched_param *param) {
6✔
369
  OS_PARAM_CHECK(policy);
6✔
370
  OS_PARAM_CHECK(param);
4✔
371
  int32_t code = pthread_getschedparam(thread, policy, param);
2✔
372
  if (code) {
2✔
373
    return (terrno = TAOS_SYSTEM_ERROR(code));
1✔
374
  }
375
  return code;
1✔
376
}
377

378
void *taosThreadGetSpecific(TdThreadKey key) { return pthread_getspecific(key); }
2✔
379

380
int32_t taosThreadJoin(TdThread thread, void **valuePtr) {
2,173,435✔
381
  int32_t code = pthread_join(thread, valuePtr);
2,173,435✔
382
  if (code) {
2,173,435!
383
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
384
  }
385
  return code;
2,173,435✔
386
}
387

388
int32_t taosThreadKeyCreate(TdThreadKey *key, void (*destructor)(void *)) {
34✔
389
  OS_PARAM_CHECK(key);
34!
390
  int32_t code = pthread_key_create(key, destructor);
34✔
391
  if (code) {
34!
392
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
393
  }
394
  return code;
34✔
395
}
396

397
int32_t taosThreadKeyDelete(TdThreadKey key) {
2✔
398
  int32_t code = pthread_key_delete(key);
2✔
399
  if (code) {
2!
400
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
401
  }
402
  return code;
2✔
403
}
404

405
int32_t taosThreadKill(TdThread thread, int32_t sig) {
13,550✔
406
  int32_t code = pthread_kill(thread, sig);
13,550✔
407
  if (code) {
13,550!
408
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
409
  }
410
  return code;
13,550✔
411
}
412

413
// int32_t taosThreadMutexConsistent(TdThreadMutex* mutex) {
414
//   return pthread_mutex_consistent(mutex);
415
// }
416

417
int32_t taosThreadMutexDestroy(TdThreadMutex *mutex) {
35,492,173✔
418
  OS_PARAM_CHECK(mutex);
35,492,173✔
419
#ifdef __USE_WIN_THREAD
420
  DeleteCriticalSection(mutex);
421
  return 0;
422
#else
423
  int32_t code = pthread_mutex_destroy(mutex);
35,492,172✔
424
  if (code) {
35,495,648!
425
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
426
  }
427
  return code;
35,497,276✔
428
#endif
429
}
430

431
int32_t taosThreadMutexInit(TdThreadMutex *mutex, const TdThreadMutexAttr *attr) {
36,185,000✔
432
  OS_PARAM_CHECK(mutex);
36,185,000✔
433
#ifdef __USE_WIN_THREAD
434
  /**
435
   * Windows Server 2003 and Windows XP:  In low memory situations, InitializeCriticalSection can raise a
436
   * STATUS_NO_MEMORY exception. Starting with Windows Vista, this exception was eliminated and
437
   * InitializeCriticalSection always succeeds, even in low memory situations.
438
   */
439
  InitializeCriticalSection(mutex);
440
  return 0;
441
#else
442
  int32_t code = pthread_mutex_init(mutex, attr);
36,184,999✔
443
  if (code) {
36,203,181!
444
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
445
  }
446
  return code;
36,211,026✔
447
#endif
448
}
449

450
int32_t taosThreadMutexLock(TdThreadMutex *mutex) {
1,149,094,261✔
451
  OS_PARAM_CHECK(mutex);
1,149,094,261✔
452
#ifdef __USE_WIN_THREAD
453
  EnterCriticalSection(mutex);
454
  return 0;
455
#else
456
  int32_t code = pthread_mutex_lock(mutex);
1,149,094,260✔
457
  if (code) {
1,152,131,523!
458
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
459
  }
460
  return code;
1,152,202,913✔
461
#endif
462
}
463

464
// int32_t taosThreadMutexTimedLock(TdThreadMutex * mutex, const struct timespec *abstime) {
465
//   return pthread_mutex_timedlock(mutex, abstime);
466
// }
467

468
int32_t taosThreadMutexTryLock(TdThreadMutex *mutex) {
3✔
469
  OS_PARAM_CHECK(mutex);
3✔
470
#ifdef __USE_WIN_THREAD
471
  if (TryEnterCriticalSection(mutex)) return 0;
472
  return EBUSY;
473
#else
474
  int32_t code = pthread_mutex_trylock(mutex);
2✔
475
  if (code && code != EBUSY) {
2!
476
    code = TAOS_SYSTEM_ERROR(code);
×
477
  }
478
  return code;
2✔
479
#endif
480
}
481

482
int32_t taosThreadMutexUnlock(TdThreadMutex *mutex) {
1,150,058,585✔
483
  OS_PARAM_CHECK(mutex);
1,150,058,585✔
484
#ifdef __USE_WIN_THREAD
485
  LeaveCriticalSection(mutex);
486
  return 0;
487
#else
488
  int32_t code = pthread_mutex_unlock(mutex);
1,150,058,584✔
489
  if (code) {
1,152,599,525!
490
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
491
  }
492
  return code;
1,152,656,265✔
493
#endif
494
}
495

496
int32_t taosThreadMutexAttrDestroy(TdThreadMutexAttr *attr) {
43,998✔
497
#ifdef __USE_WIN_THREAD
498
  return 0;
499
#else
500
  OS_PARAM_CHECK(attr);
43,998✔
501
  int32_t code = pthread_mutexattr_destroy(attr);
43,997✔
502
  if (code) {
44,018!
503
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
504
  }
505
  return code;
44,022✔
506
#endif
507
}
508

509
int32_t taosThreadMutexAttrGetPshared(const TdThreadMutexAttr *attr, int32_t *pshared) {
3✔
510
  OS_PARAM_CHECK(pshared);
3✔
511
#ifdef __USE_WIN_THREAD
512
  if (pshared) *pshared = PTHREAD_PROCESS_PRIVATE;
513
  return 0;
514
#else
515
  OS_PARAM_CHECK(attr);
2✔
516
  int32_t code = pthread_mutexattr_getpshared(attr, pshared);
1✔
517
  if (code) {
1!
518
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
519
  }
520
  return code;
1✔
521
#endif
522
}
523

524
// int32_t taosThreadMutexAttrGetRobust(const TdThreadMutexAttr * attr, int32_t * robust) {
525
//   return pthread_mutexattr_getrobust(attr, robust);
526
// }
527

528
int32_t taosThreadMutexAttrGetType(const TdThreadMutexAttr *attr, int32_t *kind) {
3✔
529
  OS_PARAM_CHECK(kind);
3✔
530
#ifdef __USE_WIN_THREAD
531
  if (kind) *kind = PTHREAD_MUTEX_NORMAL;
532
  return 0;
533
#else
534
  OS_PARAM_CHECK(attr);
2✔
535
  int32_t code = pthread_mutexattr_gettype(attr, kind);
1✔
536
  if (code) {
1!
537
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
538
  }
539
  return code;
1✔
540
#endif
541
}
542

543
int32_t taosThreadMutexAttrInit(TdThreadMutexAttr *attr) {
44,032✔
544
#ifdef __USE_WIN_THREAD
545
  return 0;
546
#else
547
  OS_PARAM_CHECK(attr);
44,032✔
548
  int32_t code = pthread_mutexattr_init(attr);
44,031✔
549
  if (code) {
44,039✔
550
    return (terrno = TAOS_SYSTEM_ERROR(code));
6✔
551
  }
552
  return code;
44,033✔
553
#endif
554
}
555

556
int32_t taosThreadMutexAttrSetPshared(TdThreadMutexAttr *attr, int32_t pshared) {
3✔
557
#ifdef __USE_WIN_THREAD
558
  return 0;
559
#else
560
  OS_PARAM_CHECK(attr);
3✔
561
  int32_t code = pthread_mutexattr_setpshared(attr, pshared);
2✔
562
  if (code) {
2✔
563
    return (terrno = TAOS_SYSTEM_ERROR(code));
1✔
564
  }
565
  return code;
1✔
566
#endif
567
}
568

569
// int32_t taosThreadMutexAttrSetRobust(TdThreadMutexAttr * attr, int32_t robust) {
570
//   return pthread_mutexattr_setrobust(attr, robust);
571
// }
572

573
int32_t taosThreadMutexAttrSetType(TdThreadMutexAttr *attr, int32_t kind) {
44,021✔
574
#ifdef __USE_WIN_THREAD
575
  return 0;
576
#else
577
  OS_PARAM_CHECK(attr);
44,021✔
578
  int32_t code = pthread_mutexattr_settype(attr, kind);
44,020✔
579
  if (code) {
44,003!
580
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
581
  }
582
  return code;
44,008✔
583
#endif
584
}
585

586
int32_t taosThreadOnce(TdThreadOnce *onceControl, void (*initRoutine)(void)) {
59,075,799✔
587
  int32_t code = pthread_once(onceControl, initRoutine);
59,075,799✔
588
  if (code) {
59,077,049!
589
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
590
  }
591
  return code;
59,079,474✔
592
}
593

594
int32_t taosThreadRwlockDestroy(TdThreadRwlock *rwlock) {
28,861,386✔
595
#ifdef __USE_WIN_THREAD
596
  /* SRWLock does not need explicit destruction so long as there are no waiting threads
597
   * See: https://docs.microsoft.com/windows/win32/api/synchapi/nf-synchapi-initializesrwlock#remarks
598
   */
599
  return 0;
600
#else
601
  OS_PARAM_CHECK(rwlock);
28,861,386✔
602
  int32_t code = pthread_rwlock_destroy(rwlock);
28,861,384✔
603
  if (code) {
28,861,323!
604
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
605
  }
606
  return code;
28,861,367✔
607
#endif
608
}
609

610
int32_t taosThreadRwlockInit(TdThreadRwlock *rwlock, const TdThreadRwlockAttr *attr) {
28,877,187✔
611
  OS_PARAM_CHECK(rwlock);
28,877,187✔
612
#ifdef __USE_WIN_THREAD
613
  memset(rwlock, 0, sizeof(*rwlock));
614
  InitializeSRWLock(&rwlock->lock);
615
  return 0;
616
#else
617
  int32_t code = pthread_rwlock_init(rwlock, attr);
28,877,185✔
618
  if (code) {
28,872,585!
619
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
620
  }
621
  return code;
28,873,188✔
622
#endif
623
}
624

625
int32_t taosThreadRwlockRdlock(TdThreadRwlock *rwlock) {
536,772,336✔
626
  OS_PARAM_CHECK(rwlock);
536,772,336✔
627
#ifdef __USE_WIN_THREAD
628
  AcquireSRWLockShared(&rwlock->lock);
629
  return 0;
630
#else
631
  int32_t code = pthread_rwlock_rdlock(rwlock);
536,772,335✔
632
  if (code) {
537,249,359!
633
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
634
  }
635
  return code;
537,268,073✔
636
#endif
637
}
638

639
// int32_t taosThreadRwlockTimedRdlock(TdThreadRwlock * rwlock, const struct timespec *abstime) {
640
//   return pthread_rwlock_timedrdlock(rwlock, abstime);
641
// }
642

643
// int32_t taosThreadRwlockTimedWrlock(TdThreadRwlock * rwlock, const struct timespec *abstime) {
644
//   return pthread_rwlock_timedwrlock(rwlock, abstime);
645
// }
646

647
int32_t taosThreadRwlockTryRdlock(TdThreadRwlock *rwlock) {
2,486,031✔
648
  OS_PARAM_CHECK(rwlock);
2,486,031✔
649
#ifdef __USE_WIN_THREAD
650
  if (!TryAcquireSRWLockShared(&rwlock->lock)) return EBUSY;
651
  return 0;
652
#else
653
  int32_t code = pthread_rwlock_tryrdlock(rwlock);
2,486,030✔
654
  if (code) {
2,486,030✔
655
    return (terrno = TAOS_SYSTEM_ERROR(code));
725✔
656
  }
657
  return code;
2,485,305✔
658
#endif
659
}
660

661
int32_t taosThreadRwlockTryWrlock(TdThreadRwlock *rwlock) {
2✔
662
  OS_PARAM_CHECK(rwlock);
2✔
663
#ifdef __USE_WIN_THREAD
664
  if (!TryAcquireSRWLockExclusive(&rwlock->lock)) return EBUSY;
665
  atomic_store_8(&rwlock->excl, 1);
666
  return 0;
667
#else
668
  int32_t code = pthread_rwlock_trywrlock(rwlock);
1✔
669
  if (code) {
1!
670
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
671
  }
672
  return code;
1✔
673
#endif
674
}
675

676
int32_t taosThreadRwlockUnlock(TdThreadRwlock *rwlock) {
1,682,588,915✔
677
  OS_PARAM_CHECK(rwlock);
1,682,588,915✔
678
#ifdef __USE_WIN_THREAD
679
  if (1 == atomic_val_compare_exchange_8(&rwlock->excl, 1, 0)) {
680
    ReleaseSRWLockExclusive(&rwlock->lock);
681
  } else {
682
    ReleaseSRWLockShared(&rwlock->lock);
683
  }
684
  return 0;
685
#else
686
  int32_t code = pthread_rwlock_unlock(rwlock);
1,682,588,911✔
687
  if (code) {
1,682,915,965!
688
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
689
  }
690
  return code;
1,682,966,661✔
691
#endif
692
}
693

694
int32_t taosThreadRwlockWrlock(TdThreadRwlock *rwlock) {
1,142,850,603✔
695
  OS_PARAM_CHECK(rwlock);
1,142,850,603✔
696
#ifdef __USE_WIN_THREAD
697
  AcquireSRWLockExclusive(&rwlock->lock);
698
  atomic_store_8(&rwlock->excl, 1);
699
  return 0;
700
#else
701
  int32_t code = pthread_rwlock_wrlock(rwlock);
1,142,850,602✔
702
  if (code) {
1,143,339,622!
703
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
704
  }
705
  return code;
1,143,342,343✔
706
#endif
707
}
708

709
int32_t taosThreadRwlockAttrDestroy(TdThreadRwlockAttr *attr) {
37,524✔
710
#ifdef __USE_WIN_THREAD
711
  return 0;
712
#else
713
  OS_PARAM_CHECK(attr);
37,524✔
714
  int32_t code = pthread_rwlockattr_destroy(attr);
37,523✔
715
  if (code) {
37,518!
716
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
717
  }
718
  return code;
37,525✔
719
#endif
720
}
721

722
int32_t taosThreadRwlockAttrGetPshared(const TdThreadRwlockAttr *attr, int32_t *pshared) {
3✔
723
  OS_PARAM_CHECK(attr);
3✔
724
  OS_PARAM_CHECK(pshared);
2✔
725
#ifdef __USE_WIN_THREAD
726
  if (pshared) *pshared = PTHREAD_PROCESS_PRIVATE;
727
  return 0;
728
#else
729
  int32_t code = pthread_rwlockattr_getpshared(attr, pshared);
1✔
730
  if (code) {
1!
731
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
732
  }
733
  return code;
1✔
734
#endif
735
}
736

737
int32_t taosThreadRwlockAttrInit(TdThreadRwlockAttr *attr) {
37,529✔
738
#ifdef __USE_WIN_THREAD
739
  return 0;
740
#else
741
  OS_PARAM_CHECK(attr);
37,529✔
742
  int32_t code = pthread_rwlockattr_init(attr);
37,528✔
743
  if (code) {
37,525!
744
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
745
  }
746
  return code;
37,532✔
747
#endif
748
}
749

750
int32_t taosThreadRwlockAttrSetPshared(TdThreadRwlockAttr *attr, int32_t pshared) {
3✔
751
#ifdef __USE_WIN_THREAD
752
  return 0;
753
#else
754
  OS_PARAM_CHECK(attr);
3✔
755
  int32_t code = pthread_rwlockattr_setpshared(attr, pshared);
2✔
756
  if (code) {
2✔
757
    return (terrno = TAOS_SYSTEM_ERROR(code));
1✔
758
  }
759
  return code;
1✔
760
#endif
761
}
762

763
TdThread taosThreadSelf(void) { return pthread_self(); }
3✔
764

765
int32_t taosThreadSetCancelState(int32_t state, int32_t *oldstate) {
2✔
766
  int32_t code = pthread_setcancelstate(state, oldstate);
2✔
767
  if (code) {
2✔
768
    return (terrno = TAOS_SYSTEM_ERROR(code));
1✔
769
  }
770
  return code;
1✔
771
}
772

773
int32_t taosThreadSetCancelType(int32_t type, int32_t *oldtype) {
2✔
774
  int32_t code = pthread_setcanceltype(type, oldtype);
2✔
775
  if (code) {
2✔
776
    return (terrno = TAOS_SYSTEM_ERROR(code));
1✔
777
  }
778
  return code;
1✔
779
}
780

781
int32_t taosThreadSetSchedParam(TdThread thread, int32_t policy, const struct sched_param *param) {
2✔
782
  OS_PARAM_CHECK(param);
2!
783
  int32_t code = pthread_setschedparam(thread, policy, param);
2✔
784
  if (code) {
2✔
785
    return (terrno = TAOS_SYSTEM_ERROR(code));
1✔
786
  }
787
  return code;
1✔
788
}
789

790
int32_t taosThreadSetSpecific(TdThreadKey key, const void *value) {
149✔
791
  OS_PARAM_CHECK(value);
149✔
792
  int32_t code = pthread_setspecific(key, value);
147✔
793
  if (code) {
147!
794
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
795
  }
796
  return code;
147✔
797
}
798

799
int32_t taosThreadSpinDestroy(TdThreadSpinlock *lock) {
4,018,114✔
800
  OS_PARAM_CHECK(lock);
4,018,114✔
801
#ifdef TD_USE_SPINLOCK_AS_MUTEX
802
  return pthread_mutex_destroy((pthread_mutex_t *)lock);
803
#else
804
  int32_t code = pthread_spin_destroy((pthread_spinlock_t *)lock);
4,018,113✔
805
  if (code) {
4,018,089!
806
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
807
  }
808
  return code;
4,018,117✔
809
#endif
810
}
811

812
int32_t taosThreadSpinInit(TdThreadSpinlock *lock, int32_t pshared) {
4,001,108✔
813
  OS_PARAM_CHECK(lock);
4,001,108✔
814
#ifdef TD_USE_SPINLOCK_AS_MUTEX
815
  if (pshared != 0) return TSDB_CODE_INVALID_PARA;
816
  return pthread_mutex_init((pthread_mutex_t *)lock, NULL);
817
#else
818
  int32_t code = pthread_spin_init((pthread_spinlock_t *)lock, pshared);
4,001,107✔
819
  if (code) {
4,001,063!
820
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
821
  }
822
  return code;
4,001,572✔
823
#endif
824
}
825

826
int32_t taosThreadSpinLock(TdThreadSpinlock *lock) {
582,780✔
827
  OS_PARAM_CHECK(lock);
582,780✔
828
#ifdef TD_USE_SPINLOCK_AS_MUTEX
829
  return pthread_mutex_lock((pthread_mutex_t *)lock);
830
#else
831
  int32_t code = pthread_spin_lock((pthread_spinlock_t *)lock);
582,779✔
832
  if (code) {
582,916!
833
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
834
  }
835
  return code;
582,916✔
836
#endif
837
}
838

839
int32_t taosThreadSpinTrylock(TdThreadSpinlock *lock) {
859,267✔
840
  OS_PARAM_CHECK(lock);
859,267✔
841
#ifdef TD_USE_SPINLOCK_AS_MUTEX
842
  return pthread_mutex_trylock((pthread_mutex_t *)lock);
843
#else
844
  int32_t code = pthread_spin_trylock((pthread_spinlock_t *)lock);
859,266✔
845
  if (code && code != EBUSY) {
859,586!
846
    code = TAOS_SYSTEM_ERROR(code);
×
847
  }
848
  return code;
859,586✔
849
#endif
850
}
851

852
int32_t taosThreadSpinUnlock(TdThreadSpinlock *lock) {
1,442,297✔
853
  OS_PARAM_CHECK(lock);
1,442,297✔
854
#ifdef TD_USE_SPINLOCK_AS_MUTEX
855
  return pthread_mutex_unlock((pthread_mutex_t *)lock);
856
#else
857
  int32_t code = pthread_spin_unlock((pthread_spinlock_t *)lock);
1,442,296✔
858
  if (code) {
1,442,214!
859
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
860
  }
861
  return code;
1,442,271✔
862
#endif
863
}
864

865
void taosThreadTestCancel(void) { return pthread_testcancel(); }
1✔
866

867
void taosThreadClear(TdThread *thread) {
1,733,843✔
868
  if (!thread) return;
1,733,843✔
869
  (void)memset(thread, 0, sizeof(TdThread));
1,733,842✔
870
}
871

872
#ifdef WINDOWS
873
bool taosThreadIsMain() {
874
  DWORD curProcessId = GetCurrentProcessId();
875
  DWORD curThreadId = GetCurrentThreadId();
876
  DWORD dwThreadId = -1;
877

878
  HANDLE hThreadSnapshot = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0);
879
  if (hThreadSnapshot == INVALID_HANDLE_VALUE) {
880
    return false;
881
  }
882

883
  THREADENTRY32 te32;
884
  te32.dwSize = sizeof(THREADENTRY32);
885

886
  if (!Thread32First(hThreadSnapshot, &te32)) {
887
    CloseHandle(hThreadSnapshot);
888
    return false;
889
  }
890

891
  do {
892
    if (te32.th32OwnerProcessID == curProcessId) {
893
      dwThreadId = te32.th32ThreadID;
894
      break;
895
    }
896
  } while (Thread32Next(hThreadSnapshot, &te32));
897

898
  CloseHandle(hThreadSnapshot);
899

900
  return curThreadId == dwThreadId;
901
}
902
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc