• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.44
/source/dnode/vnode/src/sma/smaRollup.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sma.h"
17
#include "tq.h"
18

19
#define RSMA_EXEC_SMOOTH_SIZE   (100)     // cnt
20
#define RSMA_EXEC_BATCH_SIZE    (1024)    // cnt
21
#define RSMA_FETCH_DELAY_MAX    (120000)  // ms
22
#define RSMA_FETCH_ACTIVE_MAX   (1000)    // ms
23
#define RSMA_FETCH_INTERVAL     (5000)    // ms
24
#define RSMA_EXEC_TASK_FLAG     "rsma"
25
#define RSMA_EXEC_MSG_HLEN      (13)  // type(int8_t) + len(int32_t) + version(int64_t)
26
#define RSMA_EXEC_MSG_TYPE(msg) (*(int8_t *)(msg))
27
#define RSMA_EXEC_MSG_LEN(msg)  (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t)))
28
#define RSMA_EXEC_MSG_VER(msg)  (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t)))
29
#define RSMA_EXEC_MSG_BODY(msg) (POINTER_SHIFT((msg), RSMA_EXEC_MSG_HLEN))
30

31
#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel)
32

33
SSmaMgmt smaMgmt = {
34
    .inited = 0,
35
    .rsetId = -1,
36
};
37

38
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
39

40
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
41
static void    tdUidStoreDestory(STbUidStore *pStore);
42
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
43
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
44
                                       int8_t idx);
45
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
46
                                 SRSmaInfo *pInfo, ERsmaExecType type, int8_t level);
47
static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo);
48
static void    tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
49
static void    tdFreeRSmaSubmitItems(SArray *pItems, int32_t type);
50
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
51
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
52
                                         int32_t execType, int8_t *streamFlushed);
53
static void    tdRSmaFetchTrigger(void *param, void *tmrId);
54
static void    tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
55
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
56
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
57
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
58

59
struct SRSmaQTaskInfoItem {
60
  int32_t len;
61
  int8_t  type;
62
  int64_t suid;
63
  void   *qTaskInfo;
64
};
65

66
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
×
67
  // Note: free/kill may in RC
68
  if (!taskHandle || !(*taskHandle)) return;
×
69
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
×
70
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
×
71
    smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
×
72
    qDestroyTask(otaskHandle);
×
73
  }
74
}
75

76
/**
77
 * @brief general function to free rsmaInfo
78
 *
79
 * @param pSma
80
 * @param pInfo
81
 * @return void*
82
 */
83
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
×
84
  if (pInfo) {
×
85
    for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
×
86
      SRSmaInfoItem *pItem = &pInfo->items[i];
×
87

88
      if (pItem->tmrId) {
×
89
        smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
×
90
                 pInfo->suid, i + 1);
91
        if (!taosTmrStopA(&pItem->tmrId)) {
×
92
          smaError("vgId:%d, failed to stop fetch timer for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
×
93
                   i + 1);
94
        }
95
      }
96

97
/*
98
      if (pItem->pStreamState) {
99
        streamStateClose(pItem->pStreamState, false);
100
      }
101

102
      if (pItem->pStreamTask) {
103
        tFreeStreamTask(pItem->pStreamTask);
104
      }
105
*/
106
      taosArrayDestroy(pItem->pResList);
×
107
      tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
×
108
    }
109

110
    taosMemoryFreeClear(pInfo->pTSchema);
×
111

112
    if (pInfo->queue) {
×
113
      taosCloseQueue(pInfo->queue);
×
114
      pInfo->queue = NULL;
×
115
    }
116
    if (pInfo->qall) {
×
117
      taosFreeQall(pInfo->qall);
×
118
      pInfo->qall = NULL;
×
119
    }
120

121
    taosMemoryFree(pInfo);
×
122
  }
123

124
  return NULL;
×
125
}
126

127
static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
128
  *pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
×
129
  if (*pStore == NULL) {
×
130
    return terrno;
×
131
  }
132

133
  return TSDB_CODE_SUCCESS;
×
134
}
135

136
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd) {
×
137
  SRSmaInfo *pRSmaInfo = NULL;
×
138
  int32_t    code = 0;
×
139

140
  if (!suid || !tbUids) {
×
141
    code = TSDB_CODE_INVALID_PTR;
×
142
    smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), suid ? *suid : -1,
×
143
             tstrerror(code));
144
    TAOS_RETURN(code);
×
145
  }
146

147
  int32_t nTables = taosArrayGetSize(tbUids);
×
148

149
  if (0 == nTables) {
×
150
    smaDebug("vgId:%d, no need to update tbUidList for suid:%" PRIi64 " since Empty tbUids", SMA_VID(pSma), *suid);
×
151
    return TSDB_CODE_SUCCESS;
×
152
  }
153

154
  code = tdAcquireRSmaInfoBySuid(pSma, *suid, &pRSmaInfo);
×
155

156
  if (code != 0) {
×
157
    smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
×
158
    TAOS_RETURN(code);
×
159
  }
160

161
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
×
162
    if (pRSmaInfo->taskInfo[i]) {
×
163
      if ((code = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) {
×
164
        tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
165
        smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
×
166
                 tstrerror(code));
167
        TAOS_RETURN(code);
×
168
      }
169
      smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p. suid:%" PRIi64 " uid:%" PRIi64
×
170
               "nTables:%d level %d",
171
               SMA_VID(pSma), pRSmaInfo->taskInfo[i], *suid, *(int64_t *)TARRAY_GET_ELEM(tbUids, 0), nTables, i);
172
    }
173
  }
174

175
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
176
  TAOS_RETURN(code);
×
177
}
178

179
int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
87✔
180
  int32_t code = 0;
87✔
181
  if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
87!
182
    return TSDB_CODE_SUCCESS;
87✔
183
  }
184

185
  TAOS_CHECK_RETURN(tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids, isAdd));
×
186

187
  void *pIter = NULL;
×
188
  while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
×
189
    tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
×
190
    SArray   *pTbUids = *(SArray **)pIter;
×
191

192
    if ((code = tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd)) != TSDB_CODE_SUCCESS) {
×
193
      taosHashCancelIterate(pStore->uidHash, pIter);
×
194
      TAOS_RETURN(code);
×
195
    }
196
  }
197
  return TSDB_CODE_SUCCESS;
×
198
}
199

200
/**
201
 * @brief fetch suid/uids when create child tables of rollup SMA
202
 *
203
 * @param pTsdb
204
 * @param ppStore
205
 * @param suid
206
 * @param uid
207
 * @return int32_t
208
 */
209
int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) {
87✔
210
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
87✔
211
  int32_t  code = 0;
87✔
212

213
  // only applicable to rollup SMA ctables
214
  if (!pEnv) {
87!
215
    return TSDB_CODE_SUCCESS;
87✔
216
  }
217

218
  SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
×
219
  SHashObj  *infoHash = NULL;
×
220
  if (!pStat || !(infoHash = RSMA_INFO_HASH(pStat))) {
×
221
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
222
  }
223

224
  // info cached when create rsma stable and return directly for non-rsma ctables
225
  if (!taosHashGet(infoHash, &suid, sizeof(tb_uid_t))) {
×
226
    return TSDB_CODE_SUCCESS;
×
227
  }
228

229
  if (!(*ppStore)) {
×
230
    TAOS_CHECK_RETURN(tdUidStoreInit(ppStore));
×
231
  }
232

233
  if ((code = tdUidStorePut(*ppStore, suid, &uid)) < 0) {
×
234
    *ppStore = tdUidStoreFree(*ppStore);
×
235
    TAOS_RETURN(code);
×
236
  }
237

238
  return TSDB_CODE_SUCCESS;
×
239
}
240

241
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
×
242
                                       int8_t idx) {
243
  int32_t code = 0;
×
244
#if 0
245
  if ((param->qmsgLen > 0) && param->qmsg[idx]) {
246
    SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
247
    SRetention    *pRetention = SMA_RETENTION(pSma);
248
    STsdbCfg      *pTsdbCfg = SMA_TSDB_CFG(pSma);
249
    SVnode        *pVnode = pSma->pVnode;
250
    char           taskInfDir[TSDB_FILENAME_LEN] = {0};
251
    void          *pStreamState = NULL;
252

253
    // set the backend of stream state
254
    tdRSmaQTaskInfoGetFullPath(pVnode, pRSmaInfo->suid, idx + 1, pVnode->pTfs, taskInfDir);
255

256
    if (!taosCheckExistFile(taskInfDir)) {
257
      char *s = taosStrdup(taskInfDir);
258
      if (!s) {
259
        TAOS_RETURN(terrno);
260
      }
261
      if (taosMulMkDir(s) != 0) {
262
        code = TAOS_SYSTEM_ERROR(ERRNO);
263
        taosMemoryFree(s);
264
        TAOS_RETURN(code);
265
      }
266
      taosMemoryFree(s);
267
    }
268

269
    SStreamTask *pStreamTask = taosMemoryCalloc(1, sizeof(*pStreamTask));
270
    if (!pStreamTask) {
271
      return terrno;
272
    }
273
    pItem->pStreamTask = pStreamTask;
274
    pStreamTask->id.taskId = 0;
275
    pStreamTask->id.streamId = pRSmaInfo->suid + idx;
276
    pStreamTask->chkInfo.startTs = taosGetTimestampMs();
277
    //pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
278
    pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_EXEC_TASK_FLAG) + 1);
279
    if (!pStreamTask->exec.qmsg) {
280
      TAOS_RETURN(terrno);
281
    }
282
    TAOS_UNUSED(snprintf(pStreamTask->exec.qmsg, strlen(RSMA_EXEC_TASK_FLAG) + 1, "%s", RSMA_EXEC_TASK_FLAG));
283
    pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
284
    tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
285

286
    TAOS_CHECK_RETURN(streamCreateStateMachine(pStreamTask));
287

288
    TAOS_CHECK_RETURN(streamTaskCreateActiveChkptInfo(&pStreamTask->chkInfo.pActiveInfo));
289

290
    pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId);
291
    if (!pStreamState) {
292
      TAOS_RETURN(TSDB_CODE_RSMA_STREAM_STATE_OPEN);
293
    }
294
    pItem->pStreamState = pStreamState;
295

296
    tdRSmaTaskRemove(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId);
297

298
    SReadHandle handle = {0};
299
    handle.vnode = pVnode;
300
    handle.initTqReader = 1;
301
    handle.skipRollup = 1;
302
    handle.pStateBackend = pStreamState;
303
    initStorageAPI(&handle.api);
304

305
    code = qCreateStreamExecTaskInfo(&pRSmaInfo->taskInfo[idx], param->qmsg[idx], &handle, TD_VID(pVnode), 0);
306
    if (!pRSmaInfo->taskInfo[idx] || (code != 0)) {
307
      TAOS_RETURN(TSDB_CODE_RSMA_QTASKINFO_CREATE);
308
    }
309

310
    if (!(pItem->pResList = taosArrayInit(1, POINTER_BYTES))) {
311
      TAOS_RETURN(terrno);
312
    }
313

314
    if (pItem->fetchResultVer < pItem->submitReqVer) {
315
      // fetch the data when reboot
316
      pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE;
317
    }
318

319
    if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
320
      int64_t msInterval = -1;
321
      TAOS_CHECK_RETURN(convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision,
322
                                                       TIME_UNIT_MILLISECOND, &msInterval));
323
      pItem->maxDelay = (int32_t)msInterval;
324
    } else {
325
      pItem->maxDelay = (int32_t)param->maxdelay[idx];
326
    }
327
    if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) {
328
      pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY;
329
    }
330

331
    pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
332

333
    SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
334
    if (taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)) != 0) {
335
      TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
336
    }
337

338
    bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
339
    if (!ret) {
340
      smaError("vgId:%d, failed to reset fetch timer for table %" PRIi64 " level %d", TD_VID(pVnode), pRSmaInfo->suid,
341
               idx + 1);
342
    }
343

344
    smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64
345
            ", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 ", maxdelay:%" PRIi64 " watermark:%" PRIi64
346
            ", finally maxdelay:%" PRIi32,
347
            TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId,
348
            pItem->submitReqVer, pItem->fetchResultVer, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
349
  }
350
#endif  
351
  TAOS_RETURN(TSDB_CODE_SUCCESS);
×
352
}
353

354
/**
355
 * @brief for rsam create or restore
356
 *
357
 * @param pSma
358
 * @param param
359
 * @param suid
360
 * @param tbName
361
 * @return int32_t
362
 */
363
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) {
×
364
  int32_t code = 0;
×
365
  int32_t lino = 0;
×
366

367
  if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
×
368
    smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid);
×
369
    return TSDB_CODE_SUCCESS;
×
370
  }
371

372
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
×
373
  SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
×
374
  SRSmaInfo *pRSmaInfo = NULL;
×
375

376
  pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
×
377
  if (pRSmaInfo) {
×
378
    smaInfo("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
×
379
    return TSDB_CODE_SUCCESS;
×
380
  }
381

382
  // from write queue: single thead
383
  pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
×
384
  if (!pRSmaInfo) {
×
385
    return terrno;
×
386
  }
387

388
  STSchema *pTSchema;
389
  code = metaGetTbTSchemaNotNull(SMA_META(pSma), suid, -1, 1, &pTSchema);
×
390
  TAOS_CHECK_EXIT(code);
×
391
  pRSmaInfo->pSma = pSma;
×
392
  pRSmaInfo->pTSchema = pTSchema;
×
393
  pRSmaInfo->suid = suid;
×
394
  T_REF_INIT_VAL(pRSmaInfo, 1);
×
395

396
  TAOS_CHECK_EXIT(taosOpenQueue(&pRSmaInfo->queue));
×
397

398
  TAOS_CHECK_EXIT(taosAllocateQall(&pRSmaInfo->qall));
×
399

400
  TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0));
×
401
  TAOS_CHECK_EXIT(tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1));
×
402

403
  TAOS_CHECK_EXIT(taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)));
×
404

405
_exit:
×
406
  if (code != 0) {
×
407
    TAOS_UNUSED(tdFreeRSmaInfo(pSma, pRSmaInfo));
×
408
  } else {
409
    smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid);
×
410
  }
411
  TAOS_RETURN(code);
×
412
}
413

414
/**
415
 * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam currently
416
 *
417
 * @param pSma
418
 * @param pReq
419
 * @return int32_t
420
 */
421
int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
56✔
422
  SVnode *pVnode = pSma->pVnode;
56✔
423
  if (!pReq->rollup) {
56!
424
    smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since no rollup in req", TD_VID(pVnode), pReq->name,
56!
425
             pReq->suid);
426
    return TSDB_CODE_SUCCESS;
56✔
427
  }
428

429
  if (!VND_IS_RSMA(pVnode)) {
×
430
    smaWarn("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
×
431
            pReq->suid);
432
    return TSDB_CODE_SUCCESS;
×
433
  }
434

435
  return tdRSmaProcessCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name);
×
436
}
437

438
/**
439
 * @brief drop cache for stb
440
 *
441
 * @param pSma
442
 * @param pReq
443
 * @return int32_t
444
 */
445
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
4✔
446
  SVnode *pVnode = pSma->pVnode;
4✔
447
  if (!VND_IS_RSMA(pVnode)) {
4!
448
    smaTrace("vgId:%d, not drop rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
4!
449
             pReq->suid);
450
    return TSDB_CODE_SUCCESS;
4✔
451
  }
452

453
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
×
454
  if (!pSmaEnv) {
×
455
    return TSDB_CODE_SUCCESS;
×
456
  }
457

458
  int32_t    code = 0;
×
459
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
×
460
  SRSmaInfo *pRSmaInfo = NULL;
×
461

462
  code = tdAcquireRSmaInfoBySuid(pSma, pReq->suid, &pRSmaInfo);
×
463

464
  if (code != 0) {
×
465
    smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name,
×
466
            pReq->suid);
467
    return TSDB_CODE_SUCCESS;
×
468
  }
469

470
  // set del flag for data in mem
471
  atomic_store_8(&pRSmaStat->delFlag, 1);
×
472
  RSMA_INFO_SET_DEL(pRSmaInfo);
×
473
  tdUnRefRSmaInfo(pSma, pRSmaInfo);
×
474

475
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
476

477
  // no need to save to file as triggered by dropping stable
478
  smaDebug("vgId:%d, drop rsma for stable %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
×
479
  return TSDB_CODE_SUCCESS;
×
480
}
481

482
/**
483
 * @brief store suid/[uids], prefer to use array and then hash
484
 *
485
 * @param pStore
486
 * @param suid
487
 * @param uid
488
 * @return int32_t
489
 */
490
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
×
491
  // prefer to store suid/uids in array
492
  if ((suid == pStore->suid) || (pStore->suid == 0)) {
×
493
    if (pStore->suid == 0) {
×
494
      pStore->suid = suid;
×
495
    }
496
    if (uid) {
×
497
      if (!pStore->tbUids) {
×
498
        if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) {
×
499
          TAOS_RETURN(terrno);
×
500
        }
501
      }
502
      if (!taosArrayPush(pStore->tbUids, uid)) {
×
503
        TAOS_RETURN(terrno);
×
504
      }
505
    }
506
  } else {
507
    // store other suid/uids in hash when multiple stable/table included in 1 batch of request
508
    if (!pStore->uidHash) {
×
509
      pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
510
      if (!pStore->uidHash) {
×
511
        TAOS_RETURN(terrno);
×
512
      }
513
    }
514
    if (uid) {
×
515
      SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t));
×
516
      if (uidArray && ((uidArray = *(SArray **)uidArray))) {
×
517
        if (!taosArrayPush(uidArray, uid)) {
×
518
          taosArrayDestroy(uidArray);
×
519
          TAOS_RETURN(terrno);
×
520
        }
521
      } else {
522
        SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t));
×
523
        if (!pUidArray) {
×
524
          TAOS_RETURN(terrno);
×
525
        }
526
        if (!taosArrayPush(pUidArray, uid)) {
×
527
          taosArrayDestroy(pUidArray);
×
528
          TAOS_RETURN(terrno);
×
529
        }
530
        TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)));
×
531
      }
532
    } else {
533
      TAOS_CHECK_RETURN(taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0));
×
534
    }
535
  }
536
  return TSDB_CODE_SUCCESS;
×
537
}
538

539
static void tdUidStoreDestory(STbUidStore *pStore) {
×
540
  if (pStore) {
×
541
    if (pStore->uidHash) {
×
542
      if (pStore->tbUids) {
×
543
        // When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
544
        void *pIter = NULL;
×
545
        while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
×
546
          SArray *arr = *(SArray **)pIter;
×
547
          taosArrayDestroy(arr);
×
548
        }
549
      }
550
      taosHashCleanup(pStore->uidHash);
×
551
    }
552
    taosArrayDestroy(pStore->tbUids);
×
553
  }
554
}
×
555

556
void *tdUidStoreFree(STbUidStore *pStore) {
87✔
557
  if (pStore) {
87!
558
    tdUidStoreDestory(pStore);
×
559
    taosMemoryFree(pStore);
×
560
  }
561
  return NULL;
87✔
562
}
563

564
/**
565
 * @brief The SubmitReq for rsma L2/L3 is inserted by tsdbInsertData method directly while not by WriteQ, as the queue
566
 * would be freed when close Vnode, thus lock should be used if with race condition.
567
 * @param pTsdb
568
 * @param version
569
 * @param pReq
570
 * @return int32_t
571
 */
572
static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
×
573
  if (pReq) {
×
574
    SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq;
×
575
    // spin lock for race condition during insert data
576
    TAOS_CHECK_RETURN(tsdbInsertData(pTsdb, version, pSubmitReq, NULL));
×
577
  }
578

579
  return TSDB_CODE_SUCCESS;
×
580
}
581

582
static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
×
583
  SArray *pSubmitTbData = pMsg ? pMsg->aSubmitTbData : NULL;
×
584
  int32_t size = taosArrayGetSize(pSubmitTbData);
×
585

586
  for (int32_t i = 0; i < size; ++i) {
×
587
    SSubmitTbData *pData = TARRAY_GET_ELEM(pSubmitTbData, i);
×
588
    TAOS_CHECK_RETURN(tdUidStorePut(pStore, pData->suid, NULL));
×
589
  }
590

591
  return 0;
×
592
}
593

594
/**
595
 * @brief retention of rsma1/rsma2
596
 *
597
 * @param pSma
598
 * @param now
599
 * @return int32_t
600
 */
601
int32_t smaRetention(SSma *pSma, int64_t now) {
×
602
  int32_t code = TSDB_CODE_SUCCESS;
×
603
  if (!VND_IS_RSMA(pSma->pVnode)) {
×
604
    return code;
×
605
  }
606

607
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
×
608
    if (pSma->pRSmaTsdb[i]) {
609
      // code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1);
610
      // if (code) goto _end;
611
    }
612
  }
613

614
_end:
×
615
  TAOS_RETURN(code);
×
616
}
617

618
static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatchDeleteReq *pDelReq) {
×
619
  int32_t code = 0;
×
620
  int32_t lino = 0;
×
621

622
  if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
×
623
    int32_t len = 0;
×
624
    tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
×
625
    TSDB_CHECK_CODE(code, lino, _exit);
×
626

627
    void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
×
628
    if (!pBuf) {
×
629
      code = terrno;
×
630
      TSDB_CHECK_CODE(code, lino, _exit);
×
631
    }
632

633
    SEncoder encoder;
634
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
×
635
    if ((code = tEncodeSBatchDeleteReq(&encoder, pDelReq)) < 0) {
×
636
      tEncoderClear(&encoder);
×
637
      rpcFreeCont(pBuf);
×
638
      TSDB_CHECK_CODE(code, lino, _exit);
×
639
    }
640
    tEncoderClear(&encoder);
×
641

642
    ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
×
643

644
    SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)};
×
645
    code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
×
646
    TSDB_CHECK_CODE(code, lino, _exit);
×
647
  }
648

649
_exit:
×
650
  taosArrayDestroy(pDelReq->deleteReqs);
×
651
  if (code) {
×
652
    smaError("vgId:%d, failed at line %d to process delete req for table:%" PRIi64 ", level:%" PRIi8 " since %s",
×
653
             SMA_VID(pSma), lino, suid, level, tstrerror(code));
654
  }
655

656
  TAOS_RETURN(code);
×
657
}
658

659
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
×
660
                                         int32_t execType, int8_t *streamFlushed) {
661
  int32_t      code = 0;
×
662
  int32_t      lino = 0;
×
663
  SSDataBlock *output = NULL;
×
664
  SArray      *pResList = pItem->pResList;
×
665
  STSchema    *pTSchema = pInfo->pTSchema;
×
666
  int64_t      suid = pInfo->suid;
×
667

668
  while (1) {
×
669
    uint64_t ts;
670
    bool     hasMore = false;
×
671
    code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL, false);
×
672
    if (code == TSDB_CODE_QRY_IN_EXEC) {
×
673
      code = 0;
×
674
      break;
×
675
    }
676
    TSDB_CHECK_CODE(code, lino, _exit);
×
677

678
    if (taosArrayGetSize(pResList) == 0) {
×
679
      break;
×
680
    }
681

682
    for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
×
683
      output = taosArrayGetP(pResList, i);
×
684
      if (output->info.type == STREAM_CHECKPOINT) {
×
685
        if (streamFlushed) *streamFlushed = 1;
×
686
        continue;
×
687
      } else if (output->info.type == STREAM_DELETE_RESULT) {
×
688
        SBatchDeleteReq deleteReq = {.suid = suid, .level = pItem->level};
×
689
        deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
×
690
        if (!deleteReq.deleteReqs) {
×
691
          code = terrno;
×
692
          TSDB_CHECK_CODE(code, lino, _exit);
×
693
        }
694
        code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, "", true);
×
695
        TSDB_CHECK_CODE(code, lino, _exit);
×
696
        code = tdRSmaProcessDelReq(pSma, suid, pItem->level, &deleteReq);
×
697
        TSDB_CHECK_CODE(code, lino, _exit);
×
698
        continue;
×
699
      }
700

701
      smaDebug("vgId:%d, result block, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
×
702
               ", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64,
703
               SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid,
704
               pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows);
705

706
      if (STREAM_GET_ALL == execType) {
×
707
        /**
708
         * 1. reset the output version when reboot
709
         * 2. delete msg version not updated from the result
710
         */
711
        if (output->info.version < pItem->submitReqVer) {
×
712
          // submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously
713
          output->info.version = pItem->submitReqVer;
×
714
        } else if (output->info.version == pItem->fetchResultVer) {
×
715
          smaWarn("vgId:%d, result block, skip dup version, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64
×
716
                  ", fetchResultVer:%" PRIi64 ", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64
717
                  ", rows:%" PRIi64,
718
                  SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid,
719
                  pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows);
720
          continue;
×
721
        }
722
      }
723

724
      STsdb       *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
×
725
      SSubmitReq2 *pReq = NULL;
×
726

727
      TAOS_CHECK_EXIT(
×
728
          buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid));
729

730
      if (pReq && (code = tdProcessSubmitReq(sinkTsdb, output->info.version, pReq)) < 0) {
×
731
        if (code == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) {
732
          // TODO: reconfigure SSubmitReq2
733
        }
734
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
735
        taosMemoryFreeClear(pReq);
×
736
        TSDB_CHECK_CODE(code, lino, _exit);
×
737
      }
738

739
      if (STREAM_GET_ALL == execType) {
×
740
        atomic_store_64(&pItem->fetchResultVer, output->info.version);
×
741
      }
742

743
      smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level:%" PRIi8
×
744
               ", execType:%d, ver:%" PRIi64,
745
               SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, execType, output->info.version);
746

747
      if (pReq) {
×
748
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
749
        taosMemoryFree(pReq);
×
750
      }
751
    }
752
  }
753
_exit:
×
754
  if (code) {
×
755
    smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64
×
756
             ", ver:%" PRIi64,
757
             SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1,
758
             output ? output->info.version : -1);
759
  } else {
760
    smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level);
×
761
  }
762
  qCleanExecTaskBlockBuf(taskInfo);
×
763
  return code;
×
764
}
765

766
/**
767
 * @brief Copy msg to rsmaQueueBuffer for batch process
768
 *
769
 * @param pSma
770
 * @param version
771
 * @param pMsg
772
 * @param len
773
 * @param inputType
774
 * @param pInfo
775
 * @param suid
776
 * @return int32_t
777
 */
778
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
×
779
                                      SRSmaInfo *pInfo, tb_uid_t suid) {
780
  int32_t code = 0;
×
781
  int32_t lino = 0;
×
782
  int32_t size = RSMA_EXEC_MSG_HLEN + len;  // header + payload
×
783
  void   *qItem;
784

785
  TAOS_CHECK_RETURN(taosAllocateQitem(size, DEF_QITEM, 0, (void **)&qItem));
×
786

787
  void *pItem = qItem;
×
788

789
  *(int8_t *)pItem = (int8_t)inputType;
×
790
  pItem = POINTER_SHIFT(pItem, sizeof(int8_t));
×
791
  *(int32_t *)pItem = len;
×
792
  pItem = POINTER_SHIFT(pItem, sizeof(int32_t));
×
793
  *(int64_t *)pItem = version;
×
794
  (void)memcpy(POINTER_SHIFT(pItem, sizeof(int64_t)), pMsg, len);
×
795

796
  TAOS_CHECK_RETURN(taosWriteQitem(pInfo->queue, qItem));
×
797

798
  pInfo->lastRecv = taosGetTimestampMs();
×
799

800
  SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
×
801

802
  int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1);
×
803

804
  if (atomic_load_8(&pInfo->assigned) == 0) {
×
805
    if (tsem_post(&(pRSmaStat->notEmpty)) != 0) {
×
806
      smaError("vgId:%d, failed to post notEmpty semaphore for rsma %" PRIi64 " since %s", SMA_VID(pSma), suid,
×
807
               tstrerror(terrno));
808
    }
809
  }
810

811
  // smoothing consume
812
  int32_t n = nItems / RSMA_EXEC_SMOOTH_SIZE;
×
813
  if (n > 1) {
×
814
    if (n > 10) {
×
815
      n = 10;
×
816
    }
817
    taosMsleep(n << 3);
×
818
    if (n > 5) {
×
819
      smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
×
820
              taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 3);
821
    }
822
  }
823

824
  return TSDB_CODE_SUCCESS;
×
825
}
826

827
#if 0
828
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
829
  SSubmitMsgIter msgIter = {0};
830
  SSubmitBlkIter blkIter = {0};
831
  STSRow        *row = NULL;
832
  TAOS_CHECK_RETURN(tInitSubmitMsgIter(pReq, &msgIter));
833
  while (true) {
834
    SSubmitBlk *pBlock = NULL;
835
    TAOS_CHECK_RETURN(tGetSubmitMsgNext(&msgIter, &pBlock));
836
    if (pBlock == NULL) break;
837
    tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
838
    while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
839
      smaDebug("vgId:%d, numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64,
840
               SMA_VID(pSma), msgIter.numOfRows, msgIter.suid, msgIter.uid, pReq->version, row->ts);
841
    }
842
  }
843
  return 0;
844
}
845
#endif
846

847
/**
848
 * @brief sync mode
849
 *
850
 * @param pSma
851
 * @param pMsg
852
 * @param msgSize
853
 * @param inputType
854
 * @param pInfo
855
 * @param type
856
 * @param level
857
 * @return int32_t
858
 */
859
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
×
860
                                 SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) {
861
  int32_t        code = 0;
×
862
  int32_t        idx = level - 1;
×
863
  void          *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
×
864
  SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
×
865

866
  if (!qTaskInfo) {
×
867
    smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
×
868
             pInfo->suid);
869
    return TSDB_CODE_SUCCESS;
×
870
  }
871
  if (!pInfo->pTSchema) {
×
872
    smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid);
×
873
    TAOS_RETURN(TSDB_CODE_INVALID_PTR);
×
874
  }
875

876
  smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p, suid:%" PRIu64 ", nMsg:%d, submitReqVer:%" PRIi64
×
877
           ", inputType:%d",
878
           SMA_VID(pSma), level, RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize, version, inputType);
879

880
  if ((code = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) {
×
881
    smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(code));
×
882
    TAOS_RETURN(TSDB_CODE_FAILED);
×
883
  }
884

885
  atomic_store_64(&pItem->submitReqVer, version);
×
886

887
  TAOS_CHECK_RETURN(tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL));
×
888

889
  TAOS_RETURN(code);
×
890
}
891

892
/**
893
 * @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
894
 *
895
 * @param pSma
896
 * @param suid
897
 */
898
static int32_t tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid, SRSmaInfo **ppRSmaInfo) {
×
899
  int32_t    code = 0;
×
900
  int32_t    lino = 0;
×
901
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
×
902
  SRSmaStat *pStat = NULL;
×
903
  SRSmaInfo *pRSmaInfo = NULL;
×
904

905
  *ppRSmaInfo = NULL;
×
906

907
  if (!pEnv) {
×
908
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_ENV);
×
909
  }
910

911
  pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
×
912
  if (!pStat || !RSMA_INFO_HASH(pStat)) {
×
913
    TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
914
  }
915

916
  taosRLockLatch(SMA_ENV_LOCK(pEnv));
×
917
  pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
×
918
  if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
×
919
    if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
×
920
      taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
×
921
      TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
922
    }
923

924
    tdRefRSmaInfo(pSma, pRSmaInfo);
925
    taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
×
926
    if (pRSmaInfo->suid != suid) {
×
927
      TAOS_RETURN(TSDB_CODE_APP_ERROR);
×
928
    }
929
    *ppRSmaInfo = pRSmaInfo;
×
930
    TAOS_RETURN(code);
×
931
  }
932
  taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
×
933

934
  TAOS_RETURN(TSDB_CODE_RSMA_INVALID_STAT);
×
935
}
936

937
static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
938
  if (pInfo) {
×
939
    tdUnRefRSmaInfo(pSma, pInfo);
940
  }
941
}
×
942

943
/**
944
 * @brief async mode
945
 *
946
 * @param pSma
947
 * @param version
948
 * @param pMsg
949
 * @param inputType
950
 * @param suid
951
 * @return int32_t
952
 */
953
static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
×
954
                                  tb_uid_t suid) {
955
  int32_t    code = 0;
×
956
  SRSmaInfo *pRSmaInfo = NULL;
×
957

958
  code = tdAcquireRSmaInfoBySuid(pSma, suid, &pRSmaInfo);
×
959
  if (code != 0) {
×
960
    smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
×
961
    TAOS_RETURN(TSDB_CODE_SUCCESS);  // return success
×
962
  }
963

964
  if (inputType == STREAM_INPUT__DATA_SUBMIT || inputType == STREAM_INPUT__REF_DATA_BLOCK) {
×
965
    if ((code = tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid)) < 0) {
×
966
      tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
967
      TAOS_RETURN(code);
×
968
    }
969
    if (smaMgmt.tmrHandle) {
×
970
      SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0);
×
971
      if (pItem->level > 0) {
×
972
        atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
×
973
      }
974
      pItem = RSMA_INFO_ITEM(pRSmaInfo, 1);
×
975
      if (pItem->level > 0) {
×
976
        atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
×
977
      }
978
    }
979
  } else {
980
    code = TSDB_CODE_APP_ERROR;
×
981
    tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
982
    smaError("vgId:%d, execute rsma, failed for suid:%" PRIu64 " since %s, type:%d", SMA_VID(pSma), suid,
×
983
             tstrerror(code), inputType);
984
    TAOS_RETURN(code);
×
985
  }
986

987
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
988
  return TSDB_CODE_SUCCESS;
×
989
}
990

991
int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
25,871✔
992
  if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
25,871!
993

994
  int32_t code = 0;
×
995
  if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
×
996
    smaError("vgId:%d, failed to process rsma submit since invalid exec code:%s", SMA_VID(pSma), tstrerror(code));
×
997
    goto _exit;
×
998
  }
999

1000
  STbUidStore uidStore = {0};
×
1001

1002
  if ((code = tdFetchSubmitReqSuids(pReq, &uidStore)) < 0) {
×
1003
    smaError("vgId:%d, failed to process rsma submit fetch suid since %s", SMA_VID(pSma), tstrerror(code));
×
1004
    goto _exit;
×
1005
  }
1006

1007
  if (uidStore.suid != 0) {
×
1008
    if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, uidStore.suid)) < 0) {
×
1009
      smaError("vgId:%d, failed to process rsma submit exec 1 since %s", SMA_VID(pSma), tstrerror(code));
×
1010
      goto _exit;
×
1011
    }
1012

1013
    void *pIter = NULL;
×
1014
    while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
×
1015
      tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
×
1016
      if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, *pTbSuid)) < 0) {
×
1017
        smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), tstrerror(code));
×
1018
        taosHashCancelIterate(uidStore.uidHash, pIter);
×
1019
        goto _exit;
×
1020
      }
1021
    }
1022
  }
1023
_exit:
×
1024
  tdUidStoreDestory(&uidStore);
×
1025
  TAOS_RETURN(code);
×
1026
}
1027

1028
int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
×
1029
  if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
×
1030

1031
  int32_t code = 0;
×
1032
  if ((code = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
×
1033
    smaError("vgId:%d, failed to process rsma delete since invalid exec code:%s", SMA_VID(pSma), tstrerror(code));
×
1034
    goto _exit;
×
1035
  }
1036

1037
  SDeleteRes *pDelRes = pReq;
×
1038
  if ((code = tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__REF_DATA_BLOCK, pDelRes->suid)) < 0) {
×
1039
    smaError("vgId:%d, failed to process rsma submit exec 1 since %s", SMA_VID(pSma), tstrerror(code));
×
1040
    goto _exit;
×
1041
  }
1042
_exit:
×
1043
  TAOS_RETURN(code);
×
1044
}
1045

1046
/**
1047
 * @brief retrieve rsma meta and init
1048
 *
1049
 * @param pSma
1050
 * @param nTables number of tables of rsma
1051
 * @return int32_t
1052
 */
1053
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
×
1054
  int32_t     code = 0;
×
1055
  int32_t     lino = 0;
×
1056
  SVnode     *pVnode = pSma->pVnode;
×
1057
  SArray     *suidList = NULL;
×
1058
  STbUidStore uidStore = {0};
×
1059
  SMetaReader mr = {0};
×
1060
  tb_uid_t    suid = 0;
×
1061

1062
  if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
×
1063
    code = terrno;
×
1064
    TSDB_CHECK_CODE(code, lino, _exit);
×
1065
  }
1066

1067
  TAOS_CHECK_EXIT(vnodeGetStbIdList(pSma->pVnode, 0, suidList));
×
1068

1069
  int64_t arrSize = taosArrayGetSize(suidList);
×
1070

1071
  if (arrSize == 0) {
×
1072
    if (nTables) {
×
1073
      *nTables = 0;
×
1074
    }
1075
    taosArrayDestroy(suidList);
×
1076
    smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode));
×
1077
    return TSDB_CODE_SUCCESS;
×
1078
  }
1079

1080
  int64_t nRsmaTables = 0;
×
1081
  metaReaderDoInit(&mr, SMA_META(pSma), META_READER_LOCK);
×
1082
  if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) {
×
1083
    code = terrno;
×
1084
    TSDB_CHECK_CODE(code, lino, _exit);
×
1085
  }
1086

1087
  for (int64_t i = 0; i < arrSize; ++i) {
×
1088
    suid = *(tb_uid_t *)taosArrayGet(suidList, i);
×
1089
    smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
×
1090
    if (metaReaderGetTableEntryByUidCache(&mr, suid) < 0) {
×
1091
      code = terrno;
×
1092
      TSDB_CHECK_CODE(code, lino, _exit);
×
1093
    }
1094
    tDecoderClear(&mr.coder);
×
1095
    if (mr.me.type != TSDB_SUPER_TABLE) {
×
1096
      code = TSDB_CODE_RSMA_INVALID_SCHEMA;
×
1097
      TSDB_CHECK_CODE(code, lino, _exit);
×
1098
    }
1099
    if (mr.me.uid != suid) {
×
1100
      code = TSDB_CODE_RSMA_INVALID_SCHEMA;
×
1101
      TSDB_CHECK_CODE(code, lino, _exit);
×
1102
    }
1103
    if (TABLE_IS_ROLLUP(mr.me.flags)) {
×
1104
      ++nRsmaTables;
×
1105
      SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
×
1106
      for (int i = 0; i < TSDB_RETENTION_L2; ++i) {
×
1107
        smaDebug("vgId:%d, rsma restore, table:%" PRIi64 " level:%d, maxdelay:%" PRIi64 " watermark:%" PRIi64
×
1108
                 " qmsgLen:%" PRIi32,
1109
                 TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]);
1110
      }
1111
      TAOS_CHECK_EXIT(tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name));
×
1112
#if 0
1113
      // reload all ctbUids for suid
1114
      uidStore.suid = suid;
1115
      if (vnodeGetCtbIdList(pVnode, suid, uidStore.tbUids) < 0) {
1116
        code = terrno;
1117
        TSDB_CHECK_CODE(code, lino, _exit);
1118
      }
1119

1120
      if (tdUpdateTbUidList(pVnode->pSma, &uidStore, true) < 0) {
1121
        code = terrno;
1122
        TSDB_CHECK_CODE(code, lino, _exit);
1123
      }
1124

1125
      taosArrayClear(uidStore.tbUids);
1126
#endif
1127
      smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid);
×
1128
    }
1129
  }
1130

1131
  if (nTables) {
×
1132
    *nTables = nRsmaTables;
×
1133
  }
1134
_exit:
×
1135
  if (code) {
×
1136
    smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", type:%" PRIi8 ", uid:%" PRIi64, TD_VID(pVnode),
×
1137
             __func__, lino, tstrerror(code), suid, mr.me.type, mr.me.uid);
1138
  }
1139
  metaReaderClear(&mr);
×
1140
  taosArrayDestroy(suidList);
×
1141
  tdUidStoreDestory(&uidStore);
×
1142
  TAOS_RETURN(code);
×
1143
}
1144

1145
/**
1146
 * N.B. the data would be restored from the unified WAL replay procedure
1147
 */
1148
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) {
×
1149
  int32_t code = 0;
×
1150
  int32_t lino = 0;
×
1151
  int64_t nTables = 0;
×
1152

1153
  // step 1: init env
1154
  TAOS_CHECK_EXIT(tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP));
×
1155

1156
  // step 2: iterate all stables to restore the rsma env
1157
  TAOS_CHECK_EXIT(tdRSmaRestoreQTaskInfoInit(pSma, &nTables));
×
1158

1159
_exit:
×
1160
  if (code) {
×
1161
    smaError("vgId:%d, restore rsma task %" PRIi8 "from qtaskf %" PRIi64 " failed since %s", SMA_VID(pSma), type,
×
1162
             qtaskFileVer, tstrerror(code));
1163
  } else {
1164
    smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed, nTables:%" PRIi64, SMA_VID(pSma),
×
1165
            type, qtaskFileVer, nTables);
1166
  }
1167

1168
  TAOS_RETURN(code);
×
1169
}
1170

1171
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
×
1172
  int32_t code = 0;
×
1173
#if 0
1174
  int32_t lino = 0;
1175
  int32_t nTaskInfo = 0;
1176
  SSma   *pSma = pRSmaStat->pSma;
1177
  SVnode *pVnode = pSma->pVnode;
1178

1179
  if (taosHashGetSize(pInfoHash) <= 0) {
1180
    return TSDB_CODE_SUCCESS;
1181
  }
1182

1183
  // stream state: trigger checkpoint
1184
  do {
1185
    void *infoHash = NULL;
1186
    while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
1187
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
1188
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
1189
        continue;
1190
      }
1191
      for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
1192
        if (pRSmaInfo->taskInfo[i]) {
1193
          code = qSetSMAInput(pRSmaInfo->taskInfo[i], pRSmaStat->blocks, 1, STREAM_INPUT__CHECKPOINT);
1194
          if (code) {
1195
            taosHashCancelIterate(pInfoHash, infoHash);
1196
            TSDB_CHECK_CODE(code, lino, _exit);
1197
          }
1198
          pRSmaInfo->items[i].streamFlushed = 0;
1199
          ++nTaskInfo;
1200
        }
1201
      }
1202
    }
1203
  } while (0);
1204

1205
  // stream state: wait checkpoint ready in async mode
1206
  do {
1207
    int32_t nStreamFlushed = 0;
1208
    int32_t nSleep = 0;
1209
    void   *infoHash = NULL;
1210
    while (true) {
1211
      while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
1212
        SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
1213
        if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
1214
          continue;
1215
        }
1216
        for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
1217
          if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) {
1218
            int8_t streamFlushed = 0;
1219
            code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo,
1220
                                             STREAM_CHECKPOINT, &streamFlushed);
1221
            if (code) {
1222
              taosHashCancelIterate(pInfoHash, infoHash);
1223
              TSDB_CHECK_CODE(code, lino, _exit);
1224
            }
1225

1226
            if (streamFlushed) {
1227
              pRSmaInfo->items[i].streamFlushed = 1;
1228
              if (++nStreamFlushed >= nTaskInfo) {
1229
                smaInfo("vgId:%d, rsma commit, checkpoint ready, %d us consumed, received/total:%d/%d", TD_VID(pVnode),
1230
                        nSleep * 10, nStreamFlushed, nTaskInfo);
1231
                taosHashCancelIterate(pInfoHash, infoHash);
1232
                goto _checkpoint;
1233
              }
1234
            }
1235
          }
1236
        }
1237
      }
1238
      taosUsleep(10);
1239
      ++nSleep;
1240
      smaDebug("vgId:%d, rsma commit, wait for checkpoint ready, %d us elapsed, received/total:%d/%d", TD_VID(pVnode),
1241
               nSleep * 10, nStreamFlushed, nTaskInfo);
1242
    }
1243
  } while (0);
1244

1245
_checkpoint:
1246
  // stream state: build checkpoint in backend
1247
  do {
1248
    SStreamMeta *pMeta = NULL;
1249
    int64_t      checkpointId = taosGetTimestampNs();
1250
    bool         checkpointBuilt = false;
1251
    void        *infoHash = NULL;
1252
    while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
1253
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
1254
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
1255
        continue;
1256
      }
1257

1258
      for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
1259
        SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
1260
        if (pItem && pItem->pStreamTask) {
1261
          SStreamTask *pTask = pItem->pStreamTask;
1262
          // atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
1263
          TAOS_UNUSED(streamTaskSetActiveCheckpointInfo(pTask, checkpointId));
1264

1265
          pTask->chkInfo.checkpointId = checkpointId;  // 1pTask->checkpointingId;
1266
          pTask->chkInfo.checkpointVer = pItem->submitReqVer;
1267
          pTask->info.delaySchedParam = pItem->fetchResultVer;
1268
          pTask->info.taskLevel = TASK_LEVEL_SMA;
1269

1270
          if (!checkpointBuilt) {
1271
            // the stream states share one checkpoint
1272
            code = streamTaskBuildCheckpoint(pTask);
1273
            if (code) {
1274
              taosHashCancelIterate(pInfoHash, infoHash);
1275
              TSDB_CHECK_CODE(code, lino, _exit);
1276
            }
1277
            pMeta = pTask->pMeta;
1278
            checkpointBuilt = true;
1279
          }
1280

1281
          streamMetaWLock(pMeta);
1282
          if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
1283
            streamMetaWUnLock(pMeta);
1284
            taosHashCancelIterate(pInfoHash, infoHash);
1285
            TSDB_CHECK_CODE(code, lino, _exit);
1286
          }
1287
          streamMetaWUnLock(pMeta);
1288
          smaDebug("vgId:%d, rsma commit, succeed to commit task:%p, submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
1289
                   ", table:%" PRIi64 ", level:%d",
1290
                   TD_VID(pVnode), pTask, pItem->submitReqVer, pItem->fetchResultVer, pRSmaInfo->suid, i + 1);
1291
        }
1292
      }
1293
    }
1294
    if (pMeta) {
1295
      streamMetaWLock(pMeta);
1296
      if ((code = streamMetaCommit(pMeta)) != 0) {
1297
        streamMetaWUnLock(pMeta);
1298
        if (code == -1) code = TSDB_CODE_OUT_OF_MEMORY;
1299
        TSDB_CHECK_CODE(code, lino, _exit);
1300
      }
1301
      streamMetaWUnLock(pMeta);
1302
    }
1303
    if (checkpointBuilt) {
1304
      smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint:%" PRIi64, TD_VID(pVnode), checkpointId);
1305
    }
1306
  } while (0);
1307
_exit:
1308
  if (code) {
1309
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
1310
  }
1311
#endif
1312
  TAOS_RETURN(code);
×
1313
}
1314

1315
/**
1316
 * @brief trigger to get rsma result in async mode
1317
 *
1318
 * @param param
1319
 * @param tmrId
1320
 */
1321
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
×
1322
  SRSmaRef      *pRSmaRef = NULL;
×
1323
  SSma          *pSma = NULL;
×
1324
  SRSmaStat     *pStat = NULL;
×
1325
  SRSmaInfo     *pRSmaInfo = NULL;
×
1326
  SRSmaInfoItem *pItem = NULL;
×
1327
  int32_t        code = 0;
×
1328

1329
  if (!(pRSmaRef = taosHashGet(smaMgmt.refHash, &param, POINTER_BYTES))) {
×
1330
    smaDebug("rsma fetch task not start since rsma info item:%p not exist in refHash:%p, rsetId:%d", param,
×
1331
             smaMgmt.refHash, smaMgmt.rsetId);
1332
    return;
×
1333
  }
1334

1335
  if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) {
×
1336
    smaWarn("rsma fetch task not start since rsma stat already destroyed, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1337
            pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1338
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1339
    return;
×
1340
  }
1341

1342
  pSma = pStat->pSma;
×
1343

1344
  if ((code = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid, &pRSmaInfo)) != 0) {
×
1345
    smaDebug("rsma fetch task not start since rsma info not exist, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1346
             pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1347
    TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1348
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1349
    return;
×
1350
  }
1351

1352
  if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
×
1353
    smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%d refId:%" PRIi64 ")", smaMgmt.rsetId,
×
1354
             pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
1355
    tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
1356
    TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1357
    TAOS_UNUSED(taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES));
×
1358
    return;
×
1359
  }
1360

1361
  pItem = *(SRSmaInfoItem **)&param;
×
1362

1363
  // if rsma trigger stat in paused, cancelled or finished, not start fetch task
1364
  int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
×
1365
  switch (rsmaTriggerStat) {
×
1366
    case TASK_TRIGGER_STAT_PAUSED:
×
1367
    case TASK_TRIGGER_STAT_CANCELLED: {
1368
      smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8
×
1369
               ", rsetId:%d refId:%" PRIi64,
1370
               SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaRef->refId);
1371
      if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
×
1372
        bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
×
1373
        if (!ret) {
×
1374
          smaWarn("vgId:%d, rsma fetch task not reset for level %" PRIi8
×
1375
                  " since tmr reset failed, rsetId:%d refId:%" PRIi64,
1376
                  SMA_VID(pSma), pItem->level, smaMgmt.rsetId, pRSmaRef->refId);
1377
        }
1378
      }
1379
      tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
1380
      TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1381
      return;
×
1382
    }
1383
    default:
×
1384
      break;
×
1385
  }
1386

1387
  int8_t fetchTriggerStat =
1388
      atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
×
1389
  switch (fetchTriggerStat) {
×
1390
    case TASK_TRIGGER_STAT_ACTIVE: {
×
1391
      smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
×
1392
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1393
      // async process
1394
      atomic_store_8(&pItem->fetchLevel, 1);
×
1395

1396
      if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
×
1397
        if (tsem_post(&(pStat->notEmpty)) != 0) {
×
1398
          smaError("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since sem post failed",
×
1399
                   SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1400
        }
1401
      }
1402
    } break;
×
1403
    case TASK_TRIGGER_STAT_INACTIVE: {
×
1404
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive ",
×
1405
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1406
    } break;
×
1407
    case TASK_TRIGGER_STAT_INIT: {
×
1408
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is init",
×
1409
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
1410
    } break;
×
1411
    default: {
×
1412
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat:%" PRIi8
×
1413
               " is unknown",
1414
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid, fetchTriggerStat);
1415
    } break;
×
1416
  }
1417

1418
_end:
×
1419
  taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
×
1420
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
×
1421
  TAOS_UNUSED(tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId));
×
1422
}
1423

1424
static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) {
×
1425
  int32_t arrSize = taosArrayGetSize(pItems);
×
1426
  if (type == STREAM_INPUT__MERGED_SUBMIT) {
×
1427
    for (int32_t i = 0; i < arrSize; ++i) {
×
1428
      SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
×
1429
      taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_EXEC_MSG_HLEN));
×
1430
    }
1431
  } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
×
1432
    for (int32_t i = 0; i < arrSize; ++i) {
×
1433
      SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
×
1434
      blockDataDestroy(packData->pDataBlock);
×
1435
    }
1436
  } else {
1437
    smaWarn("%s:%d unknown type:%d", __func__, __LINE__, type);
×
1438
  }
1439
  taosArrayClear(pItems);
×
1440
}
×
1441

1442
/**
1443
 * @brief fetch rsma result(consider the efficiency and functionality)
1444
 *
1445
 * @param pSma
1446
 * @param pInfo
1447
 * @return int32_t
1448
 */
1449
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
×
1450
  int32_t     code = 0;
×
1451
  int32_t     lino = 0;
×
1452
  SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
×
1453
  for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
×
1454
    SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
×
1455

1456
    if (1 == atomic_val_compare_exchange_8(&pItem->fetchLevel, 1, 0)) {
×
1457
      qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1);
×
1458
      if (!taskInfo) {
×
1459
        continue;
×
1460
      }
1461

1462
      if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
×
1463
        smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch executed",
×
1464
                 SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
1465
      } else {
1466
        int64_t curMs = taosGetTimestampMs();
×
1467
        if ((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX) {
×
1468
          smaTrace("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
×
1469
                   SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
1470
          atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);  // restore the active stat
×
1471
          continue;
×
1472
        } else {
1473
          smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
×
1474
                   SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
1475
        }
1476
      }
1477

1478
      pItem->nScanned = 0;
×
1479

1480
      TAOS_CHECK_EXIT(qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK));
×
1481
      if ((code = tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, NULL)) < 0) {
×
1482
        atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, code);
×
1483
        goto _exit;
×
1484
      }
1485

1486
      smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch finished",
×
1487
               SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
1488
    } else {
1489
      smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32
×
1490
               " maxDelay:%d, fetch not executed as fetch level is %" PRIi8,
1491
               SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay, pItem->fetchLevel);
1492
    }
1493
  }
1494

1495
_exit:
×
1496
  TAOS_RETURN(code);
×
1497
}
1498

1499
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
×
1500
  void   *msg = NULL;
×
1501
  int8_t  resume = 0;
×
1502
  int32_t nSubmit = 0;
×
1503
  int32_t nDelete = 0;
×
1504
  int64_t version = 0;
×
1505
  int32_t code = 0;
×
1506
  int32_t lino = 0;
×
1507
#if 0
1508
  SPackedData packData;
1509

1510
  taosArrayClear(pSubmitArr);
1511

1512
  // the submitReq/deleteReq msg may exsit alternately in the msg queue, consume them sequentially in batch mode
1513
  while (1) {
1514
    TAOS_UNUSED(taosGetQitem(qall, (void **)&msg));
1515
    if (msg) {
1516
      int8_t inputType = RSMA_EXEC_MSG_TYPE(msg);
1517
      if (inputType == STREAM_INPUT__DATA_SUBMIT) {
1518
      _resume_submit:
1519
        packData.msgLen = RSMA_EXEC_MSG_LEN(msg);
1520
        packData.ver = RSMA_EXEC_MSG_VER(msg);
1521
        packData.msgStr = RSMA_EXEC_MSG_BODY(msg);
1522
        version = packData.ver;
1523
        if (!taosArrayPush(pSubmitArr, &packData)) {
1524
          taosFreeQitem(msg);
1525
          TAOS_CHECK_EXIT(terrno);
1526
        }
1527
        ++nSubmit;
1528
      } else if (inputType == STREAM_INPUT__REF_DATA_BLOCK) {
1529
      _resume_delete:
1530
        version = RSMA_EXEC_MSG_VER(msg);
1531
        if ((code = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version,
1532
                                          &packData.pDataBlock, 1, STREAM_DELETE_DATA))) {
1533
          taosFreeQitem(msg);
1534
          TAOS_CHECK_EXIT(code);
1535
        }
1536

1537
        if (packData.pDataBlock && !taosArrayPush(pSubmitArr, &packData)) {
1538
          taosFreeQitem(msg);
1539
          TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
1540
        }
1541
        taosFreeQitem(msg);
1542
        if (packData.pDataBlock) {
1543
          // packData.pDataBlock is NULL if delete affects 0 row
1544
          ++nDelete;
1545
        }
1546
      } else {
1547
        smaWarn("%s:%d unknown msg type:%d", __func__, __LINE__, inputType);
1548
        break;
1549
      }
1550
    }
1551

1552
    if (nSubmit > 0 || nDelete > 0) {
1553
      int32_t size = TARRAY_SIZE(pSubmitArr);
1554
      int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK;
1555
      for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
1556
        TAOS_CHECK_EXIT(tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, version, inputType, pInfo, type, i));
1557
      }
1558
      tdFreeRSmaSubmitItems(pSubmitArr, inputType);
1559
      nSubmit = 0;
1560
      nDelete = 0;
1561
    } else {
1562
      goto _rtn;
1563
    }
1564

1565
    if (resume == 2) {
1566
      resume = 0;
1567
      goto _resume_delete;
1568
    }
1569
  }
1570

1571
_rtn:
1572
  return TSDB_CODE_SUCCESS;
1573
_exit:
1574
  atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
1575
  smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
1576
           type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr());
1577
  tdFreeRSmaSubmitItems(pSubmitArr, nSubmit ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK);
1578
  while (1) {
1579
    void *msg = NULL;
1580
    TAOS_UNUSED(taosGetQitem(qall, (void **)&msg));
1581
    if (msg) {
1582
      taosFreeQitem(msg);
1583
    } else {
1584
      break;
1585
    }
1586
  }
1587
#endif  
1588
  TAOS_RETURN(code);
×
1589
}
1590

1591
/**
1592
 * @brief
1593
 *
1594
 * @param pSma
1595
 * @param type
1596
 * @return int32_t
1597
 */
1598

1599
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
×
1600
  int32_t    code = 0;
×
1601
  int32_t    lino = 0;
×
1602
  SVnode    *pVnode = pSma->pVnode;
×
1603
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
×
1604
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
×
1605
  SHashObj  *infoHash = NULL;
×
1606
  SArray    *pSubmitArr = NULL;
×
1607
  bool       isFetchAll = false;
×
1608

1609
  if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) {
×
1610
    code = TSDB_CODE_RSMA_INVALID_STAT;
×
1611
    TSDB_CHECK_CODE(code, lino, _exit);
×
1612
  }
1613

1614
  if (!(pSubmitArr =
×
1615
            taosArrayInit(TMIN(RSMA_EXEC_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) {
×
1616
    code = terrno;
×
1617
    TSDB_CHECK_CODE(code, lino, _exit);
×
1618
  }
1619

1620
  while (true) {
1621
    // step 1: rsma exec - consume data in buffer queue for all suids
1622
    if (type == RSMA_EXEC_OVERFLOW) {
×
1623
      void *pIter = NULL;
×
1624
      while ((pIter = taosHashIterate(infoHash, pIter))) {
×
1625
        SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
×
1626
        if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
×
1627
          if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_NEED_FETCH(pInfo)) {
×
1628
            int32_t batchCnt = -1;
×
1629
            int32_t batchMax = taosHashGetSize(infoHash) / tsNumOfVnodeRsmaThreads;
×
1630
            bool    occupied = (batchMax <= 1);
×
1631
            if (batchMax > 1) {
×
1632
              batchMax = 100 / batchMax;
×
1633
              batchMax = TMAX(batchMax, 4);
×
1634
            }
1635
            while (occupied || (++batchCnt < batchMax)) {                 // greedy mode
×
1636
              TAOS_UNUSED(taosReadAllQitems(pInfo->queue, pInfo->qall));  // queue has mutex lock
×
1637
              int32_t qallItemSize = taosQallItemSize(pInfo->qall);
×
1638
              if (qallItemSize > 0) {
×
1639
                if ((code = tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type)) != 0) {
×
1640
                  taosHashCancelIterate(infoHash, pIter);
×
1641
                  TSDB_CHECK_CODE(code, lino, _exit);
×
1642
                }
1643
                smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi32, SMA_VID(pSma), qallItemSize, type);
×
1644
              }
1645

1646
              if (RSMA_NEED_FETCH(pInfo)) {
×
1647
                int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2);
×
1648
                if (oldStat == 0 ||
×
1649
                    ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
×
1650
                  int32_t oldVal = atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
×
1651

1652
                  if (oldVal < 0) {
×
1653
                    code = TSDB_CODE_APP_ERROR;
×
1654
                    taosHashCancelIterate(infoHash, pIter);
×
1655
                    TSDB_CHECK_CODE(code, lino, _exit);
×
1656
                  }
1657

1658
                  if ((code = tdRSmaFetchAllResult(pSma, pInfo)) != 0) {
×
1659
                    taosHashCancelIterate(infoHash, pIter);
×
1660
                    TSDB_CHECK_CODE(code, lino, _exit);
×
1661
                  }
1662

1663
                  if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
×
1664
                    atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
×
1665
                  }
1666
                }
1667
              }
1668

1669
              if (qallItemSize > 0) {
×
1670
                TAOS_UNUSED(atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize));
×
1671
                continue;
×
1672
              }
1673
              if (RSMA_NEED_FETCH(pInfo)) {
×
1674
                continue;
×
1675
              }
1676

1677
              break;
×
1678
            }
1679
          }
1680
          TAOS_UNUSED(atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
×
1681
        }
1682
      }
1683
    } else {
1684
      smaWarn("%s:%d unknown rsma exec type:%d", __func__, __LINE__, (int32_t)type);
×
1685
      code = TSDB_CODE_APP_ERROR;
×
1686
      TSDB_CHECK_CODE(code, lino, _exit);
×
1687
    }
1688

1689
    if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
×
1690
      if (pEnv->flag & SMA_ENV_FLG_CLOSE) {
×
1691
        break;
×
1692
      }
1693

1694
      if (tsem_wait(&pRSmaStat->notEmpty) != 0) {
×
1695
        smaError("vgId:%d, failed to wait for not empty since %s", TD_VID(pVnode), tstrerror(terrno));
×
1696
      }
1697

1698
      if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
×
1699
        smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
×
1700
                 atomic_load_64(&pRSmaStat->nBufItems));
1701
        break;
×
1702
      }
1703
    }
1704

1705
  }  // end of while(true)
1706

1707
_exit:
×
1708
  taosArrayDestroy(pSubmitArr);
×
1709
  if (code) {
×
1710
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
1711
  }
1712
  TAOS_RETURN(code);
×
1713
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc