• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3552

11 Dec 2024 06:08AM UTC coverage: 62.526% (+0.7%) from 61.798%
#3552

push

travis-ci

web-flow
Merge pull request #29092 from taosdata/fix/3.0/TD-33146

fix:[TD-33146] stmt_get_tag_fields return error code

124833 of 255773 branches covered (48.81%)

Branch coverage included in aggregate %.

209830 of 279467 relevant lines covered (75.08%)

19111707.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.42
/source/dnode/vnode/src/sma/smaRollup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sma.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
#define RSMA_EXEC_SMOOTH_SIZE   (100)     // cnt
21
#define RSMA_EXEC_BATCH_SIZE    (1024)    // cnt
22
#define RSMA_FETCH_DELAY_MAX    (120000)  // ms
23
#define RSMA_FETCH_ACTIVE_MAX   (1000)    // ms
24
#define RSMA_FETCH_INTERVAL     (5000)    // ms
25
#define RSMA_EXEC_TASK_FLAG     "rsma"
26
#define RSMA_EXEC_MSG_HLEN      (13)  // type(int8_t) + len(int32_t) + version(int64_t)
27
#define RSMA_EXEC_MSG_TYPE(msg) (*(int8_t *)(msg))
28
#define RSMA_EXEC_MSG_LEN(msg)  (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t)))
29
#define RSMA_EXEC_MSG_VER(msg)  (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t)))
30
#define RSMA_EXEC_MSG_BODY(msg) (POINTER_SHIFT((msg), RSMA_EXEC_MSG_HLEN))
31

32
#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel)
33

34
SSmaMgmt smaMgmt = {
35
    .inited = 0,
36
    .rsetId = -1,
37
};
38

39
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
40

41
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
42
static void    tdUidStoreDestory(STbUidStore *pStore);
43
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
44
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
45
                                       int8_t idx);
46
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
47
                                 SRSmaInfo *pInfo, ERsmaExecType type, int8_t level);
48
static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo);
49
static void    tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
50
static void    tdFreeRSmaSubmitItems(SArray *pItems, int32_t type);
51
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
52
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
53
                                         int32_t execType, int8_t *streamFlushed);
54
static void    tdRSmaFetchTrigger(void *param, void *tmrId);
55
static void    tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
56
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
57
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
58
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
59

60
struct SRSmaQTaskInfoItem {
61
  int32_t len;
62
  int8_t  type;
63
  int64_t suid;
64
  void   *qTaskInfo;
65
};
66

67
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
34✔
68
  // Note: free/kill may in RC
69
  if (!taskHandle || !(*taskHandle)) return;
34!
70
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
34✔
71
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
34!
72
    smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
34!
73
    qDestroyTask(otaskHandle);
34✔
74
  }
75
}
76

77
/**
78
 * @brief general function to free rsmaInfo
79
 *
80
 * @param pSma
81
 * @param pInfo
82
 * @return void*
83
 */
84
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
17✔
85
  if (pInfo) {
17!
86
    for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
51✔
87
      SRSmaInfoItem *pItem = &pInfo->items[i];
34✔
88

89
      if (pItem->tmrId) {
34!
90
        smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
34!
91
                 pInfo->suid, i + 1);
92
        if (!taosTmrStopA(&pItem->tmrId)) {
34!
93
          smaError("vgId:%d, failed to stop fetch timer for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
×
94
                   i + 1);
95
        }
96
      }
97

98
      if (pItem->pStreamState) {
34!
99
        streamStateClose(pItem->pStreamState, false);
34✔
100
      }
101

102
      if (pItem->pStreamTask) {
34!
103
        tFreeStreamTask(pItem->pStreamTask);
34✔
104
      }
105
      taosArrayDestroy(pItem->pResList);
34✔
106
      tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
34✔
107
    }
108

109
    taosMemoryFreeClear(pInfo->pTSchema);
17!
110

111
    if (pInfo->queue) {
17!
112
      taosCloseQueue(pInfo->queue);
17✔
113
      pInfo->queue = NULL;
17✔
114
    }
115
    if (pInfo->qall) {
17!
116
      taosFreeQall(pInfo->qall);
17✔
117
      pInfo->qall = NULL;
17✔
118
    }
119

120
    taosMemoryFree(pInfo);
17✔
121
  }
122

123
  return NULL;
17✔
124
}
125

126
static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
127
  *pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
7✔
128
  if (*pStore == NULL) {
7!
129
    return terrno;
×
130
  }
131

132
  return TSDB_CODE_SUCCESS;
7✔
133
}
134

135
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd) {
7✔
136
  SRSmaInfo *pRSmaInfo = NULL;
7✔
137
  int32_t    code = 0;
7✔
138

139
  if (!suid || !tbUids) {
7!
140
    code = TSDB_CODE_INVALID_PTR;
×
141
    smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), suid ? *suid : -1,
×
142
             tstrerror(code));
143
    TAOS_RETURN(code);
×
144
  }
145

146
  int32_t nTables = taosArrayGetSize(tbUids);
7✔
147

148
  if (0 == nTables) {
7!
149
    smaDebug("vgId:%d, no need to update tbUidList for suid:%" PRIi64 " since Empty tbUids", SMA_VID(pSma), *suid);
×
150
    return TSDB_CODE_SUCCESS;
×
151
  }
152

153
  code = tdAcquireRSmaInfoBySuid(pSma, *suid, &pRSmaInfo);
7✔
154

155
  if (code != 0) {
7!
156
    smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
×
157
    TAOS_RETURN(code);
×
158
  }
159

160
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
21✔
161
    if (pRSmaInfo->taskInfo[i]) {
14!
162
      if ((code = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) {
14!
163
        tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
164
        smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
×
165
                 tstrerror(code));
166
        TAOS_RETURN(code);
×
167
      }
168
      smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p. suid:%" PRIi64 " uid:%" PRIi64
14!
169
               "nTables:%d level %d",
170
               SMA_VID(pSma), pRSmaInfo->taskInfo[i], *suid, *(int64_t *)TARRAY_GET_ELEM(tbUids, 0), nTables, i);
171
    }
172
  }
173

174
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
7✔
175
  TAOS_RETURN(code);
7✔
176
}
177

178
int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
101,995✔
179
  int32_t code = 0;
101,995✔
180
  if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
101,995✔
181
    return TSDB_CODE_SUCCESS;
101,989✔
182
  }
183

184
  TAOS_CHECK_RETURN(tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids, isAdd));
6!
185

186
  void *pIter = NULL;
7✔
187
  while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
7!
188
    tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
×
189
    SArray   *pTbUids = *(SArray **)pIter;
×
190

191
    if ((code = tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd)) != TSDB_CODE_SUCCESS) {
×
192
      taosHashCancelIterate(pStore->uidHash, pIter);
×
193
      TAOS_RETURN(code);
×
194
    }
195
  }
196
  return TSDB_CODE_SUCCESS;
7✔
197
}
198

199
/**
200
 * @brief fetch suid/uids when create child tables of rollup SMA
201
 *
202
 * @param pTsdb
203
 * @param ppStore
204
 * @param suid
205
 * @param uid
206
 * @return int32_t
207
 */
208
int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) {
138,902✔
209
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
138,902✔
210
  int32_t  code = 0;
138,902✔
211

212
  // only applicable to rollup SMA ctables
213
  if (!pEnv) {
138,902✔
214
    return TSDB_CODE_SUCCESS;
138,900✔
215
  }
216

217
  SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
2✔
218
  SHashObj  *infoHash = NULL;
2✔
219
  if (!pStat || !(infoHash = RSMA_INFO_HASH(pStat))) {
2!
220
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
221
  }
222

223
  // info cached when create rsma stable and return directly for non-rsma ctables
224
  if (!taosHashGet(infoHash, &suid, sizeof(tb_uid_t))) {
7!
225
    return TSDB_CODE_SUCCESS;
×
226
  }
227

228
  if (!(*ppStore)) {
7!
229
    TAOS_CHECK_RETURN(tdUidStoreInit(ppStore));
7!
230
  }
231

232
  if ((code = tdUidStorePut(*ppStore, suid, &uid)) < 0) {
7!
233
    *ppStore = tdUidStoreFree(*ppStore);
×
234
    TAOS_RETURN(code);
×
235
  }
236

237
  return TSDB_CODE_SUCCESS;
7✔
238
}
239

240
static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTaskId *pId) {
34✔
241
  STaskId      id = {.streamId = pId->streamId, .taskId = pId->taskId};
34✔
242
  SStreamTask *pTask = NULL;
34✔
243

244
  streamMetaRLock(pMeta);
34✔
245

246
  int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
34✔
247
  if (code == 0) {
34✔
248
    pItem->submitReqVer = pTask->chkInfo.checkpointVer;
20✔
249
    pItem->fetchResultVer = pTask->info.delaySchedParam;
20✔
250
    streamMetaReleaseTask(pMeta, pTask);
20✔
251
  }
252

253
  streamMetaRUnLock(pMeta);
34✔
254
}
34✔
255

256
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
34✔
257
  streamMetaWLock(pMeta);
34✔
258

259
  int32_t code = streamMetaUnregisterTask(pMeta, streamId, taskId);
34✔
260
  if (code != 0) {
34!
261
    smaError("vgId:%d, rsma task:%" PRIi64 ",%d drop failed since %s", pMeta->vgId, streamId, taskId, tstrerror(code));
×
262
  }
263
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
34✔
264
  if (streamMetaCommit(pMeta) < 0) {
34✔
265
    // persist to disk
266
  }
267
  streamMetaWUnLock(pMeta);
34✔
268
  smaDebug("vgId:%d, rsma task:%" PRIi64 ",%d dropped, remain tasks:%d", pMeta->vgId, streamId, taskId, numOfTasks);
34!
269
}
34✔
270

271
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
34✔
272
                                       int8_t idx) {
273
  int32_t code = 0;
34✔
274
  if ((param->qmsgLen > 0) && param->qmsg[idx]) {
34!
275
    SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
34✔
276
    SRetention    *pRetention = SMA_RETENTION(pSma);
34✔
277
    STsdbCfg      *pTsdbCfg = SMA_TSDB_CFG(pSma);
34✔
278
    SVnode        *pVnode = pSma->pVnode;
34✔
279
    char           taskInfDir[TSDB_FILENAME_LEN] = {0};
34✔
280
    void          *pStreamState = NULL;
34✔
281

282
    // set the backend of stream state
283
    tdRSmaQTaskInfoGetFullPath(pVnode, pRSmaInfo->suid, idx + 1, pVnode->pTfs, taskInfDir);
34✔
284

285
    if (!taosCheckExistFile(taskInfDir)) {
34✔
286
      char *s = taosStrdup(taskInfDir);
12✔
287
      if (!s) {
12!
288
        TAOS_RETURN(terrno);
×
289
      }
290
      if (taosMulMkDir(s) != 0) {
12!
291
        code = TAOS_SYSTEM_ERROR(errno);
×
292
        taosMemoryFree(s);
×
293
        TAOS_RETURN(code);
×
294
      }
295
      taosMemoryFree(s);
12✔
296
    }
297

298
    SStreamTask *pStreamTask = taosMemoryCalloc(1, sizeof(*pStreamTask));
34✔
299
    if (!pStreamTask) {
34!
300
      return terrno;
×
301
    }
302
    pItem->pStreamTask = pStreamTask;
34✔
303
    pStreamTask->id.taskId = 0;
34✔
304
    pStreamTask->id.streamId = pRSmaInfo->suid + idx;
34✔
305
    pStreamTask->chkInfo.startTs = taosGetTimestampMs();
34✔
306
    pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
34✔
307
    pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_EXEC_TASK_FLAG) + 1);
34✔
308
    if (!pStreamTask->exec.qmsg) {
34!
309
      TAOS_RETURN(terrno);
×
310
    }
311
    (void)sprintf(pStreamTask->exec.qmsg, "%s", RSMA_EXEC_TASK_FLAG);
34✔
312
    pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
34✔
313
    tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
34✔
314

315
    TAOS_CHECK_RETURN(streamCreateStateMachine(pStreamTask));
34!
316

317
    TAOS_CHECK_RETURN(streamTaskCreateActiveChkptInfo(&pStreamTask->chkInfo.pActiveInfo));
34!
318

319
    pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId);
33✔
320
    if (!pStreamState) {
34!
321
      TAOS_RETURN(TSDB_CODE_RSMA_STREAM_STATE_OPEN);
×
322
    }
323
    pItem->pStreamState = pStreamState;
34✔
324

325
    tdRSmaTaskRemove(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId);
34✔
326

327
    SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .skipRollup = 1, .pStateBackend = pStreamState};
34✔
328
    initStorageAPI(&handle.api);
34✔
329

330
    code = qCreateStreamExecTaskInfo(&pRSmaInfo->taskInfo[idx], param->qmsg[idx], &handle, TD_VID(pVnode), 0);
34✔
331
    if (!pRSmaInfo->taskInfo[idx] || (code != 0)) {
34!
332
      TAOS_RETURN(TSDB_CODE_RSMA_QTASKINFO_CREATE);
×
333
    }
334

335
    if (!(pItem->pResList = taosArrayInit(1, POINTER_BYTES))) {
34!
336
      TAOS_RETURN(terrno);
×
337
    }
338

339
    if (pItem->fetchResultVer < pItem->submitReqVer) {
34✔
340
      // fetch the data when reboot
341
      pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE;
11✔
342
    }
343

344
    if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
34✔
345
      int64_t msInterval = -1;
12✔
346
      TAOS_CHECK_RETURN(convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision,
12!
347
                                                       TIME_UNIT_MILLISECOND, &msInterval));
348
      pItem->maxDelay = (int32_t)msInterval;
12✔
349
    } else {
350
      pItem->maxDelay = (int32_t)param->maxdelay[idx];
22✔
351
    }
352
    if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) {
34!
353
      pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY;
×
354
    }
355

356
    pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
34✔
357

358
    SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
34✔
359
    if (taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)) != 0) {
34!
360
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
361
    }
362

363
    bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
34✔
364
    if (!ret) {
34!
365
      smaError("vgId:%d, failed to reset fetch timer for table %" PRIi64 " level %d", TD_VID(pVnode), pRSmaInfo->suid,
34!
366
               idx + 1);
367
    }
368

369
    smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64
34!
370
            ", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 ", maxdelay:%" PRIi64 " watermark:%" PRIi64
371
            ", finally maxdelay:%" PRIi32,
372
            TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId,
373
            pItem->submitReqVer, pItem->fetchResultVer, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
374
  }
375
  TAOS_RETURN(TSDB_CODE_SUCCESS);
34✔
376
}
377

378
/**
379
 * @brief for rsam create or restore
380
 *
381
 * @param pSma
382
 * @param param
383
 * @param suid
384
 * @param tbName
385
 * @return int32_t
386
 */
387
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) {
17✔
388
  int32_t code = 0;
17✔
389
  int32_t lino = 0;
17✔
390

391
  if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
17!
392
    smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid);
×
393
    return TSDB_CODE_SUCCESS;
×
394
  }
395

396
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
17✔
397
  SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
17✔
398
  SRSmaInfo *pRSmaInfo = NULL;
17✔
399

400
  pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
17✔
401
  if (pRSmaInfo) {
17!
402
    smaInfo("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
×
403
    return TSDB_CODE_SUCCESS;
×
404
  }
405

406
  // from write queue: single thead
407
  pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
17✔
408
  if (!pRSmaInfo) {
17!
409
    return terrno;
×
410
  }
411

412
  STSchema *pTSchema;
413
  code = metaGetTbTSchemaNotNull(SMA_META(pSma), suid, -1, 1, &pTSchema);
17✔
414
  TAOS_CHECK_EXIT(code);
17!
415
  pRSmaInfo->pSma = pSma;
17✔
416
  pRSmaInfo->pTSchema = pTSchema;
17✔
417
  pRSmaInfo->suid = suid;
17✔
418
  T_REF_INIT_VAL(pRSmaInfo, 1);
17✔
419

420
  TAOS_CHECK_EXIT(taosOpenQueue(&pRSmaInfo->queue));
17!
421

422
  TAOS_CHECK_EXIT(taosAllocateQall(&pRSmaInfo->qall));
17!
423

424
  TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0));
17!
425
  TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1));
17!
426

427
  TAOS_CHECK_EXIT(taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)));
17!
428

429
_exit:
17✔
430
  if (code != 0) {
17!
431
    TAOS_UNUSED(tdFreeRSmaInfo(pSma, pRSmaInfo));
×
432
  } else {
433
    smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid);
17!
434
  }
435
  TAOS_RETURN(code);
17✔
436
}
437

438
/**
439
 * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam currently
440
 *
441
 * @param pSma
442
 * @param pReq
443
 * @return int32_t
444
 */
445
int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
30,840✔
446
  SVnode *pVnode = pSma->pVnode;
30,840✔
447
  if (!pReq->rollup) {
30,840!
448
    smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since no rollup in req", TD_VID(pVnode), pReq->name,
31,381✔
449
             pReq->suid);
450
    return TSDB_CODE_SUCCESS;
31,410✔
451
  }
452

453
  if (!VND_IS_RSMA(pVnode)) {
×
454
    smaWarn("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
×
455
            pReq->suid);
456
    return TSDB_CODE_SUCCESS;
×
457
  }
458

459
  return tdRSmaProcessCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name);
×
460
}
461

462
/**
463
 * @brief drop cache for stb
464
 *
465
 * @param pSma
466
 * @param pReq
467
 * @return int32_t
468
 */
469
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
3,432✔
470
  SVnode *pVnode = pSma->pVnode;
3,432✔
471
  if (!VND_IS_RSMA(pVnode)) {
3,432!
472
    smaTrace("vgId:%d, not drop rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
3,433✔
473
             pReq->suid);
474
    return TSDB_CODE_SUCCESS;
3,433✔
475
  }
476

477
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
×
478
  if (!pSmaEnv) {
×
479
    return TSDB_CODE_SUCCESS;
×
480
  }
481

482
  int32_t    code = 0;
×
483
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
×
484
  SRSmaInfo *pRSmaInfo = NULL;
×
485

486
  code = tdAcquireRSmaInfoBySuid(pSma, pReq->suid, &pRSmaInfo);
×
487

488
  if (code != 0) {
×
489
    smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name,
×
490
            pReq->suid);
491
    return TSDB_CODE_SUCCESS;
×
492
  }
493

494
  // set del flag for data in mem
495
  atomic_store_8(&pRSmaStat->delFlag, 1);
×
496
  RSMA_INFO_SET_DEL(pRSmaInfo);
×
497
  tdUnRefRSmaInfo(pSma, pRSmaInfo);
×
498

499
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
500

501
  // no need to save to file as triggered by dropping stable
502
  smaDebug("vgId:%d, drop rsma for stable %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
×
503
  return TSDB_CODE_SUCCESS;
×
504
}
505

506
/**
507
 * @brief store suid/[uids], prefer to use array and then hash
508
 *
509
 * @param pStore
510
 * @param suid
511
 * @param uid
512
 * @return int32_t
513
 */
514
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
319✔
515
  // prefer to store suid/uids in array
516
  if ((suid == pStore->suid) || (pStore->suid == 0)) {
319!
517
    if (pStore->suid == 0) {
319!
518
      pStore->suid = suid;
319✔
519
    }
520
    if (uid) {
319✔
521
      if (!pStore->tbUids) {
7!
522
        if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) {
7!
523
          TAOS_RETURN(terrno);
×
524
        }
525
      }
526
      if (!taosArrayPush(pStore->tbUids, uid)) {
14!
527
        TAOS_RETURN(terrno);
×
528
      }
529
    }
530
  } else {
531
    // store other suid/uids in hash when multiple stable/table included in 1 batch of request
532
    if (!pStore->uidHash) {
×
533
      pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
534
      if (!pStore->uidHash) {
×
535
        TAOS_RETURN(terrno);
×
536
      }
537
    }
538
    if (uid) {
×
539
      SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t));
×
540
      if (uidArray && ((uidArray = *(SArray **)uidArray))) {
×
541
        if (!taosArrayPush(uidArray, uid)) {
×
542
          taosArrayDestroy(uidArray);
×
543
          TAOS_RETURN(terrno);
×
544
        }
545
      } else {
546
        SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t));
×
547
        if (!pUidArray) {
×
548
          TAOS_RETURN(terrno);
×
549
        }
550
        if (!taosArrayPush(pUidArray, uid)) {
×
551
          taosArrayDestroy(pUidArray);
×
552
          TAOS_RETURN(terrno);
×
553
        }
554
        TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)));
×
555
      }
556
    } else {
557
      TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0));
×
558
    }
559
  }
560
  return TSDB_CODE_SUCCESS;
319✔
561
}
562

563
static void tdUidStoreDestory(STbUidStore *pStore) {
329✔
564
  if (pStore) {
329!
565
    if (pStore->uidHash) {
329!
566
      if (pStore->tbUids) {
×
567
        // When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
568
        void *pIter = NULL;
×
569
        while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
×
570
          SArray *arr = *(SArray **)pIter;
×
571
          taosArrayDestroy(arr);
×
572
        }
573
      }
574
      taosHashCleanup(pStore->uidHash);
×
575
    }
576
    taosArrayDestroy(pStore->tbUids);
329✔
577
  }
578
}
329✔
579

580
void *tdUidStoreFree(STbUidStore *pStore) {
101,995✔
581
  if (pStore) {
101,995✔
582
    tdUidStoreDestory(pStore);
7✔
583
    taosMemoryFree(pStore);
7✔
584
  }
585
  return NULL;
101,996✔
586
}
587

588
/**
589
 * @brief The SubmitReq for rsma L2/L3 is inserted by tsdbInsertData method directly while not by WriteQ, as the queue
590
 * would be freed when close Vnode, thus lock should be used if with race condition.
591
 * @param pTsdb
592
 * @param version
593
 * @param pReq
594
 * @return int32_t
595
 */
596
static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
625✔
597
  if (pReq) {
625!
598
    SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq;
625✔
599
    // spin lock for race condition during insert data
600
    TAOS_CHECK_RETURN(tsdbInsertData(pTsdb, version, pSubmitReq, NULL));
625!
601
  }
602

603
  return TSDB_CODE_SUCCESS;
625✔
604
}
605

606
static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
312✔
607
  SArray *pSubmitTbData = pMsg ? pMsg->aSubmitTbData : NULL;
312!
608
  int32_t size = taosArrayGetSize(pSubmitTbData);
312✔
609

610
  for (int32_t i = 0; i < size; ++i) {
624✔
611
    SSubmitTbData *pData = TARRAY_GET_ELEM(pSubmitTbData, i);
312✔
612
    TAOS_CHECK_RETURN(tdUidStorePut(pStore, pData->suid, NULL));
312!
613
  }
614

615
  return 0;
312✔
616
}
617

618
/**
619
 * @brief retention of rsma1/rsma2
620
 *
621
 * @param pSma
622
 * @param now
623
 * @return int32_t
624
 */
625
int32_t smaRetention(SSma *pSma, int64_t now) {
×
626
  int32_t code = TSDB_CODE_SUCCESS;
×
627
  if (!VND_IS_RSMA(pSma->pVnode)) {
×
628
    return code;
×
629
  }
630

631
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
×
632
    if (pSma->pRSmaTsdb[i]) {
633
      // code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1);
634
      // if (code) goto _end;
635
    }
636
  }
637

638
_end:
×
639
  TAOS_RETURN(code);
×
640
}
641

642
static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatchDeleteReq *pDelReq) {
16✔
643
  int32_t code = 0;
16✔
644
  int32_t lino = 0;
16✔
645

646
  if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
16!
647
    int32_t len = 0;
16✔
648
    tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
16!
649
    TSDB_CHECK_CODE(code, lino, _exit);
16!
650

651
    void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
16✔
652
    if (!pBuf) {
16!
653
      code = terrno;
×
654
      TSDB_CHECK_CODE(code, lino, _exit);
×
655
    }
656

657
    SEncoder encoder;
658
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
16✔
659
    if ((code = tEncodeSBatchDeleteReq(&encoder, pDelReq)) < 0) {
16!
660
      tEncoderClear(&encoder);
×
661
      rpcFreeCont(pBuf);
×
662
      TSDB_CHECK_CODE(code, lino, _exit);
×
663
    }
664
    tEncoderClear(&encoder);
16✔
665

666
    ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
16✔
667

668
    SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)};
16✔
669
    code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
16✔
670
    TSDB_CHECK_CODE(code, lino, _exit);
16!
671
  }
672

673
_exit:
×
674
  taosArrayDestroy(pDelReq->deleteReqs);
16✔
675
  if (code) {
16!
676
    smaError("vgId:%d, failed at line %d to process delete req for table:%" PRIi64 ", level:%" PRIi8 " since %s",
×
677
             SMA_VID(pSma), lino, suid, level, tstrerror(code));
678
  }
679

680
  TAOS_RETURN(code);
16✔
681
}
682

683
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
730✔
684
                                         int32_t execType, int8_t *streamFlushed) {
685
  int32_t      code = 0;
730✔
686
  int32_t      lino = 0;
730✔
687
  SSDataBlock *output = NULL;
730✔
688
  SArray      *pResList = pItem->pResList;
730✔
689
  STSchema    *pTSchema = pInfo->pTSchema;
730✔
690
  int64_t      suid = pInfo->suid;
730✔
691

692
  while (1) {
669✔
693
    uint64_t ts;
694
    bool     hasMore = false;
1,399✔
695
    code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL);
1,399✔
696
    if (code == TSDB_CODE_QRY_IN_EXEC) {
1,399!
697
      code = 0;
×
698
      break;
730✔
699
    }
700
    TSDB_CHECK_CODE(code, lino, _exit);
1,399!
701

702
    if (taosArrayGetSize(pResList) == 0) {
1,399✔
703
      break;
730✔
704
    }
705

706
    for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
1,340✔
707
      output = taosArrayGetP(pResList, i);
671✔
708
      if (output->info.type == STREAM_CHECKPOINT) {
671✔
709
        if (streamFlushed) *streamFlushed = 1;
28!
710
        continue;
46✔
711
      } else if (output->info.type == STREAM_DELETE_RESULT) {
643✔
712
        SBatchDeleteReq deleteReq = {.suid = suid, .level = pItem->level};
16✔
713
        deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
16✔
714
        if (!deleteReq.deleteReqs) {
16!
715
          code = terrno;
×
716
          TSDB_CHECK_CODE(code, lino, _exit);
×
717
        }
718
        code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, "", true);
16✔
719
        TSDB_CHECK_CODE(code, lino, _exit);
16!
720
        code = tdRSmaProcessDelReq(pSma, suid, pItem->level, &deleteReq);
16✔
721
        TSDB_CHECK_CODE(code, lino, _exit);
16!
722
        continue;
16✔
723
      }
724

725
      smaDebug("vgId:%d, result block, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
627!
726
               ", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64,
727
               SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid,
728
               pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows);
729

730
      if (STREAM_GET_ALL == execType) {
627✔
731
        /**
732
         * 1. reset the output version when reboot
733
         * 2. delete msg version not updated from the result
734
         */
735
        if (output->info.version < pItem->submitReqVer) {
21✔
736
          // submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously
737
          output->info.version = pItem->submitReqVer;
3✔
738
        } else if (output->info.version == pItem->fetchResultVer) {
18✔
739
          smaWarn("vgId:%d, result block, skip dup version, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64
2!
740
                  ", fetchResultVer:%" PRIi64 ", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64
741
                  ", rows:%" PRIi64,
742
                  SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid,
743
                  pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows);
744
          continue;
2✔
745
        }
746
      }
747

748
      STsdb       *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
625✔
749
      SSubmitReq2 *pReq = NULL;
625✔
750

751
      TAOS_CHECK_EXIT(
625!
752
          buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid));
753

754
      if (pReq && (code = tdProcessSubmitReq(sinkTsdb, output->info.version, pReq)) < 0) {
625!
755
        if (code == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) {
756
          // TODO: reconfigure SSubmitReq2
757
        }
758
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
759
        taosMemoryFreeClear(pReq);
×
760
        TSDB_CHECK_CODE(code, lino, _exit);
×
761
      }
762

763
      if (STREAM_GET_ALL == execType) {
625✔
764
        atomic_store_64(&pItem->fetchResultVer, output->info.version);
19✔
765
      }
766

767
      smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level:%" PRIi8
625!
768
               ", execType:%d, ver:%" PRIi64,
769
               SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, execType, output->info.version);
770

771
      if (pReq) {
625!
772
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
625✔
773
        taosMemoryFree(pReq);
625✔
774
      }
775
    }
776
  }
777
_exit:
730✔
778
  if (code) {
730!
779
    smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64
×
780
             ", ver:%" PRIi64,
781
             SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1,
782
             output ? output->info.version : -1);
783
  } else {
784
    smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level);
730!
785
  }
786
  qCleanExecTaskBlockBuf(taskInfo);
730✔
787
  return code;
730✔
788
}
789

790
/**
791
 * @brief Copy msg to rsmaQueueBuffer for batch process
792
 *
793
 * @param pSma
794
 * @param version
795
 * @param pMsg
796
 * @param len
797
 * @param inputType
798
 * @param pInfo
799
 * @param suid
800
 * @return int32_t
801
 */
802
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
341✔
803
                                      SRSmaInfo *pInfo, tb_uid_t suid) {
804
  int32_t code = 0;
341✔
805
  int32_t lino = 0;
341✔
806
  int32_t size = RSMA_EXEC_MSG_HLEN + len;  // header + payload
341✔
807
  void   *qItem;
808

809
  TAOS_CHECK_RETURN(taosAllocateQitem(size, DEF_QITEM, 0, (void **)&qItem));
341!
810

811
  void *pItem = qItem;
341✔
812

813
  *(int8_t *)pItem = (int8_t)inputType;
341✔
814
  pItem = POINTER_SHIFT(pItem, sizeof(int8_t));
341✔
815
  *(int32_t *)pItem = len;
341✔
816
  pItem = POINTER_SHIFT(pItem, sizeof(int32_t));
341✔
817
  *(int64_t *)pItem = version;
341✔
818
  (void)memcpy(POINTER_SHIFT(pItem, sizeof(int64_t)), pMsg, len);
341✔
819

820
  TAOS_CHECK_RETURN(taosWriteQitem(pInfo->queue, qItem));
341!
821

822
  pInfo->lastRecv = taosGetTimestampMs();
341✔
823

824
  SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
341✔
825

826
  int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1);
341✔
827

828
  if (atomic_load_8(&pInfo->assigned) == 0) {
341✔
829
    if (tsem_post(&(pRSmaStat->notEmpty)) != 0) {
135!
830
      smaError("vgId:%d, failed to post notEmpty semaphore for rsma %" PRIi64 " since %s", SMA_VID(pSma), suid,
×
831
               tstrerror(terrno));
832
    }
833
  }
834

835
  // smoothing consume
836
  int32_t n = nItems / RSMA_EXEC_SMOOTH_SIZE;
341✔
837
  if (n > 1) {
341!
838
    if (n > 10) {
×
839
      n = 10;
×
840
    }
841
    taosMsleep(n << 3);
×
842
    if (n > 5) {
×
843
      smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
×
844
              taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 3);
845
    }
846
  }
847

848
  return TSDB_CODE_SUCCESS;
341✔
849
}
850

851
#if 0
852
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
853
  SSubmitMsgIter msgIter = {0};
854
  SSubmitBlkIter blkIter = {0};
855
  STSRow        *row = NULL;
856
  TAOS_CHECK_RETURN(tInitSubmitMsgIter(pReq, &msgIter));
857
  while (true) {
858
    SSubmitBlk *pBlock = NULL;
859
    TAOS_CHECK_RETURN(tGetSubmitMsgNext(&msgIter, &pBlock));
860
    if (pBlock == NULL) break;
861
    tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
862
    while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
863
      smaDebug("vgId:%d, numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64,
864
               SMA_VID(pSma), msgIter.numOfRows, msgIter.suid, msgIter.uid, pReq->version, row->ts);
865
    }
866
  }
867
  return 0;
868
}
869
#endif
870

871
/**
872
 * @brief sync mode
873
 *
874
 * @param pSma
875
 * @param pMsg
876
 * @param msgSize
877
 * @param inputType
878
 * @param pInfo
879
 * @param type
880
 * @param level
881
 * @return int32_t
882
 */
883
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
640✔
884
                                 SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) {
885
  int32_t        code = 0;
640✔
886
  int32_t        idx = level - 1;
640✔
887
  void          *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
640✔
888
  SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
640✔
889

890
  if (!qTaskInfo) {
640!
891
    smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
×
892
             pInfo->suid);
893
    return TSDB_CODE_SUCCESS;
×
894
  }
895
  if (!pInfo->pTSchema) {
640!
896
    smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid);
×
897
    TAOS_RETURN(TSDB_CODE_INVALID_PTR);
×
898
  }
899

900
  smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p, suid:%" PRIu64 ", nMsg:%d, submitReqVer:%" PRIi64
640!
901
           ", inputType:%d",
902
           SMA_VID(pSma), level, RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize, version, inputType);
903

904
  if ((code = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) {
640!
905
    smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(code));
×
906
    TAOS_RETURN(TSDB_CODE_FAILED);
×
907
  }
908

909
  atomic_store_64(&pItem->submitReqVer, version);
640✔
910

911
  TAOS_CHECK_RETURN(tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL));
640!
912

913
  TAOS_RETURN(code);
640✔
914
}
915

916
/**
917
 * @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
918
 *
919
 * @param pSma
920
 * @param suid
921
 */
922
static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo) {
508✔
923
  int32_t    code = 0;
508✔
924
  int32_t    lino = 0;
508✔
925
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
508✔
926
  SRSmaStat *pStat = NULL;
508✔
927
  SRSmaInfo *pRSmaInfo = NULL;
508✔
928

929
  *ppRSmaInfo = NULL;
508✔
930

931
  if (!pEnv) {
508!
932
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_ENV);
×
933
  }
934

935
  pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
508✔
936
  if (!pStat || !RSMA_INFO_HASH(pStat)) {
508!
937
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
938
  }
939

940
  taosRLockLatch(SMA_ENV_LOCK(pEnv));
508✔
941
  pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
508✔
942
  if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
508!
943
    if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
508!
944
      taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
×
945
      TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
946
    }
947

948
    tdRefRSmaInfo(pSma, pRSmaInfo);
949
    taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
508✔
950
    if (pRSmaInfo->suid != suid) {
508!
951
      TAOS_RETURN(TSDB_CODE_APP_ERROR);
×
952
    }
953
    *ppRSmaInfo = pRSmaInfo;
508✔
954
    TAOS_RETURN(code);
508✔
955
  }
956
  taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
×
957

958
  TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
959
}
960

961
static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
962
  if (pInfo) {
7!
963
    tdUnRefRSmaInfo(pSma, pInfo);
964
  }
965
}
508✔
966

967
/**
968
 * @brief async mode
969
 *
970
 * @param pSma
971
 * @param version
972
 * @param pMsg
973
 * @param inputType
974
 * @param suid
975
 * @return int32_t
976
 */
977
static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
341✔
978
                                  tb_uid_t suid) {
979
  int32_t    code = 0;
341✔
980
  SRSmaInfo *pRSmaInfo = NULL;
341✔
981

982
  code = tdAcquireRSmaInfoBySuid(pSma, suid, &pRSmaInfo);
341✔
983
  if (code != 0) {
341!
984
    smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
×
985
    TAOS_RETURN(TSDB_CODE_SUCCESS);  // return success
×
986
  }
987

988
  if (inputType == STREAM_INPUT__DATA_SUBMIT || inputType == STREAM_INPUT__REF_DATA_BLOCK) {
341!
989
    if ((code = tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid)) < 0) {
341!
990
      tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
991
      TAOS_RETURN(code);
×
992
    }
993
    if (smaMgmt.tmrHandle) {
341!
994
      SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0);
341✔
995
      if (pItem->level > 0) {
341!
996
        atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
341✔
997
      }
998
      pItem = RSMA_INFO_ITEM(pRSmaInfo, 1);
341✔
999
      if (pItem->level > 0) {
341!
1000
        atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
341✔
1001
      }
1002
    }
1003
  } else {
1004
    code = TSDB_CODE_APP_ERROR;
×
1005
    tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
1006
    smaError("vgId:%d, execute rsma, failed for suid:%" PRIu64 " since %s, type:%d", SMA_VID(pSma), suid,
×
1007
             tstrerror(code), inputType);
1008
    TAOS_RETURN(code);
×
1009
  }
1010

1011
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
341!
1012
  return TSDB_CODE_SUCCESS;
341✔
1013
}
1014

1015
int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
13,241,093✔
1016
  if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
13,241,093✔
1017

1018
  int32_t code = 0;
232✔
1019
  if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
232!
1020
    smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), tstrerror(code));
×
1021
    goto _exit;
×
1022
  }
1023

1024
  STbUidStore uidStore = {0};
312✔
1025

1026
  if ((code = tdFetchSubmitReqSuids(pReq, &uidStore)) < 0) {
312!
1027
    smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), tstrerror(code));
×
1028
    goto _exit;
×
1029
  }
1030

1031
  if (uidStore.suid != 0) {
312!
1032
    if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, uidStore.suid)) < 0) {
312!
1033
      smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), tstrerror(code));
×
1034
      goto _exit;
×
1035
    }
1036

1037
    void *pIter = NULL;
312✔
1038
    while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
312!
1039
      tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
×
1040
      if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, *pTbSuid)) < 0) {
×
1041
        smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), tstrerror(code));
×
1042
        taosHashCancelIterate(uidStore.uidHash, pIter);
×
1043
        goto _exit;
×
1044
      }
1045
    }
1046
  }
1047
_exit:
312✔
1048
  tdUidStoreDestory(&uidStore);
312✔
1049
  TAOS_RETURN(code);
312✔
1050
}
1051

1052
int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
63,486✔
1053
  if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
63,486✔
1054

1055
  int32_t code = 0;
26✔
1056
  if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
26!
1057
    smaError("vgId:%d, failed to process rsma delete since invalid exec code: %s", SMA_VID(pSma), tstrerror(code));
×
1058
    goto _exit;
×
1059
  }
1060

1061
  SDeleteRes *pDelRes = pReq;
29✔
1062
  if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__REF_DATA_BLOCK, pDelRes->suid)) < 0) {
29!
1063
    smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), tstrerror(code));
×
1064
    goto _exit;
×
1065
  }
1066
_exit:
29✔
1067
  TAOS_RETURN(code);
29✔
1068
}
1069

1070
/**
1071
 * @brief retrieve rsma meta and init
1072
 *
1073
 * @param pSma
1074
 * @param nTables number of tables of rsma
1075
 * @return int32_t
1076
 */
1077
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
17✔
1078
  int32_t     code = 0;
17✔
1079
  int32_t     lino = 0;
17✔
1080
  SVnode     *pVnode = pSma->pVnode;
17✔
1081
  SArray     *suidList = NULL;
17✔
1082
  STbUidStore uidStore = {0};
17✔
1083
  SMetaReader mr = {0};
17✔
1084
  tb_uid_t    suid = 0;
17✔
1085

1086
  if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
17!
1087
    code = terrno;
×
1088
    TSDB_CHECK_CODE(code, lino, _exit);
×
1089
  }
1090

1091
  TAOS_CHECK_EXIT(vnodeGetStbIdList(pSma->pVnode, 0, suidList));
17!
1092

1093
  int64_t arrSize = taosArrayGetSize(suidList);
17✔
1094

1095
  if (arrSize == 0) {
17✔
1096
    if (nTables) {
7!
1097
      *nTables = 0;
7✔
1098
    }
1099
    taosArrayDestroy(suidList);
7✔
1100
    smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode));
7!
1101
    return TSDB_CODE_SUCCESS;
7✔
1102
  }
1103

1104
  int64_t nRsmaTables = 0;
10✔
1105
  metaReaderDoInit(&mr, SMA_META(pSma), META_READER_LOCK);
10✔
1106
  if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) {
10!
1107
    code = terrno;
×
1108
    TSDB_CHECK_CODE(code, lino, _exit);
×
1109
  }
1110

1111
  for (int64_t i = 0; i < arrSize; ++i) {
20✔
1112
    suid = *(tb_uid_t *)taosArrayGet(suidList, i);
10✔
1113
    smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
10!
1114
    if (metaReaderGetTableEntryByUidCache(&mr, suid) < 0) {
10!
1115
      code = terrno;
×
1116
      TSDB_CHECK_CODE(code, lino, _exit);
×
1117
    }
1118
    tDecoderClear(&mr.coder);
10✔
1119
    if (mr.me.type != TSDB_SUPER_TABLE) {
10!
1120
      code = TSDB_CODE_RSMA_INVALID_SCHEMA;
×
1121
      TSDB_CHECK_CODE(code, lino, _exit);
×
1122
    }
1123
    if (mr.me.uid != suid) {
10!
1124
      code = TSDB_CODE_RSMA_INVALID_SCHEMA;
×
1125
      TSDB_CHECK_CODE(code, lino, _exit);
×
1126
    }
1127
    if (TABLE_IS_ROLLUP(mr.me.flags)) {
10!
1128
      ++nRsmaTables;
10✔
1129
      SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
10✔
1130
      for (int i = 0; i < TSDB_RETENTION_L2; ++i) {
30✔
1131
        smaDebug("vgId:%d, rsma restore, table:%" PRIi64 " level:%d, maxdelay:%" PRIi64 " watermark:%" PRIi64
20!
1132
                 " qmsgLen:%" PRIi32,
1133
                 TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]);
1134
      }
1135
      TAOS_CHECK_EXIT(tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name));
10!
1136
#if 0
1137
      // reload all ctbUids for suid
1138
      uidStore.suid = suid;
1139
      if (vnodeGetCtbIdList(pVnode, suid, uidStore.tbUids) < 0) {
1140
        code = terrno;
1141
        TSDB_CHECK_CODE(code, lino, _exit);
1142
      }
1143

1144
      if (tdUpdateTbUidList(pVnode->pSma, &uidStore, true) < 0) {
1145
        code = terrno;
1146
        TSDB_CHECK_CODE(code, lino, _exit);
1147
      }
1148

1149
      taosArrayClear(uidStore.tbUids);
1150
#endif
1151
      smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid);
10!
1152
    }
1153
  }
1154

1155
  if (nTables) {
10!
1156
    *nTables = nRsmaTables;
10✔
1157
  }
1158
_exit:
×
1159
  if (code) {
10!
1160
    smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", type:%" PRIi8 ", uid:%" PRIi64, TD_VID(pVnode),
×
1161
             __func__, lino, tstrerror(code), suid, mr.me.type, mr.me.uid);
1162
  }
1163
  metaReaderClear(&mr);
10✔
1164
  taosArrayDestroy(suidList);
10✔
1165
  tdUidStoreDestory(&uidStore);
10✔
1166
  TAOS_RETURN(code);
10✔
1167
}
1168

1169
/**
1170
 * N.B. the data would be restored from the unified WAL replay procedure
1171
 */
1172
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) {
17✔
1173
  int32_t code = 0;
17✔
1174
  int32_t lino = 0;
17✔
1175
  int64_t nTables = 0;
17✔
1176

1177
  // step 1: init env
1178
  TAOS_CHECK_EXIT(tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP));
17!
1179

1180
  // step 2: iterate all stables to restore the rsma env
1181
  TAOS_CHECK_EXIT(tdRSmaRestoreQTaskInfoInit(pSma, &nTables));
17!
1182

1183
_exit:
17✔
1184
  if (code) {
17!
1185
    smaError("vgId:%d, restore rsma task %" PRIi8 "from qtaskf %" PRIi64 " failed since %s", SMA_VID(pSma), type,
×
1186
             qtaskFileVer, tstrerror(code));
1187
  } else {
1188
    smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed, nTables:%" PRIi64, SMA_VID(pSma),
17!
1189
            type, qtaskFileVer, nTables);
1190
  }
1191

1192
  TAOS_RETURN(code);
17✔
1193
}
1194

1195
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
14✔
1196
  int32_t code = 0;
14✔
1197
  int32_t lino = 0;
14✔
1198
  int32_t nTaskInfo = 0;
14✔
1199
  SSma   *pSma = pRSmaStat->pSma;
14✔
1200
  SVnode *pVnode = pSma->pVnode;
14✔
1201

1202
  if (taosHashGetSize(pInfoHash) <= 0) {
14!
1203
    return TSDB_CODE_SUCCESS;
×
1204
  }
1205

1206
  // stream state: trigger checkpoint
1207
  do {
1208
    void *infoHash = NULL;
14✔
1209
    while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
28✔
1210
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
14✔
1211
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
14!
1212
        continue;
×
1213
      }
1214
      for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
42✔
1215
        if (pRSmaInfo->taskInfo[i]) {
28!
1216
          code = qSetSMAInput(pRSmaInfo->taskInfo[i], pRSmaStat->blocks, 1, STREAM_INPUT__CHECKPOINT);
28✔
1217
          if (code) {
28!
1218
            taosHashCancelIterate(pInfoHash, infoHash);
×
1219
            TSDB_CHECK_CODE(code, lino, _exit);
×
1220
          }
1221
          pRSmaInfo->items[i].streamFlushed = 0;
28✔
1222
          ++nTaskInfo;
28✔
1223
        }
1224
      }
1225
    }
1226
  } while (0);
1227

1228
  // stream state: wait checkpoint ready in async mode
1229
  do {
1230
    int32_t nStreamFlushed = 0;
14✔
1231
    int32_t nSleep = 0;
14✔
1232
    void   *infoHash = NULL;
14✔
1233
    while (true) {
1234
      while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
42✔
1235
        SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
28✔
1236
        if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
28!
1237
          continue;
×
1238
        }
1239
        for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
70✔
1240
          if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) {
56!
1241
            int8_t streamFlushed = 0;
56✔
1242
            code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo,
56✔
1243
                                             STREAM_CHECKPOINT, &streamFlushed);
1244
            if (code) {
56!
1245
              taosHashCancelIterate(pInfoHash, infoHash);
×
1246
              TSDB_CHECK_CODE(code, lino, _exit);
×
1247
            }
1248

1249
            if (streamFlushed) {
56✔
1250
              pRSmaInfo->items[i].streamFlushed = 1;
28✔
1251
              if (++nStreamFlushed >= nTaskInfo) {
28✔
1252
                smaInfo("vgId:%d, rsma commit, checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode),
14!
1253
                        nSleep * 10, nStreamFlushed, nTaskInfo);
1254
                taosHashCancelIterate(pInfoHash, infoHash);
14✔
1255
                goto _checkpoint;
14✔
1256
              }
1257
            }
1258
          }
1259
        }
1260
      }
1261
      taosUsleep(10);
14✔
1262
      ++nSleep;
14✔
1263
      smaDebug("vgId:%d, rsma commit, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode),
14!
1264
               nSleep * 10, nStreamFlushed, nTaskInfo);
1265
    }
1266
  } while (0);
1267

1268
_checkpoint:
14✔
1269
  // stream state: build checkpoint in backend
1270
  do {
1271
    SStreamMeta *pMeta = NULL;
14✔
1272
    int64_t      checkpointId = taosGetTimestampNs();
14✔
1273
    bool         checkpointBuilt = false;
14✔
1274
    void        *infoHash = NULL;
14✔
1275
    while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
28✔
1276
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
14✔
1277
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
14!
1278
        continue;
×
1279
      }
1280

1281
      for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
42✔
1282
        SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
28✔
1283
        if (pItem && pItem->pStreamTask) {
28!
1284
          SStreamTask *pTask = pItem->pStreamTask;
28✔
1285
          // atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
1286
          TAOS_UNUSED(streamTaskSetActiveCheckpointInfo(pTask, checkpointId));
28✔
1287

1288
          pTask->chkInfo.checkpointId = checkpointId;  // 1pTask->checkpointingId;
28✔
1289
          pTask->chkInfo.checkpointVer = pItem->submitReqVer;
28✔
1290
          pTask->info.delaySchedParam = pItem->fetchResultVer;
28✔
1291
          pTask->info.taskLevel = TASK_LEVEL_SMA;
28✔
1292

1293
          if (!checkpointBuilt) {
28✔
1294
            // the stream states share one checkpoint
1295
            code = streamTaskBuildCheckpoint(pTask);
14✔
1296
            if (code) {
14!
1297
              taosHashCancelIterate(pInfoHash, infoHash);
×
1298
              TSDB_CHECK_CODE(code, lino, _exit);
×
1299
            }
1300
            pMeta = pTask->pMeta;
14✔
1301
            checkpointBuilt = true;
14✔
1302
          }
1303

1304
          streamMetaWLock(pMeta);
28✔
1305
          if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
28!
1306
            streamMetaWUnLock(pMeta);
×
1307
            taosHashCancelIterate(pInfoHash, infoHash);
×
1308
            TSDB_CHECK_CODE(code, lino, _exit);
×
1309
          }
1310
          streamMetaWUnLock(pMeta);
28✔
1311
          smaDebug("vgId:%d, rsma commit, succeed to commit task:%p, submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
28!
1312
                   ", table:%" PRIi64 ", level:%d",
1313
                   TD_VID(pVnode), pTask, pItem->submitReqVer, pItem->fetchResultVer, pRSmaInfo->suid, i + 1);
1314
        }
1315
      }
1316
    }
1317
    if (pMeta) {
14!
1318
      streamMetaWLock(pMeta);
14✔
1319
      if ((code = streamMetaCommit(pMeta)) != 0) {
14!
1320
        streamMetaWUnLock(pMeta);
×
1321
        if (code == -1) code = TSDB_CODE_OUT_OF_MEMORY;
×
1322
        TSDB_CHECK_CODE(code, lino, _exit);
×
1323
      }
1324
      streamMetaWUnLock(pMeta);
14✔
1325
    }
1326
    if (checkpointBuilt) {
14!
1327
      smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint:%" PRIi64, TD_VID(pVnode), checkpointId);
14!
1328
    }
1329
  } while (0);
1330
_exit:
×
1331
  if (code) {
14!
1332
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
1333
  }
1334

1335
  TAOS_RETURN(code);
14✔
1336
}
1337

1338
/**
1339
 * @brief trigger to get rsma result in async mode
1340
 *
1341
 * @param param
1342
 * @param tmrId
1343
 */
1344
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
160✔
1345
  SRSmaRef      *pRSmaRef = NULL;
160✔
1346
  SSma          *pSma = NULL;
160✔
1347
  SRSmaStat     *pStat = NULL;
160✔
1348
  SRSmaInfo     *pRSmaInfo = NULL;
160✔
1349
  SRSmaInfoItem *pItem = NULL;
160✔
1350
  int32_t        code = 0;
160✔
1351

1352
  if (!(pRSmaRef = taosHashGet(smaMgmt.refHash, &param, POINTER_BYTES))) {
160!
1353
    smaDebug("rsma fetch task not start since rsma info item:%p not exist in refHash:%p, rsetId:%d", param,
×
1354
             smaMgmt.refHash, smaMgmt.rsetId);
1355
    return;
×
1356
  }
1357

1358
  if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) {
160!
1359
    smaWarn("rsma fetch task not start since rsma stat already destroyed, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1360
            pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1361
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1362
    return;
×
1363
  }
1364

1365
  pSma = pStat->pSma;
160✔
1366

1367
  if ((code = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid, &pRSmaInfo)) != 0) {
160!
1368
    smaDebug("rsma fetch task not start since rsma info not exist, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1369
             pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1370
    TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1371
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1372
    return;
×
1373
  }
1374

1375
  if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
160!
1376
    smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1377
             pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1378
    tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
1379
    TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1380
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1381
    return;
×
1382
  }
1383

1384
  pItem = *(SRSmaInfoItem **)&param;
160✔
1385

1386
  // if rsma trigger stat in paused, cancelled or finished, not start fetch task
1387
  int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
160✔
1388
  switch (rsmaTriggerStat) {
160!
1389
    case TASK_TRIGGER_STAT_PAUSED:
×
1390
    case TASK_TRIGGER_STAT_CANCELLED: {
1391
      smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8
×
1392
               ", rsetId:%d refId:%" PRIi64,
1393
               SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaRef->refId);
1394
      if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
×
1395
        bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
×
1396
        if (!ret) {
×
1397
          smaWarn("vgId:%d, rsma fetch task not reset for level %" PRIi8
×
1398
                  " since tmr reset failed, rsetId:%d refId:%" PRIi64,
1399
                  SMA_VID(pSma), pItem->level, smaMgmt.rsetId, pRSmaRef->refId);
1400
        }
1401
      }
1402
      tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
1403
      TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1404
      return;
×
1405
    }
1406
    default:
160✔
1407
      break;
160✔
1408
  }
1409

1410
  int8_t fetchTriggerStat =
1411
      atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
160✔
1412
  switch (fetchTriggerStat) {
160!
1413
    case TASK_TRIGGER_STAT_ACTIVE: {
46✔
1414
      smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
46!
1415
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1416
      // async process
1417
      atomic_store_8(&pItem->fetchLevel, 1);
46✔
1418

1419
      if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
46!
1420
        if (tsem_post(&(pStat->notEmpty)) != 0) {
46!
1421
          smaError("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since sem post failed",
×
1422
                   SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1423
        }
1424
      }
1425
    } break;
46✔
1426
    case TASK_TRIGGER_STAT_INACTIVE: {
101✔
1427
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive ",
101!
1428
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1429
    } break;
101✔
1430
    case TASK_TRIGGER_STAT_INIT: {
13✔
1431
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is init",
13!
1432
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1433
    } break;
13✔
1434
    default: {
×
1435
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat:%" PRIi8
×
1436
               " is unknown",
1437
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid, fetchTriggerStat);
1438
    } break;
×
1439
  }
1440

1441
_end:
160✔
1442
  taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
160✔
1443
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
160!
1444
  TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
160✔
1445
}
1446

1447
static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) {
320✔
1448
  int32_t arrSize = taosArrayGetSize(pItems);
320✔
1449
  if (type == STREAM_INPUT__MERGED_SUBMIT) {
320✔
1450
    for (int32_t i = 0; i < arrSize; ++i) {
624✔
1451
      SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
312✔
1452
      taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_EXEC_MSG_HLEN));
312✔
1453
    }
1454
  } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
8!
1455
    for (int32_t i = 0; i < arrSize; ++i) {
16✔
1456
      SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
8✔
1457
      blockDataDestroy(packData->pDataBlock);
8✔
1458
    }
1459
  } else {
1460
    smaWarn("%s:%d unknown type:%d", __func__, __LINE__, type);
×
1461
  }
1462
  taosArrayClear(pItems);
320✔
1463
}
320✔
1464

1465
/**
1466
 * @brief fetch rsma result(consider the efficiency and functionality)
1467
 *
1468
 * @param pSma
1469
 * @param pInfo
1470
 * @return int32_t
1471
 */
1472
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
46✔
1473
  int32_t     code = 0;
46✔
1474
  int32_t     lino = 0;
46✔
1475
  SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
46✔
1476
  for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
138✔
1477
    SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
92✔
1478

1479
    if (1 == atomic_val_compare_exchange_8(&pItem->fetchLevel, 1, 0)) {
92✔
1480
      qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1);
46✔
1481
      if (!taskInfo) {
46!
1482
        continue;
×
1483
      }
1484

1485
      if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
46!
1486
        smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch executed",
×
1487
                 SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
1488
      } else {
1489
        int64_t curMs = taosGetTimestampMs();
46✔
1490
        if ((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX) {
46✔
1491
          smaTrace("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
12!
1492
                   SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
1493
          atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);  // restore the active stat
12✔
1494
          continue;
12✔
1495
        } else {
1496
          smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
34!
1497
                   SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
1498
        }
1499
      }
1500

1501
      pItem->nScanned = 0;
34✔
1502

1503
      TAOS_CHECK_EXIT(qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK));
34!
1504
      if ((code = tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, NULL)) < 0) {
34!
1505
        atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, code);
×
1506
        goto _exit;
×
1507
      }
1508

1509
      smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch finished",
34!
1510
               SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
1511
    } else {
1512
      smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32
46!
1513
               " maxDelay:%d, fetch not executed as fetch level is %" PRIi8,
1514
               SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay, pItem->fetchLevel);
1515
    }
1516
  }
1517

1518
_exit:
46✔
1519
  TAOS_RETURN(code);
46✔
1520
}
1521

1522
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
226✔
1523
  void   *msg = NULL;
226✔
1524
  int8_t  resume = 0;
226✔
1525
  int32_t nSubmit = 0;
226✔
1526
  int32_t nDelete = 0;
226✔
1527
  int64_t version = 0;
226✔
1528
  int32_t code = 0;
226✔
1529
  int32_t lino = 0;
226✔
1530

1531
  SPackedData packData;
1532

1533
  taosArrayClear(pSubmitArr);
226✔
1534

1535
  // the submitReq/deleteReq msg may exsit alternately in the msg queue, consume them sequentially in batch mode
1536
  while (1) {
1537
    TAOS_UNUSED(taosGetQitem(qall, (void **)&msg));
546✔
1538
    if (msg) {
546✔
1539
      int8_t inputType = RSMA_EXEC_MSG_TYPE(msg);
340✔
1540
      if (inputType == STREAM_INPUT__DATA_SUBMIT) {
340✔
1541
      _resume_submit:
312✔
1542
        packData.msgLen = RSMA_EXEC_MSG_LEN(msg);
312✔
1543
        packData.ver = RSMA_EXEC_MSG_VER(msg);
312✔
1544
        packData.msgStr = RSMA_EXEC_MSG_BODY(msg);
312✔
1545
        version = packData.ver;
312✔
1546
        if (!taosArrayPush(pSubmitArr, &packData)) {
312!
1547
          taosFreeQitem(msg);
×
1548
          TAOS_CHECK_EXIT(terrno);
×
1549
        }
1550
        ++nSubmit;
312✔
1551
      } else if (inputType == STREAM_INPUT__REF_DATA_BLOCK) {
28!
1552
      _resume_delete:
28✔
1553
        version = RSMA_EXEC_MSG_VER(msg);
28✔
1554
        if ((code = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version,
28!
1555
                                          &packData.pDataBlock, 1, STREAM_DELETE_DATA))) {
1556
          taosFreeQitem(msg);
×
1557
          TAOS_CHECK_EXIT(code);
×
1558
        }
1559

1560
        if (packData.pDataBlock && !taosArrayPush(pSubmitArr, &packData)) {
36!
1561
          taosFreeQitem(msg);
×
1562
          TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
1563
        }
1564
        taosFreeQitem(msg);
28✔
1565
        if (packData.pDataBlock) {
28✔
1566
          // packData.pDataBlock is NULL if delete affects 0 row
1567
          ++nDelete;
8✔
1568
        }
1569
      } else {
1570
        smaWarn("%s:%d unknown msg type:%d", __func__, __LINE__, inputType);
×
1571
        break;
×
1572
      }
1573
    }
1574

1575
    if (nSubmit > 0 || nDelete > 0) {
546✔
1576
      int32_t size = TARRAY_SIZE(pSubmitArr);
320✔
1577
      int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK;
320✔
1578
      for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
960✔
1579
        TAOS_CHECK_EXIT(tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, version, inputType, pInfo, type, i));
640!
1580
      }
1581
      tdFreeRSmaSubmitItems(pSubmitArr, inputType);
320✔
1582
      nSubmit = 0;
320✔
1583
      nDelete = 0;
320✔
1584
    } else {
1585
      goto _rtn;
226✔
1586
    }
1587

1588
    if (resume == 2) {
320!
1589
      resume = 0;
×
1590
      goto _resume_delete;
×
1591
    }
1592
  }
1593

1594
_rtn:
226✔
1595
  return TSDB_CODE_SUCCESS;
226✔
1596
_exit:
×
1597
  atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
×
1598
  smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
×
1599
           type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr());
1600
  tdFreeRSmaSubmitItems(pSubmitArr, nSubmit ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK);
×
1601
  while (1) {
×
1602
    void *msg = NULL;
×
1603
    TAOS_UNUSED(taosGetQitem(qall, (void **)&msg));
×
1604
    if (msg) {
×
1605
      taosFreeQitem(msg);
×
1606
    } else {
1607
      break;
×
1608
    }
1609
  }
1610
  TAOS_RETURN(code);
×
1611
}
1612

1613
/**
1614
 * @brief
1615
 *
1616
 * @param pSma
1617
 * @param type
1618
 * @return int32_t
1619
 */
1620

1621
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
680✔
1622
  int32_t    code = 0;
680✔
1623
  int32_t    lino = 0;
680✔
1624
  SVnode    *pVnode = pSma->pVnode;
680✔
1625
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
680✔
1626
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
680✔
1627
  SHashObj  *infoHash = NULL;
680✔
1628
  SArray    *pSubmitArr = NULL;
680✔
1629
  bool       isFetchAll = false;
680✔
1630

1631
  if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) {
680!
1632
    code = TSDB_CODE_RSMA_INVALID_STAT;
×
1633
    TSDB_CHECK_CODE(code, lino, _exit);
×
1634
  }
1635

1636
  if (!(pSubmitArr =
680!
1637
            taosArrayInit(TMIN(RSMA_EXEC_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) {
680✔
1638
    code = terrno;
×
1639
    TSDB_CHECK_CODE(code, lino, _exit);
×
1640
  }
1641

1642
  while (true) {
1643
    // step 1: rsma exec - consume data in buffer queue for all suids
1644
    if (type == RSMA_EXEC_OVERFLOW) {
37,922!
1645
      void *pIter = NULL;
37,922✔
1646
      while ((pIter = taosHashIterate(infoHash, pIter))) {
75,165✔
1647
        SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
37,243✔
1648
        if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
37,243✔
1649
          if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_NEED_FETCH(pInfo)) {
181✔
1650
            int32_t batchCnt = -1;
179✔
1651
            int32_t batchMax = taosHashGetSize(infoHash) / tsNumOfVnodeRsmaThreads;
179✔
1652
            bool    occupied = (batchMax <= 1);
179✔
1653
            if (batchMax > 1) {
179!
1654
              batchMax = 100 / batchMax;
×
1655
              batchMax = TMAX(batchMax, 4);
×
1656
            }
1657
            while (occupied || (++batchCnt < batchMax)) {                 // greedy mode
405!
1658
              TAOS_UNUSED(taosReadAllQitems(pInfo->queue, pInfo->qall));  // queue has mutex lock
405✔
1659
              int32_t qallItemSize = taosQallItemSize(pInfo->qall);
405✔
1660
              if (qallItemSize > 0) {
405✔
1661
                if ((code = tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type)) != 0) {
226!
1662
                  taosHashCancelIterate(infoHash, pIter);
×
1663
                  TSDB_CHECK_CODE(code, lino, _exit);
×
1664
                }
1665
                smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi32, SMA_VID(pSma), qallItemSize, type);
226!
1666
              }
1667

1668
              if (RSMA_NEED_FETCH(pInfo)) {
405✔
1669
                int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2);
46✔
1670
                if (oldStat == 0 ||
46!
1671
                    ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
×
1672
                  int32_t oldVal = atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
46✔
1673

1674
                  if (oldVal < 0) {
46!
1675
                    code = TSDB_CODE_APP_ERROR;
×
1676
                    taosHashCancelIterate(infoHash, pIter);
×
1677
                    TSDB_CHECK_CODE(code, lino, _exit);
×
1678
                  }
1679

1680
                  if ((code = tdRSmaFetchAllResult(pSma, pInfo)) != 0) {
46!
1681
                    taosHashCancelIterate(infoHash, pIter);
×
1682
                    TSDB_CHECK_CODE(code, lino, _exit);
×
1683
                  }
1684

1685
                  if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
46!
1686
                    atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
46✔
1687
                  }
1688
                }
1689
              }
1690

1691
              if (qallItemSize > 0) {
405✔
1692
                TAOS_UNUSED(atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize));
226✔
1693
                continue;
226✔
1694
              }
1695
              if (RSMA_NEED_FETCH(pInfo)) {
179!
1696
                continue;
×
1697
              }
1698

1699
              break;
179✔
1700
            }
1701
          }
1702
          TAOS_UNUSED(atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
181✔
1703
        }
1704
      }
1705
    } else {
1706
      smaWarn("%s:%d unknown rsma exec type:%d", __func__, __LINE__, (int32_t)type);
×
1707
      code = TSDB_CODE_APP_ERROR;
×
1708
      TSDB_CHECK_CODE(code, lino, _exit);
×
1709
    }
1710

1711
    if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
37,881✔
1712
      if (pEnv->flag & SMA_ENV_FLG_CLOSE) {
860!
1713
        break;
×
1714
      }
1715

1716
      if (tsem_wait(&pRSmaStat->notEmpty) != 0) {
860!
1717
        smaError("vgId:%d, failed to wait for not empty since %s", TD_VID(pVnode), tstrerror(terrno));
×
1718
      }
1719

1720
      if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
851!
1721
        smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
667!
1722
                 atomic_load_64(&pRSmaStat->nBufItems));
1723
        break;
680✔
1724
      }
1725
    }
1726

1727
  }  // end of while(true)
1728

1729
_exit:
680✔
1730
  taosArrayDestroy(pSubmitArr);
680✔
1731
  if (code) {
680!
1732
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
1733
  }
1734
  TAOS_RETURN(code);
680✔
1735
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc