• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.85
/source/dnode/vnode/src/sma/smaRollup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sma.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
#define RSMA_EXEC_SMOOTH_SIZE   (100)     // cnt
21
#define RSMA_EXEC_BATCH_SIZE    (1024)    // cnt
22
#define RSMA_FETCH_DELAY_MAX    (120000)  // ms
23
#define RSMA_FETCH_ACTIVE_MAX   (1000)    // ms
24
#define RSMA_FETCH_INTERVAL     (5000)    // ms
25
#define RSMA_EXEC_TASK_FLAG     "rsma"
26
#define RSMA_EXEC_MSG_HLEN      (13)  // type(int8_t) + len(int32_t) + version(int64_t)
27
#define RSMA_EXEC_MSG_TYPE(msg) (*(int8_t *)(msg))
28
#define RSMA_EXEC_MSG_LEN(msg)  (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t)))
29
#define RSMA_EXEC_MSG_VER(msg)  (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t)))
30
#define RSMA_EXEC_MSG_BODY(msg) (POINTER_SHIFT((msg), RSMA_EXEC_MSG_HLEN))
31

32
#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel)
33

34
SSmaMgmt smaMgmt = {
35
    .inited = 0,
36
    .rsetId = -1,
37
};
38

39
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
40

41
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
42
static void    tdUidStoreDestory(STbUidStore *pStore);
43
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
44
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
45
                                       int8_t idx);
46
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
47
                                 SRSmaInfo *pInfo, ERsmaExecType type, int8_t level);
48
static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo);
49
static void    tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
50
static void    tdFreeRSmaSubmitItems(SArray *pItems, int32_t type);
51
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
52
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
53
                                         int32_t execType, int8_t *streamFlushed);
54
static void    tdRSmaFetchTrigger(void *param, void *tmrId);
55
static void    tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
56
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
57
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
58
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
59

60
struct SRSmaQTaskInfoItem {
61
  int32_t len;
62
  int8_t  type;
63
  int64_t suid;
64
  void   *qTaskInfo;
65
};
66

UNCOV
67
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
×
68
  // Note: free/kill may in RC
UNCOV
69
  if (!taskHandle || !(*taskHandle)) return;
×
UNCOV
70
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
×
UNCOV
71
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
×
UNCOV
72
    smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
×
UNCOV
73
    qDestroyTask(otaskHandle);
×
74
  }
75
}
76

77
/**
78
 * @brief general function to free rsmaInfo
79
 *
80
 * @param pSma
81
 * @param pInfo
82
 * @return void*
83
 */
UNCOV
84
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
×
UNCOV
85
  if (pInfo) {
×
UNCOV
86
    for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
×
UNCOV
87
      SRSmaInfoItem *pItem = &pInfo->items[i];
×
88

UNCOV
89
      if (pItem->tmrId) {
×
UNCOV
90
        smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
×
91
                 pInfo->suid, i + 1);
UNCOV
92
        if (!taosTmrStopA(&pItem->tmrId)) {
×
93
          smaError("vgId:%d, failed to stop fetch timer for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
×
94
                   i + 1);
95
        }
96
      }
97

UNCOV
98
      if (pItem->pStreamState) {
×
UNCOV
99
        streamStateClose(pItem->pStreamState, false);
×
100
      }
101

UNCOV
102
      if (pItem->pStreamTask) {
×
UNCOV
103
        tFreeStreamTask(pItem->pStreamTask);
×
104
      }
UNCOV
105
      taosArrayDestroy(pItem->pResList);
×
UNCOV
106
      tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
×
107
    }
108

UNCOV
109
    taosMemoryFreeClear(pInfo->pTSchema);
×
110

UNCOV
111
    if (pInfo->queue) {
×
UNCOV
112
      taosCloseQueue(pInfo->queue);
×
UNCOV
113
      pInfo->queue = NULL;
×
114
    }
UNCOV
115
    if (pInfo->qall) {
×
UNCOV
116
      taosFreeQall(pInfo->qall);
×
UNCOV
117
      pInfo->qall = NULL;
×
118
    }
119

UNCOV
120
    taosMemoryFree(pInfo);
×
121
  }
122

UNCOV
123
  return NULL;
×
124
}
125

126
static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
UNCOV
127
  *pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
×
UNCOV
128
  if (*pStore == NULL) {
×
129
    return terrno;
×
130
  }
131

UNCOV
132
  return TSDB_CODE_SUCCESS;
×
133
}
134

UNCOV
135
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd) {
×
UNCOV
136
  SRSmaInfo *pRSmaInfo = NULL;
×
UNCOV
137
  int32_t    code = 0;
×
138

UNCOV
139
  if (!suid || !tbUids) {
×
140
    code = TSDB_CODE_INVALID_PTR;
×
141
    smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), suid ? *suid : -1,
×
142
             tstrerror(code));
143
    TAOS_RETURN(code);
×
144
  }
145

UNCOV
146
  int32_t nTables = taosArrayGetSize(tbUids);
×
147

UNCOV
148
  if (0 == nTables) {
×
149
    smaDebug("vgId:%d, no need to update tbUidList for suid:%" PRIi64 " since Empty tbUids", SMA_VID(pSma), *suid);
×
150
    return TSDB_CODE_SUCCESS;
×
151
  }
152

UNCOV
153
  code = tdAcquireRSmaInfoBySuid(pSma, *suid, &pRSmaInfo);
×
154

UNCOV
155
  if (code != 0) {
×
156
    smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
×
157
    TAOS_RETURN(code);
×
158
  }
159

UNCOV
160
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
×
UNCOV
161
    if (pRSmaInfo->taskInfo[i]) {
×
UNCOV
162
      if ((code = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) {
×
163
        tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
164
        smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
×
165
                 tstrerror(code));
166
        TAOS_RETURN(code);
×
167
      }
UNCOV
168
      smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p. suid:%" PRIi64 " uid:%" PRIi64
×
169
               "nTables:%d level %d",
170
               SMA_VID(pSma), pRSmaInfo->taskInfo[i], *suid, *(int64_t *)TARRAY_GET_ELEM(tbUids, 0), nTables, i);
171
    }
172
  }
173

UNCOV
174
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
UNCOV
175
  TAOS_RETURN(code);
×
176
}
177

178
int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
566✔
179
  int32_t code = 0;
566✔
180
  if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
566!
181
    return TSDB_CODE_SUCCESS;
566✔
182
  }
183

UNCOV
184
  TAOS_CHECK_RETURN(tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids, isAdd));
×
185

UNCOV
186
  void *pIter = NULL;
×
UNCOV
187
  while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
×
188
    tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
×
189
    SArray   *pTbUids = *(SArray **)pIter;
×
190

191
    if ((code = tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd)) != TSDB_CODE_SUCCESS) {
×
192
      taosHashCancelIterate(pStore->uidHash, pIter);
×
193
      TAOS_RETURN(code);
×
194
    }
195
  }
UNCOV
196
  return TSDB_CODE_SUCCESS;
×
197
}
198

199
/**
200
 * @brief fetch suid/uids when create child tables of rollup SMA
201
 *
202
 * @param pTsdb
203
 * @param ppStore
204
 * @param suid
205
 * @param uid
206
 * @return int32_t
207
 */
208
int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) {
566✔
209
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
566✔
210
  int32_t  code = 0;
566✔
211

212
  // only applicable to rollup SMA ctables
213
  if (!pEnv) {
566!
214
    return TSDB_CODE_SUCCESS;
566✔
215
  }
216

UNCOV
217
  SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
×
UNCOV
218
  SHashObj  *infoHash = NULL;
×
UNCOV
219
  if (!pStat || !(infoHash = RSMA_INFO_HASH(pStat))) {
×
220
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
221
  }
222

223
  // info cached when create rsma stable and return directly for non-rsma ctables
UNCOV
224
  if (!taosHashGet(infoHash, &suid, sizeof(tb_uid_t))) {
×
225
    return TSDB_CODE_SUCCESS;
×
226
  }
227

UNCOV
228
  if (!(*ppStore)) {
×
UNCOV
229
    TAOS_CHECK_RETURN(tdUidStoreInit(ppStore));
×
230
  }
231

UNCOV
232
  if ((code = tdUidStorePut(*ppStore, suid, &uid)) < 0) {
×
233
    *ppStore = tdUidStoreFree(*ppStore);
×
234
    TAOS_RETURN(code);
×
235
  }
236

UNCOV
237
  return TSDB_CODE_SUCCESS;
×
238
}
239

UNCOV
240
static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTaskId *pId) {
×
UNCOV
241
  STaskId      id = {.streamId = pId->streamId, .taskId = pId->taskId};
×
UNCOV
242
  SStreamTask *pTask = NULL;
×
243

UNCOV
244
  streamMetaRLock(pMeta);
×
245

UNCOV
246
  int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
×
UNCOV
247
  if (code == 0) {
×
UNCOV
248
    pItem->submitReqVer = pTask->chkInfo.checkpointVer;
×
UNCOV
249
    pItem->fetchResultVer = pTask->info.delaySchedParam;
×
UNCOV
250
    streamMetaReleaseTask(pMeta, pTask);
×
251
  }
252

UNCOV
253
  streamMetaRUnLock(pMeta);
×
UNCOV
254
}
×
255

UNCOV
256
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
×
UNCOV
257
  streamMetaWLock(pMeta);
×
258

UNCOV
259
  int32_t code = streamMetaUnregisterTask(pMeta, streamId, taskId);
×
UNCOV
260
  if (code != 0) {
×
261
    smaError("vgId:%d, rsma task:%" PRIi64 ",%d drop failed since %s", pMeta->vgId, streamId, taskId, tstrerror(code));
×
262
  }
UNCOV
263
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
UNCOV
264
  if (streamMetaCommit(pMeta) < 0) {
×
265
    // persist to disk
266
  }
UNCOV
267
  streamMetaWUnLock(pMeta);
×
UNCOV
268
  smaDebug("vgId:%d, rsma task:%" PRIi64 ",%d dropped, remain tasks:%d", pMeta->vgId, streamId, taskId, numOfTasks);
×
UNCOV
269
}
×
270

UNCOV
271
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
×
272
                                       int8_t idx) {
UNCOV
273
  int32_t code = 0;
×
UNCOV
274
  if ((param->qmsgLen > 0) && param->qmsg[idx]) {
×
UNCOV
275
    SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
×
UNCOV
276
    SRetention    *pRetention = SMA_RETENTION(pSma);
×
UNCOV
277
    STsdbCfg      *pTsdbCfg = SMA_TSDB_CFG(pSma);
×
UNCOV
278
    SVnode        *pVnode = pSma->pVnode;
×
UNCOV
279
    char           taskInfDir[TSDB_FILENAME_LEN] = {0};
×
UNCOV
280
    void          *pStreamState = NULL;
×
281

282
    // set the backend of stream state
UNCOV
283
    tdRSmaQTaskInfoGetFullPath(pVnode, pRSmaInfo->suid, idx + 1, pVnode->pTfs, taskInfDir);
×
284

UNCOV
285
    if (!taosCheckExistFile(taskInfDir)) {
×
UNCOV
286
      char *s = taosStrdup(taskInfDir);
×
UNCOV
287
      if (!s) {
×
288
        TAOS_RETURN(terrno);
×
289
      }
UNCOV
290
      if (taosMulMkDir(s) != 0) {
×
291
        code = TAOS_SYSTEM_ERROR(errno);
×
292
        taosMemoryFree(s);
×
293
        TAOS_RETURN(code);
×
294
      }
UNCOV
295
      taosMemoryFree(s);
×
296
    }
297

UNCOV
298
    SStreamTask *pStreamTask = taosMemoryCalloc(1, sizeof(*pStreamTask));
×
UNCOV
299
    if (!pStreamTask) {
×
300
      return terrno;
×
301
    }
UNCOV
302
    pItem->pStreamTask = pStreamTask;
×
UNCOV
303
    pStreamTask->id.taskId = 0;
×
UNCOV
304
    pStreamTask->id.streamId = pRSmaInfo->suid + idx;
×
UNCOV
305
    pStreamTask->chkInfo.startTs = taosGetTimestampMs();
×
UNCOV
306
    pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
×
UNCOV
307
    pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_EXEC_TASK_FLAG) + 1);
×
UNCOV
308
    if (!pStreamTask->exec.qmsg) {
×
309
      TAOS_RETURN(terrno);
×
310
    }
UNCOV
311
    TAOS_UNUSED(snprintf(pStreamTask->exec.qmsg, strlen(RSMA_EXEC_TASK_FLAG) + 1, "%s", RSMA_EXEC_TASK_FLAG));
×
UNCOV
312
    pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
×
UNCOV
313
    tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
×
314

UNCOV
315
    TAOS_CHECK_RETURN(streamCreateStateMachine(pStreamTask));
×
316

UNCOV
317
    TAOS_CHECK_RETURN(streamTaskCreateActiveChkptInfo(&pStreamTask->chkInfo.pActiveInfo));
×
318

UNCOV
319
    pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId);
×
UNCOV
320
    if (!pStreamState) {
×
321
      TAOS_RETURN(TSDB_CODE_RSMA_STREAM_STATE_OPEN);
×
322
    }
UNCOV
323
    pItem->pStreamState = pStreamState;
×
324

UNCOV
325
    tdRSmaTaskRemove(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId);
×
326

UNCOV
327
    SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .skipRollup = 1, .pStateBackend = pStreamState};
×
UNCOV
328
    initStorageAPI(&handle.api);
×
329

UNCOV
330
    code = qCreateStreamExecTaskInfo(&pRSmaInfo->taskInfo[idx], param->qmsg[idx], &handle, TD_VID(pVnode), 0);
×
UNCOV
331
    if (!pRSmaInfo->taskInfo[idx] || (code != 0)) {
×
332
      TAOS_RETURN(TSDB_CODE_RSMA_QTASKINFO_CREATE);
×
333
    }
334

UNCOV
335
    if (!(pItem->pResList = taosArrayInit(1, POINTER_BYTES))) {
×
336
      TAOS_RETURN(terrno);
×
337
    }
338

UNCOV
339
    if (pItem->fetchResultVer < pItem->submitReqVer) {
×
340
      // fetch the data when reboot
UNCOV
341
      pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE;
×
342
    }
343

UNCOV
344
    if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
×
UNCOV
345
      int64_t msInterval = -1;
×
UNCOV
346
      TAOS_CHECK_RETURN(convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision,
×
347
                                                       TIME_UNIT_MILLISECOND, &msInterval));
UNCOV
348
      pItem->maxDelay = (int32_t)msInterval;
×
349
    } else {
UNCOV
350
      pItem->maxDelay = (int32_t)param->maxdelay[idx];
×
351
    }
UNCOV
352
    if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) {
×
353
      pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY;
×
354
    }
355

UNCOV
356
    pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
×
357

UNCOV
358
    SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
×
UNCOV
359
    if (taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)) != 0) {
×
360
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
361
    }
362

UNCOV
363
    bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
×
UNCOV
364
    if (!ret) {
×
UNCOV
365
      smaError("vgId:%d, failed to reset fetch timer for table %" PRIi64 " level %d", TD_VID(pVnode), pRSmaInfo->suid,
×
366
               idx + 1);
367
    }
368

UNCOV
369
    smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64
×
370
            ", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 ", maxdelay:%" PRIi64 " watermark:%" PRIi64
371
            ", finally maxdelay:%" PRIi32,
372
            TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId,
373
            pItem->submitReqVer, pItem->fetchResultVer, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
374
  }
UNCOV
375
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
376
}
377

378
/**
379
 * @brief for rsam create or restore
380
 *
381
 * @param pSma
382
 * @param param
383
 * @param suid
384
 * @param tbName
385
 * @return int32_t
386
 */
UNCOV
387
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) {
×
UNCOV
388
  int32_t code = 0;
×
UNCOV
389
  int32_t lino = 0;
×
390

UNCOV
391
  if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
×
392
    smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid);
×
393
    return TSDB_CODE_SUCCESS;
×
394
  }
395

UNCOV
396
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
×
UNCOV
397
  SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
×
UNCOV
398
  SRSmaInfo *pRSmaInfo = NULL;
×
399

UNCOV
400
  pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
×
UNCOV
401
  if (pRSmaInfo) {
×
402
    smaInfo("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
×
403
    return TSDB_CODE_SUCCESS;
×
404
  }
405

406
  // from write queue: single thead
UNCOV
407
  pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
×
UNCOV
408
  if (!pRSmaInfo) {
×
409
    return terrno;
×
410
  }
411

412
  STSchema *pTSchema;
UNCOV
413
  code = metaGetTbTSchemaNotNull(SMA_META(pSma), suid, -1, 1, &pTSchema);
×
UNCOV
414
  TAOS_CHECK_EXIT(code);
×
UNCOV
415
  pRSmaInfo->pSma = pSma;
×
UNCOV
416
  pRSmaInfo->pTSchema = pTSchema;
×
UNCOV
417
  pRSmaInfo->suid = suid;
×
UNCOV
418
  T_REF_INIT_VAL(pRSmaInfo, 1);
×
419

UNCOV
420
  TAOS_CHECK_EXIT(taosOpenQueue(&pRSmaInfo->queue));
×
421

UNCOV
422
  TAOS_CHECK_EXIT(taosAllocateQall(&pRSmaInfo->qall));
×
423

UNCOV
424
  TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0));
×
UNCOV
425
  TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1));
×
426

UNCOV
427
  TAOS_CHECK_EXIT(taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)));
×
428

UNCOV
429
_exit:
×
UNCOV
430
  if (code != 0) {
×
431
    TAOS_UNUSED(tdFreeRSmaInfo(pSma, pRSmaInfo));
×
432
  } else {
UNCOV
433
    smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid);
×
434
  }
UNCOV
435
  TAOS_RETURN(code);
×
436
}
437

438
/**
439
 * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam currently
440
 *
441
 * @param pSma
442
 * @param pReq
443
 * @return int32_t
444
 */
445
int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
460✔
446
  SVnode *pVnode = pSma->pVnode;
460✔
447
  if (!pReq->rollup) {
460!
448
    smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since no rollup in req", TD_VID(pVnode), pReq->name,
460!
449
             pReq->suid);
450
    return TSDB_CODE_SUCCESS;
460✔
451
  }
452

453
  if (!VND_IS_RSMA(pVnode)) {
×
454
    smaWarn("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
×
455
            pReq->suid);
456
    return TSDB_CODE_SUCCESS;
×
457
  }
458

459
  return tdRSmaProcessCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name);
×
460
}
461

462
/**
463
 * @brief drop cache for stb
464
 *
465
 * @param pSma
466
 * @param pReq
467
 * @return int32_t
468
 */
469
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
2✔
470
  SVnode *pVnode = pSma->pVnode;
2✔
471
  if (!VND_IS_RSMA(pVnode)) {
2!
472
    smaTrace("vgId:%d, not drop rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
2!
473
             pReq->suid);
474
    return TSDB_CODE_SUCCESS;
2✔
475
  }
476

477
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
×
478
  if (!pSmaEnv) {
×
479
    return TSDB_CODE_SUCCESS;
×
480
  }
481

482
  int32_t    code = 0;
×
483
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
×
484
  SRSmaInfo *pRSmaInfo = NULL;
×
485

486
  code = tdAcquireRSmaInfoBySuid(pSma, pReq->suid, &pRSmaInfo);
×
487

488
  if (code != 0) {
×
489
    smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name,
×
490
            pReq->suid);
491
    return TSDB_CODE_SUCCESS;
×
492
  }
493

494
  // set del flag for data in mem
495
  atomic_store_8(&pRSmaStat->delFlag, 1);
×
496
  RSMA_INFO_SET_DEL(pRSmaInfo);
×
497
  tdUnRefRSmaInfo(pSma, pRSmaInfo);
×
498

499
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
500

501
  // no need to save to file as triggered by dropping stable
502
  smaDebug("vgId:%d, drop rsma for stable %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
×
503
  return TSDB_CODE_SUCCESS;
×
504
}
505

506
/**
507
 * @brief store suid/[uids], prefer to use array and then hash
508
 *
509
 * @param pStore
510
 * @param suid
511
 * @param uid
512
 * @return int32_t
513
 */
UNCOV
514
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
×
515
  // prefer to store suid/uids in array
UNCOV
516
  if ((suid == pStore->suid) || (pStore->suid == 0)) {
×
UNCOV
517
    if (pStore->suid == 0) {
×
UNCOV
518
      pStore->suid = suid;
×
519
    }
UNCOV
520
    if (uid) {
×
UNCOV
521
      if (!pStore->tbUids) {
×
UNCOV
522
        if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) {
×
523
          TAOS_RETURN(terrno);
×
524
        }
525
      }
UNCOV
526
      if (!taosArrayPush(pStore->tbUids, uid)) {
×
527
        TAOS_RETURN(terrno);
×
528
      }
529
    }
530
  } else {
531
    // store other suid/uids in hash when multiple stable/table included in 1 batch of request
532
    if (!pStore->uidHash) {
×
533
      pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
534
      if (!pStore->uidHash) {
×
535
        TAOS_RETURN(terrno);
×
536
      }
537
    }
538
    if (uid) {
×
539
      SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t));
×
540
      if (uidArray && ((uidArray = *(SArray **)uidArray))) {
×
541
        if (!taosArrayPush(uidArray, uid)) {
×
542
          taosArrayDestroy(uidArray);
×
543
          TAOS_RETURN(terrno);
×
544
        }
545
      } else {
546
        SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t));
×
547
        if (!pUidArray) {
×
548
          TAOS_RETURN(terrno);
×
549
        }
550
        if (!taosArrayPush(pUidArray, uid)) {
×
551
          taosArrayDestroy(pUidArray);
×
552
          TAOS_RETURN(terrno);
×
553
        }
554
        TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)));
×
555
      }
556
    } else {
557
      TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0));
×
558
    }
559
  }
UNCOV
560
  return TSDB_CODE_SUCCESS;
×
561
}
562

UNCOV
563
static void tdUidStoreDestory(STbUidStore *pStore) {
×
UNCOV
564
  if (pStore) {
×
UNCOV
565
    if (pStore->uidHash) {
×
566
      if (pStore->tbUids) {
×
567
        // When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
568
        void *pIter = NULL;
×
569
        while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
×
570
          SArray *arr = *(SArray **)pIter;
×
571
          taosArrayDestroy(arr);
×
572
        }
573
      }
574
      taosHashCleanup(pStore->uidHash);
×
575
    }
UNCOV
576
    taosArrayDestroy(pStore->tbUids);
×
577
  }
UNCOV
578
}
×
579

580
void *tdUidStoreFree(STbUidStore *pStore) {
566✔
581
  if (pStore) {
566!
UNCOV
582
    tdUidStoreDestory(pStore);
×
UNCOV
583
    taosMemoryFree(pStore);
×
584
  }
585
  return NULL;
566✔
586
}
587

588
/**
589
 * @brief The SubmitReq for rsma L2/L3 is inserted by tsdbInsertData method directly while not by WriteQ, as the queue
590
 * would be freed when close Vnode, thus lock should be used if with race condition.
591
 * @param pTsdb
592
 * @param version
593
 * @param pReq
594
 * @return int32_t
595
 */
UNCOV
596
static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
×
UNCOV
597
  if (pReq) {
×
UNCOV
598
    SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq;
×
599
    // spin lock for race condition during insert data
UNCOV
600
    TAOS_CHECK_RETURN(tsdbInsertData(pTsdb, version, pSubmitReq, NULL));
×
601
  }
602

UNCOV
603
  return TSDB_CODE_SUCCESS;
×
604
}
605

UNCOV
606
static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
×
UNCOV
607
  SArray *pSubmitTbData = pMsg ? pMsg->aSubmitTbData : NULL;
×
UNCOV
608
  int32_t size = taosArrayGetSize(pSubmitTbData);
×
609

UNCOV
610
  for (int32_t i = 0; i < size; ++i) {
×
UNCOV
611
    SSubmitTbData *pData = TARRAY_GET_ELEM(pSubmitTbData, i);
×
UNCOV
612
    TAOS_CHECK_RETURN(tdUidStorePut(pStore, pData->suid, NULL));
×
613
  }
614

UNCOV
615
  return 0;
×
616
}
617

618
/**
619
 * @brief retention of rsma1/rsma2
620
 *
621
 * @param pSma
622
 * @param now
623
 * @return int32_t
624
 */
625
int32_t smaRetention(SSma *pSma, int64_t now) {
×
626
  int32_t code = TSDB_CODE_SUCCESS;
×
627
  if (!VND_IS_RSMA(pSma->pVnode)) {
×
628
    return code;
×
629
  }
630

631
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
×
632
    if (pSma->pRSmaTsdb[i]) {
633
      // code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1);
634
      // if (code) goto _end;
635
    }
636
  }
637

638
_end:
×
639
  TAOS_RETURN(code);
×
640
}
641

UNCOV
642
static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatchDeleteReq *pDelReq) {
×
UNCOV
643
  int32_t code = 0;
×
UNCOV
644
  int32_t lino = 0;
×
645

UNCOV
646
  if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
×
UNCOV
647
    int32_t len = 0;
×
UNCOV
648
    tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
×
UNCOV
649
    TSDB_CHECK_CODE(code, lino, _exit);
×
650

UNCOV
651
    void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
×
UNCOV
652
    if (!pBuf) {
×
653
      code = terrno;
×
654
      TSDB_CHECK_CODE(code, lino, _exit);
×
655
    }
656

657
    SEncoder encoder;
UNCOV
658
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
×
UNCOV
659
    if ((code = tEncodeSBatchDeleteReq(&encoder, pDelReq)) < 0) {
×
660
      tEncoderClear(&encoder);
×
661
      rpcFreeCont(pBuf);
×
662
      TSDB_CHECK_CODE(code, lino, _exit);
×
663
    }
UNCOV
664
    tEncoderClear(&encoder);
×
665

UNCOV
666
    ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
×
667

UNCOV
668
    SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)};
×
UNCOV
669
    code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
×
UNCOV
670
    TSDB_CHECK_CODE(code, lino, _exit);
×
671
  }
672

673
_exit:
×
UNCOV
674
  taosArrayDestroy(pDelReq->deleteReqs);
×
UNCOV
675
  if (code) {
×
676
    smaError("vgId:%d, failed at line %d to process delete req for table:%" PRIi64 ", level:%" PRIi8 " since %s",
×
677
             SMA_VID(pSma), lino, suid, level, tstrerror(code));
678
  }
679

UNCOV
680
  TAOS_RETURN(code);
×
681
}
682

UNCOV
683
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
×
684
                                         int32_t execType, int8_t *streamFlushed) {
UNCOV
685
  int32_t      code = 0;
×
UNCOV
686
  int32_t      lino = 0;
×
UNCOV
687
  SSDataBlock *output = NULL;
×
UNCOV
688
  SArray      *pResList = pItem->pResList;
×
UNCOV
689
  STSchema    *pTSchema = pInfo->pTSchema;
×
UNCOV
690
  int64_t      suid = pInfo->suid;
×
691

UNCOV
692
  while (1) {
×
693
    uint64_t ts;
UNCOV
694
    bool     hasMore = false;
×
UNCOV
695
    code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL);
×
UNCOV
696
    if (code == TSDB_CODE_QRY_IN_EXEC) {
×
697
      code = 0;
×
UNCOV
698
      break;
×
699
    }
UNCOV
700
    TSDB_CHECK_CODE(code, lino, _exit);
×
701

UNCOV
702
    if (taosArrayGetSize(pResList) == 0) {
×
UNCOV
703
      break;
×
704
    }
705

UNCOV
706
    for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
×
UNCOV
707
      output = taosArrayGetP(pResList, i);
×
UNCOV
708
      if (output->info.type == STREAM_CHECKPOINT) {
×
UNCOV
709
        if (streamFlushed) *streamFlushed = 1;
×
UNCOV
710
        continue;
×
UNCOV
711
      } else if (output->info.type == STREAM_DELETE_RESULT) {
×
UNCOV
712
        SBatchDeleteReq deleteReq = {.suid = suid, .level = pItem->level};
×
UNCOV
713
        deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
×
UNCOV
714
        if (!deleteReq.deleteReqs) {
×
715
          code = terrno;
×
716
          TSDB_CHECK_CODE(code, lino, _exit);
×
717
        }
UNCOV
718
        code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, "", true);
×
UNCOV
719
        TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
720
        code = tdRSmaProcessDelReq(pSma, suid, pItem->level, &deleteReq);
×
UNCOV
721
        TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
722
        continue;
×
723
      }
724

UNCOV
725
      smaDebug("vgId:%d, result block, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
×
726
               ", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64,
727
               SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid,
728
               pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows);
729

UNCOV
730
      if (STREAM_GET_ALL == execType) {
×
731
        /**
732
         * 1. reset the output version when reboot
733
         * 2. delete msg version not updated from the result
734
         */
UNCOV
735
        if (output->info.version < pItem->submitReqVer) {
×
736
          // submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously
UNCOV
737
          output->info.version = pItem->submitReqVer;
×
UNCOV
738
        } else if (output->info.version == pItem->fetchResultVer) {
×
UNCOV
739
          smaWarn("vgId:%d, result block, skip dup version, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64
×
740
                  ", fetchResultVer:%" PRIi64 ", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64
741
                  ", rows:%" PRIi64,
742
                  SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid,
743
                  pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows);
UNCOV
744
          continue;
×
745
        }
746
      }
747

UNCOV
748
      STsdb       *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
×
UNCOV
749
      SSubmitReq2 *pReq = NULL;
×
750

UNCOV
751
      TAOS_CHECK_EXIT(
×
752
          buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid));
753

UNCOV
754
      if (pReq && (code = tdProcessSubmitReq(sinkTsdb, output->info.version, pReq)) < 0) {
×
755
        if (code == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) {
756
          // TODO: reconfigure SSubmitReq2
757
        }
758
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
759
        taosMemoryFreeClear(pReq);
×
760
        TSDB_CHECK_CODE(code, lino, _exit);
×
761
      }
762

UNCOV
763
      if (STREAM_GET_ALL == execType) {
×
UNCOV
764
        atomic_store_64(&pItem->fetchResultVer, output->info.version);
×
765
      }
766

UNCOV
767
      smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level:%" PRIi8
×
768
               ", execType:%d, ver:%" PRIi64,
769
               SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, execType, output->info.version);
770

UNCOV
771
      if (pReq) {
×
UNCOV
772
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
UNCOV
773
        taosMemoryFree(pReq);
×
774
      }
775
    }
776
  }
UNCOV
777
_exit:
×
UNCOV
778
  if (code) {
×
779
    smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64
×
780
             ", ver:%" PRIi64,
781
             SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1,
782
             output ? output->info.version : -1);
783
  } else {
UNCOV
784
    smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level);
×
785
  }
UNCOV
786
  qCleanExecTaskBlockBuf(taskInfo);
×
UNCOV
787
  return code;
×
788
}
789

790
/**
791
 * @brief Copy msg to rsmaQueueBuffer for batch process
792
 *
793
 * @param pSma
794
 * @param version
795
 * @param pMsg
796
 * @param len
797
 * @param inputType
798
 * @param pInfo
799
 * @param suid
800
 * @return int32_t
801
 */
UNCOV
802
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
×
803
                                      SRSmaInfo *pInfo, tb_uid_t suid) {
UNCOV
804
  int32_t code = 0;
×
UNCOV
805
  int32_t lino = 0;
×
UNCOV
806
  int32_t size = RSMA_EXEC_MSG_HLEN + len;  // header + payload
×
807
  void   *qItem;
808

UNCOV
809
  TAOS_CHECK_RETURN(taosAllocateQitem(size, DEF_QITEM, 0, (void **)&qItem));
×
810

UNCOV
811
  void *pItem = qItem;
×
812

UNCOV
813
  *(int8_t *)pItem = (int8_t)inputType;
×
UNCOV
814
  pItem = POINTER_SHIFT(pItem, sizeof(int8_t));
×
UNCOV
815
  *(int32_t *)pItem = len;
×
UNCOV
816
  pItem = POINTER_SHIFT(pItem, sizeof(int32_t));
×
UNCOV
817
  *(int64_t *)pItem = version;
×
UNCOV
818
  (void)memcpy(POINTER_SHIFT(pItem, sizeof(int64_t)), pMsg, len);
×
819

UNCOV
820
  TAOS_CHECK_RETURN(taosWriteQitem(pInfo->queue, qItem));
×
821

UNCOV
822
  pInfo->lastRecv = taosGetTimestampMs();
×
823

UNCOV
824
  SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
×
825

UNCOV
826
  int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1);
×
827

UNCOV
828
  if (atomic_load_8(&pInfo->assigned) == 0) {
×
UNCOV
829
    if (tsem_post(&(pRSmaStat->notEmpty)) != 0) {
×
830
      smaError("vgId:%d, failed to post notEmpty semaphore for rsma %" PRIi64 " since %s", SMA_VID(pSma), suid,
×
831
               tstrerror(terrno));
832
    }
833
  }
834

835
  // smoothing consume
UNCOV
836
  int32_t n = nItems / RSMA_EXEC_SMOOTH_SIZE;
×
UNCOV
837
  if (n > 1) {
×
838
    if (n > 10) {
×
839
      n = 10;
×
840
    }
841
    taosMsleep(n << 3);
×
842
    if (n > 5) {
×
843
      smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
×
844
              taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 3);
845
    }
846
  }
847

UNCOV
848
  return TSDB_CODE_SUCCESS;
×
849
}
850

851
#if 0
852
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
853
  SSubmitMsgIter msgIter = {0};
854
  SSubmitBlkIter blkIter = {0};
855
  STSRow        *row = NULL;
856
  TAOS_CHECK_RETURN(tInitSubmitMsgIter(pReq, &msgIter));
857
  while (true) {
858
    SSubmitBlk *pBlock = NULL;
859
    TAOS_CHECK_RETURN(tGetSubmitMsgNext(&msgIter, &pBlock));
860
    if (pBlock == NULL) break;
861
    tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
862
    while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
863
      smaDebug("vgId:%d, numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64,
864
               SMA_VID(pSma), msgIter.numOfRows, msgIter.suid, msgIter.uid, pReq->version, row->ts);
865
    }
866
  }
867
  return 0;
868
}
869
#endif
870

871
/**
872
 * @brief sync mode
873
 *
874
 * @param pSma
875
 * @param pMsg
876
 * @param msgSize
877
 * @param inputType
878
 * @param pInfo
879
 * @param type
880
 * @param level
881
 * @return int32_t
882
 */
UNCOV
883
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
×
884
                                 SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) {
UNCOV
885
  int32_t        code = 0;
×
UNCOV
886
  int32_t        idx = level - 1;
×
UNCOV
887
  void          *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
×
UNCOV
888
  SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
×
889

UNCOV
890
  if (!qTaskInfo) {
×
891
    smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
×
892
             pInfo->suid);
893
    return TSDB_CODE_SUCCESS;
×
894
  }
UNCOV
895
  if (!pInfo->pTSchema) {
×
896
    smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid);
×
897
    TAOS_RETURN(TSDB_CODE_INVALID_PTR);
×
898
  }
899

UNCOV
900
  smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p, suid:%" PRIu64 ", nMsg:%d, submitReqVer:%" PRIi64
×
901
           ", inputType:%d",
902
           SMA_VID(pSma), level, RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize, version, inputType);
903

UNCOV
904
  if ((code = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) {
×
905
    smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(code));
×
906
    TAOS_RETURN(TSDB_CODE_FAILED);
×
907
  }
908

UNCOV
909
  atomic_store_64(&pItem->submitReqVer, version);
×
910

UNCOV
911
  TAOS_CHECK_RETURN(tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL));
×
912

UNCOV
913
  TAOS_RETURN(code);
×
914
}
915

916
/**
917
 * @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
918
 *
919
 * @param pSma
920
 * @param suid
921
 */
UNCOV
922
static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo) {
×
UNCOV
923
  int32_t    code = 0;
×
UNCOV
924
  int32_t    lino = 0;
×
UNCOV
925
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
×
UNCOV
926
  SRSmaStat *pStat = NULL;
×
UNCOV
927
  SRSmaInfo *pRSmaInfo = NULL;
×
928

UNCOV
929
  *ppRSmaInfo = NULL;
×
930

UNCOV
931
  if (!pEnv) {
×
932
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_ENV);
×
933
  }
934

UNCOV
935
  pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
×
UNCOV
936
  if (!pStat || !RSMA_INFO_HASH(pStat)) {
×
937
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
938
  }
939

UNCOV
940
  taosRLockLatch(SMA_ENV_LOCK(pEnv));
×
UNCOV
941
  pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
×
UNCOV
942
  if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
×
UNCOV
943
    if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
×
944
      taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
×
945
      TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
946
    }
947

948
    tdRefRSmaInfo(pSma, pRSmaInfo);
UNCOV
949
    taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
×
UNCOV
950
    if (pRSmaInfo->suid != suid) {
×
951
      TAOS_RETURN(TSDB_CODE_APP_ERROR);
×
952
    }
UNCOV
953
    *ppRSmaInfo = pRSmaInfo;
×
UNCOV
954
    TAOS_RETURN(code);
×
955
  }
956
  taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
×
957

958
  TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
959
}
960

961
static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
UNCOV
962
  if (pInfo) {
×
963
    tdUnRefRSmaInfo(pSma, pInfo);
964
  }
UNCOV
965
}
×
966

967
/**
968
 * @brief async mode
969
 *
970
 * @param pSma
971
 * @param version
972
 * @param pMsg
973
 * @param inputType
974
 * @param suid
975
 * @return int32_t
976
 */
UNCOV
977
static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
×
978
                                  tb_uid_t suid) {
UNCOV
979
  int32_t    code = 0;
×
UNCOV
980
  SRSmaInfo *pRSmaInfo = NULL;
×
981

UNCOV
982
  code = tdAcquireRSmaInfoBySuid(pSma, suid, &pRSmaInfo);
×
UNCOV
983
  if (code != 0) {
×
984
    smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
×
985
    TAOS_RETURN(TSDB_CODE_SUCCESS);  // return success
×
986
  }
987

UNCOV
988
  if (inputType == STREAM_INPUT__DATA_SUBMIT || inputType == STREAM_INPUT__REF_DATA_BLOCK) {
×
UNCOV
989
    if ((code = tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid)) < 0) {
×
990
      tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
991
      TAOS_RETURN(code);
×
992
    }
UNCOV
993
    if (smaMgmt.tmrHandle) {
×
UNCOV
994
      SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0);
×
UNCOV
995
      if (pItem->level > 0) {
×
UNCOV
996
        atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
×
997
      }
UNCOV
998
      pItem = RSMA_INFO_ITEM(pRSmaInfo, 1);
×
UNCOV
999
      if (pItem->level > 0) {
×
UNCOV
1000
        atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
×
1001
      }
1002
    }
1003
  } else {
1004
    code = TSDB_CODE_APP_ERROR;
×
1005
    tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
1006
    smaError("vgId:%d, execute rsma, failed for suid:%" PRIu64 " since %s, type:%d", SMA_VID(pSma), suid,
×
1007
             tstrerror(code), inputType);
1008
    TAOS_RETURN(code);
×
1009
  }
1010

UNCOV
1011
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
UNCOV
1012
  return TSDB_CODE_SUCCESS;
×
1013
}
1014

1015
int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
44,980✔
1016
  if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
44,980!
1017

UNCOV
1018
  int32_t code = 0;
×
UNCOV
1019
  if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
×
1020
    smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), tstrerror(code));
×
1021
    goto _exit;
×
1022
  }
1023

UNCOV
1024
  STbUidStore uidStore = {0};
×
1025

UNCOV
1026
  if ((code = tdFetchSubmitReqSuids(pReq, &uidStore)) < 0) {
×
1027
    smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), tstrerror(code));
×
1028
    goto _exit;
×
1029
  }
1030

UNCOV
1031
  if (uidStore.suid != 0) {
×
UNCOV
1032
    if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, uidStore.suid)) < 0) {
×
1033
      smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), tstrerror(code));
×
1034
      goto _exit;
×
1035
    }
1036

UNCOV
1037
    void *pIter = NULL;
×
UNCOV
1038
    while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
×
1039
      tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
×
1040
      if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, *pTbSuid)) < 0) {
×
1041
        smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), tstrerror(code));
×
1042
        taosHashCancelIterate(uidStore.uidHash, pIter);
×
1043
        goto _exit;
×
1044
      }
1045
    }
1046
  }
UNCOV
1047
_exit:
×
UNCOV
1048
  tdUidStoreDestory(&uidStore);
×
UNCOV
1049
  TAOS_RETURN(code);
×
1050
}
1051

UNCOV
1052
int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
×
UNCOV
1053
  if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
×
1054

UNCOV
1055
  int32_t code = 0;
×
UNCOV
1056
  if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
×
1057
    smaError("vgId:%d, failed to process rsma delete since invalid exec code: %s", SMA_VID(pSma), tstrerror(code));
×
1058
    goto _exit;
×
1059
  }
1060

UNCOV
1061
  SDeleteRes *pDelRes = pReq;
×
UNCOV
1062
  if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__REF_DATA_BLOCK, pDelRes->suid)) < 0) {
×
1063
    smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), tstrerror(code));
×
1064
    goto _exit;
×
1065
  }
UNCOV
1066
_exit:
×
UNCOV
1067
  TAOS_RETURN(code);
×
1068
}
1069

1070
/**
1071
 * @brief retrieve rsma meta and init
1072
 *
1073
 * @param pSma
1074
 * @param nTables number of tables of rsma
1075
 * @return int32_t
1076
 */
UNCOV
1077
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
×
UNCOV
1078
  int32_t     code = 0;
×
UNCOV
1079
  int32_t     lino = 0;
×
UNCOV
1080
  SVnode     *pVnode = pSma->pVnode;
×
UNCOV
1081
  SArray     *suidList = NULL;
×
UNCOV
1082
  STbUidStore uidStore = {0};
×
UNCOV
1083
  SMetaReader mr = {0};
×
UNCOV
1084
  tb_uid_t    suid = 0;
×
1085

UNCOV
1086
  if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
×
1087
    code = terrno;
×
1088
    TSDB_CHECK_CODE(code, lino, _exit);
×
1089
  }
1090

UNCOV
1091
  TAOS_CHECK_EXIT(vnodeGetStbIdList(pSma->pVnode, 0, suidList));
×
1092

UNCOV
1093
  int64_t arrSize = taosArrayGetSize(suidList);
×
1094

UNCOV
1095
  if (arrSize == 0) {
×
UNCOV
1096
    if (nTables) {
×
UNCOV
1097
      *nTables = 0;
×
1098
    }
UNCOV
1099
    taosArrayDestroy(suidList);
×
UNCOV
1100
    smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode));
×
UNCOV
1101
    return TSDB_CODE_SUCCESS;
×
1102
  }
1103

UNCOV
1104
  int64_t nRsmaTables = 0;
×
UNCOV
1105
  metaReaderDoInit(&mr, SMA_META(pSma), META_READER_LOCK);
×
UNCOV
1106
  if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) {
×
1107
    code = terrno;
×
1108
    TSDB_CHECK_CODE(code, lino, _exit);
×
1109
  }
1110

UNCOV
1111
  for (int64_t i = 0; i < arrSize; ++i) {
×
UNCOV
1112
    suid = *(tb_uid_t *)taosArrayGet(suidList, i);
×
UNCOV
1113
    smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
×
UNCOV
1114
    if (metaReaderGetTableEntryByUidCache(&mr, suid) < 0) {
×
1115
      code = terrno;
×
1116
      TSDB_CHECK_CODE(code, lino, _exit);
×
1117
    }
UNCOV
1118
    tDecoderClear(&mr.coder);
×
UNCOV
1119
    if (mr.me.type != TSDB_SUPER_TABLE) {
×
1120
      code = TSDB_CODE_RSMA_INVALID_SCHEMA;
×
1121
      TSDB_CHECK_CODE(code, lino, _exit);
×
1122
    }
UNCOV
1123
    if (mr.me.uid != suid) {
×
1124
      code = TSDB_CODE_RSMA_INVALID_SCHEMA;
×
1125
      TSDB_CHECK_CODE(code, lino, _exit);
×
1126
    }
UNCOV
1127
    if (TABLE_IS_ROLLUP(mr.me.flags)) {
×
UNCOV
1128
      ++nRsmaTables;
×
UNCOV
1129
      SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
×
UNCOV
1130
      for (int i = 0; i < TSDB_RETENTION_L2; ++i) {
×
UNCOV
1131
        smaDebug("vgId:%d, rsma restore, table:%" PRIi64 " level:%d, maxdelay:%" PRIi64 " watermark:%" PRIi64
×
1132
                 " qmsgLen:%" PRIi32,
1133
                 TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]);
1134
      }
UNCOV
1135
      TAOS_CHECK_EXIT(tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name));
×
1136
#if 0
1137
      // reload all ctbUids for suid
1138
      uidStore.suid = suid;
1139
      if (vnodeGetCtbIdList(pVnode, suid, uidStore.tbUids) < 0) {
1140
        code = terrno;
1141
        TSDB_CHECK_CODE(code, lino, _exit);
1142
      }
1143

1144
      if (tdUpdateTbUidList(pVnode->pSma, &uidStore, true) < 0) {
1145
        code = terrno;
1146
        TSDB_CHECK_CODE(code, lino, _exit);
1147
      }
1148

1149
      taosArrayClear(uidStore.tbUids);
1150
#endif
UNCOV
1151
      smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid);
×
1152
    }
1153
  }
1154

UNCOV
1155
  if (nTables) {
×
UNCOV
1156
    *nTables = nRsmaTables;
×
1157
  }
1158
_exit:
×
UNCOV
1159
  if (code) {
×
1160
    smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", type:%" PRIi8 ", uid:%" PRIi64, TD_VID(pVnode),
×
1161
             __func__, lino, tstrerror(code), suid, mr.me.type, mr.me.uid);
1162
  }
UNCOV
1163
  metaReaderClear(&mr);
×
UNCOV
1164
  taosArrayDestroy(suidList);
×
UNCOV
1165
  tdUidStoreDestory(&uidStore);
×
UNCOV
1166
  TAOS_RETURN(code);
×
1167
}
1168

1169
/**
1170
 * N.B. the data would be restored from the unified WAL replay procedure
1171
 */
UNCOV
1172
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) {
×
UNCOV
1173
  int32_t code = 0;
×
UNCOV
1174
  int32_t lino = 0;
×
UNCOV
1175
  int64_t nTables = 0;
×
1176

1177
  // step 1: init env
UNCOV
1178
  TAOS_CHECK_EXIT(tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP));
×
1179

1180
  // step 2: iterate all stables to restore the rsma env
UNCOV
1181
  TAOS_CHECK_EXIT(tdRSmaRestoreQTaskInfoInit(pSma, &nTables));
×
1182

UNCOV
1183
_exit:
×
UNCOV
1184
  if (code) {
×
1185
    smaError("vgId:%d, restore rsma task %" PRIi8 "from qtaskf %" PRIi64 " failed since %s", SMA_VID(pSma), type,
×
1186
             qtaskFileVer, tstrerror(code));
1187
  } else {
UNCOV
1188
    smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed, nTables:%" PRIi64, SMA_VID(pSma),
×
1189
            type, qtaskFileVer, nTables);
1190
  }
1191

UNCOV
1192
  TAOS_RETURN(code);
×
1193
}
1194

UNCOV
1195
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
×
UNCOV
1196
  int32_t code = 0;
×
UNCOV
1197
  int32_t lino = 0;
×
UNCOV
1198
  int32_t nTaskInfo = 0;
×
UNCOV
1199
  SSma   *pSma = pRSmaStat->pSma;
×
UNCOV
1200
  SVnode *pVnode = pSma->pVnode;
×
1201

UNCOV
1202
  if (taosHashGetSize(pInfoHash) <= 0) {
×
1203
    return TSDB_CODE_SUCCESS;
×
1204
  }
1205

1206
  // stream state: trigger checkpoint
1207
  do {
UNCOV
1208
    void *infoHash = NULL;
×
UNCOV
1209
    while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
×
UNCOV
1210
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
×
UNCOV
1211
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
×
1212
        continue;
×
1213
      }
UNCOV
1214
      for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
×
UNCOV
1215
        if (pRSmaInfo->taskInfo[i]) {
×
UNCOV
1216
          code = qSetSMAInput(pRSmaInfo->taskInfo[i], pRSmaStat->blocks, 1, STREAM_INPUT__CHECKPOINT);
×
UNCOV
1217
          if (code) {
×
1218
            taosHashCancelIterate(pInfoHash, infoHash);
×
1219
            TSDB_CHECK_CODE(code, lino, _exit);
×
1220
          }
UNCOV
1221
          pRSmaInfo->items[i].streamFlushed = 0;
×
UNCOV
1222
          ++nTaskInfo;
×
1223
        }
1224
      }
1225
    }
1226
  } while (0);
1227

1228
  // stream state: wait checkpoint ready in async mode
1229
  do {
UNCOV
1230
    int32_t nStreamFlushed = 0;
×
UNCOV
1231
    int32_t nSleep = 0;
×
UNCOV
1232
    void   *infoHash = NULL;
×
1233
    while (true) {
UNCOV
1234
      while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
×
UNCOV
1235
        SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
×
UNCOV
1236
        if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
×
1237
          continue;
×
1238
        }
UNCOV
1239
        for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
×
UNCOV
1240
          if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) {
×
UNCOV
1241
            int8_t streamFlushed = 0;
×
UNCOV
1242
            code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo,
×
1243
                                             STREAM_CHECKPOINT, &streamFlushed);
UNCOV
1244
            if (code) {
×
1245
              taosHashCancelIterate(pInfoHash, infoHash);
×
1246
              TSDB_CHECK_CODE(code, lino, _exit);
×
1247
            }
1248

UNCOV
1249
            if (streamFlushed) {
×
UNCOV
1250
              pRSmaInfo->items[i].streamFlushed = 1;
×
UNCOV
1251
              if (++nStreamFlushed >= nTaskInfo) {
×
UNCOV
1252
                smaInfo("vgId:%d, rsma commit, checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode),
×
1253
                        nSleep * 10, nStreamFlushed, nTaskInfo);
UNCOV
1254
                taosHashCancelIterate(pInfoHash, infoHash);
×
UNCOV
1255
                goto _checkpoint;
×
1256
              }
1257
            }
1258
          }
1259
        }
1260
      }
UNCOV
1261
      taosUsleep(10);
×
UNCOV
1262
      ++nSleep;
×
UNCOV
1263
      smaDebug("vgId:%d, rsma commit, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode),
×
1264
               nSleep * 10, nStreamFlushed, nTaskInfo);
1265
    }
1266
  } while (0);
1267

UNCOV
1268
_checkpoint:
×
1269
  // stream state: build checkpoint in backend
1270
  do {
UNCOV
1271
    SStreamMeta *pMeta = NULL;
×
UNCOV
1272
    int64_t      checkpointId = taosGetTimestampNs();
×
UNCOV
1273
    bool         checkpointBuilt = false;
×
UNCOV
1274
    void        *infoHash = NULL;
×
UNCOV
1275
    while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
×
UNCOV
1276
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
×
UNCOV
1277
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
×
1278
        continue;
×
1279
      }
1280

UNCOV
1281
      for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
×
UNCOV
1282
        SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
×
UNCOV
1283
        if (pItem && pItem->pStreamTask) {
×
UNCOV
1284
          SStreamTask *pTask = pItem->pStreamTask;
×
1285
          // atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
UNCOV
1286
          TAOS_UNUSED(streamTaskSetActiveCheckpointInfo(pTask, checkpointId));
×
1287

UNCOV
1288
          pTask->chkInfo.checkpointId = checkpointId;  // 1pTask->checkpointingId;
×
UNCOV
1289
          pTask->chkInfo.checkpointVer = pItem->submitReqVer;
×
UNCOV
1290
          pTask->info.delaySchedParam = pItem->fetchResultVer;
×
UNCOV
1291
          pTask->info.taskLevel = TASK_LEVEL_SMA;
×
1292

UNCOV
1293
          if (!checkpointBuilt) {
×
1294
            // the stream states share one checkpoint
UNCOV
1295
            code = streamTaskBuildCheckpoint(pTask);
×
UNCOV
1296
            if (code) {
×
1297
              taosHashCancelIterate(pInfoHash, infoHash);
×
1298
              TSDB_CHECK_CODE(code, lino, _exit);
×
1299
            }
UNCOV
1300
            pMeta = pTask->pMeta;
×
UNCOV
1301
            checkpointBuilt = true;
×
1302
          }
1303

UNCOV
1304
          streamMetaWLock(pMeta);
×
UNCOV
1305
          if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
×
1306
            streamMetaWUnLock(pMeta);
×
1307
            taosHashCancelIterate(pInfoHash, infoHash);
×
1308
            TSDB_CHECK_CODE(code, lino, _exit);
×
1309
          }
UNCOV
1310
          streamMetaWUnLock(pMeta);
×
UNCOV
1311
          smaDebug("vgId:%d, rsma commit, succeed to commit task:%p, submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
×
1312
                   ", table:%" PRIi64 ", level:%d",
1313
                   TD_VID(pVnode), pTask, pItem->submitReqVer, pItem->fetchResultVer, pRSmaInfo->suid, i + 1);
1314
        }
1315
      }
1316
    }
UNCOV
1317
    if (pMeta) {
×
UNCOV
1318
      streamMetaWLock(pMeta);
×
UNCOV
1319
      if ((code = streamMetaCommit(pMeta)) != 0) {
×
1320
        streamMetaWUnLock(pMeta);
×
1321
        if (code == -1) code = TSDB_CODE_OUT_OF_MEMORY;
×
1322
        TSDB_CHECK_CODE(code, lino, _exit);
×
1323
      }
UNCOV
1324
      streamMetaWUnLock(pMeta);
×
1325
    }
UNCOV
1326
    if (checkpointBuilt) {
×
UNCOV
1327
      smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint:%" PRIi64, TD_VID(pVnode), checkpointId);
×
1328
    }
1329
  } while (0);
1330
_exit:
×
UNCOV
1331
  if (code) {
×
1332
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
1333
  }
1334

UNCOV
1335
  TAOS_RETURN(code);
×
1336
}
1337

1338
/**
1339
 * @brief trigger to get rsma result in async mode
1340
 *
1341
 * @param param
1342
 * @param tmrId
1343
 */
UNCOV
1344
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
×
UNCOV
1345
  SRSmaRef      *pRSmaRef = NULL;
×
UNCOV
1346
  SSma          *pSma = NULL;
×
UNCOV
1347
  SRSmaStat     *pStat = NULL;
×
UNCOV
1348
  SRSmaInfo     *pRSmaInfo = NULL;
×
UNCOV
1349
  SRSmaInfoItem *pItem = NULL;
×
UNCOV
1350
  int32_t        code = 0;
×
1351

UNCOV
1352
  if (!(pRSmaRef = taosHashGet(smaMgmt.refHash, &param, POINTER_BYTES))) {
×
1353
    smaDebug("rsma fetch task not start since rsma info item:%p not exist in refHash:%p, rsetId:%d", param,
×
1354
             smaMgmt.refHash, smaMgmt.rsetId);
UNCOV
1355
    return;
×
1356
  }
1357

UNCOV
1358
  if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) {
×
1359
    smaWarn("rsma fetch task not start since rsma stat already destroyed, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1360
            pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1361
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1362
    return;
×
1363
  }
1364

UNCOV
1365
  pSma = pStat->pSma;
×
1366

UNCOV
1367
  if ((code = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid, &pRSmaInfo)) != 0) {
×
1368
    smaDebug("rsma fetch task not start since rsma info not exist, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1369
             pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1370
    TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1371
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1372
    return;
×
1373
  }
1374

UNCOV
1375
  if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
×
1376
    smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1377
             pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1378
    tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
1379
    TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1380
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1381
    return;
×
1382
  }
1383

UNCOV
1384
  pItem = *(SRSmaInfoItem **)&param;
×
1385

1386
  // if rsma trigger stat in paused, cancelled or finished, not start fetch task
UNCOV
1387
  int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
×
UNCOV
1388
  switch (rsmaTriggerStat) {
×
UNCOV
1389
    case TASK_TRIGGER_STAT_PAUSED:
×
1390
    case TASK_TRIGGER_STAT_CANCELLED: {
UNCOV
1391
      smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8
×
1392
               ", rsetId:%d refId:%" PRIi64,
1393
               SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaRef->refId);
UNCOV
1394
      if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
×
UNCOV
1395
        bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
×
UNCOV
1396
        if (!ret) {
×
UNCOV
1397
          smaWarn("vgId:%d, rsma fetch task not reset for level %" PRIi8
×
1398
                  " since tmr reset failed, rsetId:%d refId:%" PRIi64,
1399
                  SMA_VID(pSma), pItem->level, smaMgmt.rsetId, pRSmaRef->refId);
1400
        }
1401
      }
UNCOV
1402
      tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
UNCOV
1403
      TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
UNCOV
1404
      return;
×
1405
    }
UNCOV
1406
    default:
×
UNCOV
1407
      break;
×
1408
  }
1409

1410
  int8_t fetchTriggerStat =
UNCOV
1411
      atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
×
UNCOV
1412
  switch (fetchTriggerStat) {
×
UNCOV
1413
    case TASK_TRIGGER_STAT_ACTIVE: {
×
UNCOV
1414
      smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
×
1415
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1416
      // async process
UNCOV
1417
      atomic_store_8(&pItem->fetchLevel, 1);
×
1418

UNCOV
1419
      if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
×
UNCOV
1420
        if (tsem_post(&(pStat->notEmpty)) != 0) {
×
1421
          smaError("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since sem post failed",
×
1422
                   SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1423
        }
1424
      }
UNCOV
1425
    } break;
×
UNCOV
1426
    case TASK_TRIGGER_STAT_INACTIVE: {
×
UNCOV
1427
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive ",
×
1428
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
UNCOV
1429
    } break;
×
UNCOV
1430
    case TASK_TRIGGER_STAT_INIT: {
×
UNCOV
1431
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is init",
×
1432
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
UNCOV
1433
    } break;
×
1434
    default: {
×
1435
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat:%" PRIi8
×
1436
               " is unknown",
1437
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid, fetchTriggerStat);
1438
    } break;
×
1439
  }
1440

UNCOV
1441
_end:
×
UNCOV
1442
  taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
×
UNCOV
1443
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
UNCOV
1444
  TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1445
}
1446

UNCOV
1447
static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) {
×
UNCOV
1448
  int32_t arrSize = taosArrayGetSize(pItems);
×
UNCOV
1449
  if (type == STREAM_INPUT__MERGED_SUBMIT) {
×
UNCOV
1450
    for (int32_t i = 0; i < arrSize; ++i) {
×
UNCOV
1451
      SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
×
UNCOV
1452
      taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_EXEC_MSG_HLEN));
×
1453
    }
UNCOV
1454
  } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
×
UNCOV
1455
    for (int32_t i = 0; i < arrSize; ++i) {
×
UNCOV
1456
      SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
×
UNCOV
1457
      blockDataDestroy(packData->pDataBlock);
×
1458
    }
1459
  } else {
1460
    smaWarn("%s:%d unknown type:%d", __func__, __LINE__, type);
×
1461
  }
UNCOV
1462
  taosArrayClear(pItems);
×
UNCOV
1463
}
×
1464

1465
/**
1466
 * @brief fetch rsma result(consider the efficiency and functionality)
1467
 *
1468
 * @param pSma
1469
 * @param pInfo
1470
 * @return int32_t
1471
 */
UNCOV
1472
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
×
UNCOV
1473
  int32_t     code = 0;
×
UNCOV
1474
  int32_t     lino = 0;
×
UNCOV
1475
  SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
×
UNCOV
1476
  for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
×
UNCOV
1477
    SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
×
1478

UNCOV
1479
    if (1 == atomic_val_compare_exchange_8(&pItem->fetchLevel, 1, 0)) {
×
UNCOV
1480
      qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1);
×
UNCOV
1481
      if (!taskInfo) {
×
1482
        continue;
×
1483
      }
1484

UNCOV
1485
      if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
×
1486
        smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch executed",
×
1487
                 SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
1488
      } else {
UNCOV
1489
        int64_t curMs = taosGetTimestampMs();
×
UNCOV
1490
        if ((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX) {
×
UNCOV
1491
          smaTrace("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
×
1492
                   SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
UNCOV
1493
          atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);  // restore the active stat
×
UNCOV
1494
          continue;
×
1495
        } else {
UNCOV
1496
          smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
×
1497
                   SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
1498
        }
1499
      }
1500

UNCOV
1501
      pItem->nScanned = 0;
×
1502

UNCOV
1503
      TAOS_CHECK_EXIT(qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK));
×
UNCOV
1504
      if ((code = tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, NULL)) < 0) {
×
1505
        atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, code);
×
1506
        goto _exit;
×
1507
      }
1508

UNCOV
1509
      smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch finished",
×
1510
               SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
1511
    } else {
UNCOV
1512
      smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32
×
1513
               " maxDelay:%d, fetch not executed as fetch level is %" PRIi8,
1514
               SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay, pItem->fetchLevel);
1515
    }
1516
  }
1517

UNCOV
1518
_exit:
×
UNCOV
1519
  TAOS_RETURN(code);
×
1520
}
1521

UNCOV
1522
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
×
UNCOV
1523
  void   *msg = NULL;
×
UNCOV
1524
  int8_t  resume = 0;
×
UNCOV
1525
  int32_t nSubmit = 0;
×
UNCOV
1526
  int32_t nDelete = 0;
×
UNCOV
1527
  int64_t version = 0;
×
UNCOV
1528
  int32_t code = 0;
×
UNCOV
1529
  int32_t lino = 0;
×
1530

1531
  SPackedData packData;
1532

UNCOV
1533
  taosArrayClear(pSubmitArr);
×
1534

1535
  // the submitReq/deleteReq msg may exsit alternately in the msg queue, consume them sequentially in batch mode
1536
  while (1) {
UNCOV
1537
    TAOS_UNUSED(taosGetQitem(qall, (void **)&msg));
×
UNCOV
1538
    if (msg) {
×
UNCOV
1539
      int8_t inputType = RSMA_EXEC_MSG_TYPE(msg);
×
UNCOV
1540
      if (inputType == STREAM_INPUT__DATA_SUBMIT) {
×
UNCOV
1541
      _resume_submit:
×
UNCOV
1542
        packData.msgLen = RSMA_EXEC_MSG_LEN(msg);
×
UNCOV
1543
        packData.ver = RSMA_EXEC_MSG_VER(msg);
×
UNCOV
1544
        packData.msgStr = RSMA_EXEC_MSG_BODY(msg);
×
UNCOV
1545
        version = packData.ver;
×
UNCOV
1546
        if (!taosArrayPush(pSubmitArr, &packData)) {
×
1547
          taosFreeQitem(msg);
×
1548
          TAOS_CHECK_EXIT(terrno);
×
1549
        }
UNCOV
1550
        ++nSubmit;
×
UNCOV
1551
      } else if (inputType == STREAM_INPUT__REF_DATA_BLOCK) {
×
UNCOV
1552
      _resume_delete:
×
UNCOV
1553
        version = RSMA_EXEC_MSG_VER(msg);
×
UNCOV
1554
        if ((code = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version,
×
1555
                                          &packData.pDataBlock, 1, STREAM_DELETE_DATA))) {
1556
          taosFreeQitem(msg);
×
1557
          TAOS_CHECK_EXIT(code);
×
1558
        }
1559

UNCOV
1560
        if (packData.pDataBlock && !taosArrayPush(pSubmitArr, &packData)) {
×
1561
          taosFreeQitem(msg);
×
1562
          TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
1563
        }
UNCOV
1564
        taosFreeQitem(msg);
×
UNCOV
1565
        if (packData.pDataBlock) {
×
1566
          // packData.pDataBlock is NULL if delete affects 0 row
UNCOV
1567
          ++nDelete;
×
1568
        }
1569
      } else {
1570
        smaWarn("%s:%d unknown msg type:%d", __func__, __LINE__, inputType);
×
1571
        break;
×
1572
      }
1573
    }
1574

UNCOV
1575
    if (nSubmit > 0 || nDelete > 0) {
×
UNCOV
1576
      int32_t size = TARRAY_SIZE(pSubmitArr);
×
UNCOV
1577
      int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK;
×
UNCOV
1578
      for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
×
UNCOV
1579
        TAOS_CHECK_EXIT(tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, version, inputType, pInfo, type, i));
×
1580
      }
UNCOV
1581
      tdFreeRSmaSubmitItems(pSubmitArr, inputType);
×
UNCOV
1582
      nSubmit = 0;
×
UNCOV
1583
      nDelete = 0;
×
1584
    } else {
UNCOV
1585
      goto _rtn;
×
1586
    }
1587

UNCOV
1588
    if (resume == 2) {
×
1589
      resume = 0;
×
1590
      goto _resume_delete;
×
1591
    }
1592
  }
1593

UNCOV
1594
_rtn:
×
UNCOV
1595
  return TSDB_CODE_SUCCESS;
×
1596
_exit:
×
1597
  atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
×
1598
  smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
×
1599
           type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr());
1600
  tdFreeRSmaSubmitItems(pSubmitArr, nSubmit ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK);
×
1601
  while (1) {
×
1602
    void *msg = NULL;
×
1603
    TAOS_UNUSED(taosGetQitem(qall, (void **)&msg));
×
1604
    if (msg) {
×
1605
      taosFreeQitem(msg);
×
1606
    } else {
1607
      break;
×
1608
    }
1609
  }
1610
  TAOS_RETURN(code);
×
1611
}
1612

1613
/**
1614
 * @brief
1615
 *
1616
 * @param pSma
1617
 * @param type
1618
 * @return int32_t
1619
 */
1620

UNCOV
1621
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
×
UNCOV
1622
  int32_t    code = 0;
×
UNCOV
1623
  int32_t    lino = 0;
×
UNCOV
1624
  SVnode    *pVnode = pSma->pVnode;
×
UNCOV
1625
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
×
UNCOV
1626
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
×
UNCOV
1627
  SHashObj  *infoHash = NULL;
×
UNCOV
1628
  SArray    *pSubmitArr = NULL;
×
UNCOV
1629
  bool       isFetchAll = false;
×
1630

UNCOV
1631
  if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) {
×
1632
    code = TSDB_CODE_RSMA_INVALID_STAT;
×
1633
    TSDB_CHECK_CODE(code, lino, _exit);
×
1634
  }
1635

UNCOV
1636
  if (!(pSubmitArr =
×
UNCOV
1637
            taosArrayInit(TMIN(RSMA_EXEC_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) {
×
1638
    code = terrno;
×
1639
    TSDB_CHECK_CODE(code, lino, _exit);
×
1640
  }
1641

1642
  while (true) {
1643
    // step 1: rsma exec - consume data in buffer queue for all suids
UNCOV
1644
    if (type == RSMA_EXEC_OVERFLOW) {
×
UNCOV
1645
      void *pIter = NULL;
×
UNCOV
1646
      while ((pIter = taosHashIterate(infoHash, pIter))) {
×
UNCOV
1647
        SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
×
UNCOV
1648
        if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
×
UNCOV
1649
          if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_NEED_FETCH(pInfo)) {
×
UNCOV
1650
            int32_t batchCnt = -1;
×
UNCOV
1651
            int32_t batchMax = taosHashGetSize(infoHash) / tsNumOfVnodeRsmaThreads;
×
UNCOV
1652
            bool    occupied = (batchMax <= 1);
×
UNCOV
1653
            if (batchMax > 1) {
×
1654
              batchMax = 100 / batchMax;
×
1655
              batchMax = TMAX(batchMax, 4);
×
1656
            }
UNCOV
1657
            while (occupied || (++batchCnt < batchMax)) {                 // greedy mode
×
UNCOV
1658
              TAOS_UNUSED(taosReadAllQitems(pInfo->queue, pInfo->qall));  // queue has mutex lock
×
UNCOV
1659
              int32_t qallItemSize = taosQallItemSize(pInfo->qall);
×
UNCOV
1660
              if (qallItemSize > 0) {
×
UNCOV
1661
                if ((code = tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type)) != 0) {
×
1662
                  taosHashCancelIterate(infoHash, pIter);
×
1663
                  TSDB_CHECK_CODE(code, lino, _exit);
×
1664
                }
UNCOV
1665
                smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi32, SMA_VID(pSma), qallItemSize, type);
×
1666
              }
1667

UNCOV
1668
              if (RSMA_NEED_FETCH(pInfo)) {
×
UNCOV
1669
                int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2);
×
UNCOV
1670
                if (oldStat == 0 ||
×
1671
                    ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
×
UNCOV
1672
                  int32_t oldVal = atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
×
1673

UNCOV
1674
                  if (oldVal < 0) {
×
1675
                    code = TSDB_CODE_APP_ERROR;
×
1676
                    taosHashCancelIterate(infoHash, pIter);
×
1677
                    TSDB_CHECK_CODE(code, lino, _exit);
×
1678
                  }
1679

UNCOV
1680
                  if ((code = tdRSmaFetchAllResult(pSma, pInfo)) != 0) {
×
1681
                    taosHashCancelIterate(infoHash, pIter);
×
1682
                    TSDB_CHECK_CODE(code, lino, _exit);
×
1683
                  }
1684

UNCOV
1685
                  if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
×
UNCOV
1686
                    atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
×
1687
                  }
1688
                }
1689
              }
1690

UNCOV
1691
              if (qallItemSize > 0) {
×
UNCOV
1692
                TAOS_UNUSED(atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize));
×
UNCOV
1693
                continue;
×
1694
              }
UNCOV
1695
              if (RSMA_NEED_FETCH(pInfo)) {
×
1696
                continue;
×
1697
              }
1698

UNCOV
1699
              break;
×
1700
            }
1701
          }
UNCOV
1702
          TAOS_UNUSED(atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
×
1703
        }
1704
      }
1705
    } else {
1706
      smaWarn("%s:%d unknown rsma exec type:%d", __func__, __LINE__, (int32_t)type);
×
1707
      code = TSDB_CODE_APP_ERROR;
×
1708
      TSDB_CHECK_CODE(code, lino, _exit);
×
1709
    }
1710

UNCOV
1711
    if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
×
UNCOV
1712
      if (pEnv->flag & SMA_ENV_FLG_CLOSE) {
×
1713
        break;
×
1714
      }
1715

UNCOV
1716
      if (tsem_wait(&pRSmaStat->notEmpty) != 0) {
×
1717
        smaError("vgId:%d, failed to wait for not empty since %s", TD_VID(pVnode), tstrerror(terrno));
×
1718
      }
1719

UNCOV
1720
      if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
×
UNCOV
1721
        smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
×
1722
                 atomic_load_64(&pRSmaStat->nBufItems));
UNCOV
1723
        break;
×
1724
      }
1725
    }
1726

1727
  }  // end of while(true)
1728

UNCOV
1729
_exit:
×
UNCOV
1730
  taosArrayDestroy(pSubmitArr);
×
UNCOV
1731
  if (code) {
×
1732
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
1733
  }
UNCOV
1734
  TAOS_RETURN(code);
×
1735
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc