• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.64
/source/dnode/vnode/src/sma/smaEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sma.h"
17

18
typedef struct SSmaStat SSmaStat;
19

20
#define SMA_MGMT_REF_NUM 10240
21

22
extern SSmaMgmt smaMgmt;
23

24
// declaration of static functions
25

26
static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv);
27
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv);
28
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
29
static int32_t tdRsmaStartExecutor(const SSma *pSma);
30
static int32_t tdRsmaStopExecutor(const SSma *pSma);
31
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
32
static void   *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
33
static void    tdDestroyRSmaStat(void *pRSmaStat);
34

35
/**
36
 * @brief rsma init
37
 *
38
 * @return int32_t
39
 */
40
// implementation
41
int32_t smaInit() {
17✔
42
  int32_t code = 0;
17✔
43
  int8_t  old;
44
  int32_t nLoops = 0;
17✔
45
  while (1) {
46
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2);
17✔
47
    if (old != 2) break;
17!
UNCOV
48
    if (++nLoops > 1000) {
×
UNCOV
49
      TAOS_UNUSED(sched_yield());
×
UNCOV
50
      nLoops = 0;
×
51
    }
52
  }
53

54
  if (old == 0) {
17✔
55
    // init tref rset
56
    smaMgmt.rsetId = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);
14✔
57

58
    if (smaMgmt.rsetId < 0) {
14!
59
      atomic_store_8(&smaMgmt.inited, 0);
×
60
      code = terrno;
×
61
      smaError("failed to init sma rset since %s", tstrerror(code));
×
62
      TAOS_RETURN(code);
×
63
    }
64

65
    int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT;
14✔
66
    smaMgmt.refHash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK);
14✔
67
    // init fetch timer handle
68
    smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
14✔
69

70
    if (!smaMgmt.refHash || !smaMgmt.tmrHandle) {
14!
71
      code = terrno;
×
72
      taosCloseRef(smaMgmt.rsetId);
×
73
      if (smaMgmt.refHash) {
×
74
        taosHashCleanup(smaMgmt.refHash);
×
75
        smaMgmt.refHash = NULL;
×
76
      }
77
      atomic_store_8(&smaMgmt.inited, 0);
×
78
      smaError("failed to init sma tmr handle since %s", tstrerror(code));
×
79
      TAOS_RETURN(code);
×
80
    }
81

82
    atomic_store_8(&smaMgmt.inited, 1);
14✔
83
    smaInfo("sma mgmt env is initialized, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
14!
84
  }
85

86
  TAOS_RETURN(code);
17✔
87
}
88

89
/**
90
 * @brief rsma cleanup
91
 *
92
 */
93
void smaCleanUp() {
2,406✔
94
  int8_t  old;
95
  int32_t nLoops = 0;
2,406✔
96
  while (1) {
97
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2);
2,406✔
98
    if (old != 2) break;
2,406!
99
    if (++nLoops > 1000) {
×
100
      TAOS_UNUSED(sched_yield());
×
101
      nLoops = 0;
×
102
    }
103
  }
104

105
  if (old == 1) {
2,406✔
106
    taosCloseRef(smaMgmt.rsetId);
14✔
107
    taosHashCleanup(smaMgmt.refHash);
14✔
108
    smaMgmt.refHash = NULL;
14✔
109
    taosTmrCleanUp(smaMgmt.tmrHandle);
14✔
110
    smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
14!
111
    atomic_store_8(&smaMgmt.inited, 0);
14✔
112
  }
113
}
2,406✔
114

115
static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
21✔
116
  int32_t  code = 0;
21✔
117
  SSmaEnv *pEnv = NULL;
21✔
118

119
  pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
21✔
120
  if (!pEnv) {
21!
121
    return terrno;
×
122
  }
123
  *ppEnv = pEnv;
21✔
124

125
  SMA_ENV_TYPE(pEnv) = smaType;
21✔
126

127
  taosInitRWLatch(&(pEnv->lock));
21✔
128

129
  (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), *ppEnv)
4✔
130
                                        : atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv);
21✔
131

132
  if ((code = tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma)) != TSDB_CODE_SUCCESS) {
21!
133
    TAOS_UNUSED(tdFreeSmaEnv(pEnv));
×
134
    *ppEnv = NULL;
×
135
    (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL)
×
136
                                          : atomic_store_ptr(&SMA_RSMA_ENV(pSma), NULL);
×
137
    TAOS_RETURN(code);
×
138
  }
139

140
  TAOS_RETURN(code);
21✔
141
}
142

143
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
21✔
144
  if (!(*ppEnv)) {
21!
145
    TAOS_CHECK_RETURN(tdNewSmaEnv(pSma, smaType, ppEnv));
21!
146
  }
147

148
  TAOS_RETURN(TSDB_CODE_SUCCESS);
21✔
149
}
150

151
/**
152
 * @brief Release resources allocated for its member fields, not including itself.
153
 *
154
 * @param pSmaEnv
155
 * @return int32_t
156
 */
157
void tdDestroySmaEnv(SSmaEnv *pSmaEnv) {
21✔
158
  if (pSmaEnv) {
21!
159
    pSmaEnv->pStat = tdFreeSmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
21✔
160
  }
161
}
21✔
162

163
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) {
27,754✔
164
  if (pSmaEnv) {
27,754✔
165
    tdDestroySmaEnv(pSmaEnv);
21✔
166
    taosMemoryFreeClear(pSmaEnv);
21!
167
  }
168
  return NULL;
27,754✔
169
}
170

171
static void tRSmaInfoHashFreeNode(void *data) {
17✔
172
  SRSmaInfo     *pRSmaInfo = NULL;
17✔
173
  SRSmaInfoItem *pItem = NULL;
17✔
174

175
  if ((pRSmaInfo = *(SRSmaInfo **)data)) {
17!
176
    if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 0)) && pItem->level) {
17!
177
      if (TSDB_CODE_SUCCESS != taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES)) {
17!
178
        smaError("failed to hash remove %s:%d", __FUNCTION__, __LINE__);
×
179
      }
180
    }
181
    if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) {
17!
182
      if (TSDB_CODE_SUCCESS != taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES)) {
17!
183
        smaError("failed to hash remove %s:%d", __FUNCTION__, __LINE__);
×
184
      }
185
    }
186
    TAOS_UNUSED(tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo));
17✔
187
  }
188
}
17✔
189

190
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
21✔
191
  int32_t code = 0;
21✔
192
  int32_t lino = 0;
21✔
193

194
  if (*pSmaStat) {      // no lock
21!
195
    TAOS_RETURN(code);  // success, return directly
×
196
  }
197

198
  /**
199
   *  1. Lazy mode utilized when init SSmaStat to update expire window(or hungry mode when tdNew).
200
   *  2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
201
   * tdInitSmaStat invoked in other multithread environment later.
202
   */
203
  if (!(*pSmaStat)) {
21!
204
    *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat) + sizeof(TdThread) * tsNumOfVnodeRsmaThreads);
21✔
205
    if (!(*pSmaStat)) {
21!
206
      code = terrno;
×
207
      TAOS_CHECK_GOTO(code, &lino, _exit);
×
208
    }
209

210
    if (smaType == TSDB_SMA_TYPE_ROLLUP) {
21✔
211
      SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
17✔
212
      pRSmaStat->pSma = (SSma *)pSma;
17✔
213
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
17✔
214
      if (tsem_init(&pRSmaStat->notEmpty, 0, 0) != 0) {
17!
215
        code = terrno;
×
216
        TAOS_CHECK_GOTO(code, &lino, _exit);
×
217
      }
218
      if (!(pRSmaStat->blocks = taosArrayInit(1, sizeof(SSDataBlock)))) {
17!
219
        code = terrno;
×
220
        TAOS_CHECK_GOTO(code, &lino, _exit);
×
221
      }
222
      SSDataBlock datablock = {.info.type = STREAM_CHECKPOINT};
17✔
223
      TSDB_CHECK_NULL(taosArrayPush(pRSmaStat->blocks, &datablock), code, lino, _exit, terrno);
34!
224

225
      // init smaMgmt
226
      TAOS_CHECK_GOTO(smaInit(), &lino, _exit);
17!
227

228
      int64_t refId = taosAddRef(smaMgmt.rsetId, pRSmaStat);
17✔
229
      if (refId < 0) {
17!
230
        code = terrno;
×
231
        smaError("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d failed since:%s", SMA_VID(pSma),
×
232
                 refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM, tstrerror(code));
233
        TAOS_CHECK_GOTO(code, &lino, _exit);
×
234
      } else {
235
        smaDebug("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d succeed", SMA_VID(pSma), refId,
17!
236
                 smaMgmt.rsetId, SMA_MGMT_REF_NUM);
237
      }
238
      pRSmaStat->refId = refId;
17✔
239

240
      // init hash
241
      RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
17✔
242
          RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
243
      if (!RSMA_INFO_HASH(pRSmaStat)) {
17!
244
        code = terrno;
×
245
        TAOS_CHECK_GOTO(code, &lino, _exit);
×
246
      }
247
      taosHashSetFreeFp(RSMA_INFO_HASH(pRSmaStat), tRSmaInfoHashFreeNode);
17✔
248

249
      TAOS_CHECK_GOTO(tdRsmaStartExecutor(pSma), &lino, _exit);
17!
250

251
      taosInitRWLatch(RSMA_FS_LOCK(pRSmaStat));
17✔
252
    } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
253
      // TODO
254
    }
255
  }
256
_exit:
4✔
257
  if (code) {
21!
258
    smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
×
259
  } else {
260
    smaDebug("vgId:%d, %s succeed, type:%" PRIi8, SMA_VID(pSma), __func__, smaType);
21✔
261
  }
262
  TAOS_RETURN(code);
21✔
263
}
264

265
static void tdDestroyTSmaStat(STSmaStat *pStat) {
4✔
266
  if (pStat) {
4!
267
    smaDebug("destroy tsma stat");
4✔
268
    tDestroyTSma(pStat->pTSma);
4!
269
    taosMemoryFreeClear(pStat->pTSma);
4!
270
    taosMemoryFreeClear(pStat->pTSchema);
4!
271
  }
272
}
4✔
273

274
static void tdDestroyRSmaStat(void *pRSmaStat) {
17✔
275
  if (pRSmaStat) {
17!
276
    SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
17✔
277
    SSma      *pSma = pStat->pSma;
17✔
278
    smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat);
17!
279
    // step 1: set rsma trigger stat cancelled
280
    atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
17✔
281

282
    // step 2: wait for all triggered fetch tasks to finish
283
    int32_t nLoops = 0;
17✔
284
    while (1) {
285
      if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
17!
286
        smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma));
17!
287
        break;
17✔
288
      } else {
289
        smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
×
290
      }
291
      TD_SMA_LOOPS_CHECK(nLoops, 1000);
×
292
    }
293

294
    // step 3:
295
    TAOS_UNUSED(tdRsmaStopExecutor(pSma));
17✔
296

297
    // step 4: destroy the rsma info and associated fetch tasks
298
    taosHashCleanup(RSMA_INFO_HASH(pStat));
17✔
299

300
    // step 5: free pStat
301
    if (tsem_destroy(&(pStat->notEmpty)) != 0) {
17!
302
      smaError("vgId:%d, failed to destroy notEmpty semaphore for rsma stat:%p since %s", SMA_VID(pSma), pRSmaStat,
×
303
               tstrerror(terrno));
304
    }
305
    taosArrayDestroy(pStat->blocks);
17✔
306
    taosMemoryFreeClear(pStat);
17!
307
  }
308
}
17✔
309

310
static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
21✔
311
  TAOS_UNUSED(tdDestroySmaState(pSmaStat, smaType));
21✔
312
  if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
21✔
313
    taosMemoryFreeClear(pSmaStat);
4!
314
  }
315
  // tref used to free rsma stat
316

317
  return NULL;
21✔
318
}
319

320
/**
321
 * @brief Release resources allocated for its member fields, not including itself.
322
 *
323
 * @param pSmaStat
324
 * @return int32_t
325
 */
326

327
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
21✔
328
  if (pSmaStat) {
21!
329
    if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
21✔
330
      tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat));
4✔
331
    } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
17!
332
      SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat;
17✔
333
      int32_t    vid = SMA_VID(pRSmaStat->pSma);
17✔
334
      int64_t    refId = RSMA_REF_ID(pRSmaStat);
17✔
335
      if (taosRemoveRef(smaMgmt.rsetId, refId) < 0) {
17!
336
        smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", vid, refId,
×
337
                 smaMgmt.rsetId, terrstr());
338
      } else {
339
        smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId);
17!
340
      }
341
    } else {
342
      smaError("%s failed at line %d since Unknown type", __func__, __LINE__);
×
343
    }
344
  }
345
  return 0;
21✔
346
}
347

348
int32_t tdLockSma(SSma *pSma) {
21✔
349
  int errCode = taosThreadMutexLock(&pSma->mutex);
21✔
350
  if (errCode != 0) {
21!
351
    int32_t code = TAOS_SYSTEM_ERROR(errCode);
×
352
    smaError("vgId:%d, failed to lock since %s", SMA_VID(pSma), tstrerror(code));
×
353
    TAOS_RETURN(code);
×
354
  }
355
  pSma->locked = true;
21✔
356
  TAOS_RETURN(0);
21✔
357
}
358

359
int32_t tdUnLockSma(SSma *pSma) {
21✔
360
  pSma->locked = false;
21✔
361
  int errCode = taosThreadMutexUnlock(&pSma->mutex);
21✔
362
  if (errCode != 0) {
21!
363
    int32_t code = TAOS_SYSTEM_ERROR(errCode);
×
364
    smaError("vgId:%d, failed to unlock since %s", SMA_VID(pSma), tstrerror(code));
×
365
    TAOS_RETURN(code);
×
366
  }
367
  TAOS_RETURN(TSDB_CODE_SUCCESS);
21✔
368
}
369

370
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
22✔
371
  int32_t  code = 0;
22✔
372
  SSmaEnv *pEnv = NULL;
22✔
373

374
  switch (smaType) {
22!
375
    case TSDB_SMA_TYPE_TIME_RANGE:
5✔
376
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) {
5✔
377
        TAOS_RETURN(TSDB_CODE_SUCCESS);
1✔
378
      }
379
      break;
4✔
380
    case TSDB_SMA_TYPE_ROLLUP:
17✔
381
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_RSMA_ENV(pSma)))) {
17!
382
        TAOS_RETURN(TSDB_CODE_SUCCESS);
×
383
      }
384
      break;
17✔
385
    default:
×
386
      smaError("vgId:%d, undefined smaType:%" PRIi8, SMA_VID(pSma), smaType);
×
387
      TAOS_RETURN(TSDB_CODE_INVALID_PARA);
×
388
  }
389

390
  // init sma env
391
  TAOS_UNUSED(tdLockSma(pSma));
21✔
392
  pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma))
21✔
393
                                               : atomic_load_ptr(&SMA_RSMA_ENV(pSma));
17✔
394
  if (!pEnv) {
21!
395
    if ((code = tdInitSmaEnv(pSma, smaType, &pEnv)) < 0) {
21!
396
      TAOS_UNUSED(tdUnLockSma(pSma));
×
397
      TAOS_RETURN(code);
×
398
    }
399
  }
400
  TAOS_UNUSED(tdUnLockSma(pSma));
21✔
401

402
  TAOS_RETURN(TSDB_CODE_SUCCESS);
21✔
403
}
404

405
void *tdRSmaExecutorFunc(void *param) {
680✔
406
  setThreadName("vnode-rsma");
680✔
407

408
  if (tdRSmaProcessExecImpl((SSma *)param, RSMA_EXEC_OVERFLOW) < 0) {
680!
409
    smaError("vgId:%d, failed to process rsma exec", SMA_VID((SSma *)param));
×
410
  }
411
  return NULL;
680✔
412
}
413

414
static int32_t tdRsmaStartExecutor(const SSma *pSma) {
17✔
415
  int32_t      code = 0;
17✔
416
  TdThreadAttr thAttr = {0};
17✔
417
  (void)taosThreadAttrInit(&thAttr);
17✔
418
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
17✔
419

420
  SSmaEnv  *pEnv = SMA_RSMA_ENV(pSma);
17✔
421
  SSmaStat *pStat = SMA_ENV_STAT(pEnv);
17✔
422
  TdThread *pthread = (TdThread *)&pStat->data;
17✔
423

424
  for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
697✔
425
    if (taosThreadCreate(&pthread[i], &thAttr, tdRSmaExecutorFunc, (void *)pSma) != 0) {
680!
426
      code = TAOS_SYSTEM_ERROR(errno);
×
427
      smaError("vgId:%d, failed to create pthread for rsma since %s", SMA_VID(pSma), tstrerror(code));
×
428
      TAOS_RETURN(code);
×
429
    }
430
    smaDebug("vgId:%d, success to create pthread for rsma", SMA_VID(pSma));
680!
431
  }
432

433
  (void)taosThreadAttrDestroy(&thAttr);
17✔
434
  TAOS_RETURN(code);
17✔
435
}
436

437
static int32_t tdRsmaStopExecutor(const SSma *pSma) {
17✔
438
  if (pSma && VND_IS_RSMA(pSma->pVnode)) {
17!
439
    SSmaEnv   *pEnv = NULL;
17✔
440
    SSmaStat  *pStat = NULL;
17✔
441
    SRSmaStat *pRSmaStat = NULL;
17✔
442
    TdThread  *pthread = NULL;
17✔
443

444
    if (!(pEnv = SMA_RSMA_ENV(pSma)) || !(pStat = SMA_ENV_STAT(pEnv))) {
17!
445
      TAOS_RETURN(0);
×
446
    }
447

448
    pEnv->flag |= SMA_ENV_FLG_CLOSE;
17✔
449
    pRSmaStat = (SRSmaStat *)pStat;
17✔
450
    pthread = (TdThread *)&pStat->data;
17✔
451

452
    for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
696✔
453
      if (tsem_post(&(pRSmaStat->notEmpty)) != 0) {
679!
454
        smaError("vgId:%d, failed to post notEmpty semaphore for rsma since %s", SMA_VID(pSma), tstrerror(terrno));
×
455
      }
456
    }
457

458
    for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
697✔
459
      if (taosCheckPthreadValid(pthread[i])) {
680!
460
        smaDebug("vgId:%d, start to join pthread for rsma:%" PRId64 "", SMA_VID(pSma), taosGetPthreadId(pthread[i]));
680!
461
        (void)taosThreadJoin(pthread[i], NULL);
680✔
462
      }
463
    }
464

465
    smaInfo("vgId:%d, rsma executor stopped, number:%d", SMA_VID(pSma), tsNumOfVnodeRsmaThreads);
17!
466
  }
467
  TAOS_RETURN(0);
17✔
468
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc