• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/sma/smaTimeRange.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sma.h"
17
#include "tq.h"
18
#include "tsdb.h"
19

20
#define SMA_STORAGE_MINUTES_MAX  86400
21
#define SMA_STORAGE_MINUTES_DAY  1440
22
#define SMA_STORAGE_SPLIT_FACTOR 14400  // least records in tsma file
23

24
static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
25
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
26
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
27

UNCOV
28
int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
×
UNCOV
29
  int32_t code = TSDB_CODE_SUCCESS;
×
30

UNCOV
31
  if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
×
32
    smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(code));
×
33
  }
34

UNCOV
35
  TAOS_RETURN(code);
×
36
}
37

UNCOV
38
int32_t tdProcessTSmaCreate(SSma *pSma, int64_t ver, const char *msg) {
×
39
#ifdef USE_TSMA
UNCOV
40
  int32_t code = tdProcessTSmaCreateImpl(pSma, ver, msg);
×
41

UNCOV
42
  TAOS_RETURN(code);
×
43
#else
44
  return TSDB_CODE_SUCCESS;
45
#endif
46
}
47

UNCOV
48
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
×
UNCOV
49
  int32_t code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days);
×
50

UNCOV
51
  TAOS_RETURN(code);
×
52
}
53

54
/**
55
 * @brief Judge the tsma file split days
56
 *
57
 * @param pCfg
58
 * @param pCont
59
 * @param contLen
60
 * @param days unit is minute
61
 * @return int32_t
62
 */
UNCOV
63
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
×
UNCOV
64
  int32_t  code = 0;
×
UNCOV
65
  int32_t  lino = 0;
×
UNCOV
66
  SDecoder coder = {0};
×
UNCOV
67
  tDecoderInit(&coder, pCont, contLen);
×
68

UNCOV
69
  STSma tsma = {0};
×
UNCOV
70
  if (tDecodeSVCreateTSmaReq(&coder, &tsma) < 0) {
×
71
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
72
    TSDB_CHECK_CODE(code, lino, _exit);
×
73
  }
74

UNCOV
75
  STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg;
×
UNCOV
76
  int64_t   sInterval = -1;
×
UNCOV
77
  TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND, &sInterval));
×
UNCOV
78
  if (0 == sInterval) {
×
UNCOV
79
    *days = pTsdbCfg->days;
×
UNCOV
80
    goto _exit;
×
81
  }
UNCOV
82
  int64_t records = pTsdbCfg->days * 60 / sInterval;
×
UNCOV
83
  if (records >= SMA_STORAGE_SPLIT_FACTOR) {
×
UNCOV
84
    *days = pTsdbCfg->days;
×
85
  } else {
UNCOV
86
    int64_t mInterval = -1;
×
UNCOV
87
    TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE, &mInterval));
×
UNCOV
88
    int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2;
×
89

UNCOV
90
    if (daysPerFile > SMA_STORAGE_MINUTES_MAX) {
×
91
      *days = SMA_STORAGE_MINUTES_MAX;
×
92
    } else {
UNCOV
93
      *days = (int32_t)daysPerFile;
×
94
    }
95

UNCOV
96
    if (*days < pTsdbCfg->days) {
×
97
      *days = pTsdbCfg->days;
×
98
    }
99
  }
UNCOV
100
_exit:
×
UNCOV
101
  if (code) {
×
102
    smaWarn("vgId:%d, failed at line %d to get tsma days %d since %s", pCfg->vgId, lino, *days, tstrerror(code));
×
103
  } else {
UNCOV
104
    smaDebug("vgId:%d, succeed to get tsma days %d", pCfg->vgId, *days);
×
105
  }
UNCOV
106
  tDecoderClear(&coder);
×
UNCOV
107
  TAOS_RETURN(code);
×
108
}
109

110
/**
111
 * @brief create tsma meta and result stable
112
 *
113
 * @param pSma
114
 * @param version
115
 * @param pMsg
116
 * @return int32_t
117
 */
UNCOV
118
static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg) {
×
UNCOV
119
  int32_t        code = 0;
×
UNCOV
120
  int32_t        lino = 0;
×
UNCOV
121
  SSmaCfg       *pCfg = (SSmaCfg *)pMsg;
×
UNCOV
122
  SName          stbFullName = {0};
×
UNCOV
123
  SVCreateStbReq pReq = {0};
×
124

UNCOV
125
  if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
×
126
    // create tsma meta in dstVgId
UNCOV
127
    TAOS_CHECK_EXIT(metaCreateTSma(SMA_META(pSma), ver, pCfg));
×
128

129
    // create stable to save tsma result in dstVgId
UNCOV
130
    TAOS_CHECK_EXIT(tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
×
UNCOV
131
    pReq.name = (char *)tNameGetTableName(&stbFullName);
×
UNCOV
132
    pReq.suid = pCfg->dstTbUid;
×
UNCOV
133
    pReq.schemaRow = pCfg->schemaRow;
×
UNCOV
134
    pReq.schemaTag = pCfg->schemaTag;
×
135

UNCOV
136
    TAOS_CHECK_EXIT(metaCreateSuperTable(SMA_META(pSma), ver, &pReq));
×
137
  } else {
138
    TAOS_CHECK_EXIT(TSDB_CODE_TSMA_INVALID_STAT);
×
139
  }
140

141
_exit:
×
UNCOV
142
  if (code) {
×
143
    smaError("vgId:%d, failed at line %d to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
×
144
             " dstTb:%s dstVg:%d since %s",
145
             SMA_VID(pSma), lino, pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name,
146
             pCfg->dstVgId, tstrerror(code));
147
  } else {
UNCOV
148
    smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
×
149
             " dstTb:%s dstVg:%d",
150
             SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId);
151
  }
152

UNCOV
153
  TAOS_RETURN(code);
×
154
}
155

UNCOV
156
int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, int64_t suid,
×
157
                         const char *stbFullName, SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) {
UNCOV
158
  int32_t      code = 0;
×
UNCOV
159
  int32_t      lino = 0;
×
UNCOV
160
  void        *pBuf = NULL;
×
UNCOV
161
  int32_t      len = 0;
×
UNCOV
162
  SSubmitReq2 *pReq = NULL;
×
UNCOV
163
  SArray      *tagArray = NULL;
×
UNCOV
164
  SHashObj    *pTableIndexMap = NULL;
×
165

UNCOV
166
  int32_t numOfBlocks = taosArrayGetSize(pBlocks);
×
167

UNCOV
168
  tagArray = taosArrayInit(1, sizeof(STagVal));
×
UNCOV
169
  pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
×
170

UNCOV
171
  if (!tagArray || !pReq) {
×
172
    TAOS_CHECK_EXIT(terrno);
×
173
  }
174

UNCOV
175
  pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
×
UNCOV
176
  if (pReq->aSubmitTbData == NULL) {
×
177
    TAOS_CHECK_EXIT(terrno);
×
178
  }
179

UNCOV
180
  pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
UNCOV
181
  if (pTableIndexMap == NULL) {
×
182
    TAOS_CHECK_EXIT(terrno);
×
183
  }
184

185
  // SSubmitTbData req
UNCOV
186
  for (int32_t i = 0; i < numOfBlocks; ++i) {
×
UNCOV
187
    SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i);
×
UNCOV
188
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
×
189
      pDeleteReq->suid = suid;
×
190
      pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
×
191
      TAOS_CHECK_EXIT(tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, "", true));
×
192
      continue;
×
193
    }
194

UNCOV
195
    SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE};
×
196

UNCOV
197
    int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
×
198

UNCOV
199
    TAOS_CHECK_EXIT(buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true, &tbData.pCreateTbReq));
×
200

201
    {
UNCOV
202
      uint64_t groupId = pDataBlock->info.id.groupId;
×
203

UNCOV
204
      int32_t *index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
×
UNCOV
205
      if (index == NULL) {  // no data yet, append it
×
UNCOV
206
        code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, "");
×
UNCOV
207
        if (code != TSDB_CODE_SUCCESS) {
×
208
          continue;
×
209
        }
210

UNCOV
211
        if (taosArrayPush(pReq->aSubmitTbData, &tbData) == NULL) {
×
212
          code = terrno;
×
213
          continue;
×
214
        }
215

UNCOV
216
        int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1;
×
UNCOV
217
        TAOS_CHECK_EXIT(taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)));
×
218
      } else {
219
        code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, "");
×
220
        if (code != TSDB_CODE_SUCCESS) {
×
221
          continue;
×
222
        }
223

224
        SSubmitTbData *pExisted = taosArrayGet(pReq->aSubmitTbData, *index);
×
225
        code = doMergeExistedRows(pExisted, &tbData, "id");
×
226
        if (code != TSDB_CODE_SUCCESS) {
×
227
          continue;
×
228
        }
229
      }
230
    }
231
  }
232

233
  // encode
UNCOV
234
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
×
UNCOV
235
  if (TSDB_CODE_SUCCESS == code) {
×
236
    SEncoder encoder;
UNCOV
237
    len += sizeof(SSubmitReq2Msg);
×
UNCOV
238
    if (!(pBuf = rpcMallocCont(len))) {
×
239
      code = terrno;
×
240
      TSDB_CHECK_CODE(code, lino, _exit);
×
241
    }
242

UNCOV
243
    ((SSubmitReq2Msg *)pBuf)->header.vgId = TD_VID(pVnode);
×
UNCOV
244
    ((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len);
×
UNCOV
245
    ((SSubmitReq2Msg *)pBuf)->version = htobe64(1);
×
UNCOV
246
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
×
UNCOV
247
    if ((code = tEncodeSubmitReq(&encoder, pReq)) < 0) {
×
248
      tEncoderClear(&encoder);
×
249
      TSDB_CHECK_CODE(code, lino, _exit);
×
250
    }
UNCOV
251
    tEncoderClear(&encoder);
×
252
  }
253

254
_exit:
×
UNCOV
255
  taosArrayDestroy(tagArray);
×
UNCOV
256
  taosHashCleanup(pTableIndexMap);
×
UNCOV
257
  if (pReq != NULL) {
×
UNCOV
258
    tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
UNCOV
259
    taosMemoryFree(pReq);
×
260
  }
261

UNCOV
262
  if (code) {
×
263
    rpcFreeCont(pBuf);
×
264
    taosArrayDestroy(pDeleteReq->deleteReqs);
×
265
    smaWarn("vgId:%d, failed at line %d since %s", TD_VID(pVnode), lino, tstrerror(code));
×
266
  } else {
UNCOV
267
    if (ppData) *ppData = pBuf;
×
UNCOV
268
    if (pLen) *pLen = len;
×
269
  }
UNCOV
270
  TAOS_RETURN(code);
×
271
}
272

UNCOV
273
static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *pDelReq) {
×
UNCOV
274
  int32_t code = 0;
×
UNCOV
275
  int32_t lino = 0;
×
276

UNCOV
277
  if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
×
278
    int32_t len = 0;
×
279
    tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
×
280
    TSDB_CHECK_CODE(code, lino, _exit);
×
281

282
    void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
×
283
    if (!pBuf) {
×
284
      code = terrno;
×
285
      TSDB_CHECK_CODE(code, lino, _exit);
×
286
    }
287

288
    SEncoder encoder;
289
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
×
290
    if ((code = tEncodeSBatchDeleteReq(&encoder, pDelReq)) < 0) {
×
291
      tEncoderClear(&encoder);
×
292
      TSDB_CHECK_CODE(code, lino, _exit);
×
293
    }
294
    tEncoderClear(&encoder);
×
295

296
    ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
×
297

298
    SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)};
×
299
    code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
×
300
    TSDB_CHECK_CODE(code, lino, _exit);
×
301
  }
302

UNCOV
303
_exit:
×
UNCOV
304
  taosArrayDestroy(pDelReq->deleteReqs);
×
UNCOV
305
  if (code) {
×
306
    smaError("vgId:%d, failed at line %d to process delete req for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), lino,
×
307
             indexUid, tstrerror(code));
308
  }
309

UNCOV
310
  TAOS_RETURN(code);
×
311
}
312

313
/**
314
 * @brief Insert/Update Time-range-wise SMA data.
315
 *
316
 * @param pSma
317
 * @param msg
318
 * @return int32_t
319
 */
UNCOV
320
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
×
UNCOV
321
  int32_t       code = 0;
×
UNCOV
322
  int32_t       lino = 0;
×
UNCOV
323
  const SArray *pDataBlocks = (const SArray *)msg;
×
324

UNCOV
325
  if (taosArrayGetSize(pDataBlocks) <= 0) {
×
326
    code = TSDB_CODE_TSMA_INVALID_PARA;
×
327
    TSDB_CHECK_CODE(code, lino, _exit);
×
328
  }
329

UNCOV
330
  if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) != 0) {
×
331
    code = TSDB_CODE_TSMA_INIT_FAILED;
×
332
    TSDB_CHECK_CODE(code, lino, _exit);
×
333
  }
334

UNCOV
335
  SSmaEnv   *pEnv = SMA_TSMA_ENV(pSma);
×
UNCOV
336
  SSmaStat  *pStat = NULL;
×
UNCOV
337
  STSmaStat *pTsmaStat = NULL;
×
338

UNCOV
339
  if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
×
340
    code = TSDB_CODE_TSMA_INVALID_ENV;
×
341
    TSDB_CHECK_CODE(code, lino, _exit);
×
342
  }
343

UNCOV
344
  pTsmaStat = SMA_STAT_TSMA(pStat);
×
345

UNCOV
346
  if (!pTsmaStat->pTSma) {
×
UNCOV
347
    STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
×
UNCOV
348
    if (!pTSma) {
×
349
      code = TSDB_CODE_TSMA_INVALID_PTR;
×
350
      TSDB_CHECK_CODE(code, lino, _exit);
×
351
    }
UNCOV
352
    pTsmaStat->pTSma = pTSma;
×
UNCOV
353
    code = metaGetTbTSchemaNotNull(SMA_META(pSma), pTSma->dstTbUid, -1, 1, &pTsmaStat->pTSchema);
×
UNCOV
354
    TSDB_CHECK_CODE(code, lino, _exit);
×
355
  }
356

UNCOV
357
  if (pTsmaStat->pTSma->indexUid != indexUid) {
×
358
    code = TSDB_CODE_APP_ERROR;
×
359
    TSDB_CHECK_CODE(code, lino, _exit);
×
360
  }
361

UNCOV
362
  SBatchDeleteReq deleteReq = {0};
×
UNCOV
363
  void           *pSubmitReq = NULL;
×
UNCOV
364
  int32_t         contLen = 0;
×
365

UNCOV
366
  code = smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, pTsmaStat->pTSma->dstTbUid,
×
UNCOV
367
                          pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen);
×
UNCOV
368
  TSDB_CHECK_CODE(code, lino, _exit);
×
369

UNCOV
370
  TAOS_CHECK_EXIT(tsmaProcessDelReq(pSma, indexUid, &deleteReq));
×
371

372
#if 0
373
  if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) {
374
    code = TSDB_CODE_APP_ERROR;
375
    smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 " failed since %s, %s", SMA_VID(pSma), indexUid,
376
             pTsmaStat->pTSma->indexUid, tstrerror(code), pTsmaStat->pTSma->dstTbName);
377
    goto _err;
378
  }
379
#endif
380

UNCOV
381
  SRpcMsg submitReqMsg = {
×
382
      .msgType = TDMT_VND_SUBMIT,
383
      .pCont = pSubmitReq,
384
      .contLen = contLen,
385
  };
386

UNCOV
387
  code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg);
×
UNCOV
388
  TSDB_CHECK_CODE(code, lino, _exit);
×
389

UNCOV
390
_exit:
×
UNCOV
391
  if (code) {
×
392
    smaError("vgId:%d, %s failed at line %d since %s, smaIndex:%" PRIi64, SMA_VID(pSma), __func__, lino,
×
393
             tstrerror(code), indexUid);
394
  }
UNCOV
395
  TAOS_RETURN(code);
×
396
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc