• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.86
/source/dnode/vnode/src/sma/smaRollup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sma.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
#define RSMA_EXEC_SMOOTH_SIZE   (100)     // cnt
21
#define RSMA_EXEC_BATCH_SIZE    (1024)    // cnt
22
#define RSMA_FETCH_DELAY_MAX    (120000)  // ms
23
#define RSMA_FETCH_ACTIVE_MAX   (1000)    // ms
24
#define RSMA_FETCH_INTERVAL     (5000)    // ms
25
#define RSMA_EXEC_TASK_FLAG     "rsma"
26
#define RSMA_EXEC_MSG_HLEN      (13)  // type(int8_t) + len(int32_t) + version(int64_t)
27
#define RSMA_EXEC_MSG_TYPE(msg) (*(int8_t *)(msg))
28
#define RSMA_EXEC_MSG_LEN(msg)  (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t)))
29
#define RSMA_EXEC_MSG_VER(msg)  (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t)))
30
#define RSMA_EXEC_MSG_BODY(msg) (POINTER_SHIFT((msg), RSMA_EXEC_MSG_HLEN))
31

32
#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel)
33

34
SSmaMgmt smaMgmt = {
35
    .inited = 0,
36
    .rsetId = -1,
37
};
38

39
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
40

41
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
42
static void    tdUidStoreDestory(STbUidStore *pStore);
43
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
44
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
45
                                       int8_t idx);
46
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
47
                                 SRSmaInfo *pInfo, ERsmaExecType type, int8_t level);
48
static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo);
49
static void    tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
50
static void    tdFreeRSmaSubmitItems(SArray *pItems, int32_t type);
51
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
52
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
53
                                         int32_t execType, int8_t *streamFlushed);
54
static void    tdRSmaFetchTrigger(void *param, void *tmrId);
55
static void    tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
56
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
57
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
58
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
59

60
struct SRSmaQTaskInfoItem {
61
  int32_t len;
62
  int8_t  type;
63
  int64_t suid;
64
  void   *qTaskInfo;
65
};
66

67
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
34✔
68
  // Note: free/kill may in RC
69
  if (!taskHandle || !(*taskHandle)) return;
34!
70
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
34✔
71
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
34!
72
    smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
34!
73
    qDestroyTask(otaskHandle);
34✔
74
  }
75
}
76

77
/**
78
 * @brief general function to free rsmaInfo
79
 *
80
 * @param pSma
81
 * @param pInfo
82
 * @return void*
83
 */
84
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
17✔
85
  if (pInfo) {
17!
86
    for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
51✔
87
      SRSmaInfoItem *pItem = &pInfo->items[i];
34✔
88

89
      if (pItem->tmrId) {
34!
90
        smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
34!
91
                 pInfo->suid, i + 1);
92
        if (!taosTmrStopA(&pItem->tmrId)) {
34!
93
          smaError("vgId:%d, failed to stop fetch timer for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
×
94
                   i + 1);
95
        }
96
      }
97

98
      if (pItem->pStreamState) {
34!
99
        streamStateClose(pItem->pStreamState, false);
34✔
100
      }
101

102
      if (pItem->pStreamTask) {
34!
103
        tFreeStreamTask(pItem->pStreamTask);
34✔
104
      }
105
      taosArrayDestroy(pItem->pResList);
34✔
106
      tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
34✔
107
    }
108

109
    taosMemoryFreeClear(pInfo->pTSchema);
17!
110

111
    if (pInfo->queue) {
17!
112
      taosCloseQueue(pInfo->queue);
17✔
113
      pInfo->queue = NULL;
17✔
114
    }
115
    if (pInfo->qall) {
17!
116
      taosFreeQall(pInfo->qall);
17✔
117
      pInfo->qall = NULL;
17✔
118
    }
119

120
    taosMemoryFree(pInfo);
17!
121
  }
122

123
  return NULL;
17✔
124
}
125

126
static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
127
  *pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
7!
128
  if (*pStore == NULL) {
7!
129
    return terrno;
×
130
  }
131

132
  return TSDB_CODE_SUCCESS;
7✔
133
}
134

135
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd) {
7✔
136
  SRSmaInfo *pRSmaInfo = NULL;
7✔
137
  int32_t    code = 0;
7✔
138

139
  if (!suid || !tbUids) {
7!
140
    code = TSDB_CODE_INVALID_PTR;
×
141
    smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), suid ? *suid : -1,
×
142
             tstrerror(code));
143
    TAOS_RETURN(code);
×
144
  }
145

146
  int32_t nTables = taosArrayGetSize(tbUids);
7✔
147

148
  if (0 == nTables) {
7!
149
    smaDebug("vgId:%d, no need to update tbUidList for suid:%" PRIi64 " since Empty tbUids", SMA_VID(pSma), *suid);
×
150
    return TSDB_CODE_SUCCESS;
×
151
  }
152

153
  code = tdAcquireRSmaInfoBySuid(pSma, *suid, &pRSmaInfo);
7✔
154

155
  if (code != 0) {
7!
156
    smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
×
157
    TAOS_RETURN(code);
×
158
  }
159

160
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
21✔
161
    if (pRSmaInfo->taskInfo[i]) {
14!
162
      if ((code = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) {
14!
163
        tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
164
        smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
×
165
                 tstrerror(code));
166
        TAOS_RETURN(code);
×
167
      }
168
      smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p. suid:%" PRIi64 " uid:%" PRIi64
14!
169
               "nTables:%d level %d",
170
               SMA_VID(pSma), pRSmaInfo->taskInfo[i], *suid, *(int64_t *)TARRAY_GET_ELEM(tbUids, 0), nTables, i);
171
    }
172
  }
173

174
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
7✔
175
  TAOS_RETURN(code);
7✔
176
}
177

178
int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
78,313✔
179
  int32_t code = 0;
78,313✔
180
  if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
78,313✔
181
    return TSDB_CODE_SUCCESS;
78,308✔
182
  }
183

184
  TAOS_CHECK_RETURN(tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids, isAdd));
5!
185

186
  void *pIter = NULL;
7✔
187
  while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
7!
188
    tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
×
189
    SArray   *pTbUids = *(SArray **)pIter;
×
190

191
    if ((code = tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd)) != TSDB_CODE_SUCCESS) {
×
192
      taosHashCancelIterate(pStore->uidHash, pIter);
×
193
      TAOS_RETURN(code);
×
194
    }
195
  }
196
  return TSDB_CODE_SUCCESS;
7✔
197
}
198

199
/**
200
 * @brief fetch suid/uids when create child tables of rollup SMA
201
 *
202
 * @param pTsdb
203
 * @param ppStore
204
 * @param suid
205
 * @param uid
206
 * @return int32_t
207
 */
208
int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) {
113,783✔
209
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
113,783✔
210
  int32_t  code = 0;
113,783✔
211

212
  // only applicable to rollup SMA ctables
213
  if (!pEnv) {
113,783✔
214
    return TSDB_CODE_SUCCESS;
113,778✔
215
  }
216

217
  SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
5✔
218
  SHashObj  *infoHash = NULL;
5✔
219
  if (!pStat || !(infoHash = RSMA_INFO_HASH(pStat))) {
5!
UNCOV
220
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
221
  }
222

223
  // info cached when create rsma stable and return directly for non-rsma ctables
224
  if (!taosHashGet(infoHash, &suid, sizeof(tb_uid_t))) {
7!
225
    return TSDB_CODE_SUCCESS;
×
226
  }
227

228
  if (!(*ppStore)) {
7!
229
    TAOS_CHECK_RETURN(tdUidStoreInit(ppStore));
7!
230
  }
231

232
  if ((code = tdUidStorePut(*ppStore, suid, &uid)) < 0) {
7!
233
    *ppStore = tdUidStoreFree(*ppStore);
×
234
    TAOS_RETURN(code);
×
235
  }
236

237
  return TSDB_CODE_SUCCESS;
7✔
238
}
239

240
static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTaskId *pId) {
34✔
241
  STaskId      id = {.streamId = pId->streamId, .taskId = pId->taskId};
34✔
242
  SStreamTask *pTask = NULL;
34✔
243

244
  streamMetaRLock(pMeta);
34✔
245

246
  int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
34✔
247
  if (code == 0) {
34✔
248
    pItem->submitReqVer = pTask->chkInfo.checkpointVer;
4✔
249
    pItem->fetchResultVer = pTask->info.delaySchedParam;
4✔
250
    streamMetaReleaseTask(pMeta, pTask);
4✔
251
  }
252

253
  streamMetaRUnLock(pMeta);
34✔
254
}
34✔
255

256
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
34✔
257
  streamMetaWLock(pMeta);
34✔
258

259
  int32_t code = streamMetaUnregisterTask(pMeta, streamId, taskId);
34✔
260
  if (code != 0) {
34!
261
    smaError("vgId:%d, rsma task:%" PRIi64 ",%d drop failed since %s", pMeta->vgId, streamId, taskId, tstrerror(code));
×
262
  }
263
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
34✔
264
  if (streamMetaCommit(pMeta) < 0) {
34✔
265
    // persist to disk
266
  }
267
  streamMetaWUnLock(pMeta);
34✔
268
  smaDebug("vgId:%d, rsma task:%" PRIi64 ",%d dropped, remain tasks:%d", pMeta->vgId, streamId, taskId, numOfTasks);
34!
269
}
34✔
270

271
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
34✔
272
                                       int8_t idx) {
273
  int32_t code = 0;
34✔
274
  if ((param->qmsgLen > 0) && param->qmsg[idx]) {
34!
275
    SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
33✔
276
    SRetention    *pRetention = SMA_RETENTION(pSma);
33✔
277
    STsdbCfg      *pTsdbCfg = SMA_TSDB_CFG(pSma);
33✔
278
    SVnode        *pVnode = pSma->pVnode;
33✔
279
    char           taskInfDir[TSDB_FILENAME_LEN] = {0};
33✔
280
    void          *pStreamState = NULL;
33✔
281

282
    // set the backend of stream state
283
    tdRSmaQTaskInfoGetFullPath(pVnode, pRSmaInfo->suid, idx + 1, pVnode->pTfs, taskInfDir);
33✔
284

285
    if (!taosCheckExistFile(taskInfDir)) {
34✔
286
      char *s = taosStrdup(taskInfDir);
12!
287
      if (!s) {
12!
288
        TAOS_RETURN(terrno);
×
289
      }
290
      if (taosMulMkDir(s) != 0) {
12!
291
        code = TAOS_SYSTEM_ERROR(ERRNO);
×
292
        taosMemoryFree(s);
×
293
        TAOS_RETURN(code);
×
294
      }
295
      taosMemoryFree(s);
12!
296
    }
297

298
    SStreamTask *pStreamTask = taosMemoryCalloc(1, sizeof(*pStreamTask));
33!
299
    if (!pStreamTask) {
34!
300
      return terrno;
×
301
    }
302
    pItem->pStreamTask = pStreamTask;
34✔
303
    pStreamTask->id.taskId = 0;
34✔
304
    pStreamTask->id.streamId = pRSmaInfo->suid + idx;
34✔
305
    pStreamTask->chkInfo.startTs = taosGetTimestampMs();
34✔
306
    pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
34✔
307
    pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_EXEC_TASK_FLAG) + 1);
34!
308
    if (!pStreamTask->exec.qmsg) {
34!
309
      TAOS_RETURN(terrno);
×
310
    }
311
    TAOS_UNUSED(snprintf(pStreamTask->exec.qmsg, strlen(RSMA_EXEC_TASK_FLAG) + 1, "%s", RSMA_EXEC_TASK_FLAG));
34✔
312
    pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
34✔
313
    tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
34✔
314

315
    TAOS_CHECK_RETURN(streamCreateStateMachine(pStreamTask));
34!
316

317
    TAOS_CHECK_RETURN(streamTaskCreateActiveChkptInfo(&pStreamTask->chkInfo.pActiveInfo));
34!
318

319
    pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId);
34✔
320
    if (!pStreamState) {
34!
321
      TAOS_RETURN(TSDB_CODE_RSMA_STREAM_STATE_OPEN);
×
322
    }
323
    pItem->pStreamState = pStreamState;
34✔
324

325
    tdRSmaTaskRemove(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId);
34✔
326

327
    SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .skipRollup = 1, .pStateBackend = pStreamState};
34✔
328
    initStorageAPI(&handle.api);
34✔
329

330
    code = qCreateStreamExecTaskInfo(&pRSmaInfo->taskInfo[idx], param->qmsg[idx], &handle, TD_VID(pVnode), 0);
34✔
331
    if (!pRSmaInfo->taskInfo[idx] || (code != 0)) {
34!
332
      TAOS_RETURN(TSDB_CODE_RSMA_QTASKINFO_CREATE);
×
333
    }
334

335
    if (!(pItem->pResList = taosArrayInit(1, POINTER_BYTES))) {
34!
336
      TAOS_RETURN(terrno);
×
337
    }
338

339
    if (pItem->fetchResultVer < pItem->submitReqVer) {
34✔
340
      // fetch the data when reboot
341
      pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE;
4✔
342
    }
343

344
    if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
34✔
345
      int64_t msInterval = -1;
12✔
346
      TAOS_CHECK_RETURN(convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision,
12!
347
                                                       TIME_UNIT_MILLISECOND, &msInterval));
348
      pItem->maxDelay = (int32_t)msInterval;
12✔
349
    } else {
350
      pItem->maxDelay = (int32_t)param->maxdelay[idx];
22✔
351
    }
352
    if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) {
34!
353
      pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY;
×
354
    }
355

356
    pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
34✔
357

358
    SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
34✔
359
    if (taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)) != 0) {
34!
360
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
361
    }
362

363
    bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
34✔
364
    if (!ret) {
34!
365
      smaError("vgId:%d, failed to reset fetch timer for table %" PRIi64 " level %d", TD_VID(pVnode), pRSmaInfo->suid,
34!
366
               idx + 1);
367
    }
368

369
    smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64
34!
370
            ", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 ", maxdelay:%" PRIi64 " watermark:%" PRIi64
371
            ", finally maxdelay:%" PRIi32,
372
            TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId,
373
            pItem->submitReqVer, pItem->fetchResultVer, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
374
  }
375
  TAOS_RETURN(TSDB_CODE_SUCCESS);
35✔
376
}
377

378
/**
379
 * @brief for rsam create or restore
380
 *
381
 * @param pSma
382
 * @param param
383
 * @param suid
384
 * @param tbName
385
 * @return int32_t
386
 */
387
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) {
17✔
388
  int32_t code = 0;
17✔
389
  int32_t lino = 0;
17✔
390

391
  if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
17!
392
    smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid);
×
393
    return TSDB_CODE_SUCCESS;
×
394
  }
395

396
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
17✔
397
  SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
17✔
398
  SRSmaInfo *pRSmaInfo = NULL;
17✔
399

400
  pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
17✔
401
  if (pRSmaInfo) {
17!
402
    smaInfo("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
×
403
    return TSDB_CODE_SUCCESS;
×
404
  }
405

406
  // from write queue: single thead
407
  pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
17!
408
  if (!pRSmaInfo) {
17!
409
    return terrno;
×
410
  }
411

412
  STSchema *pTSchema;
413
  code = metaGetTbTSchemaNotNull(SMA_META(pSma), suid, -1, 1, &pTSchema);
17✔
414
  TAOS_CHECK_EXIT(code);
17!
415
  pRSmaInfo->pSma = pSma;
17✔
416
  pRSmaInfo->pTSchema = pTSchema;
17✔
417
  pRSmaInfo->suid = suid;
17✔
418
  T_REF_INIT_VAL(pRSmaInfo, 1);
17✔
419

420
  TAOS_CHECK_EXIT(taosOpenQueue(&pRSmaInfo->queue));
17!
421

422
  TAOS_CHECK_EXIT(taosAllocateQall(&pRSmaInfo->qall));
17!
423

424
  TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0));
17!
425
  TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1));
17!
426

427
  TAOS_CHECK_EXIT(taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)));
17!
428

429
_exit:
17✔
430
  if (code != 0) {
17!
431
    TAOS_UNUSED(tdFreeRSmaInfo(pSma, pRSmaInfo));
×
432
  } else {
433
    smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid);
17!
434
  }
435
  TAOS_RETURN(code);
17✔
436
}
437

438
/**
439
 * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam currently
440
 *
441
 * @param pSma
442
 * @param pReq
443
 * @return int32_t
444
 */
445
int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
27,831✔
446
  SVnode *pVnode = pSma->pVnode;
27,831✔
447
  if (!pReq->rollup) {
27,831!
448
    smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since no rollup in req", TD_VID(pVnode), pReq->name,
27,878✔
449
             pReq->suid);
450
    return TSDB_CODE_SUCCESS;
27,878✔
451
  }
452

453
  if (!VND_IS_RSMA(pVnode)) {
×
454
    smaWarn("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
×
455
            pReq->suid);
456
    return TSDB_CODE_SUCCESS;
×
457
  }
458

459
  return tdRSmaProcessCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name);
×
460
}
461

462
/**
463
 * @brief drop cache for stb
464
 *
465
 * @param pSma
466
 * @param pReq
467
 * @return int32_t
468
 */
469
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
2,518✔
470
  SVnode *pVnode = pSma->pVnode;
2,518✔
471
  if (!VND_IS_RSMA(pVnode)) {
2,518!
472
    smaTrace("vgId:%d, not drop rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
2,518✔
473
             pReq->suid);
474
    return TSDB_CODE_SUCCESS;
2,518✔
475
  }
476

477
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
×
478
  if (!pSmaEnv) {
×
479
    return TSDB_CODE_SUCCESS;
×
480
  }
481

482
  int32_t    code = 0;
×
483
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
×
484
  SRSmaInfo *pRSmaInfo = NULL;
×
485

486
  code = tdAcquireRSmaInfoBySuid(pSma, pReq->suid, &pRSmaInfo);
×
487

488
  if (code != 0) {
×
489
    smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name,
×
490
            pReq->suid);
491
    return TSDB_CODE_SUCCESS;
×
492
  }
493

494
  // set del flag for data in mem
495
  atomic_store_8(&pRSmaStat->delFlag, 1);
×
496
  RSMA_INFO_SET_DEL(pRSmaInfo);
×
497
  tdUnRefRSmaInfo(pSma, pRSmaInfo);
×
498

499
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
500

501
  // no need to save to file as triggered by dropping stable
502
  smaDebug("vgId:%d, drop rsma for stable %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
×
503
  return TSDB_CODE_SUCCESS;
×
504
}
505

506
/**
507
 * @brief store suid/[uids], prefer to use array and then hash
508
 *
509
 * @param pStore
510
 * @param suid
511
 * @param uid
512
 * @return int32_t
513
 */
514
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
319✔
515
  // prefer to store suid/uids in array
516
  if ((suid == pStore->suid) || (pStore->suid == 0)) {
319!
517
    if (pStore->suid == 0) {
319!
518
      pStore->suid = suid;
319✔
519
    }
520
    if (uid) {
319✔
521
      if (!pStore->tbUids) {
7!
522
        if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) {
7!
523
          TAOS_RETURN(terrno);
×
524
        }
525
      }
526
      if (!taosArrayPush(pStore->tbUids, uid)) {
14!
527
        TAOS_RETURN(terrno);
×
528
      }
529
    }
530
  } else {
531
    // store other suid/uids in hash when multiple stable/table included in 1 batch of request
532
    if (!pStore->uidHash) {
×
533
      pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
534
      if (!pStore->uidHash) {
×
535
        TAOS_RETURN(terrno);
×
536
      }
537
    }
538
    if (uid) {
×
539
      SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t));
×
540
      if (uidArray && ((uidArray = *(SArray **)uidArray))) {
×
541
        if (!taosArrayPush(uidArray, uid)) {
×
542
          taosArrayDestroy(uidArray);
×
543
          TAOS_RETURN(terrno);
×
544
        }
545
      } else {
546
        SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t));
×
547
        if (!pUidArray) {
×
548
          TAOS_RETURN(terrno);
×
549
        }
550
        if (!taosArrayPush(pUidArray, uid)) {
×
551
          taosArrayDestroy(pUidArray);
×
552
          TAOS_RETURN(terrno);
×
553
        }
554
        TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)));
×
555
      }
556
    } else {
557
      TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0));
×
558
    }
559
  }
560
  return TSDB_CODE_SUCCESS;
319✔
561
}
562

563
static void tdUidStoreDestory(STbUidStore *pStore) {
329✔
564
  if (pStore) {
329!
565
    if (pStore->uidHash) {
329!
566
      if (pStore->tbUids) {
×
567
        // When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
568
        void *pIter = NULL;
×
569
        while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
×
570
          SArray *arr = *(SArray **)pIter;
×
571
          taosArrayDestroy(arr);
×
572
        }
573
      }
574
      taosHashCleanup(pStore->uidHash);
×
575
    }
576
    taosArrayDestroy(pStore->tbUids);
329✔
577
  }
578
}
329✔
579

580
void *tdUidStoreFree(STbUidStore *pStore) {
78,313✔
581
  if (pStore) {
78,313✔
582
    tdUidStoreDestory(pStore);
7✔
583
    taosMemoryFree(pStore);
7!
584
  }
585
  return NULL;
78,313✔
586
}
587

588
/**
589
 * @brief The SubmitReq for rsma L2/L3 is inserted by tsdbInsertData method directly while not by WriteQ, as the queue
590
 * would be freed when close Vnode, thus lock should be used if with race condition.
591
 * @param pTsdb
592
 * @param version
593
 * @param pReq
594
 * @return int32_t
595
 */
596
static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
624✔
597
  if (pReq) {
624!
598
    SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq;
624✔
599
    // spin lock for race condition during insert data
600
    TAOS_CHECK_RETURN(tsdbInsertData(pTsdb, version, pSubmitReq, NULL));
624!
601
  }
602

603
  return TSDB_CODE_SUCCESS;
624✔
604
}
605

606
static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
312✔
607
  SArray *pSubmitTbData = pMsg ? pMsg->aSubmitTbData : NULL;
312!
608
  int32_t size = taosArrayGetSize(pSubmitTbData);
312✔
609

610
  for (int32_t i = 0; i < size; ++i) {
624✔
611
    SSubmitTbData *pData = TARRAY_GET_ELEM(pSubmitTbData, i);
312✔
612
    TAOS_CHECK_RETURN(tdUidStorePut(pStore, pData->suid, NULL));
312!
613
  }
614

615
  return 0;
312✔
616
}
617

618
/**
619
 * @brief retention of rsma1/rsma2
620
 *
621
 * @param pSma
622
 * @param now
623
 * @return int32_t
624
 */
625
int32_t smaRetention(SSma *pSma, int64_t now) {
×
626
  int32_t code = TSDB_CODE_SUCCESS;
×
627
  if (!VND_IS_RSMA(pSma->pVnode)) {
×
628
    return code;
×
629
  }
630

631
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
×
632
    if (pSma->pRSmaTsdb[i]) {
633
      // code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1);
634
      // if (code) goto _end;
635
    }
636
  }
637

638
_end:
×
639
  TAOS_RETURN(code);
×
640
}
641

642
static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatchDeleteReq *pDelReq) {
16✔
643
  int32_t code = 0;
16✔
644
  int32_t lino = 0;
16✔
645

646
  if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
16!
647
    int32_t len = 0;
16✔
648
    tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
16!
649
    TSDB_CHECK_CODE(code, lino, _exit);
16!
650

651
    void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
16✔
652
    if (!pBuf) {
16!
653
      code = terrno;
×
654
      TSDB_CHECK_CODE(code, lino, _exit);
×
655
    }
656

657
    SEncoder encoder;
658
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
16✔
659
    if ((code = tEncodeSBatchDeleteReq(&encoder, pDelReq)) < 0) {
16!
660
      tEncoderClear(&encoder);
×
661
      rpcFreeCont(pBuf);
×
662
      TSDB_CHECK_CODE(code, lino, _exit);
×
663
    }
664
    tEncoderClear(&encoder);
16✔
665

666
    ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
16✔
667

668
    SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)};
16✔
669
    code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
16✔
670
    TSDB_CHECK_CODE(code, lino, _exit);
16!
671
  }
672

673
_exit:
×
674
  taosArrayDestroy(pDelReq->deleteReqs);
16✔
675
  if (code) {
16!
676
    smaError("vgId:%d, failed at line %d to process delete req for table:%" PRIi64 ", level:%" PRIi8 " since %s",
×
677
             SMA_VID(pSma), lino, suid, level, tstrerror(code));
678
  }
679

680
  TAOS_RETURN(code);
16✔
681
}
682

683
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
724✔
684
                                         int32_t execType, int8_t *streamFlushed) {
685
  int32_t      code = 0;
724✔
686
  int32_t      lino = 0;
724✔
687
  SSDataBlock *output = NULL;
724✔
688
  SArray      *pResList = pItem->pResList;
724✔
689
  STSchema    *pTSchema = pInfo->pTSchema;
724✔
690
  int64_t      suid = pInfo->suid;
724✔
691

692
  while (1) {
668✔
693
    uint64_t ts;
694
    bool     hasMore = false;
1,392✔
695
    code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL, false);
1,392✔
696
    if (code == TSDB_CODE_QRY_IN_EXEC) {
1,392!
697
      code = 0;
×
698
      break;
724✔
699
    }
700
    TSDB_CHECK_CODE(code, lino, _exit);
1,392!
701

702
    if (taosArrayGetSize(pResList) == 0) {
1,392✔
703
      break;
724✔
704
    }
705

706
    for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
1,338✔
707
      output = taosArrayGetP(pResList, i);
670✔
708
      if (output->info.type == STREAM_CHECKPOINT) {
670✔
709
        if (streamFlushed) *streamFlushed = 1;
28!
710
        continue;
46✔
711
      } else if (output->info.type == STREAM_DELETE_RESULT) {
642✔
712
        SBatchDeleteReq deleteReq = {.suid = suid, .level = pItem->level};
16✔
713
        deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
16✔
714
        if (!deleteReq.deleteReqs) {
16!
715
          code = terrno;
×
716
          TSDB_CHECK_CODE(code, lino, _exit);
×
717
        }
718
        code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, "", true);
16✔
719
        TSDB_CHECK_CODE(code, lino, _exit);
16!
720
        code = tdRSmaProcessDelReq(pSma, suid, pItem->level, &deleteReq);
16✔
721
        TSDB_CHECK_CODE(code, lino, _exit);
16!
722
        continue;
16✔
723
      }
724

725
      smaDebug("vgId:%d, result block, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
626!
726
               ", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64,
727
               SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid,
728
               pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows);
729

730
      if (STREAM_GET_ALL == execType) {
626✔
731
        /**
732
         * 1. reset the output version when reboot
733
         * 2. delete msg version not updated from the result
734
         */
735
        if (output->info.version < pItem->submitReqVer) {
20✔
736
          // submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously
737
          output->info.version = pItem->submitReqVer;
2✔
738
        } else if (output->info.version == pItem->fetchResultVer) {
18✔
739
          smaWarn("vgId:%d, result block, skip dup version, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64
2!
740
                  ", fetchResultVer:%" PRIi64 ", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64
741
                  ", rows:%" PRIi64,
742
                  SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid,
743
                  pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows);
744
          continue;
2✔
745
        }
746
      }
747

748
      STsdb       *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
624✔
749
      SSubmitReq2 *pReq = NULL;
624✔
750

751
      TAOS_CHECK_EXIT(
624!
752
          buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid));
753

754
      if (pReq && (code = tdProcessSubmitReq(sinkTsdb, output->info.version, pReq)) < 0) {
624!
755
        if (code == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) {
756
          // TODO: reconfigure SSubmitReq2
757
        }
758
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
759
        taosMemoryFreeClear(pReq);
×
760
        TSDB_CHECK_CODE(code, lino, _exit);
×
761
      }
762

763
      if (STREAM_GET_ALL == execType) {
624✔
764
        atomic_store_64(&pItem->fetchResultVer, output->info.version);
18✔
765
      }
766

767
      smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level:%" PRIi8
624!
768
               ", execType:%d, ver:%" PRIi64,
769
               SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, execType, output->info.version);
770

771
      if (pReq) {
624!
772
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
624✔
773
        taosMemoryFree(pReq);
624!
774
      }
775
    }
776
  }
777
_exit:
724✔
778
  if (code) {
724!
779
    smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64
×
780
             ", ver:%" PRIi64,
781
             SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1,
782
             output ? output->info.version : -1);
783
  } else {
784
    smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level);
724!
785
  }
786
  qCleanExecTaskBlockBuf(taskInfo);
724✔
787
  return code;
724✔
788
}
789

790
/**
791
 * @brief Copy msg to rsmaQueueBuffer for batch process
792
 *
793
 * @param pSma
794
 * @param version
795
 * @param pMsg
796
 * @param len
797
 * @param inputType
798
 * @param pInfo
799
 * @param suid
800
 * @return int32_t
801
 */
802
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
341✔
803
                                      SRSmaInfo *pInfo, tb_uid_t suid) {
804
  int32_t code = 0;
341✔
805
  int32_t lino = 0;
341✔
806
  int32_t size = RSMA_EXEC_MSG_HLEN + len;  // header + payload
341✔
807
  void   *qItem;
808

809
  TAOS_CHECK_RETURN(taosAllocateQitem(size, DEF_QITEM, 0, (void **)&qItem));
341!
810

811
  void *pItem = qItem;
341✔
812

813
  *(int8_t *)pItem = (int8_t)inputType;
341✔
814
  pItem = POINTER_SHIFT(pItem, sizeof(int8_t));
341✔
815
  *(int32_t *)pItem = len;
341✔
816
  pItem = POINTER_SHIFT(pItem, sizeof(int32_t));
341✔
817
  *(int64_t *)pItem = version;
341✔
818
  (void)memcpy(POINTER_SHIFT(pItem, sizeof(int64_t)), pMsg, len);
341✔
819

820
  TAOS_CHECK_RETURN(taosWriteQitem(pInfo->queue, qItem));
341!
821

822
  pInfo->lastRecv = taosGetTimestampMs();
341✔
823

824
  SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
341✔
825

826
  int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1);
341✔
827

828
  if (atomic_load_8(&pInfo->assigned) == 0) {
341✔
829
    if (tsem_post(&(pRSmaStat->notEmpty)) != 0) {
138!
830
      smaError("vgId:%d, failed to post notEmpty semaphore for rsma %" PRIi64 " since %s", SMA_VID(pSma), suid,
×
831
               tstrerror(terrno));
832
    }
833
  }
834

835
  // smoothing consume
836
  int32_t n = nItems / RSMA_EXEC_SMOOTH_SIZE;
341✔
837
  if (n > 1) {
341!
838
    if (n > 10) {
×
839
      n = 10;
×
840
    }
841
    taosMsleep(n << 3);
×
842
    if (n > 5) {
×
843
      smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
×
844
              taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 3);
845
    }
846
  }
847

848
  return TSDB_CODE_SUCCESS;
341✔
849
}
850

851
#if 0
852
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
853
  SSubmitMsgIter msgIter = {0};
854
  SSubmitBlkIter blkIter = {0};
855
  STSRow        *row = NULL;
856
  TAOS_CHECK_RETURN(tInitSubmitMsgIter(pReq, &msgIter));
857
  while (true) {
858
    SSubmitBlk *pBlock = NULL;
859
    TAOS_CHECK_RETURN(tGetSubmitMsgNext(&msgIter, &pBlock));
860
    if (pBlock == NULL) break;
861
    tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
862
    while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
863
      smaDebug("vgId:%d, numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64,
864
               SMA_VID(pSma), msgIter.numOfRows, msgIter.suid, msgIter.uid, pReq->version, row->ts);
865
    }
866
  }
867
  return 0;
868
}
869
#endif
870

871
/**
872
 * @brief sync mode
873
 *
874
 * @param pSma
875
 * @param pMsg
876
 * @param msgSize
877
 * @param inputType
878
 * @param pInfo
879
 * @param type
880
 * @param level
881
 * @return int32_t
882
 */
883
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
640✔
884
                                 SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) {
885
  int32_t        code = 0;
640✔
886
  int32_t        idx = level - 1;
640✔
887
  void          *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
640✔
888
  SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
640✔
889

890
  if (!qTaskInfo) {
640!
891
    smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
×
892
             pInfo->suid);
893
    return TSDB_CODE_SUCCESS;
×
894
  }
895
  if (!pInfo->pTSchema) {
640!
896
    smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid);
×
897
    TAOS_RETURN(TSDB_CODE_INVALID_PTR);
×
898
  }
899

900
  smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p, suid:%" PRIu64 ", nMsg:%d, submitReqVer:%" PRIi64
640!
901
           ", inputType:%d",
902
           SMA_VID(pSma), level, RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize, version, inputType);
903

904
  if ((code = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) {
640!
905
    smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(code));
×
906
    TAOS_RETURN(TSDB_CODE_FAILED);
×
907
  }
908

909
  atomic_store_64(&pItem->submitReqVer, version);
640✔
910

911
  TAOS_CHECK_RETURN(tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL));
640!
912

913
  TAOS_RETURN(code);
640✔
914
}
915

916
/**
917
 * @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
918
 *
919
 * @param pSma
920
 * @param suid
921
 */
922
static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo) {
546✔
923
  int32_t    code = 0;
546✔
924
  int32_t    lino = 0;
546✔
925
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
546✔
926
  SRSmaStat *pStat = NULL;
546✔
927
  SRSmaInfo *pRSmaInfo = NULL;
546✔
928

929
  *ppRSmaInfo = NULL;
546✔
930

931
  if (!pEnv) {
546!
932
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_ENV);
×
933
  }
934

935
  pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
546✔
936
  if (!pStat || !RSMA_INFO_HASH(pStat)) {
546!
937
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
938
  }
939

940
  taosRLockLatch(SMA_ENV_LOCK(pEnv));
546✔
941
  pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
546✔
942
  if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
546!
943
    if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
546!
944
      taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
×
945
      TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
946
    }
947

948
    tdRefRSmaInfo(pSma, pRSmaInfo);
949
    taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
546✔
950
    if (pRSmaInfo->suid != suid) {
546!
951
      TAOS_RETURN(TSDB_CODE_APP_ERROR);
×
952
    }
953
    *ppRSmaInfo = pRSmaInfo;
546✔
954
    TAOS_RETURN(code);
546✔
955
  }
956
  taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
×
957

958
  TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
959
}
960

961
static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
962
  if (pInfo) {
7!
963
    tdUnRefRSmaInfo(pSma, pInfo);
964
  }
965
}
546✔
966

967
/**
968
 * @brief async mode
969
 *
970
 * @param pSma
971
 * @param version
972
 * @param pMsg
973
 * @param inputType
974
 * @param suid
975
 * @return int32_t
976
 */
977
static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
341✔
978
                                  tb_uid_t suid) {
979
  int32_t    code = 0;
341✔
980
  SRSmaInfo *pRSmaInfo = NULL;
341✔
981

982
  code = tdAcquireRSmaInfoBySuid(pSma, suid, &pRSmaInfo);
341✔
983
  if (code != 0) {
341!
984
    smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
×
985
    TAOS_RETURN(TSDB_CODE_SUCCESS);  // return success
×
986
  }
987

988
  if (inputType == STREAM_INPUT__DATA_SUBMIT || inputType == STREAM_INPUT__REF_DATA_BLOCK) {
341!
989
    if ((code = tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid)) < 0) {
341!
990
      tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
991
      TAOS_RETURN(code);
×
992
    }
993
    if (smaMgmt.tmrHandle) {
341!
994
      SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0);
341✔
995
      if (pItem->level > 0) {
341!
996
        atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
341✔
997
      }
998
      pItem = RSMA_INFO_ITEM(pRSmaInfo, 1);
341✔
999
      if (pItem->level > 0) {
341!
1000
        atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
341✔
1001
      }
1002
    }
1003
  } else {
1004
    code = TSDB_CODE_APP_ERROR;
×
1005
    tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
1006
    smaError("vgId:%d, execute rsma, failed for suid:%" PRIu64 " since %s, type:%d", SMA_VID(pSma), suid,
×
1007
             tstrerror(code), inputType);
1008
    TAOS_RETURN(code);
×
1009
  }
1010

1011
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
341!
1012
  return TSDB_CODE_SUCCESS;
341✔
1013
}
1014

1015
int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
11,128,117✔
1016
  if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
11,128,117✔
1017

1018
  int32_t code = 0;
131✔
1019
  if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
131!
1020
    smaError("vgId:%d, failed to process rsma submit since invalid exec code:%s", SMA_VID(pSma), tstrerror(code));
×
1021
    goto _exit;
×
1022
  }
1023

1024
  STbUidStore uidStore = {0};
312✔
1025

1026
  if ((code = tdFetchSubmitReqSuids(pReq, &uidStore)) < 0) {
312!
1027
    smaError("vgId:%d, failed to process rsma submit fetch suid since %s", SMA_VID(pSma), tstrerror(code));
×
1028
    goto _exit;
×
1029
  }
1030

1031
  if (uidStore.suid != 0) {
312!
1032
    if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, uidStore.suid)) < 0) {
312!
1033
      smaError("vgId:%d, failed to process rsma submit exec 1 since %s", SMA_VID(pSma), tstrerror(code));
×
1034
      goto _exit;
×
1035
    }
1036

1037
    void *pIter = NULL;
312✔
1038
    while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
312!
1039
      tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
×
1040
      if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, *pTbSuid)) < 0) {
×
1041
        smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), tstrerror(code));
×
1042
        taosHashCancelIterate(uidStore.uidHash, pIter);
×
1043
        goto _exit;
×
1044
      }
1045
    }
1046
  }
1047
_exit:
312✔
1048
  tdUidStoreDestory(&uidStore);
312✔
1049
  TAOS_RETURN(code);
312✔
1050
}
1051

1052
int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
68,434✔
1053
  if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
68,434✔
1054

1055
  int32_t code = 0;
28✔
1056
  if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
28!
1057
    smaError("vgId:%d, failed to process rsma delete since invalid exec code:%s", SMA_VID(pSma), tstrerror(code));
×
1058
    goto _exit;
×
1059
  }
1060

1061
  SDeleteRes *pDelRes = pReq;
29✔
1062
  if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__REF_DATA_BLOCK, pDelRes->suid)) < 0) {
29!
1063
    smaError("vgId:%d, failed to process rsma submit exec 1 since %s", SMA_VID(pSma), tstrerror(code));
×
1064
    goto _exit;
×
1065
  }
1066
_exit:
29✔
1067
  TAOS_RETURN(code);
29✔
1068
}
1069

1070
/**
1071
 * @brief retrieve rsma meta and init
1072
 *
1073
 * @param pSma
1074
 * @param nTables number of tables of rsma
1075
 * @return int32_t
1076
 */
1077
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
17✔
1078
  int32_t     code = 0;
17✔
1079
  int32_t     lino = 0;
17✔
1080
  SVnode     *pVnode = pSma->pVnode;
17✔
1081
  SArray     *suidList = NULL;
17✔
1082
  STbUidStore uidStore = {0};
17✔
1083
  SMetaReader mr = {0};
17✔
1084
  tb_uid_t    suid = 0;
17✔
1085

1086
  if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
17!
1087
    code = terrno;
×
1088
    TSDB_CHECK_CODE(code, lino, _exit);
×
1089
  }
1090

1091
  TAOS_CHECK_EXIT(vnodeGetStbIdList(pSma->pVnode, 0, suidList));
17!
1092

1093
  int64_t arrSize = taosArrayGetSize(suidList);
17✔
1094

1095
  if (arrSize == 0) {
17✔
1096
    if (nTables) {
7!
1097
      *nTables = 0;
7✔
1098
    }
1099
    taosArrayDestroy(suidList);
7✔
1100
    smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode));
7!
1101
    return TSDB_CODE_SUCCESS;
7✔
1102
  }
1103

1104
  int64_t nRsmaTables = 0;
10✔
1105
  metaReaderDoInit(&mr, SMA_META(pSma), META_READER_LOCK);
10✔
1106
  if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) {
10!
1107
    code = terrno;
×
1108
    TSDB_CHECK_CODE(code, lino, _exit);
×
1109
  }
1110

1111
  for (int64_t i = 0; i < arrSize; ++i) {
20✔
1112
    suid = *(tb_uid_t *)taosArrayGet(suidList, i);
10✔
1113
    smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
10!
1114
    if (metaReaderGetTableEntryByUidCache(&mr, suid) < 0) {
10!
1115
      code = terrno;
×
1116
      TSDB_CHECK_CODE(code, lino, _exit);
×
1117
    }
1118
    tDecoderClear(&mr.coder);
10✔
1119
    if (mr.me.type != TSDB_SUPER_TABLE) {
10!
1120
      code = TSDB_CODE_RSMA_INVALID_SCHEMA;
×
1121
      TSDB_CHECK_CODE(code, lino, _exit);
×
1122
    }
1123
    if (mr.me.uid != suid) {
10!
1124
      code = TSDB_CODE_RSMA_INVALID_SCHEMA;
×
1125
      TSDB_CHECK_CODE(code, lino, _exit);
×
1126
    }
1127
    if (TABLE_IS_ROLLUP(mr.me.flags)) {
10!
1128
      ++nRsmaTables;
10✔
1129
      SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
10✔
1130
      for (int i = 0; i < TSDB_RETENTION_L2; ++i) {
30✔
1131
        smaDebug("vgId:%d, rsma restore, table:%" PRIi64 " level:%d, maxdelay:%" PRIi64 " watermark:%" PRIi64
20!
1132
                 " qmsgLen:%" PRIi32,
1133
                 TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]);
1134
      }
1135
      TAOS_CHECK_EXIT(tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name));
10!
1136
#if 0
1137
      // reload all ctbUids for suid
1138
      uidStore.suid = suid;
1139
      if (vnodeGetCtbIdList(pVnode, suid, uidStore.tbUids) < 0) {
1140
        code = terrno;
1141
        TSDB_CHECK_CODE(code, lino, _exit);
1142
      }
1143

1144
      if (tdUpdateTbUidList(pVnode->pSma, &uidStore, true) < 0) {
1145
        code = terrno;
1146
        TSDB_CHECK_CODE(code, lino, _exit);
1147
      }
1148

1149
      taosArrayClear(uidStore.tbUids);
1150
#endif
1151
      smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid);
10!
1152
    }
1153
  }
1154

1155
  if (nTables) {
10!
1156
    *nTables = nRsmaTables;
10✔
1157
  }
1158
_exit:
×
1159
  if (code) {
10!
1160
    smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", type:%" PRIi8 ", uid:%" PRIi64, TD_VID(pVnode),
×
1161
             __func__, lino, tstrerror(code), suid, mr.me.type, mr.me.uid);
1162
  }
1163
  metaReaderClear(&mr);
10✔
1164
  taosArrayDestroy(suidList);
10✔
1165
  tdUidStoreDestory(&uidStore);
10✔
1166
  TAOS_RETURN(code);
10✔
1167
}
1168

1169
/**
1170
 * N.B. the data would be restored from the unified WAL replay procedure
1171
 */
1172
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) {
17✔
1173
  int32_t code = 0;
17✔
1174
  int32_t lino = 0;
17✔
1175
  int64_t nTables = 0;
17✔
1176

1177
  // step 1: init env
1178
  TAOS_CHECK_EXIT(tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP));
17!
1179

1180
  // step 2: iterate all stables to restore the rsma env
1181
  TAOS_CHECK_EXIT(tdRSmaRestoreQTaskInfoInit(pSma, &nTables));
17!
1182

1183
_exit:
17✔
1184
  if (code) {
17!
1185
    smaError("vgId:%d, restore rsma task %" PRIi8 "from qtaskf %" PRIi64 " failed since %s", SMA_VID(pSma), type,
×
1186
             qtaskFileVer, tstrerror(code));
1187
  } else {
1188
    smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed, nTables:%" PRIi64, SMA_VID(pSma),
17!
1189
            type, qtaskFileVer, nTables);
1190
  }
1191

1192
  TAOS_RETURN(code);
17✔
1193
}
1194

1195
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
14✔
1196
  int32_t code = 0;
14✔
1197
  int32_t lino = 0;
14✔
1198
  int32_t nTaskInfo = 0;
14✔
1199
  SSma   *pSma = pRSmaStat->pSma;
14✔
1200
  SVnode *pVnode = pSma->pVnode;
14✔
1201

1202
  if (taosHashGetSize(pInfoHash) <= 0) {
14!
1203
    return TSDB_CODE_SUCCESS;
×
1204
  }
1205

1206
  // stream state: trigger checkpoint
1207
  do {
1208
    void *infoHash = NULL;
14✔
1209
    while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
28✔
1210
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
14✔
1211
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
14!
1212
        continue;
×
1213
      }
1214
      for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
42✔
1215
        if (pRSmaInfo->taskInfo[i]) {
28!
1216
          code = qSetSMAInput(pRSmaInfo->taskInfo[i], pRSmaStat->blocks, 1, STREAM_INPUT__CHECKPOINT);
28✔
1217
          if (code) {
28!
1218
            taosHashCancelIterate(pInfoHash, infoHash);
×
1219
            TSDB_CHECK_CODE(code, lino, _exit);
×
1220
          }
1221
          pRSmaInfo->items[i].streamFlushed = 0;
28✔
1222
          ++nTaskInfo;
28✔
1223
        }
1224
      }
1225
    }
1226
  } while (0);
1227

1228
  // stream state: wait checkpoint ready in async mode
1229
  do {
1230
    int32_t nStreamFlushed = 0;
14✔
1231
    int32_t nSleep = 0;
14✔
1232
    void   *infoHash = NULL;
14✔
1233
    while (true) {
1234
      while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
42✔
1235
        SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
28✔
1236
        if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
28!
1237
          continue;
×
1238
        }
1239
        for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
70✔
1240
          if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) {
56!
1241
            int8_t streamFlushed = 0;
56✔
1242
            code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo,
56✔
1243
                                             STREAM_CHECKPOINT, &streamFlushed);
1244
            if (code) {
56!
1245
              taosHashCancelIterate(pInfoHash, infoHash);
×
1246
              TSDB_CHECK_CODE(code, lino, _exit);
×
1247
            }
1248

1249
            if (streamFlushed) {
56✔
1250
              pRSmaInfo->items[i].streamFlushed = 1;
28✔
1251
              if (++nStreamFlushed >= nTaskInfo) {
28✔
1252
                smaInfo("vgId:%d, rsma commit, checkpoint ready, %d us consumed, received/total:%d/%d", TD_VID(pVnode),
14!
1253
                        nSleep * 10, nStreamFlushed, nTaskInfo);
1254
                taosHashCancelIterate(pInfoHash, infoHash);
14✔
1255
                goto _checkpoint;
14✔
1256
              }
1257
            }
1258
          }
1259
        }
1260
      }
1261
      taosUsleep(10);
14✔
1262
      ++nSleep;
14✔
1263
      smaDebug("vgId:%d, rsma commit, wait for checkpoint ready, %d us elapsed, received/total:%d/%d", TD_VID(pVnode),
14!
1264
               nSleep * 10, nStreamFlushed, nTaskInfo);
1265
    }
1266
  } while (0);
1267

1268
_checkpoint:
14✔
1269
  // stream state: build checkpoint in backend
1270
  do {
1271
    SStreamMeta *pMeta = NULL;
14✔
1272
    int64_t      checkpointId = taosGetTimestampNs();
14✔
1273
    bool         checkpointBuilt = false;
14✔
1274
    void        *infoHash = NULL;
14✔
1275
    while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
28✔
1276
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
14✔
1277
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
14!
1278
        continue;
×
1279
      }
1280

1281
      for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
42✔
1282
        SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
28✔
1283
        if (pItem && pItem->pStreamTask) {
28!
1284
          SStreamTask *pTask = pItem->pStreamTask;
28✔
1285
          // atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
1286
          TAOS_UNUSED(streamTaskSetActiveCheckpointInfo(pTask, checkpointId));
28✔
1287

1288
          pTask->chkInfo.checkpointId = checkpointId;  // 1pTask->checkpointingId;
28✔
1289
          pTask->chkInfo.checkpointVer = pItem->submitReqVer;
28✔
1290
          pTask->info.delaySchedParam = pItem->fetchResultVer;
28✔
1291
          pTask->info.taskLevel = TASK_LEVEL_SMA;
28✔
1292

1293
          if (!checkpointBuilt) {
28✔
1294
            // the stream states share one checkpoint
1295
            code = streamTaskBuildCheckpoint(pTask);
14✔
1296
            if (code) {
14!
1297
              taosHashCancelIterate(pInfoHash, infoHash);
×
1298
              TSDB_CHECK_CODE(code, lino, _exit);
×
1299
            }
1300
            pMeta = pTask->pMeta;
14✔
1301
            checkpointBuilt = true;
14✔
1302
          }
1303

1304
          streamMetaWLock(pMeta);
28✔
1305
          if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
28!
1306
            streamMetaWUnLock(pMeta);
×
1307
            taosHashCancelIterate(pInfoHash, infoHash);
×
1308
            TSDB_CHECK_CODE(code, lino, _exit);
×
1309
          }
1310
          streamMetaWUnLock(pMeta);
28✔
1311
          smaDebug("vgId:%d, rsma commit, succeed to commit task:%p, submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
28!
1312
                   ", table:%" PRIi64 ", level:%d",
1313
                   TD_VID(pVnode), pTask, pItem->submitReqVer, pItem->fetchResultVer, pRSmaInfo->suid, i + 1);
1314
        }
1315
      }
1316
    }
1317
    if (pMeta) {
14!
1318
      streamMetaWLock(pMeta);
14✔
1319
      if ((code = streamMetaCommit(pMeta)) != 0) {
14!
1320
        streamMetaWUnLock(pMeta);
×
1321
        if (code == -1) code = TSDB_CODE_OUT_OF_MEMORY;
×
1322
        TSDB_CHECK_CODE(code, lino, _exit);
×
1323
      }
1324
      streamMetaWUnLock(pMeta);
14✔
1325
    }
1326
    if (checkpointBuilt) {
14!
1327
      smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint:%" PRIi64, TD_VID(pVnode), checkpointId);
14!
1328
    }
1329
  } while (0);
1330
_exit:
×
1331
  if (code) {
14!
1332
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
1333
  }
1334

1335
  TAOS_RETURN(code);
14✔
1336
}
1337

1338
/**
1339
 * @brief trigger to get rsma result in async mode
1340
 *
1341
 * @param param
1342
 * @param tmrId
1343
 */
1344
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
198✔
1345
  SRSmaRef      *pRSmaRef = NULL;
198✔
1346
  SSma          *pSma = NULL;
198✔
1347
  SRSmaStat     *pStat = NULL;
198✔
1348
  SRSmaInfo     *pRSmaInfo = NULL;
198✔
1349
  SRSmaInfoItem *pItem = NULL;
198✔
1350
  int32_t        code = 0;
198✔
1351

1352
  if (!(pRSmaRef = taosHashGet(smaMgmt.refHash, &param, POINTER_BYTES))) {
198!
1353
    smaDebug("rsma fetch task not start since rsma info item:%p not exist in refHash:%p, rsetId:%d", param,
×
1354
             smaMgmt.refHash, smaMgmt.rsetId);
UNCOV
1355
    return;
×
1356
  }
1357

1358
  if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) {
198!
1359
    smaWarn("rsma fetch task not start since rsma stat already destroyed, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1360
            pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1361
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1362
    return;
×
1363
  }
1364

1365
  pSma = pStat->pSma;
198✔
1366

1367
  if ((code = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid, &pRSmaInfo)) != 0) {
198!
1368
    smaDebug("rsma fetch task not start since rsma info not exist, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1369
             pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1370
    TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1371
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1372
    return;
×
1373
  }
1374

1375
  if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
198!
1376
    smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1377
             pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1378
    tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
1379
    TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1380
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1381
    return;
×
1382
  }
1383

1384
  pItem = *(SRSmaInfoItem **)&param;
198✔
1385

1386
  // if rsma trigger stat in paused, cancelled or finished, not start fetch task
1387
  int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
198✔
1388
  switch (rsmaTriggerStat) {
198!
UNCOV
1389
    case TASK_TRIGGER_STAT_PAUSED:
×
1390
    case TASK_TRIGGER_STAT_CANCELLED: {
UNCOV
1391
      smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8
×
1392
               ", rsetId:%d refId:%" PRIi64,
1393
               SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaRef->refId);
UNCOV
1394
      if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
×
UNCOV
1395
        bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
×
UNCOV
1396
        if (!ret) {
×
UNCOV
1397
          smaWarn("vgId:%d, rsma fetch task not reset for level %" PRIi8
×
1398
                  " since tmr reset failed, rsetId:%d refId:%" PRIi64,
1399
                  SMA_VID(pSma), pItem->level, smaMgmt.rsetId, pRSmaRef->refId);
1400
        }
1401
      }
UNCOV
1402
      tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
UNCOV
1403
      TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
UNCOV
1404
      return;
×
1405
    }
1406
    default:
198✔
1407
      break;
198✔
1408
  }
1409

1410
  int8_t fetchTriggerStat =
1411
      atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
198✔
1412
  switch (fetchTriggerStat) {
198!
1413
    case TASK_TRIGGER_STAT_ACTIVE: {
40✔
1414
      smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
40!
1415
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1416
      // async process
1417
      atomic_store_8(&pItem->fetchLevel, 1);
40✔
1418

1419
      if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
40✔
1420
        if (tsem_post(&(pStat->notEmpty)) != 0) {
39!
1421
          smaError("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since sem post failed",
×
1422
                   SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1423
        }
1424
      }
1425
    } break;
40✔
1426
    case TASK_TRIGGER_STAT_INACTIVE: {
114✔
1427
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive ",
114!
1428
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1429
    } break;
114✔
1430
    case TASK_TRIGGER_STAT_INIT: {
44✔
1431
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is init",
44!
1432
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1433
    } break;
44✔
1434
    default: {
×
1435
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat:%" PRIi8
×
1436
               " is unknown",
1437
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid, fetchTriggerStat);
1438
    } break;
×
1439
  }
1440

1441
_end:
198✔
1442
  taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
198✔
1443
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
198!
1444
  TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
198✔
1445
}
1446

1447
static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) {
320✔
1448
  int32_t arrSize = taosArrayGetSize(pItems);
320✔
1449
  if (type == STREAM_INPUT__MERGED_SUBMIT) {
320✔
1450
    for (int32_t i = 0; i < arrSize; ++i) {
624✔
1451
      SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
312✔
1452
      taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_EXEC_MSG_HLEN));
312✔
1453
    }
1454
  } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
8!
1455
    for (int32_t i = 0; i < arrSize; ++i) {
16✔
1456
      SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
8✔
1457
      blockDataDestroy(packData->pDataBlock);
8✔
1458
    }
1459
  } else {
1460
    smaWarn("%s:%d unknown type:%d", __func__, __LINE__, type);
×
1461
  }
1462
  taosArrayClear(pItems);
320✔
1463
}
320✔
1464

1465
/**
1466
 * @brief fetch rsma result(consider the efficiency and functionality)
1467
 *
1468
 * @param pSma
1469
 * @param pInfo
1470
 * @return int32_t
1471
 */
1472
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
40✔
1473
  int32_t     code = 0;
40✔
1474
  int32_t     lino = 0;
40✔
1475
  SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
40✔
1476
  for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
120✔
1477
    SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
80✔
1478

1479
    if (1 == atomic_val_compare_exchange_8(&pItem->fetchLevel, 1, 0)) {
80✔
1480
      qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1);
40✔
1481
      if (!taskInfo) {
40!
1482
        continue;
×
1483
      }
1484

1485
      if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
40!
1486
        smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch executed",
×
1487
                 SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
1488
      } else {
1489
        int64_t curMs = taosGetTimestampMs();
40✔
1490
        if ((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX) {
40✔
1491
          smaTrace("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
12!
1492
                   SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
1493
          atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);  // restore the active stat
12✔
1494
          continue;
12✔
1495
        } else {
1496
          smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
28!
1497
                   SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
1498
        }
1499
      }
1500

1501
      pItem->nScanned = 0;
28✔
1502

1503
      TAOS_CHECK_EXIT(qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK));
28!
1504
      if ((code = tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, NULL)) < 0) {
28!
1505
        atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, code);
×
1506
        goto _exit;
×
1507
      }
1508

1509
      smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch finished",
28!
1510
               SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
1511
    } else {
1512
      smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32
40!
1513
               " maxDelay:%d, fetch not executed as fetch level is %" PRIi8,
1514
               SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay, pItem->fetchLevel);
1515
    }
1516
  }
1517

1518
_exit:
40✔
1519
  TAOS_RETURN(code);
40✔
1520
}
1521

1522
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
236✔
1523
  void   *msg = NULL;
236✔
1524
  int8_t  resume = 0;
236✔
1525
  int32_t nSubmit = 0;
236✔
1526
  int32_t nDelete = 0;
236✔
1527
  int64_t version = 0;
236✔
1528
  int32_t code = 0;
236✔
1529
  int32_t lino = 0;
236✔
1530

1531
  SPackedData packData;
1532

1533
  taosArrayClear(pSubmitArr);
236✔
1534

1535
  // the submitReq/deleteReq msg may exsit alternately in the msg queue, consume them sequentially in batch mode
1536
  while (1) {
1537
    TAOS_UNUSED(taosGetQitem(qall, (void **)&msg));
556✔
1538
    if (msg) {
556✔
1539
      int8_t inputType = RSMA_EXEC_MSG_TYPE(msg);
338✔
1540
      if (inputType == STREAM_INPUT__DATA_SUBMIT) {
338✔
1541
      _resume_submit:
312✔
1542
        packData.msgLen = RSMA_EXEC_MSG_LEN(msg);
312✔
1543
        packData.ver = RSMA_EXEC_MSG_VER(msg);
312✔
1544
        packData.msgStr = RSMA_EXEC_MSG_BODY(msg);
312✔
1545
        version = packData.ver;
312✔
1546
        if (!taosArrayPush(pSubmitArr, &packData)) {
312!
1547
          taosFreeQitem(msg);
×
1548
          TAOS_CHECK_EXIT(terrno);
×
1549
        }
1550
        ++nSubmit;
312✔
1551
      } else if (inputType == STREAM_INPUT__REF_DATA_BLOCK) {
26!
1552
      _resume_delete:
26✔
1553
        version = RSMA_EXEC_MSG_VER(msg);
26✔
1554
        if ((code = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version,
26!
1555
                                          &packData.pDataBlock, 1, STREAM_DELETE_DATA))) {
1556
          taosFreeQitem(msg);
×
1557
          TAOS_CHECK_EXIT(code);
×
1558
        }
1559

1560
        if (packData.pDataBlock && !taosArrayPush(pSubmitArr, &packData)) {
34!
1561
          taosFreeQitem(msg);
×
1562
          TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
1563
        }
1564
        taosFreeQitem(msg);
26✔
1565
        if (packData.pDataBlock) {
26✔
1566
          // packData.pDataBlock is NULL if delete affects 0 row
1567
          ++nDelete;
8✔
1568
        }
1569
      } else {
1570
        smaWarn("%s:%d unknown msg type:%d", __func__, __LINE__, inputType);
×
1571
        break;
×
1572
      }
1573
    }
1574

1575
    if (nSubmit > 0 || nDelete > 0) {
556✔
1576
      int32_t size = TARRAY_SIZE(pSubmitArr);
320✔
1577
      int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK;
320✔
1578
      for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
960✔
1579
        TAOS_CHECK_EXIT(tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, version, inputType, pInfo, type, i));
640!
1580
      }
1581
      tdFreeRSmaSubmitItems(pSubmitArr, inputType);
320✔
1582
      nSubmit = 0;
320✔
1583
      nDelete = 0;
320✔
1584
    } else {
1585
      goto _rtn;
236✔
1586
    }
1587

1588
    if (resume == 2) {
320!
1589
      resume = 0;
×
1590
      goto _resume_delete;
×
1591
    }
1592
  }
1593

1594
_rtn:
236✔
1595
  return TSDB_CODE_SUCCESS;
236✔
1596
_exit:
×
1597
  atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
×
1598
  smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
×
1599
           type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr());
1600
  tdFreeRSmaSubmitItems(pSubmitArr, nSubmit ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK);
×
1601
  while (1) {
×
1602
    void *msg = NULL;
×
1603
    TAOS_UNUSED(taosGetQitem(qall, (void **)&msg));
×
1604
    if (msg) {
×
1605
      taosFreeQitem(msg);
×
1606
    } else {
1607
      break;
×
1608
    }
1609
  }
1610
  TAOS_RETURN(code);
×
1611
}
1612

1613
/**
1614
 * @brief
1615
 *
1616
 * @param pSma
1617
 * @param type
1618
 * @return int32_t
1619
 */
1620

1621
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
679✔
1622
  int32_t    code = 0;
679✔
1623
  int32_t    lino = 0;
679✔
1624
  SVnode    *pVnode = pSma->pVnode;
679✔
1625
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
679✔
1626
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
679✔
1627
  SHashObj  *infoHash = NULL;
679✔
1628
  SArray    *pSubmitArr = NULL;
679✔
1629
  bool       isFetchAll = false;
679✔
1630

1631
  if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) {
679!
1632
    code = TSDB_CODE_RSMA_INVALID_STAT;
×
1633
    TSDB_CHECK_CODE(code, lino, _exit);
×
1634
  }
1635

1636
  if (!(pSubmitArr =
679!
1637
            taosArrayInit(TMIN(RSMA_EXEC_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) {
679✔
1638
    code = terrno;
×
1639
    TSDB_CHECK_CODE(code, lino, _exit);
×
1640
  }
1641

1642
  while (true) {
1643
    // step 1: rsma exec - consume data in buffer queue for all suids
1644
    if (type == RSMA_EXEC_OVERFLOW) {
857!
1645
      void *pIter = NULL;
857✔
1646
      while ((pIter = taosHashIterate(infoHash, pIter))) {
1,034✔
1647
        SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
177✔
1648
        if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
177!
1649
          if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_NEED_FETCH(pInfo)) {
177!
1650
            int32_t batchCnt = -1;
177✔
1651
            int32_t batchMax = taosHashGetSize(infoHash) / tsNumOfVnodeRsmaThreads;
177✔
1652
            bool    occupied = (batchMax <= 1);
177✔
1653
            if (batchMax > 1) {
177!
1654
              batchMax = 100 / batchMax;
×
1655
              batchMax = TMAX(batchMax, 4);
×
1656
            }
1657
            while (occupied || (++batchCnt < batchMax)) {                 // greedy mode
413!
1658
              TAOS_UNUSED(taosReadAllQitems(pInfo->queue, pInfo->qall));  // queue has mutex lock
413✔
1659
              int32_t qallItemSize = taosQallItemSize(pInfo->qall);
413✔
1660
              if (qallItemSize > 0) {
413✔
1661
                if ((code = tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type)) != 0) {
236!
1662
                  taosHashCancelIterate(infoHash, pIter);
×
1663
                  TSDB_CHECK_CODE(code, lino, _exit);
×
1664
                }
1665
                smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi32, SMA_VID(pSma), qallItemSize, type);
236!
1666
              }
1667

1668
              if (RSMA_NEED_FETCH(pInfo)) {
413✔
1669
                int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2);
40✔
1670
                if (oldStat == 0 ||
40!
1671
                    ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
×
1672
                  int32_t oldVal = atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
40✔
1673

1674
                  if (oldVal < 0) {
40!
1675
                    code = TSDB_CODE_APP_ERROR;
×
1676
                    taosHashCancelIterate(infoHash, pIter);
×
1677
                    TSDB_CHECK_CODE(code, lino, _exit);
×
1678
                  }
1679

1680
                  if ((code = tdRSmaFetchAllResult(pSma, pInfo)) != 0) {
40!
1681
                    taosHashCancelIterate(infoHash, pIter);
×
1682
                    TSDB_CHECK_CODE(code, lino, _exit);
×
1683
                  }
1684

1685
                  if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
40!
1686
                    atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
40✔
1687
                  }
1688
                }
1689
              }
1690

1691
              if (qallItemSize > 0) {
413✔
1692
                TAOS_UNUSED(atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize));
236✔
1693
                continue;
236✔
1694
              }
1695
              if (RSMA_NEED_FETCH(pInfo)) {
177!
UNCOV
1696
                continue;
×
1697
              }
1698

1699
              break;
177✔
1700
            }
1701
          }
1702
          TAOS_UNUSED(atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
177✔
1703
        }
1704
      }
1705
    } else {
1706
      smaWarn("%s:%d unknown rsma exec type:%d", __func__, __LINE__, (int32_t)type);
×
1707
      code = TSDB_CODE_APP_ERROR;
×
1708
      TSDB_CHECK_CODE(code, lino, _exit);
×
1709
    }
1710

1711
    if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
856!
1712
      if (pEnv->flag & SMA_ENV_FLG_CLOSE) {
856!
1713
        break;
×
1714
      }
1715

1716
      if (tsem_wait(&pRSmaStat->notEmpty) != 0) {
856!
1717
        smaError("vgId:%d, failed to wait for not empty since %s", TD_VID(pVnode), tstrerror(terrno));
×
1718
      }
1719

1720
      if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
848!
1721
        smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
670!
1722
                 atomic_load_64(&pRSmaStat->nBufItems));
1723
        break;
680✔
1724
      }
1725
    }
1726

1727
  }  // end of while(true)
1728

1729
_exit:
680✔
1730
  taosArrayDestroy(pSubmitArr);
680✔
1731
  if (code) {
680!
1732
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
1733
  }
1734
  TAOS_RETURN(code);
680✔
1735
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc