• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3663

19 Mar 2025 09:21AM UTC coverage: 61.664% (-0.6%) from 62.28%
#3663

push

travis-ci

web-flow
docs: add defination of tmq_config_res_t & fix spell error (#30271)

153169 of 318241 branches covered (48.13%)

Branch coverage included in aggregate %.

239405 of 318390 relevant lines covered (75.19%)

5762846.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/sma/smaTimeRange.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sma.h"
17
#include "tq.h"
18
#include "tsdb.h"
19

20
#define SMA_STORAGE_MINUTES_MAX  86400
21
#define SMA_STORAGE_MINUTES_DAY  1440
22
#define SMA_STORAGE_SPLIT_FACTOR 14400  // least records in tsma file
23

24
static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
25
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
26
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
27

28
int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
×
29
  int32_t code = TSDB_CODE_SUCCESS;
×
30

31
  if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
×
32
    smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(code));
×
33
  }
34

35
  TAOS_RETURN(code);
×
36
}
37

38
int32_t tdProcessTSmaCreate(SSma *pSma, int64_t ver, const char *msg) {
×
39
#ifdef USE_TSMA
40
  int32_t code = tdProcessTSmaCreateImpl(pSma, ver, msg);
×
41

42
  TAOS_RETURN(code);
×
43
#else
44
  return TSDB_CODE_SUCCESS;
45
#endif
46
}
47

48
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
×
49
  int32_t code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days);
×
50

51
  TAOS_RETURN(code);
×
52
}
53

54
/**
55
 * @brief Judge the tsma file split days
56
 *
57
 * @param pCfg
58
 * @param pCont
59
 * @param contLen
60
 * @param days unit is minute
61
 * @return int32_t
62
 */
63
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
×
64
  int32_t  code = 0;
×
65
  int32_t  lino = 0;
×
66
  SDecoder coder = {0};
×
67
  tDecoderInit(&coder, pCont, contLen);
×
68

69
  STSma tsma = {0};
×
70
  if (tDecodeSVCreateTSmaReq(&coder, &tsma) < 0) {
×
71
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
72
    TSDB_CHECK_CODE(code, lino, _exit);
×
73
  }
74

75
  STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg;
×
76
  int64_t   sInterval = -1;
×
77
  TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND, &sInterval));
×
78
  if (0 == sInterval) {
×
79
    *days = pTsdbCfg->days;
×
80
    goto _exit;
×
81
  }
82
  int64_t records = pTsdbCfg->days * 60 / sInterval;
×
83
  if (records >= SMA_STORAGE_SPLIT_FACTOR) {
×
84
    *days = pTsdbCfg->days;
×
85
  } else {
86
    int64_t mInterval = -1;
×
87
    TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE, &mInterval));
×
88
    int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2;
×
89

90
    if (daysPerFile > SMA_STORAGE_MINUTES_MAX) {
×
91
      *days = SMA_STORAGE_MINUTES_MAX;
×
92
    } else {
93
      *days = (int32_t)daysPerFile;
×
94
    }
95

96
    if (*days < pTsdbCfg->days) {
×
97
      *days = pTsdbCfg->days;
×
98
    }
99
  }
100
_exit:
×
101
  if (code) {
×
102
    smaWarn("vgId:%d, failed at line %d to get tsma days %d since %s", pCfg->vgId, lino, *days, tstrerror(code));
×
103
  } else {
104
    smaDebug("vgId:%d, succeed to get tsma days %d", pCfg->vgId, *days);
×
105
  }
106
  tDecoderClear(&coder);
×
107
  TAOS_RETURN(code);
×
108
}
109

110
/**
111
 * @brief create tsma meta and result stable
112
 *
113
 * @param pSma
114
 * @param version
115
 * @param pMsg
116
 * @return int32_t
117
 */
118
static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg) {
×
119
  int32_t        code = 0;
×
120
  int32_t        lino = 0;
×
121
  SSmaCfg       *pCfg = (SSmaCfg *)pMsg;
×
122
  SName          stbFullName = {0};
×
123
  SVCreateStbReq pReq = {0};
×
124

125
  if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
×
126
    // create tsma meta in dstVgId
127
    TAOS_CHECK_EXIT(metaCreateTSma(SMA_META(pSma), ver, pCfg));
×
128

129
    // create stable to save tsma result in dstVgId
130
    TAOS_CHECK_EXIT(tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
×
131
    pReq.name = (char *)tNameGetTableName(&stbFullName);
×
132
    pReq.suid = pCfg->dstTbUid;
×
133
    pReq.schemaRow = pCfg->schemaRow;
×
134
    pReq.schemaTag = pCfg->schemaTag;
×
135

136
    TAOS_CHECK_EXIT(metaCreateSuperTable(SMA_META(pSma), ver, &pReq));
×
137
  } else {
138
    TAOS_CHECK_EXIT(TSDB_CODE_TSMA_INVALID_STAT);
×
139
  }
140

141
_exit:
×
142
  if (code) {
×
143
    smaError("vgId:%d, failed at line %d to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
×
144
             " dstTb:%s dstVg:%d since %s",
145
             SMA_VID(pSma), lino, pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name,
146
             pCfg->dstVgId, tstrerror(code));
147
  } else {
148
    smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
×
149
             " dstTb:%s dstVg:%d",
150
             SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId);
151
  }
152

153
  TAOS_RETURN(code);
×
154
}
155

156
int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, int64_t suid,
×
157
                         const char *stbFullName, SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) {
158
  int32_t      code = 0;
×
159
  int32_t      lino = 0;
×
160
  void        *pBuf = NULL;
×
161
  int32_t      len = 0;
×
162
  SSubmitReq2 *pReq = NULL;
×
163
  SArray      *tagArray = NULL;
×
164
  SHashObj    *pTableIndexMap = NULL;
×
165

166
  int32_t numOfBlocks = taosArrayGetSize(pBlocks);
×
167

168
  tagArray = taosArrayInit(1, sizeof(STagVal));
×
169
  pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
×
170

171
  if (!tagArray || !pReq) {
×
172
    TAOS_CHECK_EXIT(terrno);
×
173
  }
174

175
  pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
×
176
  if (pReq->aSubmitTbData == NULL) {
×
177
    TAOS_CHECK_EXIT(terrno);
×
178
  }
179

180
  pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
181
  if (pTableIndexMap == NULL) {
×
182
    TAOS_CHECK_EXIT(terrno);
×
183
  }
184

185
  // SSubmitTbData req
186
  for (int32_t i = 0; i < numOfBlocks; ++i) {
×
187
    SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i);
×
188
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
×
189
      pDeleteReq->suid = suid;
×
190
      pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
×
191
      TAOS_CHECK_EXIT(tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, "", true));
×
192
      continue;
×
193
    }
194

195
    SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE};
×
196

197
    int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
×
198

199
    TAOS_CHECK_EXIT(buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true, &tbData.pCreateTbReq,
×
200
                                            ""));
201

202
    {
203
      uint64_t groupId = pDataBlock->info.id.groupId;
×
204

205
      int32_t *index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
×
206
      if (index == NULL) {  // no data yet, append it
×
207
        code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, "");
×
208
        if (code != TSDB_CODE_SUCCESS) {
×
209
          continue;
×
210
        }
211

212
        if (taosArrayPush(pReq->aSubmitTbData, &tbData) == NULL) {
×
213
          code = terrno;
×
214
          continue;
×
215
        }
216

217
        int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1;
×
218
        TAOS_CHECK_EXIT(taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)));
×
219
      } else {
220
        code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, "");
×
221
        if (code != TSDB_CODE_SUCCESS) {
×
222
          continue;
×
223
        }
224

225
        SSubmitTbData *pExisted = taosArrayGet(pReq->aSubmitTbData, *index);
×
226
        code = doMergeExistedRows(pExisted, &tbData, "id");
×
227
        if (code != TSDB_CODE_SUCCESS) {
×
228
          continue;
×
229
        }
230
      }
231
    }
232
  }
233

234
  // encode
235
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
×
236
  if (TSDB_CODE_SUCCESS == code) {
×
237
    SEncoder encoder;
238
    len += sizeof(SSubmitReq2Msg);
×
239
    if (!(pBuf = rpcMallocCont(len))) {
×
240
      code = terrno;
×
241
      TSDB_CHECK_CODE(code, lino, _exit);
×
242
    }
243

244
    ((SSubmitReq2Msg *)pBuf)->header.vgId = TD_VID(pVnode);
×
245
    ((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len);
×
246
    ((SSubmitReq2Msg *)pBuf)->version = htobe64(1);
×
247
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
×
248
    if ((code = tEncodeSubmitReq(&encoder, pReq)) < 0) {
×
249
      tEncoderClear(&encoder);
×
250
      TSDB_CHECK_CODE(code, lino, _exit);
×
251
    }
252
    tEncoderClear(&encoder);
×
253
  }
254

255
_exit:
×
256
  taosArrayDestroy(tagArray);
×
257
  taosHashCleanup(pTableIndexMap);
×
258
  if (pReq != NULL) {
×
259
    tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
260
    taosMemoryFree(pReq);
×
261
  }
262

263
  if (code) {
×
264
    rpcFreeCont(pBuf);
×
265
    taosArrayDestroy(pDeleteReq->deleteReqs);
×
266
    smaWarn("vgId:%d, failed at line %d since %s", TD_VID(pVnode), lino, tstrerror(code));
×
267
  } else {
268
    if (ppData) *ppData = pBuf;
×
269
    if (pLen) *pLen = len;
×
270
  }
271
  TAOS_RETURN(code);
×
272
}
273

274
static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *pDelReq) {
×
275
  int32_t code = 0;
×
276
  int32_t lino = 0;
×
277

278
  if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
×
279
    int32_t len = 0;
×
280
    tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
×
281
    TSDB_CHECK_CODE(code, lino, _exit);
×
282

283
    void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
×
284
    if (!pBuf) {
×
285
      code = terrno;
×
286
      TSDB_CHECK_CODE(code, lino, _exit);
×
287
    }
288

289
    SEncoder encoder;
290
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
×
291
    if ((code = tEncodeSBatchDeleteReq(&encoder, pDelReq)) < 0) {
×
292
      tEncoderClear(&encoder);
×
293
      TSDB_CHECK_CODE(code, lino, _exit);
×
294
    }
295
    tEncoderClear(&encoder);
×
296

297
    ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
×
298

299
    SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)};
×
300
    code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
×
301
    TSDB_CHECK_CODE(code, lino, _exit);
×
302
  }
303

304
_exit:
×
305
  taosArrayDestroy(pDelReq->deleteReqs);
×
306
  if (code) {
×
307
    smaError("vgId:%d, failed at line %d to process delete req for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), lino,
×
308
             indexUid, tstrerror(code));
309
  }
310

311
  TAOS_RETURN(code);
×
312
}
313

314
/**
315
 * @brief Insert/Update Time-range-wise SMA data.
316
 *
317
 * @param pSma
318
 * @param msg
319
 * @return int32_t
320
 */
321
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
×
322
  int32_t       code = 0;
×
323
  int32_t       lino = 0;
×
324
  const SArray *pDataBlocks = (const SArray *)msg;
×
325

326
  if (taosArrayGetSize(pDataBlocks) <= 0) {
×
327
    code = TSDB_CODE_TSMA_INVALID_PARA;
×
328
    TSDB_CHECK_CODE(code, lino, _exit);
×
329
  }
330

331
  if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) != 0) {
×
332
    code = TSDB_CODE_TSMA_INIT_FAILED;
×
333
    TSDB_CHECK_CODE(code, lino, _exit);
×
334
  }
335

336
  SSmaEnv   *pEnv = SMA_TSMA_ENV(pSma);
×
337
  SSmaStat  *pStat = NULL;
×
338
  STSmaStat *pTsmaStat = NULL;
×
339

340
  if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
×
341
    code = TSDB_CODE_TSMA_INVALID_ENV;
×
342
    TSDB_CHECK_CODE(code, lino, _exit);
×
343
  }
344

345
  pTsmaStat = SMA_STAT_TSMA(pStat);
×
346

347
  if (!pTsmaStat->pTSma) {
×
348
    STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
×
349
    if (!pTSma) {
×
350
      code = TSDB_CODE_TSMA_INVALID_PTR;
×
351
      TSDB_CHECK_CODE(code, lino, _exit);
×
352
    }
353
    pTsmaStat->pTSma = pTSma;
×
354
    code = metaGetTbTSchemaNotNull(SMA_META(pSma), pTSma->dstTbUid, -1, 1, &pTsmaStat->pTSchema);
×
355
    TSDB_CHECK_CODE(code, lino, _exit);
×
356
  }
357

358
  if (pTsmaStat->pTSma->indexUid != indexUid) {
×
359
    code = TSDB_CODE_APP_ERROR;
×
360
    TSDB_CHECK_CODE(code, lino, _exit);
×
361
  }
362

363
  SBatchDeleteReq deleteReq = {0};
×
364
  void           *pSubmitReq = NULL;
×
365
  int32_t         contLen = 0;
×
366

367
  code = smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, pTsmaStat->pTSma->dstTbUid,
×
368
                          pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen);
×
369
  TSDB_CHECK_CODE(code, lino, _exit);
×
370

371
  TAOS_CHECK_EXIT(tsmaProcessDelReq(pSma, indexUid, &deleteReq));
×
372

373
#if 0
374
  if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) {
375
    code = TSDB_CODE_APP_ERROR;
376
    smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 " failed since %s, %s", SMA_VID(pSma), indexUid,
377
             pTsmaStat->pTSma->indexUid, tstrerror(code), pTsmaStat->pTSma->dstTbName);
378
    goto _err;
379
  }
380
#endif
381

382
  SRpcMsg submitReqMsg = {
×
383
      .msgType = TDMT_VND_SUBMIT,
384
      .pCont = pSubmitReq,
385
      .contLen = contLen,
386
  };
387

388
  code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg);
×
389
  TSDB_CHECK_CODE(code, lino, _exit);
×
390

391
_exit:
×
392
  if (code) {
×
393
    smaError("vgId:%d, %s failed at line %d since %s, smaIndex:%" PRIi64, SMA_VID(pSma), __func__, lino,
×
394
             tstrerror(code), indexUid);
395
  }
396
  TAOS_RETURN(code);
×
397
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc