• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/sma/smaTimeRange.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sma.h"
17
#include "tq.h"
18
#include "tsdb.h"
19

20
#define SMA_STORAGE_MINUTES_MAX  86400
21
#define SMA_STORAGE_MINUTES_DAY  1440
22
#define SMA_STORAGE_SPLIT_FACTOR 14400  // least records in tsma file
23

24
static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
25
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
26
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
27

UNCOV
28
int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
×
UNCOV
29
  int32_t code = TSDB_CODE_SUCCESS;
×
30

UNCOV
31
  if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
×
32
    smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(code));
×
33
  }
34

UNCOV
35
  TAOS_RETURN(code);
×
36
}
37

UNCOV
38
int32_t tdProcessTSmaCreate(SSma *pSma, int64_t ver, const char *msg) {
×
UNCOV
39
  int32_t code = tdProcessTSmaCreateImpl(pSma, ver, msg);
×
40

UNCOV
41
  TAOS_RETURN(code);
×
42
}
43

UNCOV
44
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
×
UNCOV
45
  int32_t code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days);
×
46

UNCOV
47
  TAOS_RETURN(code);
×
48
}
49

50
/**
51
 * @brief Judge the tsma file split days
52
 *
53
 * @param pCfg
54
 * @param pCont
55
 * @param contLen
56
 * @param days unit is minute
57
 * @return int32_t
58
 */
UNCOV
59
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
×
UNCOV
60
  int32_t  code = 0;
×
UNCOV
61
  int32_t  lino = 0;
×
UNCOV
62
  SDecoder coder = {0};
×
UNCOV
63
  tDecoderInit(&coder, pCont, contLen);
×
64

UNCOV
65
  STSma tsma = {0};
×
UNCOV
66
  if (tDecodeSVCreateTSmaReq(&coder, &tsma) < 0) {
×
67
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
68
    TSDB_CHECK_CODE(code, lino, _exit);
×
69
  }
70

UNCOV
71
  STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg;
×
UNCOV
72
  int64_t   sInterval = -1;
×
UNCOV
73
  TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND, &sInterval));
×
UNCOV
74
  if (0 == sInterval) {
×
UNCOV
75
    *days = pTsdbCfg->days;
×
UNCOV
76
    goto _exit;
×
77
  }
UNCOV
78
  int64_t records = pTsdbCfg->days * 60 / sInterval;
×
UNCOV
79
  if (records >= SMA_STORAGE_SPLIT_FACTOR) {
×
UNCOV
80
    *days = pTsdbCfg->days;
×
81
  } else {
UNCOV
82
    int64_t mInterval = -1;
×
UNCOV
83
    TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE, &mInterval));
×
UNCOV
84
    int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2;
×
85

UNCOV
86
    if (daysPerFile > SMA_STORAGE_MINUTES_MAX) {
×
87
      *days = SMA_STORAGE_MINUTES_MAX;
×
88
    } else {
UNCOV
89
      *days = (int32_t)daysPerFile;
×
90
    }
91

UNCOV
92
    if (*days < pTsdbCfg->days) {
×
93
      *days = pTsdbCfg->days;
×
94
    }
95
  }
UNCOV
96
_exit:
×
UNCOV
97
  if (code) {
×
98
    smaWarn("vgId:%d, failed at line %d to get tsma days %d since %s", pCfg->vgId, lino, *days, tstrerror(code));
×
99
  } else {
UNCOV
100
    smaDebug("vgId:%d, succeed to get tsma days %d", pCfg->vgId, *days);
×
101
  }
UNCOV
102
  tDecoderClear(&coder);
×
UNCOV
103
  TAOS_RETURN(code);
×
104
}
105

106
/**
107
 * @brief create tsma meta and result stable
108
 *
109
 * @param pSma
110
 * @param version
111
 * @param pMsg
112
 * @return int32_t
113
 */
UNCOV
114
static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg) {
×
UNCOV
115
  int32_t        code = 0;
×
UNCOV
116
  int32_t        lino = 0;
×
UNCOV
117
  SSmaCfg       *pCfg = (SSmaCfg *)pMsg;
×
UNCOV
118
  SName          stbFullName = {0};
×
UNCOV
119
  SVCreateStbReq pReq = {0};
×
120

UNCOV
121
  if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
×
122
    // create tsma meta in dstVgId
UNCOV
123
    TAOS_CHECK_EXIT(metaCreateTSma(SMA_META(pSma), ver, pCfg));
×
124

125
    // create stable to save tsma result in dstVgId
UNCOV
126
    TAOS_CHECK_EXIT(tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
×
UNCOV
127
    pReq.name = (char *)tNameGetTableName(&stbFullName);
×
UNCOV
128
    pReq.suid = pCfg->dstTbUid;
×
UNCOV
129
    pReq.schemaRow = pCfg->schemaRow;
×
UNCOV
130
    pReq.schemaTag = pCfg->schemaTag;
×
131

UNCOV
132
    TAOS_CHECK_EXIT(metaCreateSuperTable(SMA_META(pSma), ver, &pReq));
×
133
  } else {
134
    TAOS_CHECK_EXIT(TSDB_CODE_TSMA_INVALID_STAT);
×
135
  }
136

137
_exit:
×
UNCOV
138
  if (code) {
×
139
    smaError("vgId:%d, failed at line %d to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
×
140
             " dstTb:%s dstVg:%d since %s",
141
             SMA_VID(pSma), lino, pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name,
142
             pCfg->dstVgId, tstrerror(code));
143
  } else {
UNCOV
144
    smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
×
145
             " dstTb:%s dstVg:%d",
146
             SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId);
147
  }
148

UNCOV
149
  TAOS_RETURN(code);
×
150
}
151

UNCOV
152
int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, int64_t suid,
×
153
                         const char *stbFullName, SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) {
UNCOV
154
  int32_t      code = 0;
×
UNCOV
155
  int32_t      lino = 0;
×
UNCOV
156
  void        *pBuf = NULL;
×
UNCOV
157
  int32_t      len = 0;
×
UNCOV
158
  SSubmitReq2 *pReq = NULL;
×
UNCOV
159
  SArray      *tagArray = NULL;
×
UNCOV
160
  SHashObj    *pTableIndexMap = NULL;
×
161

UNCOV
162
  int32_t numOfBlocks = taosArrayGetSize(pBlocks);
×
163

UNCOV
164
  tagArray = taosArrayInit(1, sizeof(STagVal));
×
UNCOV
165
  pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
×
166

UNCOV
167
  if (!tagArray || !pReq) {
×
168
    TAOS_CHECK_EXIT(terrno);
×
169
  }
170

UNCOV
171
  pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
×
UNCOV
172
  if (pReq->aSubmitTbData == NULL) {
×
173
    TAOS_CHECK_EXIT(terrno);
×
174
  }
175

UNCOV
176
  pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
UNCOV
177
  if (pTableIndexMap == NULL) {
×
178
    TAOS_CHECK_EXIT(terrno);
×
179
  }
180

181
  // SSubmitTbData req
UNCOV
182
  for (int32_t i = 0; i < numOfBlocks; ++i) {
×
UNCOV
183
    SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i);
×
UNCOV
184
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
×
UNCOV
185
      pDeleteReq->suid = suid;
×
UNCOV
186
      pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
×
UNCOV
187
      TAOS_CHECK_EXIT(tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, "", true));
×
UNCOV
188
      continue;
×
189
    }
190

UNCOV
191
    SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE};
×
192

UNCOV
193
    int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
×
194

UNCOV
195
    TAOS_CHECK_EXIT(buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true, &tbData.pCreateTbReq));
×
196

197
    {
UNCOV
198
      uint64_t groupId = pDataBlock->info.id.groupId;
×
199

UNCOV
200
      int32_t *index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
×
UNCOV
201
      if (index == NULL) {  // no data yet, append it
×
UNCOV
202
        code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, "");
×
UNCOV
203
        if (code != TSDB_CODE_SUCCESS) {
×
204
          continue;
×
205
        }
206

UNCOV
207
        if (taosArrayPush(pReq->aSubmitTbData, &tbData) == NULL) {
×
208
          code = terrno;
×
209
          continue;
×
210
        }
211

UNCOV
212
        int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1;
×
UNCOV
213
        TAOS_CHECK_EXIT(taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)));
×
214
      } else {
215
        code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, "");
×
216
        if (code != TSDB_CODE_SUCCESS) {
×
217
          continue;
×
218
        }
219

220
        SSubmitTbData *pExisted = taosArrayGet(pReq->aSubmitTbData, *index);
×
221
        code = doMergeExistedRows(pExisted, &tbData, "id");
×
222
        if (code != TSDB_CODE_SUCCESS) {
×
223
          continue;
×
224
        }
225
      }
226
    }
227
  }
228

229
  // encode
UNCOV
230
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
×
UNCOV
231
  if (TSDB_CODE_SUCCESS == code) {
×
232
    SEncoder encoder;
UNCOV
233
    len += sizeof(SSubmitReq2Msg);
×
UNCOV
234
    if (!(pBuf = rpcMallocCont(len))) {
×
235
      code = terrno;
×
236
      TSDB_CHECK_CODE(code, lino, _exit);
×
237
    }
238

UNCOV
239
    ((SSubmitReq2Msg *)pBuf)->header.vgId = TD_VID(pVnode);
×
UNCOV
240
    ((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len);
×
UNCOV
241
    ((SSubmitReq2Msg *)pBuf)->version = htobe64(1);
×
UNCOV
242
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
×
UNCOV
243
    if ((code = tEncodeSubmitReq(&encoder, pReq)) < 0) {
×
244
      tEncoderClear(&encoder);
×
245
      TSDB_CHECK_CODE(code, lino, _exit);
×
246
    }
UNCOV
247
    tEncoderClear(&encoder);
×
248
  }
249

250
_exit:
×
UNCOV
251
  taosArrayDestroy(tagArray);
×
UNCOV
252
  taosHashCleanup(pTableIndexMap);
×
UNCOV
253
  if (pReq != NULL) {
×
UNCOV
254
    tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
UNCOV
255
    taosMemoryFree(pReq);
×
256
  }
257

UNCOV
258
  if (code) {
×
259
    rpcFreeCont(pBuf);
×
260
    taosArrayDestroy(pDeleteReq->deleteReqs);
×
261
    smaWarn("vgId:%d, failed at line %d since %s", TD_VID(pVnode), lino, tstrerror(code));
×
262
  } else {
UNCOV
263
    if (ppData) *ppData = pBuf;
×
UNCOV
264
    if (pLen) *pLen = len;
×
265
  }
UNCOV
266
  TAOS_RETURN(code);
×
267
}
268

UNCOV
269
static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *pDelReq) {
×
UNCOV
270
  int32_t code = 0;
×
UNCOV
271
  int32_t lino = 0;
×
272

UNCOV
273
  if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
×
UNCOV
274
    int32_t len = 0;
×
UNCOV
275
    tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
×
UNCOV
276
    TSDB_CHECK_CODE(code, lino, _exit);
×
277

UNCOV
278
    void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
×
UNCOV
279
    if (!pBuf) {
×
280
      code = terrno;
×
281
      TSDB_CHECK_CODE(code, lino, _exit);
×
282
    }
283

284
    SEncoder encoder;
UNCOV
285
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
×
UNCOV
286
    if ((code = tEncodeSBatchDeleteReq(&encoder, pDelReq)) < 0) {
×
287
      tEncoderClear(&encoder);
×
288
      TSDB_CHECK_CODE(code, lino, _exit);
×
289
    }
UNCOV
290
    tEncoderClear(&encoder);
×
291

UNCOV
292
    ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
×
293

UNCOV
294
    SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)};
×
UNCOV
295
    code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
×
UNCOV
296
    TSDB_CHECK_CODE(code, lino, _exit);
×
297
  }
298

UNCOV
299
_exit:
×
UNCOV
300
  taosArrayDestroy(pDelReq->deleteReqs);
×
UNCOV
301
  if (code) {
×
302
    smaError("vgId:%d, failed at line %d to process delete req for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), lino,
×
303
             indexUid, tstrerror(code));
304
  }
305

UNCOV
306
  TAOS_RETURN(code);
×
307
}
308

309
/**
310
 * @brief Insert/Update Time-range-wise SMA data.
311
 *
312
 * @param pSma
313
 * @param msg
314
 * @return int32_t
315
 */
UNCOV
316
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
×
UNCOV
317
  int32_t       code = 0;
×
UNCOV
318
  int32_t       lino = 0;
×
UNCOV
319
  const SArray *pDataBlocks = (const SArray *)msg;
×
320

UNCOV
321
  if (taosArrayGetSize(pDataBlocks) <= 0) {
×
322
    code = TSDB_CODE_TSMA_INVALID_PARA;
×
323
    TSDB_CHECK_CODE(code, lino, _exit);
×
324
  }
325

UNCOV
326
  if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) != 0) {
×
327
    code = TSDB_CODE_TSMA_INIT_FAILED;
×
328
    TSDB_CHECK_CODE(code, lino, _exit);
×
329
  }
330

UNCOV
331
  SSmaEnv   *pEnv = SMA_TSMA_ENV(pSma);
×
UNCOV
332
  SSmaStat  *pStat = NULL;
×
UNCOV
333
  STSmaStat *pTsmaStat = NULL;
×
334

UNCOV
335
  if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
×
336
    code = TSDB_CODE_TSMA_INVALID_ENV;
×
337
    TSDB_CHECK_CODE(code, lino, _exit);
×
338
  }
339

UNCOV
340
  pTsmaStat = SMA_STAT_TSMA(pStat);
×
341

UNCOV
342
  if (!pTsmaStat->pTSma) {
×
UNCOV
343
    STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
×
UNCOV
344
    if (!pTSma) {
×
345
      code = TSDB_CODE_TSMA_INVALID_PTR;
×
346
      TSDB_CHECK_CODE(code, lino, _exit);
×
347
    }
UNCOV
348
    pTsmaStat->pTSma = pTSma;
×
UNCOV
349
    code = metaGetTbTSchemaNotNull(SMA_META(pSma), pTSma->dstTbUid, -1, 1, &pTsmaStat->pTSchema);
×
UNCOV
350
    TSDB_CHECK_CODE(code, lino, _exit);
×
351
  }
352

UNCOV
353
  if (pTsmaStat->pTSma->indexUid != indexUid) {
×
354
    code = TSDB_CODE_APP_ERROR;
×
355
    TSDB_CHECK_CODE(code, lino, _exit);
×
356
  }
357

UNCOV
358
  SBatchDeleteReq deleteReq = {0};
×
UNCOV
359
  void           *pSubmitReq = NULL;
×
UNCOV
360
  int32_t         contLen = 0;
×
361

UNCOV
362
  code = smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, pTsmaStat->pTSma->dstTbUid,
×
UNCOV
363
                          pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen);
×
UNCOV
364
  TSDB_CHECK_CODE(code, lino, _exit);
×
365

UNCOV
366
  TAOS_CHECK_EXIT(tsmaProcessDelReq(pSma, indexUid, &deleteReq));
×
367

368
#if 0
369
  if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) {
370
    code = TSDB_CODE_APP_ERROR;
371
    smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 " failed since %s, %s", SMA_VID(pSma), indexUid,
372
             pTsmaStat->pTSma->indexUid, tstrerror(code), pTsmaStat->pTSma->dstTbName);
373
    goto _err;
374
  }
375
#endif
376

UNCOV
377
  SRpcMsg submitReqMsg = {
×
378
      .msgType = TDMT_VND_SUBMIT,
379
      .pCont = pSubmitReq,
380
      .contLen = contLen,
381
  };
382

UNCOV
383
  code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg);
×
UNCOV
384
  TSDB_CHECK_CODE(code, lino, _exit);
×
385

UNCOV
386
_exit:
×
UNCOV
387
  if (code) {
×
388
    smaError("vgId:%d, %s failed at line %d since %s, smaIndex:%" PRIi64, SMA_VID(pSma), __func__, lino,
×
389
             tstrerror(code), indexUid);
390
  }
UNCOV
391
  TAOS_RETURN(code);
×
392
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc