• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.5
/source/os/src/osSemaphore.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define ALLOW_FORBID_FUNC
17
#define _DEFAULT_SOURCE
18
#include "os.h"
19
#include "pthread.h"
20
#include "tdef.h"
21

22
#ifdef WINDOWS
23

24
/*
25
 * windows implementation
26
 */
27

28
#include <windows.h>
29

30
bool taosCheckPthreadValid(TdThread thread) { return thread.p != NULL; }
31

32
void taosResetPthread(TdThread* thread) {
33
  if (thread != NULL) {
34
    thread->p = NULL;
35
  }
36
}
37

38
int64_t taosGetPthreadId(TdThread thread) {
39
#ifdef PTW32_VERSION
40
  return pthread_getw32threadid_np(thread);
41
#else
42
  return (int64_t)thread;
43
#endif
44
}
45

46
int64_t taosGetSelfPthreadId() { return GetCurrentThreadId(); }
47

48
bool taosComparePthread(TdThread first, TdThread second) { return first.p == second.p; }
49

50
int32_t taosGetPId() { return GetCurrentProcessId(); }
51

52
int32_t taosGetAppName(char* name, int32_t* len) {
53
  OS_PARAM_CHECK(name);
54
  char filepath[1024] = {0};
55

56
  if (GetModuleFileName(NULL, filepath, MAX_PATH) == 0) {
57
    terrno = TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
58
    return terrno;
59
  }
60
  char* sub = strrchr(filepath, '.');
61
  if (sub != NULL) {
62
    *sub = '\0';
63
  }
64
  char* end = strrchr(filepath, TD_DIRSEP[0]);
65
  if (end == NULL) {
66
    end = filepath;
67
  } else {
68
    end += 1;
69
  }
70

71
  tstrncpy(name, end, TSDB_APP_NAME_LEN);
72

73
  if (len != NULL) {
74
    *len = (int32_t)strlen(end);
75
  }
76

77
  return 0;
78
}
79

80
int32_t taosGetPIdByName(const char* name, int32_t* pPId) { return -1;}
81

82
int32_t tsem_wait(tsem_t* sem) {
83
  OS_PARAM_CHECK(sem);
84
  OS_PARAM_CHECK(*sem);
85
  DWORD ret = WaitForSingleObject(*sem, INFINITE);
86
  if (ret == WAIT_OBJECT_0) {
87
    return 0;
88
  } else {
89
    return TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
90
  }
91
}
92

93
int32_t tsem_timewait(tsem_t* sem, int64_t timeout_ms) {
94
  OS_PARAM_CHECK(sem);
95
  OS_PARAM_CHECK(*sem);
96
  DWORD result = WaitForSingleObject(*sem, timeout_ms);
97
  if (result == WAIT_OBJECT_0) {
98
    return 0;  // Semaphore acquired
99
  } else if (result == WAIT_TIMEOUT) {
100
    return TSDB_CODE_TIMEOUT_ERROR;  // Timeout reached
101
  } else {
102
    return TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
103
  }
104
}
105

106
// Inter-process sharing is not currently supported. The pshared parameter is invalid.
107
int32_t tsem_init(tsem_t* sem, int pshared, unsigned int value) {
108
  OS_PARAM_CHECK(sem);
109
  *sem = CreateSemaphore(NULL, value, LONG_MAX, NULL);
110
  return (*sem != NULL) ? 0 : TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
111
}
112

113
int32_t tsem_post(tsem_t* sem) {
114
  OS_PARAM_CHECK(sem);
115
  OS_PARAM_CHECK(*sem);
116
  if (ReleaseSemaphore(*sem, 1, NULL)) return 0;
117
  return TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
118
}
119

120
int32_t tsem_destroy(tsem_t* sem) {
121
  OS_PARAM_CHECK(sem);
122
  OS_PARAM_CHECK(*sem);
123
  if (CloseHandle(*sem)) return 0;
124
  return TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
125
}
126

127
#elif defined(_TD_DARWIN_64)
128

129
#include <libproc.h>
130

131
int32_t tsem_init(tsem_t *psem, int flags, unsigned int count) {
132
  OS_PARAM_CHECK(psem);
133
  *psem = dispatch_semaphore_create(count);
134
  if (*psem == NULL) return TAOS_SYSTEM_ERROR(ERRNO);
135
  return 0;
136
}
137

138
int32_t tsem_destroy(tsem_t *psem) {
139
  // if (psem == NULL || *psem == NULL) return -1;
140
  // dispatch_release(*psem);
141
  // *psem = NULL;
142
  return 0;
143
}
144

145
int32_t tsem_post(tsem_t *psem) {
146
  if (psem == NULL || *psem == NULL) return TSDB_CODE_INVALID_PARA;
147
  (void)dispatch_semaphore_signal(*psem);
148
  return 0;
149
}
150

151
int32_t tsem_wait(tsem_t *psem) {
152
  if (psem == NULL || *psem == NULL) return TSDB_CODE_INVALID_PARA;
153
  dispatch_semaphore_wait(*psem, DISPATCH_TIME_FOREVER);
154
  return 0;
155
}
156

157
int32_t tsem_timewait(tsem_t *psem, int64_t milis) {
158
  if (psem == NULL || *psem == NULL) return TSDB_CODE_INVALID_PARA;
159
  dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(milis * USEC_PER_SEC));
160
  if (dispatch_semaphore_wait(*psem, time) == 0) {
161
    return 0;
162
  } else {
163
    return TSDB_CODE_TIMEOUT_ERROR;
164
  }
165
}
166

167
bool taosCheckPthreadValid(TdThread thread) { return thread != 0; }
168

169
int64_t taosGetSelfPthreadId() {
170
  TdThread thread = taosThreadSelf();
171
  return (int64_t)thread;
172
}
173

174
int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; }
175

176
void taosResetPthread(TdThread *thread) {
177
  if (thread) {
178
    *thread = NULL;
179
  }
180
}
181

182
bool taosComparePthread(TdThread first, TdThread second) { return taosThreadEqual(first, second) ? true : false; }
183

184
int32_t taosGetPId() { return (int32_t)getpid(); }
185

186
int32_t taosGetAppName(char *name, int32_t *len) {
187
  OS_PARAM_CHECK(name);
188
  char buf[PATH_MAX + 1];
189
  buf[0] = '\0';
190
  proc_name(getpid(), buf, sizeof(buf) - 1);
191
  buf[PATH_MAX] = '\0';
192
  size_t n = strlen(buf);
193
  if (len) *len = n;
194
  if (name) tstrncpy(name, buf, TSDB_APP_NAME_LEN);
195
  return 0;
196
}
197

198
int32_t taosGetPIdByName(const char* name, int32_t* pPId) {return -1;}
199

200
#else
201

202
/*
203
 * linux implementation
204
 */
205
#ifndef TD_ASTRA
206
#include <sys/syscall.h>
207
#endif
208
#include <unistd.h>
209

210
bool taosCheckPthreadValid(TdThread thread) { return thread != 0; }
2,422,234✔
211

212
int64_t taosGetSelfPthreadId() {
2,147,483,647✔
213
  static __thread int64_t id = 0;
214
  if (id != 0) return id;
2,147,483,647✔
215
#ifndef TD_ASTRA
216
  id = syscall(SYS_gettid);
338,680✔
217
#else
218
  id = (int64_t) taosThreadSelf();
219
#endif
220
  return id;
2,677,165✔
221
}
222

223
int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; }
1,883✔
224
void    taosResetPthread(TdThread* thread) {
2✔
225
  if (thread) {
2✔
226
    *thread = 0;
1✔
227
  }
228
}
2✔
229
bool    taosComparePthread(TdThread first, TdThread second) { return first == second; }
1✔
230

231
int32_t taosGetPId() {
47,397,966✔
232
  static int32_t pid;
233
  if (pid != 0) return pid;
47,397,966!
234
#ifndef TD_ASTRA
UNCOV
235
  pid = getpid();
×
236
#else
237
  pid = (int32_t)taosThreadSelf(); // TD_ASTRA_TODO
238
#endif
239
  return pid;
21,352✔
240
}
241

242
int32_t taosGetAppName(char* name, int32_t* len) {
92,068✔
243
#ifndef TD_ASTRA
244
  OS_PARAM_CHECK(name);
92,068✔
245
  const char* self = "/proc/self/exe";
92,067✔
246
  char        path[PATH_MAX] = {0};
92,067✔
247

248
  if (-1 == readlink(self, path, PATH_MAX)) {
92,067!
249
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
250
    return terrno;
×
251
  }
252

253
  path[PATH_MAX - 1] = 0;
92,067✔
254
  char* end = strrchr(path, '/');
92,067✔
255
  if (end == NULL) {
92,067!
256
    end = path;
×
257
  } else {
258
    ++end;
92,067✔
259
  }
260

261
  tstrncpy(name, end, TSDB_APP_NAME_LEN);
92,067✔
262
#else
263
  tstrncpy(name, "tdastra", TSDB_APP_NAME_LEN); // TD_ASTRA_TODO
264
#endif
265
  if (len != NULL) {
92,067!
266
    *len = strlen(name);
×
267
  }
268

269
  return 0;
92,067✔
270
}
271

272
int32_t taosGetPIdByName(const char* name, int32_t* pPId) {
85✔
273
#ifndef TD_ASTRA
274
  OS_PARAM_CHECK(name);
85✔
275
  OS_PARAM_CHECK(pPId);
84✔
276
  DIR*           dir = NULL;
83✔
277
  struct dirent* ptr = NULL;
83✔
278
  FILE*          fp = NULL;
83✔
279
  char           filepath[512];
280
  char           bufx[50];
281
  char           buf[1024] = {0};
83✔
282

283
  *pPId = -1;
83✔
284
  dir = opendir("/proc");
83✔
285
  if (dir == NULL) {
83!
286
    return TAOS_SYSTEM_ERROR(ERRNO);
×
287
  }
288

289
  while ((ptr = readdir(dir)) != NULL) {
11,285✔
290
    if ((strcmp(ptr->d_name, ".") == 0) || (strcmp(ptr->d_name, "..") == 0)) {
11,202✔
291
      continue;
166✔
292
    }
293

294
    if (DT_DIR != ptr->d_type) {
11,036✔
295
      continue;
4,150✔
296
    }
297

298
    int32_t ret = tsnprintf(filepath, tListLen(filepath), "/proc/%s/status", ptr->d_name);
6,886✔
299
    if (ret == -1) {
6,886!
300
      continue;
×
301
    }
302

303
    fp = fopen(filepath, "r");
6,886✔
304
    if (NULL != fp) {
6,886✔
305
      if (fgets(buf, tListLen(buf) - 1, fp) == NULL) {
5,994!
306
        TAOS_UNUSED(fclose(fp));
×
307
        continue;
×
308
      }
309

310
      ret = sscanf(buf, "%*s %s", bufx);
5,994✔
311
      if (!strcmp(bufx, name)) {
5,994✔
312
        char* end = NULL;
44✔
313
        *pPId = taosStr2Int32(ptr->d_name, &end, 10);
44✔
314
      }
315
      TAOS_UNUSED(fclose(fp));
5,994✔
316
    }
317
  }
318

319
  TAOS_UNUSED(closedir(dir));
83✔
320

321
  if ((*pPId) == -1) {
83✔
322
    return TAOS_SYSTEM_ERROR(ESRCH);
39✔
323
  } else {
324
    return TSDB_CODE_SUCCESS;
44✔
325
  }
326
#else
327
  return TSDB_CODE_APP_ERROR;
328
#endif
329
}
330

331
int32_t tsem_init(tsem_t* psem, int flags, unsigned int count) {
54,990,424✔
332
  if (sem_init(psem, flags, count) == 0) {
54,990,424!
333
    return 0;
55,060,190✔
334
  } else {
335
    return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
336
  }
337
}
338

339
int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
101,006✔
340
  OS_PARAM_CHECK(sem);
101,006✔
341
  int ret = 0;
101,005✔
342

343
  struct timespec ts = {0};
101,005✔
344

345
  if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
101,005!
346
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
347
    return terrno;
×
348
  }
349

350
  ts.tv_nsec += ms * 1000000;
101,005✔
351
  ts.tv_sec += ts.tv_nsec / 1000000000;
101,005✔
352
  ts.tv_nsec %= 1000000000;
101,005✔
353

354
  while ((ret = sem_timedwait(sem, &ts)) == -1) {
101,005✔
355
    if (ERRNO == EINTR) {
2!
356
      continue;
×
357
    } else if (ERRNO == ETIMEDOUT) {
2!
358
      return TSDB_CODE_TIMEOUT_ERROR;
2✔
359
    } else {
360
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
361
      return terrno;
×
362
    }
363
  }
364

365
  return 0;
101,003✔
366
}
367

368
int32_t tsem_wait(tsem_t* sem) {
111,817,658✔
369
  OS_PARAM_CHECK(sem);
111,817,658✔
370
  int ret = 0;
111,817,657✔
371
  do {
372
    ret = sem_wait(sem);
111,788,817✔
373
  } while (-1 == ret && ERRNO == EINTR);
111,777,611!
374

375
  if (-1 == ret) {
111,806,451!
376
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
377
    return terrno;
×
378
  }
379

380
  return ret;
111,806,451✔
381
}
382

383
int tsem2_init(tsem2_t* sem, int pshared, unsigned int value) {
265,901✔
384
  OS_PARAM_CHECK(sem);
265,901✔
385
  int ret = taosThreadMutexInit(&sem->mutex, NULL);
265,900✔
386
  if (ret != 0) return ret;
265,886!
387

388
  ret = taosThreadCondAttrInit(&sem->attr);
265,886✔
389
  if (ret != 0) {
265,887!
390
    (void)taosThreadMutexDestroy(&sem->mutex);
×
391
    return ret;
×
392
  }
393

394
  ret = taosThreadCondAttrSetclock(&sem->attr, CLOCK_MONOTONIC);
265,887✔
395
  if (ret != 0) {
265,875!
396
    (void)taosThreadMutexDestroy(&sem->mutex);
×
397
    (void)taosThreadCondAttrDestroy(&sem->attr);
×
398
    return ret;
×
399
  }
400

401
  ret = taosThreadCondInit(&sem->cond, &sem->attr);
265,875✔
402
  if (ret != 0) {
265,882!
403
    (void)taosThreadMutexDestroy(&sem->mutex);
×
404
    (void)taosThreadCondAttrDestroy(&sem->attr);
×
405
    return ret;
×
406
  }
407

408
  sem->count = value;
265,884✔
409

410
  return 0;
265,884✔
411
}
412

413
int32_t tsem_post(tsem_t* psem) {
111,836,082✔
414
  OS_PARAM_CHECK(psem);
111,836,082✔
415
  if (sem_post(psem) == 0) {
111,836,081!
416
    return 0;
111,890,358✔
417
  } else {
418
    return TAOS_SYSTEM_ERROR(ERRNO);
×
419
  }
420
}
421

422
int32_t tsem_destroy(tsem_t* sem) {
55,563,807✔
423
  OS_PARAM_CHECK(sem);
55,563,807✔
424
  if (sem_destroy(sem) == 0) {
55,563,806!
425
    return 0;
55,603,404✔
426
  } else {
427
    return TAOS_SYSTEM_ERROR(ERRNO);
×
428
  }
429
}
430

431
int tsem2_post(tsem2_t* sem) {
416,674✔
432
  OS_PARAM_CHECK(sem);
416,674✔
433
  int32_t code = taosThreadMutexLock(&sem->mutex);
416,673✔
434
  if (code) {
417,056!
435
    return code;
×
436
  }
437

438
  sem->count++;
417,056✔
439
  code = taosThreadCondSignal(&sem->cond);
417,056✔
440
  if (code) {
417,056!
441
    return code;
×
442
  }
443

444
  code = taosThreadMutexUnlock(&sem->mutex);
417,056✔
445
  if (code) {
416,687!
446
    return code;
×
447
  }
448

449
  return 0;
416,687✔
450
}
451

452
int tsem2_destroy(tsem2_t* sem) {
265,890✔
453
  OS_PARAM_CHECK(sem);
265,890✔
454
  (void)taosThreadMutexDestroy(&sem->mutex);
265,889✔
455
  (void)taosThreadCondDestroy(&sem->cond);
265,890✔
456
  (void)taosThreadCondAttrDestroy(&sem->attr);
265,898✔
457

458
  return 0;
265,887✔
459
}
460

461
int32_t tsem2_wait(tsem2_t* sem) {
205,769✔
462
  OS_PARAM_CHECK(sem);
205,769✔
463
  int32_t code = taosThreadMutexLock(&sem->mutex);
205,768✔
464
  if (code) {
205,768!
465
    return code;
×
466
  }
467

468
  while (sem->count <= 0) {
214,249✔
469
    int ret = taosThreadCondWait(&sem->cond, &sem->mutex);
8,481✔
470
    if (0 == ret) {
8,481!
471
      continue;
8,481✔
472
    } else {
473
      (void)taosThreadMutexUnlock(&sem->mutex);
×
474
      return ret;
×
475
    }
476
  }
477
  sem->count--;
205,768✔
478

479
  code = taosThreadMutexUnlock(&sem->mutex);
205,768✔
480
  if (code) {
205,768!
481
    return code;
×
482
  }
483

484
  return 0;
205,768✔
485
}
486

487
int32_t tsem2_timewait(tsem2_t* sem, int64_t ms) {
1,233,861✔
488
  OS_PARAM_CHECK(sem);
1,233,861✔
489
  int32_t code = 0;
1,233,860✔
490

491
  code = taosThreadMutexLock(&sem->mutex);
1,233,860✔
492
  if (code) {
1,233,861!
493
    return code;
×
494
  }
495

496
  if (sem->count <= 0) {
1,233,861✔
497
    struct timespec ts = {0};
1,126,429✔
498
    if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) {
1,126,429✔
499
      code = TAOS_SYSTEM_ERROR(ERRNO);
1✔
500
      (void)taosThreadMutexUnlock(&sem->mutex);
1✔
501
      terrno = code;
×
502
      return code;
924,029✔
503
    }
504

505
    ts.tv_sec += ms / 1000;
1,126,427✔
506
    ts.tv_nsec += (ms % 1000) * 1000000;
1,126,427✔
507
    ts.tv_sec += ts.tv_nsec / 1000000000;
1,126,427✔
508
    ts.tv_nsec %= 1000000000;
1,126,427✔
509

510
    while (sem->count <= 0) {
1,328,819✔
511
      code = taosThreadCondTimedWait(&sem->cond, &sem->mutex, &ts);
1,126,427✔
512
      if (code != 0) {
1,126,417✔
513
        (void)taosThreadMutexUnlock(&sem->mutex);
924,025✔
514
        return code;
924,029✔
515
      }
516
    }
517
  }
518

519
  sem->count--;
309,824✔
520
  
521
  code = taosThreadMutexUnlock(&sem->mutex);
309,824✔
522
  return code;
309,827✔
523
}
524

525
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc