• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.66
/source/dnode/vnode/src/sma/smaEnv.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sma.h"
17

18
typedef struct SSmaStat SSmaStat;
19

20
#define SMA_MGMT_REF_NUM 10240
21

22
extern SSmaMgmt smaMgmt;
23

24
// declaration of static functions
25

26
static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv);
27
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv);
28
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
29
static int32_t tdRsmaStartExecutor(const SSma *pSma);
30
static int32_t tdRsmaStopExecutor(const SSma *pSma);
31
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
32
static void   *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
33
static void    tdDestroyRSmaStat(void *pRSmaStat);
34

35
/**
36
 * @brief rsma init
37
 *
38
 * @return int32_t
39
 */
40
// implementation
41
int32_t smaInit() {
17✔
42
  int32_t code = 0;
17✔
43
  int8_t  old;
44
  int32_t nLoops = 0;
17✔
45
  while (1) {
46
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2);
17✔
47
    if (old != 2) break;
17!
UNCOV
48
    if (++nLoops > 1000) {
×
49
      TAOS_UNUSED(sched_yield());
×
50
      nLoops = 0;
×
51
    }
52
  }
53

54
  if (old == 0) {
17✔
55
    // init tref rset
56
    smaMgmt.rsetId = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);
14✔
57

58
    if (smaMgmt.rsetId < 0) {
14!
59
      atomic_store_8(&smaMgmt.inited, 0);
×
60
      code = terrno;
×
61
      smaError("failed to init sma rset since %s", tstrerror(code));
×
62
      TAOS_RETURN(code);
×
63
    }
64

65
    int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT;
14✔
66
    smaMgmt.refHash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK);
14✔
67
    // init fetch timer handle
68
    smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
14✔
69

70
    if (!smaMgmt.refHash || !smaMgmt.tmrHandle) {
14!
71
      code = terrno;
×
72
      taosCloseRef(smaMgmt.rsetId);
×
73
      if (smaMgmt.refHash) {
×
74
        taosHashCleanup(smaMgmt.refHash);
×
75
        smaMgmt.refHash = NULL;
×
76
      }
77
      atomic_store_8(&smaMgmt.inited, 0);
×
78
      smaError("failed to init sma tmr handle since %s", tstrerror(code));
×
79
      TAOS_RETURN(code);
×
80
    }
81

82
    atomic_store_8(&smaMgmt.inited, 1);
14✔
83
    smaInfo("sma mgmt env is initialized, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
14!
84
  }
85

86
  TAOS_RETURN(code);
17✔
87
}
88

89
/**
90
 * @brief rsma cleanup
91
 *
92
 */
93
void smaCleanUp() {
2,125✔
94
  int8_t  old;
95
  int32_t nLoops = 0;
2,125✔
96
  while (1) {
97
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2);
2,125✔
98
    if (old != 2) break;
2,125!
99
    if (++nLoops > 1000) {
×
100
      TAOS_UNUSED(sched_yield());
×
101
      nLoops = 0;
×
102
    }
103
  }
104

105
  if (old == 1) {
2,125✔
106
    taosCloseRef(smaMgmt.rsetId);
14✔
107
    taosHashCleanup(smaMgmt.refHash);
14✔
108
    smaMgmt.refHash = NULL;
14✔
109
    taosTmrCleanUp(smaMgmt.tmrHandle);
14✔
110
    smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
14!
111
    atomic_store_8(&smaMgmt.inited, 0);
14✔
112
  }
113
}
2,125✔
114

115
static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
17✔
116
  int32_t  code = 0;
17✔
117
  SSmaEnv *pEnv = NULL;
17✔
118

119
  pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
17!
120
  if (!pEnv) {
17!
121
    return terrno;
×
122
  }
123
  *ppEnv = pEnv;
17✔
124

125
  SMA_ENV_TYPE(pEnv) = smaType;
17✔
126

127
  taosInitRWLatch(&(pEnv->lock));
17✔
128

UNCOV
129
  (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), *ppEnv)
×
130
                                        : atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv);
17!
131

132
  if ((code = tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma)) != TSDB_CODE_SUCCESS) {
17!
133
    TAOS_UNUSED(tdFreeSmaEnv(pEnv));
×
134
    *ppEnv = NULL;
×
135
    (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL)
×
136
                                          : atomic_store_ptr(&SMA_RSMA_ENV(pSma), NULL);
×
137
    TAOS_RETURN(code);
×
138
  }
139

140
  TAOS_RETURN(code);
17✔
141
}
142

143
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
17✔
144
  if (!(*ppEnv)) {
17!
145
    TAOS_CHECK_RETURN(tdNewSmaEnv(pSma, smaType, ppEnv));
17!
146
  }
147

148
  TAOS_RETURN(TSDB_CODE_SUCCESS);
17✔
149
}
150

151
/**
152
 * @brief Release resources allocated for its member fields, not including itself.
153
 *
154
 * @param pSmaEnv
155
 * @return int32_t
156
 */
157
void tdDestroySmaEnv(SSmaEnv *pSmaEnv) {
17✔
158
  if (pSmaEnv) {
17!
159
    pSmaEnv->pStat = tdFreeSmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
17✔
160
  }
161
}
17✔
162

163
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) {
23,777✔
164
  if (pSmaEnv) {
23,777✔
165
    tdDestroySmaEnv(pSmaEnv);
17✔
166
    taosMemoryFreeClear(pSmaEnv);
17!
167
  }
168
  return NULL;
23,771✔
169
}
170

171
static void tRSmaInfoHashFreeNode(void *data) {
17✔
172
  SRSmaInfo     *pRSmaInfo = NULL;
17✔
173
  SRSmaInfoItem *pItem = NULL;
17✔
174

175
  if ((pRSmaInfo = *(SRSmaInfo **)data)) {
17!
176
    if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 0)) && pItem->level) {
17!
177
      if (TSDB_CODE_SUCCESS != taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES)) {
17!
178
        smaError("failed to hash remove %s:%d", __FUNCTION__, __LINE__);
×
179
      }
180
    }
181
    if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) {
17!
182
      if (TSDB_CODE_SUCCESS != taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES)) {
17!
183
        smaError("failed to hash remove %s:%d", __FUNCTION__, __LINE__);
×
184
      }
185
    }
186
    TAOS_UNUSED(tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo));
17✔
187
  }
188
}
17✔
189

190
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
17✔
191
  int32_t code = 0;
17✔
192
  int32_t lino = 0;
17✔
193

194
  if (*pSmaStat) {      // no lock
17!
195
    TAOS_RETURN(code);  // success, return directly
×
196
  }
197

198
  /**
199
   *  1. Lazy mode utilized when init SSmaStat to update expire window(or hungry mode when tdNew).
200
   *  2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
201
   * tdInitSmaStat invoked in other multithread environment later.
202
   */
203
  if (!(*pSmaStat)) {
17!
204
    *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat) + sizeof(TdThread) * tsNumOfVnodeRsmaThreads);
17!
205
    if (!(*pSmaStat)) {
17!
206
      code = terrno;
×
207
      TAOS_CHECK_GOTO(code, &lino, _exit);
×
208
    }
209

210
    if (smaType == TSDB_SMA_TYPE_ROLLUP) {
17!
211
      SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
17✔
212
      pRSmaStat->pSma = (SSma *)pSma;
17✔
213
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
17✔
214
      if (tsem_init(&pRSmaStat->notEmpty, 0, 0) != 0) {
17!
215
        code = terrno;
×
216
        TAOS_CHECK_GOTO(code, &lino, _exit);
×
217
      }
218
      if (!(pRSmaStat->blocks = taosArrayInit(1, sizeof(SSDataBlock)))) {
17!
219
        code = terrno;
×
220
        TAOS_CHECK_GOTO(code, &lino, _exit);
×
221
      }
222
      SSDataBlock datablock = {.info.type = STREAM_CHECKPOINT};
17✔
223
      TSDB_CHECK_NULL(taosArrayPush(pRSmaStat->blocks, &datablock), code, lino, _exit, terrno);
34!
224

225
      // init smaMgmt
226
      TAOS_CHECK_GOTO(smaInit(), &lino, _exit);
17!
227

228
      int64_t refId = taosAddRef(smaMgmt.rsetId, pRSmaStat);
17✔
229
      if (refId < 0) {
17!
230
        code = terrno;
×
231
        smaError("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d failed since:%s", SMA_VID(pSma),
×
232
                 refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM, tstrerror(code));
233
        TAOS_CHECK_GOTO(code, &lino, _exit);
×
234
      } else {
235
        smaDebug("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d succeed", SMA_VID(pSma), refId,
17!
236
                 smaMgmt.rsetId, SMA_MGMT_REF_NUM);
237
      }
238
      pRSmaStat->refId = refId;
17✔
239

240
      // init hash
241
      RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
17✔
242
          RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
243
      if (!RSMA_INFO_HASH(pRSmaStat)) {
17!
244
        code = terrno;
×
245
        TAOS_CHECK_GOTO(code, &lino, _exit);
×
246
      }
247
      taosHashSetFreeFp(RSMA_INFO_HASH(pRSmaStat), tRSmaInfoHashFreeNode);
17✔
248

249
      TAOS_CHECK_GOTO(tdRsmaStartExecutor(pSma), &lino, _exit);
17!
250

251
      taosInitRWLatch(RSMA_FS_LOCK(pRSmaStat));
17✔
252
    } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
253
      // TODO
254
    }
255
  }
UNCOV
256
_exit:
×
257
  if (code) {
17!
258
    smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
×
259
  } else {
260
    smaDebug("vgId:%d, %s succeed, type:%" PRIi8, SMA_VID(pSma), __func__, smaType);
17!
261
  }
262
  TAOS_RETURN(code);
17✔
263
}
264

UNCOV
265
static void tdDestroyTSmaStat(STSmaStat *pStat) {
×
UNCOV
266
  if (pStat) {
×
UNCOV
267
    smaDebug("destroy tsma stat");
×
UNCOV
268
    tDestroyTSma(pStat->pTSma);
×
UNCOV
269
    taosMemoryFreeClear(pStat->pTSma);
×
UNCOV
270
    taosMemoryFreeClear(pStat->pTSchema);
×
271
  }
UNCOV
272
}
×
273

274
static void tdDestroyRSmaStat(void *pRSmaStat) {
17✔
275
  if (pRSmaStat) {
17!
276
    SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
17✔
277
    SSma      *pSma = pStat->pSma;
17✔
278
    smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat);
17!
279
    // step 1: set rsma trigger stat cancelled
280
    atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
17✔
281

282
    // step 2: wait for all triggered fetch tasks to finish
283
    int32_t nLoops = 0;
17✔
284
    while (1) {
285
      if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
17!
286
        smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma));
17!
287
        break;
17✔
288
      } else {
289
        smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
×
290
      }
291
      TD_SMA_LOOPS_CHECK(nLoops, 1000);
×
292
    }
293

294
    // step 3:
295
    TAOS_UNUSED(tdRsmaStopExecutor(pSma));
17✔
296

297
    // step 4: destroy the rsma info and associated fetch tasks
298
    taosHashCleanup(RSMA_INFO_HASH(pStat));
17✔
299

300
    // step 5: free pStat
301
    if (tsem_destroy(&(pStat->notEmpty)) != 0) {
17!
302
      smaError("vgId:%d, failed to destroy notEmpty semaphore for rsma stat:%p since %s", SMA_VID(pSma), pRSmaStat,
×
303
               tstrerror(terrno));
304
    }
305
    taosArrayDestroy(pStat->blocks);
17✔
306
    taosMemoryFreeClear(pStat);
17!
307
  }
308
}
17✔
309

310
static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
17✔
311
  TAOS_UNUSED(tdDestroySmaState(pSmaStat, smaType));
17✔
312
  if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
17!
UNCOV
313
    taosMemoryFreeClear(pSmaStat);
×
314
  }
315
  // tref used to free rsma stat
316

317
  return NULL;
17✔
318
}
319

320
/**
321
 * @brief Release resources allocated for its member fields, not including itself.
322
 *
323
 * @param pSmaStat
324
 * @return int32_t
325
 */
326

327
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
17✔
328
  if (pSmaStat) {
17!
329
    if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
17!
UNCOV
330
      tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat));
×
331
    } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
17!
332
      SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat;
17✔
333
      int32_t    vid = SMA_VID(pRSmaStat->pSma);
17✔
334
      int64_t    refId = RSMA_REF_ID(pRSmaStat);
17✔
335
      if (taosRemoveRef(smaMgmt.rsetId, refId) < 0) {
17!
336
        smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", vid, refId,
×
337
                 smaMgmt.rsetId, terrstr());
338
      } else {
339
        smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId);
17!
340
      }
341
    } else {
342
      smaError("%s failed at line %d since Unknown type", __func__, __LINE__);
×
343
    }
344
  }
345
  return 0;
17✔
346
}
347

348
int32_t tdLockSma(SSma *pSma) {
17✔
349
  int errCode = taosThreadMutexLock(&pSma->mutex);
17✔
350
  if (errCode != 0) {
17!
351
    int32_t code = TAOS_SYSTEM_ERROR(errCode);
×
352
    smaError("vgId:%d, failed to lock since %s", SMA_VID(pSma), tstrerror(code));
×
353
    TAOS_RETURN(code);
×
354
  }
355
  pSma->locked = true;
17✔
356
  TAOS_RETURN(0);
17✔
357
}
358

359
int32_t tdUnLockSma(SSma *pSma) {
17✔
360
  pSma->locked = false;
17✔
361
  int errCode = taosThreadMutexUnlock(&pSma->mutex);
17✔
362
  if (errCode != 0) {
17!
363
    int32_t code = TAOS_SYSTEM_ERROR(errCode);
×
364
    smaError("vgId:%d, failed to unlock since %s", SMA_VID(pSma), tstrerror(code));
×
365
    TAOS_RETURN(code);
×
366
  }
367
  TAOS_RETURN(TSDB_CODE_SUCCESS);
17✔
368
}
369

370
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
17✔
371
  int32_t  code = 0;
17✔
372
  SSmaEnv *pEnv = NULL;
17✔
373

374
  switch (smaType) {
17!
UNCOV
375
    case TSDB_SMA_TYPE_TIME_RANGE:
×
UNCOV
376
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) {
×
377
        TAOS_RETURN(TSDB_CODE_SUCCESS);
×
378
      }
UNCOV
379
      break;
×
380
    case TSDB_SMA_TYPE_ROLLUP:
17✔
381
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_RSMA_ENV(pSma)))) {
17!
382
        TAOS_RETURN(TSDB_CODE_SUCCESS);
×
383
      }
384
      break;
17✔
385
    default:
×
386
      smaError("vgId:%d, undefined smaType:%" PRIi8, SMA_VID(pSma), smaType);
×
387
      TAOS_RETURN(TSDB_CODE_INVALID_PARA);
×
388
  }
389

390
  // init sma env
391
  TAOS_UNUSED(tdLockSma(pSma));
17✔
392
  pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma))
17!
393
                                               : atomic_load_ptr(&SMA_RSMA_ENV(pSma));
17✔
394
  if (!pEnv) {
17!
395
    if ((code = tdInitSmaEnv(pSma, smaType, &pEnv)) < 0) {
17!
396
      TAOS_UNUSED(tdUnLockSma(pSma));
×
397
      TAOS_RETURN(code);
×
398
    }
399
  }
400
  TAOS_UNUSED(tdUnLockSma(pSma));
17✔
401

402
  TAOS_RETURN(TSDB_CODE_SUCCESS);
17✔
403
}
404

405
void *tdRSmaExecutorFunc(void *param) {
680✔
406
  setThreadName("vnode-rsma");
680✔
407

408
  if (tdRSmaProcessExecImpl((SSma *)param, RSMA_EXEC_OVERFLOW) < 0) {
680!
409
    smaError("vgId:%d, failed to process rsma exec", SMA_VID((SSma *)param));
×
410
  }
411
  return NULL;
680✔
412
}
413

414
static int32_t tdRsmaStartExecutor(const SSma *pSma) {
17✔
415
  int32_t      code = 0;
17✔
416
  TdThreadAttr thAttr = {0};
17✔
417
  (void)taosThreadAttrInit(&thAttr);
17✔
418
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
17✔
419

420
  SSmaEnv  *pEnv = SMA_RSMA_ENV(pSma);
17✔
421
  SSmaStat *pStat = SMA_ENV_STAT(pEnv);
17✔
422
  TdThread *pthread = (TdThread *)&pStat->data;
17✔
423

424
  for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
697✔
425
    if (taosThreadCreate(&pthread[i], &thAttr, tdRSmaExecutorFunc, (void *)pSma) != 0) {
680!
426
      code = TAOS_SYSTEM_ERROR(ERRNO);
×
427
      smaError("vgId:%d, failed to create pthread for rsma since %s", SMA_VID(pSma), tstrerror(code));
×
428
      TAOS_RETURN(code);
×
429
    }
430
    smaDebug("vgId:%d, success to create pthread for rsma", SMA_VID(pSma));
680!
431
  }
432

433
  (void)taosThreadAttrDestroy(&thAttr);
17✔
434
  TAOS_RETURN(code);
17✔
435
}
436

437
static int32_t tdRsmaStopExecutor(const SSma *pSma) {
17✔
438
  if (pSma && VND_IS_RSMA(pSma->pVnode)) {
17!
439
    SSmaEnv   *pEnv = NULL;
17✔
440
    SSmaStat  *pStat = NULL;
17✔
441
    SRSmaStat *pRSmaStat = NULL;
17✔
442
    TdThread  *pthread = NULL;
17✔
443

444
    if (!(pEnv = SMA_RSMA_ENV(pSma)) || !(pStat = SMA_ENV_STAT(pEnv))) {
17!
445
      TAOS_RETURN(0);
×
446
    }
447

448
    pEnv->flag |= SMA_ENV_FLG_CLOSE;
17✔
449
    pRSmaStat = (SRSmaStat *)pStat;
17✔
450
    pthread = (TdThread *)&pStat->data;
17✔
451

452
    for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
697✔
453
      if (tsem_post(&(pRSmaStat->notEmpty)) != 0) {
680!
454
        smaError("vgId:%d, failed to post notEmpty semaphore for rsma since %s", SMA_VID(pSma), tstrerror(terrno));
×
455
      }
456
    }
457

458
    for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
697✔
459
      if (taosCheckPthreadValid(pthread[i])) {
680!
460
        smaDebug("vgId:%d, start to join pthread for rsma:%" PRId64 "", SMA_VID(pSma), taosGetPthreadId(pthread[i]));
680!
461
        (void)taosThreadJoin(pthread[i], NULL);
680✔
462
      }
463
    }
464

465
    smaInfo("vgId:%d, rsma executor stopped, number:%d", SMA_VID(pSma), tsNumOfVnodeRsmaThreads);
17!
466
  }
467
  TAOS_RETURN(0);
17✔
468
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc