• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3534

21 Nov 2024 07:36AM UTC coverage: 60.825% (+2.0%) from 58.848%
#3534

push

travis-ci

web-flow
Merge pull request #28810 from taosdata/ehn/add-sync-heartbeat-sent-time-to-log

ehn:add-sync-heartbeat-sent-time-to-log

120023 of 252376 branches covered (47.56%)

Branch coverage included in aggregate %.

43 of 47 new or added lines in 3 files covered. (91.49%)

2254 existing lines in 162 files now uncovered.

200876 of 275203 relevant lines covered (72.99%)

16110754.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.24
/source/dnode/vnode/src/sma/smaRollup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sma.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
#define RSMA_EXEC_SMOOTH_SIZE   (100)     // cnt
21
#define RSMA_EXEC_BATCH_SIZE    (1024)    // cnt
22
#define RSMA_FETCH_DELAY_MAX    (120000)  // ms
23
#define RSMA_FETCH_ACTIVE_MAX   (1000)    // ms
24
#define RSMA_FETCH_INTERVAL     (5000)    // ms
25
#define RSMA_EXEC_TASK_FLAG     "rsma"
26
#define RSMA_EXEC_MSG_HLEN      (13)  // type(int8_t) + len(int32_t) + version(int64_t)
27
#define RSMA_EXEC_MSG_TYPE(msg) (*(int8_t *)(msg))
28
#define RSMA_EXEC_MSG_LEN(msg)  (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t)))
29
#define RSMA_EXEC_MSG_VER(msg)  (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t)))
30
#define RSMA_EXEC_MSG_BODY(msg) (POINTER_SHIFT((msg), RSMA_EXEC_MSG_HLEN))
31

32
#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel)
33

34
SSmaMgmt smaMgmt = {
35
    .inited = 0,
36
    .rsetId = -1,
37
};
38

39
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
40

41
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
42
static void    tdUidStoreDestory(STbUidStore *pStore);
43
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
44
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
45
                                       int8_t idx);
46
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
47
                                 SRSmaInfo *pInfo, ERsmaExecType type, int8_t level);
48
static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo);
49
static void    tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
50
static void    tdFreeRSmaSubmitItems(SArray *pItems, int32_t type);
51
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
52
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
53
                                         int32_t execType, int8_t *streamFlushed);
54
static void    tdRSmaFetchTrigger(void *param, void *tmrId);
55
static void    tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
56
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
57
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
58
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
59

60
struct SRSmaQTaskInfoItem {
61
  int32_t len;
62
  int8_t  type;
63
  int64_t suid;
64
  void   *qTaskInfo;
65
};
66

67
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
34✔
68
  // Note: free/kill may in RC
69
  if (!taskHandle || !(*taskHandle)) return;
34!
70
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
34✔
71
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
34!
72
    smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
34!
73
    qDestroyTask(otaskHandle);
34✔
74
  }
75
}
76

77
/**
78
 * @brief general function to free rsmaInfo
79
 *
80
 * @param pSma
81
 * @param pInfo
82
 * @return void*
83
 */
84
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
17✔
85
  if (pInfo) {
17!
86
    for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
51✔
87
      SRSmaInfoItem *pItem = &pInfo->items[i];
34✔
88

89
      if (pItem->tmrId) {
34!
90
        smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
34!
91
                 pInfo->suid, i + 1);
92
        if (!taosTmrStopA(&pItem->tmrId)) {
34!
93
          smaError("vgId:%d, failed to stop fetch timer for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
×
94
                   i + 1);
95
        }
96
      }
97

98
      if (pItem->pStreamState) {
34!
99
        streamStateClose(pItem->pStreamState, false);
34✔
100
      }
101

102
      if (pItem->pStreamTask) {
34!
103
        tFreeStreamTask(pItem->pStreamTask);
34✔
104
      }
105
      taosArrayDestroy(pItem->pResList);
34✔
106
      tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
34✔
107
    }
108

109
    taosMemoryFreeClear(pInfo->pTSchema);
17!
110

111
    if (pInfo->queue) {
17!
112
      taosCloseQueue(pInfo->queue);
17✔
113
      pInfo->queue = NULL;
17✔
114
    }
115
    if (pInfo->qall) {
17!
116
      taosFreeQall(pInfo->qall);
17✔
117
      pInfo->qall = NULL;
17✔
118
    }
119

120
    taosMemoryFree(pInfo);
17✔
121
  }
122

123
  return NULL;
17✔
124
}
125

126
static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
127
  *pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
8✔
128
  if (*pStore == NULL) {
8!
129
    return terrno;
×
130
  }
131

132
  return TSDB_CODE_SUCCESS;
8✔
133
}
134

135
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd) {
8✔
136
  SRSmaInfo *pRSmaInfo = NULL;
8✔
137
  int32_t    code = 0;
8✔
138

139
  if (!suid || !tbUids) {
8!
140
    code = TSDB_CODE_INVALID_PTR;
×
141
    smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), suid ? *suid : -1,
×
142
             tstrerror(code));
143
    TAOS_RETURN(code);
×
144
  }
145

146
  int32_t nTables = taosArrayGetSize(tbUids);
8✔
147

148
  if (0 == nTables) {
8!
149
    smaDebug("vgId:%d, no need to update tbUidList for suid:%" PRIi64 " since Empty tbUids", SMA_VID(pSma), *suid);
×
150
    return TSDB_CODE_SUCCESS;
×
151
  }
152

153
  code = tdAcquireRSmaInfoBySuid(pSma, *suid, &pRSmaInfo);
8✔
154

155
  if (code != 0) {
8!
156
    smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
×
157
    TAOS_RETURN(code);
×
158
  }
159

160
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
24✔
161
    if (pRSmaInfo->taskInfo[i]) {
16!
162
      if ((code = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) {
16!
163
        tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
164
        smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
×
165
                 tstrerror(code));
166
        TAOS_RETURN(code);
×
167
      }
168
      smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p. suid:%" PRIi64 " uid:%" PRIi64
16!
169
               "nTables:%d level %d",
170
               SMA_VID(pSma), pRSmaInfo->taskInfo[i], *suid, *(int64_t *)TARRAY_GET_ELEM(tbUids, 0), nTables, i);
171
    }
172
  }
173

174
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
8✔
175
  TAOS_RETURN(code);
8✔
176
}
177

178
int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
101,221✔
179
  int32_t code = 0;
101,221✔
180
  if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
101,221✔
181
    return TSDB_CODE_SUCCESS;
101,215✔
182
  }
183

184
  TAOS_CHECK_RETURN(tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids, isAdd));
6!
185

186
  void *pIter = NULL;
8✔
187
  while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
8!
188
    tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
×
189
    SArray   *pTbUids = *(SArray **)pIter;
×
190

191
    if ((code = tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd)) != TSDB_CODE_SUCCESS) {
×
192
      taosHashCancelIterate(pStore->uidHash, pIter);
×
193
      TAOS_RETURN(code);
×
194
    }
195
  }
196
  return TSDB_CODE_SUCCESS;
8✔
197
}
198

199
/**
200
 * @brief fetch suid/uids when create child tables of rollup SMA
201
 *
202
 * @param pTsdb
203
 * @param ppStore
204
 * @param suid
205
 * @param uid
206
 * @return int32_t
207
 */
208
int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) {
138,956✔
209
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
138,956✔
210
  int32_t  code = 0;
138,956✔
211

212
  // only applicable to rollup SMA ctables
213
  if (!pEnv) {
138,956✔
214
    return TSDB_CODE_SUCCESS;
138,954✔
215
  }
216

217
  SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
2✔
218
  SHashObj  *infoHash = NULL;
2✔
219
  if (!pStat || !(infoHash = RSMA_INFO_HASH(pStat))) {
2!
220
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
221
  }
222

223
  // info cached when create rsma stable and return directly for non-rsma ctables
224
  if (!taosHashGet(infoHash, &suid, sizeof(tb_uid_t))) {
8!
225
    return TSDB_CODE_SUCCESS;
×
226
  }
227

228
  if (!(*ppStore)) {
8!
229
    TAOS_CHECK_RETURN(tdUidStoreInit(ppStore));
8!
230
  }
231

232
  if ((code = tdUidStorePut(*ppStore, suid, &uid)) < 0) {
8!
233
    *ppStore = tdUidStoreFree(*ppStore);
×
234
    TAOS_RETURN(code);
×
235
  }
236

237
  return TSDB_CODE_SUCCESS;
8✔
238
}
239

240
static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTaskId *pId) {
34✔
241
  STaskId      id = {.streamId = pId->streamId, .taskId = pId->taskId};
34✔
242
  SStreamTask *pTask = NULL;
34✔
243

244
  streamMetaRLock(pMeta);
34✔
245

246
  int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
34✔
247
  if (code == 0) {
34✔
248
    pItem->submitReqVer = pTask->chkInfo.checkpointVer;
20✔
249
    pItem->fetchResultVer = pTask->info.delaySchedParam;
20✔
250
    streamMetaReleaseTask(pMeta, pTask);
20✔
251
  }
252

253
  streamMetaRUnLock(pMeta);
34✔
254
}
34✔
255

256
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
34✔
257
  int32_t code = streamMetaUnregisterTask(pMeta, streamId, taskId);
34✔
258
  if (code != 0) {
34!
259
    smaError("vgId:%d, rsma task:%" PRIi64 ",%d drop failed since %s", pMeta->vgId, streamId, taskId, tstrerror(code));
×
260
  }
261
  streamMetaWLock(pMeta);
34✔
262
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
34✔
263
  if (streamMetaCommit(pMeta) < 0) {
34✔
264
    // persist to disk
265
  }
266
  streamMetaWUnLock(pMeta);
34✔
267
  smaDebug("vgId:%d, rsma task:%" PRIi64 ",%d dropped, remain tasks:%d", pMeta->vgId, streamId, taskId, numOfTasks);
34!
268
}
34✔
269

270
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
34✔
271
                                       int8_t idx) {
272
  int32_t code = 0;
34✔
273
  if ((param->qmsgLen > 0) && param->qmsg[idx]) {
34!
274
    SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
34✔
275
    SRetention    *pRetention = SMA_RETENTION(pSma);
34✔
276
    STsdbCfg      *pTsdbCfg = SMA_TSDB_CFG(pSma);
34✔
277
    SVnode        *pVnode = pSma->pVnode;
34✔
278
    char           taskInfDir[TSDB_FILENAME_LEN] = {0};
34✔
279
    void          *pStreamState = NULL;
34✔
280

281
    // set the backend of stream state
282
    tdRSmaQTaskInfoGetFullPath(pVnode, pRSmaInfo->suid, idx + 1, pVnode->pTfs, taskInfDir);
34✔
283

284
    if (!taosCheckExistFile(taskInfDir)) {
34✔
285
      char *s = taosStrdup(taskInfDir);
12✔
286
      if (!s) {
12!
287
        TAOS_RETURN(terrno);
×
288
      }
289
      if (taosMulMkDir(s) != 0) {
12!
290
        code = TAOS_SYSTEM_ERROR(errno);
×
291
        taosMemoryFree(s);
×
292
        TAOS_RETURN(code);
×
293
      }
294
      taosMemoryFree(s);
12✔
295
    }
296

297
    SStreamTask *pStreamTask = taosMemoryCalloc(1, sizeof(*pStreamTask));
34✔
298
    if (!pStreamTask) {
34!
299
      return terrno;
×
300
    }
301
    pItem->pStreamTask = pStreamTask;
34✔
302
    pStreamTask->id.taskId = 0;
34✔
303
    pStreamTask->id.streamId = pRSmaInfo->suid + idx;
34✔
304
    pStreamTask->chkInfo.startTs = taosGetTimestampMs();
34✔
305
    pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
34✔
306
    pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_EXEC_TASK_FLAG) + 1);
34✔
307
    if (!pStreamTask->exec.qmsg) {
34!
308
      TAOS_RETURN(terrno);
×
309
    }
310
    (void)sprintf(pStreamTask->exec.qmsg, "%s", RSMA_EXEC_TASK_FLAG);
34✔
311
    pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
34✔
312
    tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
34✔
313

314
    TAOS_CHECK_RETURN(streamCreateStateMachine(pStreamTask));
34!
315

316
    TAOS_CHECK_RETURN(streamTaskCreateActiveChkptInfo(&pStreamTask->chkInfo.pActiveInfo));
34!
317

318
    pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId);
34✔
319
    if (!pStreamState) {
34!
320
      TAOS_RETURN(TSDB_CODE_RSMA_STREAM_STATE_OPEN);
×
321
    }
322
    pItem->pStreamState = pStreamState;
34✔
323

324
    tdRSmaTaskRemove(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId);
34✔
325

326
    SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .skipRollup = 1, .pStateBackend = pStreamState};
34✔
327
    initStorageAPI(&handle.api);
34✔
328

329
    code = qCreateStreamExecTaskInfo(&pRSmaInfo->taskInfo[idx], param->qmsg[idx], &handle, TD_VID(pVnode), 0);
34✔
330
    if (!pRSmaInfo->taskInfo[idx] || (code != 0)) {
34!
331
      TAOS_RETURN(TSDB_CODE_RSMA_QTASKINFO_CREATE);
×
332
    }
333

334
    if (!(pItem->pResList = taosArrayInit(1, POINTER_BYTES))) {
34!
335
      TAOS_RETURN(terrno);
×
336
    }
337

338
    if (pItem->fetchResultVer < pItem->submitReqVer) {
34✔
339
      // fetch the data when reboot
340
      pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE;
11✔
341
    }
342

343
    if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
34✔
344
      int64_t msInterval = -1;
12✔
345
      TAOS_CHECK_RETURN(convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision,
12!
346
                                                       TIME_UNIT_MILLISECOND, &msInterval));
347
      pItem->maxDelay = (int32_t)msInterval;
12✔
348
    } else {
349
      pItem->maxDelay = (int32_t)param->maxdelay[idx];
22✔
350
    }
351
    if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) {
34!
352
      pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY;
×
353
    }
354

355
    pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
34✔
356

357
    SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
34✔
358
    if (taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)) != 0) {
34!
359
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
360
    }
361

362
    bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
34✔
363
    if (!ret) {
34!
364
      smaError("vgId:%d, failed to reset fetch timer for table %" PRIi64 " level %d", TD_VID(pVnode), pRSmaInfo->suid,
34!
365
               idx + 1);
366
    }
367

368
    smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64
34!
369
            ", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 ", maxdelay:%" PRIi64 " watermark:%" PRIi64
370
            ", finally maxdelay:%" PRIi32,
371
            TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId,
372
            pItem->submitReqVer, pItem->fetchResultVer, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
373
  }
374
  TAOS_RETURN(TSDB_CODE_SUCCESS);
34✔
375
}
376

377
/**
378
 * @brief for rsam create or restore
379
 *
380
 * @param pSma
381
 * @param param
382
 * @param suid
383
 * @param tbName
384
 * @return int32_t
385
 */
386
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) {
17✔
387
  int32_t code = 0;
17✔
388
  int32_t lino = 0;
17✔
389

390
  if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
17!
391
    smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid);
×
392
    return TSDB_CODE_SUCCESS;
×
393
  }
394

395
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
17✔
396
  SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
17✔
397
  SRSmaInfo *pRSmaInfo = NULL;
17✔
398

399
  pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
17✔
400
  if (pRSmaInfo) {
17!
401
    smaInfo("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
×
402
    return TSDB_CODE_SUCCESS;
×
403
  }
404

405
  // from write queue: single thead
406
  pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
17✔
407
  if (!pRSmaInfo) {
17!
408
    return terrno;
×
409
  }
410

411
  STSchema *pTSchema;
412
  code = metaGetTbTSchemaNotNull(SMA_META(pSma), suid, -1, 1, &pTSchema);
17✔
413
  TAOS_CHECK_EXIT(code);
17!
414
  pRSmaInfo->pSma = pSma;
17✔
415
  pRSmaInfo->pTSchema = pTSchema;
17✔
416
  pRSmaInfo->suid = suid;
17✔
417
  T_REF_INIT_VAL(pRSmaInfo, 1);
17✔
418

419
  TAOS_CHECK_EXIT(taosOpenQueue(&pRSmaInfo->queue));
17!
420

421
  TAOS_CHECK_EXIT(taosAllocateQall(&pRSmaInfo->qall));
17!
422

423
  TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0));
17!
424
  TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1));
17!
425

426
  TAOS_CHECK_EXIT(taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)));
17!
427

428
_exit:
17✔
429
  if (code != 0) {
17!
430
    TAOS_UNUSED(tdFreeRSmaInfo(pSma, pRSmaInfo));
×
431
  } else {
432
    smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid);
17!
433
  }
434
  TAOS_RETURN(code);
17✔
435
}
436

437
/**
438
 * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam currently
439
 *
440
 * @param pSma
441
 * @param pReq
442
 * @return int32_t
443
 */
444
int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
29,224✔
445
  SVnode *pVnode = pSma->pVnode;
29,224✔
446
  if (!pReq->rollup) {
29,224!
447
    smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since no rollup in req", TD_VID(pVnode), pReq->name,
29,773✔
448
             pReq->suid);
449
    return TSDB_CODE_SUCCESS;
29,806✔
450
  }
451

452
  if (!VND_IS_RSMA(pVnode)) {
×
453
    smaWarn("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
×
454
            pReq->suid);
455
    return TSDB_CODE_SUCCESS;
×
456
  }
457

458
  return tdRSmaProcessCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name);
×
459
}
460

461
/**
462
 * @brief drop cache for stb
463
 *
464
 * @param pSma
465
 * @param pReq
466
 * @return int32_t
467
 */
468
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
2,321✔
469
  SVnode *pVnode = pSma->pVnode;
2,321✔
470
  if (!VND_IS_RSMA(pVnode)) {
2,321!
471
    smaTrace("vgId:%d, not drop rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
2,322✔
472
             pReq->suid);
473
    return TSDB_CODE_SUCCESS;
2,322✔
474
  }
475

476
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
×
477
  if (!pSmaEnv) {
×
478
    return TSDB_CODE_SUCCESS;
×
479
  }
480

481
  int32_t    code = 0;
×
482
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
×
483
  SRSmaInfo *pRSmaInfo = NULL;
×
484

485
  code = tdAcquireRSmaInfoBySuid(pSma, pReq->suid, &pRSmaInfo);
×
486

487
  if (code != 0) {
×
488
    smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name,
×
489
            pReq->suid);
490
    return TSDB_CODE_SUCCESS;
×
491
  }
492

493
  // set del flag for data in mem
494
  atomic_store_8(&pRSmaStat->delFlag, 1);
×
495
  RSMA_INFO_SET_DEL(pRSmaInfo);
×
496
  tdUnRefRSmaInfo(pSma, pRSmaInfo);
×
497

498
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
499

500
  // no need to save to file as triggered by dropping stable
501
  smaDebug("vgId:%d, drop rsma for stable %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
×
502
  return TSDB_CODE_SUCCESS;
×
503
}
504

505
/**
506
 * @brief store suid/[uids], prefer to use array and then hash
507
 *
508
 * @param pStore
509
 * @param suid
510
 * @param uid
511
 * @return int32_t
512
 */
513
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
320✔
514
  // prefer to store suid/uids in array
515
  if ((suid == pStore->suid) || (pStore->suid == 0)) {
320!
516
    if (pStore->suid == 0) {
320!
517
      pStore->suid = suid;
320✔
518
    }
519
    if (uid) {
320✔
520
      if (!pStore->tbUids) {
8!
521
        if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) {
8!
522
          TAOS_RETURN(terrno);
×
523
        }
524
      }
525
      if (!taosArrayPush(pStore->tbUids, uid)) {
16!
526
        TAOS_RETURN(terrno);
×
527
      }
528
    }
529
  } else {
530
    // store other suid/uids in hash when multiple stable/table included in 1 batch of request
531
    if (!pStore->uidHash) {
×
532
      pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
533
      if (!pStore->uidHash) {
×
534
        TAOS_RETURN(terrno);
×
535
      }
536
    }
537
    if (uid) {
×
538
      SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t));
×
539
      if (uidArray && ((uidArray = *(SArray **)uidArray))) {
×
540
        if (!taosArrayPush(uidArray, uid)) {
×
541
          taosArrayDestroy(uidArray);
×
542
          TAOS_RETURN(terrno);
×
543
        }
544
      } else {
545
        SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t));
×
546
        if (!pUidArray) {
×
547
          TAOS_RETURN(terrno);
×
548
        }
549
        if (!taosArrayPush(pUidArray, uid)) {
×
550
          taosArrayDestroy(pUidArray);
×
551
          TAOS_RETURN(terrno);
×
552
        }
553
        TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)));
×
554
      }
555
    } else {
556
      TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0));
×
557
    }
558
  }
559
  return TSDB_CODE_SUCCESS;
320✔
560
}
561

562
static void tdUidStoreDestory(STbUidStore *pStore) {
330✔
563
  if (pStore) {
330!
564
    if (pStore->uidHash) {
330!
565
      if (pStore->tbUids) {
×
566
        // When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
567
        void *pIter = NULL;
×
568
        while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
×
569
          SArray *arr = *(SArray **)pIter;
×
570
          taosArrayDestroy(arr);
×
571
        }
572
      }
573
      taosHashCleanup(pStore->uidHash);
×
574
    }
575
    taosArrayDestroy(pStore->tbUids);
330✔
576
  }
577
}
330✔
578

579
void *tdUidStoreFree(STbUidStore *pStore) {
101,215✔
580
  if (pStore) {
101,215✔
581
    tdUidStoreDestory(pStore);
8✔
582
    taosMemoryFree(pStore);
8✔
583
  }
584
  return NULL;
101,224✔
585
}
586

587
/**
588
 * @brief The SubmitReq for rsma L2/L3 is inserted by tsdbInsertData method directly while not by WriteQ, as the queue
589
 * would be freed when close Vnode, thus lock should be used if with race condition.
590
 * @param pTsdb
591
 * @param version
592
 * @param pReq
593
 * @return int32_t
594
 */
595
static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
625✔
596
  if (pReq) {
625!
597
    SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq;
625✔
598
    // spin lock for race condition during insert data
599
    TAOS_CHECK_RETURN(tsdbInsertData(pTsdb, version, pSubmitReq, NULL));
625!
600
  }
601

602
  return TSDB_CODE_SUCCESS;
625✔
603
}
604

605
static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
312✔
606
  SArray *pSubmitTbData = pMsg ? pMsg->aSubmitTbData : NULL;
312!
607
  int32_t size = taosArrayGetSize(pSubmitTbData);
312✔
608

609
  for (int32_t i = 0; i < size; ++i) {
624✔
610
    SSubmitTbData *pData = TARRAY_GET_ELEM(pSubmitTbData, i);
312✔
611
    TAOS_CHECK_RETURN(tdUidStorePut(pStore, pData->suid, NULL));
312!
612
  }
613

614
  return 0;
312✔
615
}
616

617
/**
618
 * @brief retention of rsma1/rsma2
619
 *
620
 * @param pSma
621
 * @param now
622
 * @return int32_t
623
 */
624
int32_t smaRetention(SSma *pSma, int64_t now) {
×
625
  int32_t code = TSDB_CODE_SUCCESS;
×
626
  if (!VND_IS_RSMA(pSma->pVnode)) {
×
627
    return code;
×
628
  }
629

630
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
×
631
    if (pSma->pRSmaTsdb[i]) {
632
      // code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1);
633
      // if (code) goto _end;
634
    }
635
  }
636

637
_end:
×
638
  TAOS_RETURN(code);
×
639
}
640

641
static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatchDeleteReq *pDelReq) {
16✔
642
  int32_t code = 0;
16✔
643
  int32_t lino = 0;
16✔
644

645
  if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
16!
646
    int32_t len = 0;
16✔
647
    tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
16!
648
    TSDB_CHECK_CODE(code, lino, _exit);
16!
649

650
    void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
16✔
651
    if (!pBuf) {
16!
652
      code = terrno;
×
653
      TSDB_CHECK_CODE(code, lino, _exit);
×
654
    }
655

656
    SEncoder encoder;
657
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
16✔
658
    if ((code = tEncodeSBatchDeleteReq(&encoder, pDelReq)) < 0) {
16!
659
      tEncoderClear(&encoder);
×
660
      rpcFreeCont(pBuf);
×
661
      TSDB_CHECK_CODE(code, lino, _exit);
×
662
    }
663
    tEncoderClear(&encoder);
16✔
664

665
    ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
16✔
666

667
    SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)};
16✔
668
    code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
16✔
669
    TSDB_CHECK_CODE(code, lino, _exit);
16!
670
  }
671

672
_exit:
×
673
  taosArrayDestroy(pDelReq->deleteReqs);
16✔
674
  if (code) {
16!
675
    smaError("vgId:%d, failed at line %d to process delete req for table:%" PRIi64 ", level:%" PRIi8 " since %s",
×
676
             SMA_VID(pSma), lino, suid, level, tstrerror(code));
677
  }
678

679
  TAOS_RETURN(code);
16✔
680
}
681

682
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
730✔
683
                                         int32_t execType, int8_t *streamFlushed) {
684
  int32_t      code = 0;
730✔
685
  int32_t      lino = 0;
730✔
686
  SSDataBlock *output = NULL;
730✔
687
  SArray      *pResList = pItem->pResList;
730✔
688
  STSchema    *pTSchema = pInfo->pTSchema;
730✔
689
  int64_t      suid = pInfo->suid;
730✔
690

691
  while (1) {
669✔
692
    uint64_t ts;
693
    bool     hasMore = false;
1,399✔
694
    code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL);
1,399✔
695
    if (code == TSDB_CODE_QRY_IN_EXEC) {
1,399!
696
      code = 0;
×
697
      break;
730✔
698
    }
699
    TSDB_CHECK_CODE(code, lino, _exit);
1,399!
700

701
    if (taosArrayGetSize(pResList) == 0) {
1,399✔
702
      break;
730✔
703
    }
704

705
    for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
1,340✔
706
      output = taosArrayGetP(pResList, i);
671✔
707
      if (output->info.type == STREAM_CHECKPOINT) {
671✔
708
        if (streamFlushed) *streamFlushed = 1;
28!
709
        continue;
46✔
710
      } else if (output->info.type == STREAM_DELETE_RESULT) {
643✔
711
        SBatchDeleteReq deleteReq = {.suid = suid, .level = pItem->level};
16✔
712
        deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
16✔
713
        if (!deleteReq.deleteReqs) {
16!
714
          code = terrno;
×
715
          TSDB_CHECK_CODE(code, lino, _exit);
×
716
        }
717
        code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, "", true);
16✔
718
        TSDB_CHECK_CODE(code, lino, _exit);
16!
719
        code = tdRSmaProcessDelReq(pSma, suid, pItem->level, &deleteReq);
16✔
720
        TSDB_CHECK_CODE(code, lino, _exit);
16!
721
        continue;
16✔
722
      }
723

724
      smaDebug("vgId:%d, result block, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
627!
725
               ", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64,
726
               SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid,
727
               pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows);
728

729
      if (STREAM_GET_ALL == execType) {
627✔
730
        /**
731
         * 1. reset the output version when reboot
732
         * 2. delete msg version not updated from the result
733
         */
734
        if (output->info.version < pItem->submitReqVer) {
21✔
735
          // submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously
736
          output->info.version = pItem->submitReqVer;
3✔
737
        } else if (output->info.version == pItem->fetchResultVer) {
18✔
738
          smaWarn("vgId:%d, result block, skip dup version, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64
2!
739
                  ", fetchResultVer:%" PRIi64 ", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64
740
                  ", rows:%" PRIi64,
741
                  SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid,
742
                  pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows);
743
          continue;
2✔
744
        }
745
      }
746

747
      STsdb       *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
625✔
748
      SSubmitReq2 *pReq = NULL;
625✔
749

750
      TAOS_CHECK_EXIT(
625!
751
          buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid));
752

753
      if (pReq && (code = tdProcessSubmitReq(sinkTsdb, output->info.version, pReq)) < 0) {
625!
754
        if (code == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) {
755
          // TODO: reconfigure SSubmitReq2
756
        }
757
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
758
        taosMemoryFreeClear(pReq);
×
759
        TSDB_CHECK_CODE(code, lino, _exit);
×
760
      }
761

762
      if (STREAM_GET_ALL == execType) {
625✔
763
        atomic_store_64(&pItem->fetchResultVer, output->info.version);
19✔
764
      }
765

766
      smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level:%" PRIi8
625!
767
               ", execType:%d, ver:%" PRIi64,
768
               SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, execType, output->info.version);
769

770
      if (pReq) {
625!
771
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
625✔
772
        taosMemoryFree(pReq);
625✔
773
      }
774
    }
775
  }
776
_exit:
730✔
777
  if (code) {
730!
778
    smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64
×
779
             ", ver:%" PRIi64,
780
             SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1,
781
             output ? output->info.version : -1);
782
  } else {
783
    smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level);
730!
784
  }
785
  qCleanExecTaskBlockBuf(taskInfo);
730✔
786
  return code;
730✔
787
}
788

789
/**
790
 * @brief Copy msg to rsmaQueueBuffer for batch process
791
 *
792
 * @param pSma
793
 * @param version
794
 * @param pMsg
795
 * @param len
796
 * @param inputType
797
 * @param pInfo
798
 * @param suid
799
 * @return int32_t
800
 */
801
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
341✔
802
                                      SRSmaInfo *pInfo, tb_uid_t suid) {
803
  int32_t code = 0;
341✔
804
  int32_t lino = 0;
341✔
805
  int32_t size = RSMA_EXEC_MSG_HLEN + len;  // header + payload
341✔
806
  void   *qItem;
807

808
  TAOS_CHECK_RETURN(taosAllocateQitem(size, DEF_QITEM, 0, (void **)&qItem));
341!
809

810
  void *pItem = qItem;
341✔
811

812
  *(int8_t *)pItem = (int8_t)inputType;
341✔
813
  pItem = POINTER_SHIFT(pItem, sizeof(int8_t));
341✔
814
  *(int32_t *)pItem = len;
341✔
815
  pItem = POINTER_SHIFT(pItem, sizeof(int32_t));
341✔
816
  *(int64_t *)pItem = version;
341✔
817
  (void)memcpy(POINTER_SHIFT(pItem, sizeof(int64_t)), pMsg, len);
341✔
818

819
  TAOS_CHECK_RETURN(taosWriteQitem(pInfo->queue, qItem));
341!
820

821
  pInfo->lastRecv = taosGetTimestampMs();
341✔
822

823
  SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
341✔
824

825
  int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1);
341✔
826

827
  if (atomic_load_8(&pInfo->assigned) == 0) {
341✔
828
    if (tsem_post(&(pRSmaStat->notEmpty)) != 0) {
216!
829
      smaError("vgId:%d, failed to post notEmpty semaphore for rsma %" PRIi64 " since %s", SMA_VID(pSma), suid,
×
830
               tstrerror(terrno));
831
    }
832
  }
833

834
  // smoothing consume
835
  int32_t n = nItems / RSMA_EXEC_SMOOTH_SIZE;
341✔
836
  if (n > 1) {
341!
837
    if (n > 10) {
×
838
      n = 10;
×
839
    }
840
    taosMsleep(n << 3);
×
841
    if (n > 5) {
×
842
      smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
×
843
              taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 3);
844
    }
845
  }
846

847
  return TSDB_CODE_SUCCESS;
341✔
848
}
849

850
#if 0
851
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
852
  SSubmitMsgIter msgIter = {0};
853
  SSubmitBlkIter blkIter = {0};
854
  STSRow        *row = NULL;
855
  TAOS_CHECK_RETURN(tInitSubmitMsgIter(pReq, &msgIter));
856
  while (true) {
857
    SSubmitBlk *pBlock = NULL;
858
    TAOS_CHECK_RETURN(tGetSubmitMsgNext(&msgIter, &pBlock));
859
    if (pBlock == NULL) break;
860
    tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
861
    while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
862
      smaDebug("vgId:%d, numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64,
863
               SMA_VID(pSma), msgIter.numOfRows, msgIter.suid, msgIter.uid, pReq->version, row->ts);
864
    }
865
  }
866
  return 0;
867
}
868
#endif
869

870
/**
871
 * @brief sync mode
872
 *
873
 * @param pSma
874
 * @param pMsg
875
 * @param msgSize
876
 * @param inputType
877
 * @param pInfo
878
 * @param type
879
 * @param level
880
 * @return int32_t
881
 */
882
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
640✔
883
                                 SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) {
884
  int32_t        code = 0;
640✔
885
  int32_t        idx = level - 1;
640✔
886
  void          *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
640✔
887
  SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
640✔
888

889
  if (!qTaskInfo) {
640!
890
    smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
×
891
             pInfo->suid);
892
    return TSDB_CODE_SUCCESS;
×
893
  }
894
  if (!pInfo->pTSchema) {
640!
895
    smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid);
×
896
    TAOS_RETURN(TSDB_CODE_INVALID_PTR);
×
897
  }
898

899
  smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p, suid:%" PRIu64 ", nMsg:%d, submitReqVer:%" PRIi64
640!
900
           ", inputType:%d",
901
           SMA_VID(pSma), level, RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize, version, inputType);
902

903
  if ((code = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) {
640!
904
    smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(code));
×
905
    TAOS_RETURN(TSDB_CODE_FAILED);
×
906
  }
907

908
  atomic_store_64(&pItem->submitReqVer, version);
640✔
909

910
  TAOS_CHECK_RETURN(tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL));
640!
911

912
  TAOS_RETURN(code);
640✔
913
}
914

915
/**
916
 * @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
917
 *
918
 * @param pSma
919
 * @param suid
920
 */
921
static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo) {
513✔
922
  int32_t    code = 0;
513✔
923
  int32_t    lino = 0;
513✔
924
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
513✔
925
  SRSmaStat *pStat = NULL;
513✔
926
  SRSmaInfo *pRSmaInfo = NULL;
513✔
927

928
  *ppRSmaInfo = NULL;
513✔
929

930
  if (!pEnv) {
513!
931
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_ENV);
×
932
  }
933

934
  pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
513✔
935
  if (!pStat || !RSMA_INFO_HASH(pStat)) {
513!
936
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
937
  }
938

939
  taosRLockLatch(SMA_ENV_LOCK(pEnv));
513✔
940
  pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
513✔
941
  if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
513!
942
    if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
513!
943
      taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
×
944
      TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
945
    }
946

947
    tdRefRSmaInfo(pSma, pRSmaInfo);
948
    taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
513✔
949
    if (pRSmaInfo->suid != suid) {
513!
950
      TAOS_RETURN(TSDB_CODE_APP_ERROR);
×
951
    }
952
    *ppRSmaInfo = pRSmaInfo;
513✔
953
    TAOS_RETURN(code);
513✔
954
  }
UNCOV
955
  taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
×
956

UNCOV
957
  TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
958
}
959

960
static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
961
  if (pInfo) {
8!
962
    tdUnRefRSmaInfo(pSma, pInfo);
963
  }
964
}
513✔
965

966
/**
967
 * @brief async mode
968
 *
969
 * @param pSma
970
 * @param version
971
 * @param pMsg
972
 * @param inputType
973
 * @param suid
974
 * @return int32_t
975
 */
976
static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
341✔
977
                                  tb_uid_t suid) {
978
  int32_t    code = 0;
341✔
979
  SRSmaInfo *pRSmaInfo = NULL;
341✔
980

981
  code = tdAcquireRSmaInfoBySuid(pSma, suid, &pRSmaInfo);
341✔
982
  if (code != 0) {
341!
UNCOV
983
    smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
×
UNCOV
984
    TAOS_RETURN(TSDB_CODE_SUCCESS);  // return success
×
985
  }
986

987
  if (inputType == STREAM_INPUT__DATA_SUBMIT || inputType == STREAM_INPUT__REF_DATA_BLOCK) {
341!
988
    if ((code = tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid)) < 0) {
341!
989
      tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
990
      TAOS_RETURN(code);
×
991
    }
992
    if (smaMgmt.tmrHandle) {
341!
993
      SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0);
341✔
994
      if (pItem->level > 0) {
341!
995
        atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
341✔
996
      }
997
      pItem = RSMA_INFO_ITEM(pRSmaInfo, 1);
341✔
998
      if (pItem->level > 0) {
341!
999
        atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
341✔
1000
      }
1001
    }
1002
  } else {
1003
    code = TSDB_CODE_APP_ERROR;
×
1004
    tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
1005
    smaError("vgId:%d, execute rsma, failed for suid:%" PRIu64 " since %s, type:%d", SMA_VID(pSma), suid,
×
1006
             tstrerror(code), inputType);
1007
    TAOS_RETURN(code);
×
1008
  }
1009

1010
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
341!
1011
  return TSDB_CODE_SUCCESS;
341✔
1012
}
1013

1014
int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
13,059,084✔
1015
  if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
13,059,084✔
1016

1017
  int32_t code = 0;
228✔
1018
  if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
228!
1019
    smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), tstrerror(code));
×
1020
    goto _exit;
×
1021
  }
1022

1023
  STbUidStore uidStore = {0};
312✔
1024

1025
  if ((code = tdFetchSubmitReqSuids(pReq, &uidStore)) < 0) {
312!
1026
    smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), tstrerror(code));
×
1027
    goto _exit;
×
1028
  }
1029

1030
  if (uidStore.suid != 0) {
312!
1031
    if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, uidStore.suid)) < 0) {
312!
1032
      smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), tstrerror(code));
×
1033
      goto _exit;
×
1034
    }
1035

1036
    void *pIter = NULL;
312✔
1037
    while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
312!
1038
      tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
×
1039
      if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, *pTbSuid)) < 0) {
×
1040
        smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), tstrerror(code));
×
1041
        taosHashCancelIterate(uidStore.uidHash, pIter);
×
1042
        goto _exit;
×
1043
      }
1044
    }
1045
  }
1046
_exit:
312✔
1047
  tdUidStoreDestory(&uidStore);
312✔
1048
  TAOS_RETURN(code);
312✔
1049
}
1050

1051
int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
63,580✔
1052
  if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
63,580✔
1053

1054
  int32_t code = 0;
24✔
1055
  if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
24!
1056
    smaError("vgId:%d, failed to process rsma delete since invalid exec code: %s", SMA_VID(pSma), tstrerror(code));
×
1057
    goto _exit;
×
1058
  }
1059

1060
  SDeleteRes *pDelRes = pReq;
29✔
1061
  if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__REF_DATA_BLOCK, pDelRes->suid)) < 0) {
29!
1062
    smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), tstrerror(code));
×
1063
    goto _exit;
×
1064
  }
1065
_exit:
29✔
1066
  TAOS_RETURN(code);
29✔
1067
}
1068

1069
/**
1070
 * @brief retrieve rsma meta and init
1071
 *
1072
 * @param pSma
1073
 * @param nTables number of tables of rsma
1074
 * @return int32_t
1075
 */
1076
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
17✔
1077
  int32_t     code = 0;
17✔
1078
  int32_t     lino = 0;
17✔
1079
  SVnode     *pVnode = pSma->pVnode;
17✔
1080
  SArray     *suidList = NULL;
17✔
1081
  STbUidStore uidStore = {0};
17✔
1082
  SMetaReader mr = {0};
17✔
1083
  tb_uid_t    suid = 0;
17✔
1084

1085
  if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
17!
1086
    code = terrno;
×
1087
    TSDB_CHECK_CODE(code, lino, _exit);
×
1088
  }
1089

1090
  TAOS_CHECK_EXIT(vnodeGetStbIdList(pSma->pVnode, 0, suidList));
17!
1091

1092
  int64_t arrSize = taosArrayGetSize(suidList);
17✔
1093

1094
  if (arrSize == 0) {
17✔
1095
    if (nTables) {
7!
1096
      *nTables = 0;
7✔
1097
    }
1098
    taosArrayDestroy(suidList);
7✔
1099
    smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode));
7!
1100
    return TSDB_CODE_SUCCESS;
7✔
1101
  }
1102

1103
  int64_t nRsmaTables = 0;
10✔
1104
  metaReaderDoInit(&mr, SMA_META(pSma), META_READER_LOCK);
10✔
1105
  if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) {
10!
1106
    code = terrno;
×
1107
    TSDB_CHECK_CODE(code, lino, _exit);
×
1108
  }
1109

1110
  for (int64_t i = 0; i < arrSize; ++i) {
20✔
1111
    suid = *(tb_uid_t *)taosArrayGet(suidList, i);
10✔
1112
    smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
10!
1113
    if (metaReaderGetTableEntryByUidCache(&mr, suid) < 0) {
10!
1114
      code = terrno;
×
1115
      TSDB_CHECK_CODE(code, lino, _exit);
×
1116
    }
1117
    tDecoderClear(&mr.coder);
10✔
1118
    if (mr.me.type != TSDB_SUPER_TABLE) {
10!
1119
      code = TSDB_CODE_RSMA_INVALID_SCHEMA;
×
1120
      TSDB_CHECK_CODE(code, lino, _exit);
×
1121
    }
1122
    if (mr.me.uid != suid) {
10!
1123
      code = TSDB_CODE_RSMA_INVALID_SCHEMA;
×
1124
      TSDB_CHECK_CODE(code, lino, _exit);
×
1125
    }
1126
    if (TABLE_IS_ROLLUP(mr.me.flags)) {
10!
1127
      ++nRsmaTables;
10✔
1128
      SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
10✔
1129
      for (int i = 0; i < TSDB_RETENTION_L2; ++i) {
30✔
1130
        smaDebug("vgId:%d, rsma restore, table:%" PRIi64 " level:%d, maxdelay:%" PRIi64 " watermark:%" PRIi64
20!
1131
                 " qmsgLen:%" PRIi32,
1132
                 TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]);
1133
      }
1134
      TAOS_CHECK_EXIT(tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name));
10!
1135
#if 0
1136
      // reload all ctbUids for suid
1137
      uidStore.suid = suid;
1138
      if (vnodeGetCtbIdList(pVnode, suid, uidStore.tbUids) < 0) {
1139
        code = terrno;
1140
        TSDB_CHECK_CODE(code, lino, _exit);
1141
      }
1142

1143
      if (tdUpdateTbUidList(pVnode->pSma, &uidStore, true) < 0) {
1144
        code = terrno;
1145
        TSDB_CHECK_CODE(code, lino, _exit);
1146
      }
1147

1148
      taosArrayClear(uidStore.tbUids);
1149
#endif
1150
      smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid);
10!
1151
    }
1152
  }
1153

1154
  if (nTables) {
10!
1155
    *nTables = nRsmaTables;
10✔
1156
  }
1157
_exit:
×
1158
  if (code) {
10!
1159
    smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", type:%" PRIi8 ", uid:%" PRIi64, TD_VID(pVnode),
×
1160
             __func__, lino, tstrerror(code), suid, mr.me.type, mr.me.uid);
1161
  }
1162
  metaReaderClear(&mr);
10✔
1163
  taosArrayDestroy(suidList);
10✔
1164
  tdUidStoreDestory(&uidStore);
10✔
1165
  TAOS_RETURN(code);
10✔
1166
}
1167

1168
/**
1169
 * N.B. the data would be restored from the unified WAL replay procedure
1170
 */
1171
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) {
17✔
1172
  int32_t code = 0;
17✔
1173
  int32_t lino = 0;
17✔
1174
  int64_t nTables = 0;
17✔
1175

1176
  // step 1: init env
1177
  TAOS_CHECK_EXIT(tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP));
17!
1178

1179
  // step 2: iterate all stables to restore the rsma env
1180
  TAOS_CHECK_EXIT(tdRSmaRestoreQTaskInfoInit(pSma, &nTables));
17!
1181

1182
_exit:
17✔
1183
  if (code) {
17!
1184
    smaError("vgId:%d, restore rsma task %" PRIi8 "from qtaskf %" PRIi64 " failed since %s", SMA_VID(pSma), type,
×
1185
             qtaskFileVer, tstrerror(code));
1186
  } else {
1187
    smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed, nTables:%" PRIi64, SMA_VID(pSma),
17!
1188
            type, qtaskFileVer, nTables);
1189
  }
1190

1191
  TAOS_RETURN(code);
17✔
1192
}
1193

1194
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
14✔
1195
  int32_t code = 0;
14✔
1196
  int32_t lino = 0;
14✔
1197
  int32_t nTaskInfo = 0;
14✔
1198
  SSma   *pSma = pRSmaStat->pSma;
14✔
1199
  SVnode *pVnode = pSma->pVnode;
14✔
1200

1201
  if (taosHashGetSize(pInfoHash) <= 0) {
14!
1202
    return TSDB_CODE_SUCCESS;
×
1203
  }
1204

1205
  // stream state: trigger checkpoint
1206
  do {
1207
    void *infoHash = NULL;
14✔
1208
    while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
28✔
1209
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
14✔
1210
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
14!
1211
        continue;
×
1212
      }
1213
      for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
42✔
1214
        if (pRSmaInfo->taskInfo[i]) {
28!
1215
          code = qSetSMAInput(pRSmaInfo->taskInfo[i], pRSmaStat->blocks, 1, STREAM_INPUT__CHECKPOINT);
28✔
1216
          if (code) {
28!
1217
            taosHashCancelIterate(pInfoHash, infoHash);
×
1218
            TSDB_CHECK_CODE(code, lino, _exit);
×
1219
          }
1220
          pRSmaInfo->items[i].streamFlushed = 0;
28✔
1221
          ++nTaskInfo;
28✔
1222
        }
1223
      }
1224
    }
1225
  } while (0);
1226

1227
  // stream state: wait checkpoint ready in async mode
1228
  do {
1229
    int32_t nStreamFlushed = 0;
14✔
1230
    int32_t nSleep = 0;
14✔
1231
    void   *infoHash = NULL;
14✔
1232
    while (true) {
1233
      while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
42✔
1234
        SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
28✔
1235
        if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
28!
1236
          continue;
×
1237
        }
1238
        for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
70✔
1239
          if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) {
56!
1240
            int8_t streamFlushed = 0;
56✔
1241
            code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo,
56✔
1242
                                             STREAM_CHECKPOINT, &streamFlushed);
1243
            if (code) {
56!
1244
              taosHashCancelIterate(pInfoHash, infoHash);
×
1245
              TSDB_CHECK_CODE(code, lino, _exit);
×
1246
            }
1247

1248
            if (streamFlushed) {
56✔
1249
              pRSmaInfo->items[i].streamFlushed = 1;
28✔
1250
              if (++nStreamFlushed >= nTaskInfo) {
28✔
1251
                smaInfo("vgId:%d, rsma commit, checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode),
14!
1252
                        nSleep * 10, nStreamFlushed, nTaskInfo);
1253
                taosHashCancelIterate(pInfoHash, infoHash);
14✔
1254
                goto _checkpoint;
14✔
1255
              }
1256
            }
1257
          }
1258
        }
1259
      }
1260
      taosUsleep(10);
14✔
1261
      ++nSleep;
14✔
1262
      smaDebug("vgId:%d, rsma commit, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode),
14!
1263
               nSleep * 10, nStreamFlushed, nTaskInfo);
1264
    }
1265
  } while (0);
1266

1267
_checkpoint:
14✔
1268
  // stream state: build checkpoint in backend
1269
  do {
1270
    SStreamMeta *pMeta = NULL;
14✔
1271
    int64_t      checkpointId = taosGetTimestampNs();
14✔
1272
    bool         checkpointBuilt = false;
14✔
1273
    void        *infoHash = NULL;
14✔
1274
    while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
28✔
1275
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
14✔
1276
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
14!
1277
        continue;
×
1278
      }
1279

1280
      for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
42✔
1281
        SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
28✔
1282
        if (pItem && pItem->pStreamTask) {
28!
1283
          SStreamTask *pTask = pItem->pStreamTask;
28✔
1284
          // atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
1285
          TAOS_UNUSED(streamTaskSetActiveCheckpointInfo(pTask, checkpointId));
28✔
1286

1287
          pTask->chkInfo.checkpointId = checkpointId;  // 1pTask->checkpointingId;
28✔
1288
          pTask->chkInfo.checkpointVer = pItem->submitReqVer;
28✔
1289
          pTask->info.delaySchedParam = pItem->fetchResultVer;
28✔
1290
          pTask->info.taskLevel = TASK_LEVEL_SMA;
28✔
1291

1292
          if (!checkpointBuilt) {
28✔
1293
            // the stream states share one checkpoint
1294
            code = streamTaskBuildCheckpoint(pTask);
14✔
1295
            if (code) {
14!
1296
              taosHashCancelIterate(pInfoHash, infoHash);
×
1297
              TSDB_CHECK_CODE(code, lino, _exit);
×
1298
            }
1299
            pMeta = pTask->pMeta;
14✔
1300
            checkpointBuilt = true;
14✔
1301
          }
1302

1303
          streamMetaWLock(pMeta);
28✔
1304
          if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
28!
1305
            streamMetaWUnLock(pMeta);
×
1306
            taosHashCancelIterate(pInfoHash, infoHash);
×
1307
            TSDB_CHECK_CODE(code, lino, _exit);
×
1308
          }
1309
          streamMetaWUnLock(pMeta);
28✔
1310
          smaDebug("vgId:%d, rsma commit, succeed to commit task:%p, submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
28!
1311
                   ", table:%" PRIi64 ", level:%d",
1312
                   TD_VID(pVnode), pTask, pItem->submitReqVer, pItem->fetchResultVer, pRSmaInfo->suid, i + 1);
1313
        }
1314
      }
1315
    }
1316
    if (pMeta) {
14!
1317
      streamMetaWLock(pMeta);
14✔
1318
      if ((code = streamMetaCommit(pMeta)) != 0) {
14!
1319
        streamMetaWUnLock(pMeta);
×
1320
        if (code == -1) code = TSDB_CODE_OUT_OF_MEMORY;
×
1321
        TSDB_CHECK_CODE(code, lino, _exit);
×
1322
      }
1323
      streamMetaWUnLock(pMeta);
14✔
1324
    }
1325
    if (checkpointBuilt) {
14!
1326
      smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint:%" PRIi64, TD_VID(pVnode), checkpointId);
14!
1327
    }
1328
  } while (0);
1329
_exit:
×
1330
  if (code) {
14!
1331
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
1332
  }
1333

1334
  TAOS_RETURN(code);
14✔
1335
}
1336

1337
/**
1338
 * @brief trigger to get rsma result in async mode
1339
 *
1340
 * @param param
1341
 * @param tmrId
1342
 */
1343
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
164✔
1344
  SRSmaRef      *pRSmaRef = NULL;
164✔
1345
  SSma          *pSma = NULL;
164✔
1346
  SRSmaStat     *pStat = NULL;
164✔
1347
  SRSmaInfo     *pRSmaInfo = NULL;
164✔
1348
  SRSmaInfoItem *pItem = NULL;
164✔
1349
  int32_t        code = 0;
164✔
1350

1351
  if (!(pRSmaRef = taosHashGet(smaMgmt.refHash, &param, POINTER_BYTES))) {
164!
1352
    smaDebug("rsma fetch task not start since rsma info item:%p not exist in refHash:%p, rsetId:%d", param,
×
1353
             smaMgmt.refHash, smaMgmt.rsetId);
1354
    return;
×
1355
  }
1356

1357
  if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) {
164!
1358
    smaWarn("rsma fetch task not start since rsma stat already destroyed, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1359
            pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1360
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1361
    return;
×
1362
  }
1363

1364
  pSma = pStat->pSma;
164✔
1365

1366
  if ((code = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid, &pRSmaInfo)) != 0) {
164!
1367
    smaDebug("rsma fetch task not start since rsma info not exist, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1368
             pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1369
    TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1370
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1371
    return;
×
1372
  }
1373

1374
  if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
164!
1375
    smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1376
             pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1377
    tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
1378
    TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1379
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1380
    return;
×
1381
  }
1382

1383
  pItem = *(SRSmaInfoItem **)&param;
164✔
1384

1385
  // if rsma trigger stat in paused, cancelled or finished, not start fetch task
1386
  int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
164✔
1387
  switch (rsmaTriggerStat) {
164!
1388
    case TASK_TRIGGER_STAT_PAUSED:
×
1389
    case TASK_TRIGGER_STAT_CANCELLED: {
1390
      smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8
×
1391
               ", rsetId:%d refId:%" PRIi64,
1392
               SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaRef->refId);
1393
      if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
×
1394
        bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
×
1395
        if (!ret) {
×
1396
          smaWarn("vgId:%d, rsma fetch task not reset for level %" PRIi8
×
1397
                  " since tmr reset failed, rsetId:%d refId:%" PRIi64,
1398
                  SMA_VID(pSma), pItem->level, smaMgmt.rsetId, pRSmaRef->refId);
1399
        }
1400
      }
1401
      tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
1402
      TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1403
      return;
×
1404
    }
1405
    default:
164✔
1406
      break;
164✔
1407
  }
1408

1409
  int8_t fetchTriggerStat =
1410
      atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
164✔
1411
  switch (fetchTriggerStat) {
164!
1412
    case TASK_TRIGGER_STAT_ACTIVE: {
50✔
1413
      smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
50!
1414
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1415
      // async process
1416
      atomic_store_8(&pItem->fetchLevel, 1);
50✔
1417

1418
      if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
50!
1419
        if (tsem_post(&(pStat->notEmpty)) != 0) {
50!
1420
          smaError("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since sem post failed",
×
1421
                   SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1422
        }
1423
      }
1424
    } break;
50✔
1425
    case TASK_TRIGGER_STAT_INACTIVE: {
101✔
1426
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive ",
101!
1427
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1428
    } break;
101✔
1429
    case TASK_TRIGGER_STAT_INIT: {
13✔
1430
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is init",
13!
1431
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1432
    } break;
13✔
1433
    default: {
×
1434
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat:%" PRIi8
×
1435
               " is unknown",
1436
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid, fetchTriggerStat);
1437
    } break;
×
1438
  }
1439

1440
_end:
164✔
1441
  taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
164✔
1442
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
164!
1443
  TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
164✔
1444
}
1445

1446
static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) {
320✔
1447
  int32_t arrSize = taosArrayGetSize(pItems);
320✔
1448
  if (type == STREAM_INPUT__MERGED_SUBMIT) {
320✔
1449
    for (int32_t i = 0; i < arrSize; ++i) {
624✔
1450
      SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
312✔
1451
      taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_EXEC_MSG_HLEN));
312✔
1452
    }
1453
  } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
8!
1454
    for (int32_t i = 0; i < arrSize; ++i) {
16✔
1455
      SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
8✔
1456
      blockDataDestroy(packData->pDataBlock);
8✔
1457
    }
1458
  } else {
1459
    smaWarn("%s:%d unknown type:%d", __func__, __LINE__, type);
×
1460
  }
1461
  taosArrayClear(pItems);
320✔
1462
}
320✔
1463

1464
/**
1465
 * @brief fetch rsma result(consider the efficiency and functionality)
1466
 *
1467
 * @param pSma
1468
 * @param pInfo
1469
 * @return int32_t
1470
 */
1471
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
50✔
1472
  int32_t     code = 0;
50✔
1473
  int32_t     lino = 0;
50✔
1474
  SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
50✔
1475
  for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
150✔
1476
    SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
100✔
1477

1478
    if (1 == atomic_val_compare_exchange_8(&pItem->fetchLevel, 1, 0)) {
100✔
1479
      qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1);
50✔
1480
      if (!taskInfo) {
50!
1481
        continue;
×
1482
      }
1483

1484
      if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
50!
1485
        smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch executed",
×
1486
                 SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
1487
      } else {
1488
        int64_t curMs = taosGetTimestampMs();
50✔
1489
        if ((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX) {
50✔
1490
          smaTrace("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
16!
1491
                   SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
1492
          atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);  // restore the active stat
16✔
1493
          continue;
16✔
1494
        } else {
1495
          smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
34!
1496
                   SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
1497
        }
1498
      }
1499

1500
      pItem->nScanned = 0;
34✔
1501

1502
      TAOS_CHECK_EXIT(qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK));
34!
1503
      if ((code = tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, NULL)) < 0) {
34!
1504
        atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, code);
×
1505
        goto _exit;
×
1506
      }
1507

1508
      smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch finished",
34!
1509
               SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
1510
    } else {
1511
      smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32
50!
1512
               " maxDelay:%d, fetch not executed as fetch level is %" PRIi8,
1513
               SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay, pItem->fetchLevel);
1514
    }
1515
  }
1516

1517
_exit:
50✔
1518
  TAOS_RETURN(code);
50✔
1519
}
1520

1521
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
235✔
1522
  void   *msg = NULL;
235✔
1523
  int8_t  resume = 0;
235✔
1524
  int32_t nSubmit = 0;
235✔
1525
  int32_t nDelete = 0;
235✔
1526
  int64_t version = 0;
235✔
1527
  int32_t code = 0;
235✔
1528
  int32_t lino = 0;
235✔
1529

1530
  SPackedData packData;
1531

1532
  taosArrayClear(pSubmitArr);
235✔
1533

1534
  // the submitReq/deleteReq msg may exsit alternately in the msg queue, consume them sequentially in batch mode
1535
  while (1) {
1536
    TAOS_UNUSED(taosGetQitem(qall, (void **)&msg));
555✔
1537
    if (msg) {
555✔
1538
      int8_t inputType = RSMA_EXEC_MSG_TYPE(msg);
340✔
1539
      if (inputType == STREAM_INPUT__DATA_SUBMIT) {
340✔
1540
      _resume_submit:
312✔
1541
        packData.msgLen = RSMA_EXEC_MSG_LEN(msg);
312✔
1542
        packData.ver = RSMA_EXEC_MSG_VER(msg);
312✔
1543
        packData.msgStr = RSMA_EXEC_MSG_BODY(msg);
312✔
1544
        version = packData.ver;
312✔
1545
        if (!taosArrayPush(pSubmitArr, &packData)) {
312!
1546
          taosFreeQitem(msg);
×
1547
          TAOS_CHECK_EXIT(terrno);
×
1548
        }
1549
        ++nSubmit;
312✔
1550
      } else if (inputType == STREAM_INPUT__REF_DATA_BLOCK) {
28!
1551
      _resume_delete:
28✔
1552
        version = RSMA_EXEC_MSG_VER(msg);
28✔
1553
        if ((code = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version,
28!
1554
                                          &packData.pDataBlock, 1, STREAM_DELETE_DATA))) {
1555
          taosFreeQitem(msg);
×
1556
          TAOS_CHECK_EXIT(code);
×
1557
        }
1558

1559
        if (packData.pDataBlock && !taosArrayPush(pSubmitArr, &packData)) {
36!
1560
          taosFreeQitem(msg);
×
1561
          TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
1562
        }
1563
        taosFreeQitem(msg);
28✔
1564
        if (packData.pDataBlock) {
28✔
1565
          // packData.pDataBlock is NULL if delete affects 0 row
1566
          ++nDelete;
8✔
1567
        }
1568
      } else {
1569
        smaWarn("%s:%d unknown msg type:%d", __func__, __LINE__, inputType);
×
1570
        break;
×
1571
      }
1572
    }
1573

1574
    if (nSubmit > 0 || nDelete > 0) {
555✔
1575
      int32_t size = TARRAY_SIZE(pSubmitArr);
320✔
1576
      int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK;
320✔
1577
      for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
960✔
1578
        TAOS_CHECK_EXIT(tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, version, inputType, pInfo, type, i));
640!
1579
      }
1580
      tdFreeRSmaSubmitItems(pSubmitArr, inputType);
320✔
1581
      nSubmit = 0;
320✔
1582
      nDelete = 0;
320✔
1583
    } else {
1584
      goto _rtn;
235✔
1585
    }
1586

1587
    if (resume == 2) {
320!
1588
      resume = 0;
×
1589
      goto _resume_delete;
×
1590
    }
1591
  }
1592

1593
_rtn:
235✔
1594
  return TSDB_CODE_SUCCESS;
235✔
1595
_exit:
×
1596
  atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
×
1597
  smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
×
1598
           type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr());
1599
  tdFreeRSmaSubmitItems(pSubmitArr, nSubmit ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK);
×
1600
  while (1) {
×
1601
    void *msg = NULL;
×
1602
    TAOS_UNUSED(taosGetQitem(qall, (void **)&msg));
×
1603
    if (msg) {
×
1604
      taosFreeQitem(msg);
×
1605
    } else {
1606
      break;
×
1607
    }
1608
  }
1609
  TAOS_RETURN(code);
×
1610
}
1611

1612
/**
1613
 * @brief
1614
 *
1615
 * @param pSma
1616
 * @param type
1617
 * @return int32_t
1618
 */
1619

1620
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
679✔
1621
  int32_t    code = 0;
679✔
1622
  int32_t    lino = 0;
679✔
1623
  SVnode    *pVnode = pSma->pVnode;
679✔
1624
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
679✔
1625
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
679✔
1626
  SHashObj  *infoHash = NULL;
679✔
1627
  SArray    *pSubmitArr = NULL;
679✔
1628
  bool       isFetchAll = false;
679✔
1629

1630
  if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) {
679!
1631
    code = TSDB_CODE_RSMA_INVALID_STAT;
×
1632
    TSDB_CHECK_CODE(code, lino, _exit);
×
1633
  }
1634

1635
  if (!(pSubmitArr =
680!
1636
            taosArrayInit(TMIN(RSMA_EXEC_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) {
679✔
1637
    code = terrno;
×
1638
    TSDB_CHECK_CODE(code, lino, _exit);
×
1639
  }
1640

1641
  while (true) {
1642
    // step 1: rsma exec - consume data in buffer queue for all suids
1643
    if (type == RSMA_EXEC_OVERFLOW) {
946!
1644
      void *pIter = NULL;
946✔
1645
      while ((pIter = taosHashIterate(infoHash, pIter))) {
1,212✔
1646
        SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
266✔
1647
        if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
266!
1648
          if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_NEED_FETCH(pInfo)) {
266!
1649
            int32_t batchCnt = -1;
266✔
1650
            int32_t batchMax = taosHashGetSize(infoHash) / tsNumOfVnodeRsmaThreads;
266✔
1651
            bool    occupied = (batchMax <= 1);
266✔
1652
            if (batchMax > 1) {
266!
1653
              batchMax = 100 / batchMax;
×
1654
              batchMax = TMAX(batchMax, 4);
×
1655
            }
1656
            while (occupied || (++batchCnt < batchMax)) {                 // greedy mode
501!
1657
              TAOS_UNUSED(taosReadAllQitems(pInfo->queue, pInfo->qall));  // queue has mutex lock
501✔
1658
              int32_t qallItemSize = taosQallItemSize(pInfo->qall);
501✔
1659
              if (qallItemSize > 0) {
501✔
1660
                if ((code = tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type)) != 0) {
235!
1661
                  taosHashCancelIterate(infoHash, pIter);
×
1662
                  TSDB_CHECK_CODE(code, lino, _exit);
×
1663
                }
1664
                smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi32, SMA_VID(pSma), qallItemSize, type);
235!
1665
              }
1666

1667
              if (RSMA_NEED_FETCH(pInfo)) {
501✔
1668
                int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2);
50✔
1669
                if (oldStat == 0 ||
50!
1670
                    ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
×
1671
                  int32_t oldVal = atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
50✔
1672

1673
                  if (oldVal < 0) {
50!
1674
                    code = TSDB_CODE_APP_ERROR;
×
1675
                    taosHashCancelIterate(infoHash, pIter);
×
1676
                    TSDB_CHECK_CODE(code, lino, _exit);
×
1677
                  }
1678

1679
                  if ((code = tdRSmaFetchAllResult(pSma, pInfo)) != 0) {
50!
1680
                    taosHashCancelIterate(infoHash, pIter);
×
1681
                    TSDB_CHECK_CODE(code, lino, _exit);
×
1682
                  }
1683

1684
                  if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
50!
1685
                    atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
50✔
1686
                  }
1687
                }
1688
              }
1689

1690
              if (qallItemSize > 0) {
501✔
1691
                TAOS_UNUSED(atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize));
235✔
1692
                continue;
235✔
1693
              }
1694
              if (RSMA_NEED_FETCH(pInfo)) {
266!
1695
                continue;
×
1696
              }
1697

1698
              break;
266✔
1699
            }
1700
          }
1701
          TAOS_UNUSED(atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
266✔
1702
        }
1703
      }
1704
    } else {
1705
      smaWarn("%s:%d unknown rsma exec type:%d", __func__, __LINE__, (int32_t)type);
×
1706
      code = TSDB_CODE_APP_ERROR;
×
1707
      TSDB_CHECK_CODE(code, lino, _exit);
×
1708
    }
1709

1710
    if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
946!
1711
      if (pEnv->flag & SMA_ENV_FLG_CLOSE) {
946!
1712
        break;
×
1713
      }
1714

1715
      if (tsem_wait(&pRSmaStat->notEmpty) != 0) {
946!
1716
        smaError("vgId:%d, failed to wait for not empty since %s", TD_VID(pVnode), tstrerror(terrno));
×
1717
      }
1718

1719
      if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
937!
1720
        smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
670!
1721
                 atomic_load_64(&pRSmaStat->nBufItems));
1722
        break;
679✔
1723
      }
1724
    }
1725

1726
  }  // end of while(true)
1727

1728
_exit:
679✔
1729
  taosArrayDestroy(pSubmitArr);
679✔
1730
  if (code) {
680!
1731
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
1732
  }
1733
  TAOS_RETURN(code);
680✔
1734
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc