• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3544

30 Nov 2024 03:06AM UTC coverage: 60.88% (+0.04%) from 60.842%
#3544

push

travis-ci

web-flow
Merge pull request #28988 from taosdata/main

merge: from main to 3.0 branch

120724 of 253479 branches covered (47.63%)

Branch coverage included in aggregate %.

407 of 489 new or added lines in 21 files covered. (83.23%)

1148 existing lines in 113 files now uncovered.

201919 of 276488 relevant lines covered (73.03%)

18898587.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.03
/source/dnode/vnode/src/sma/smaTimeRange.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sma.h"
17
#include "tq.h"
18
#include "tsdb.h"
19

20
#define SMA_STORAGE_MINUTES_MAX  86400
21
#define SMA_STORAGE_MINUTES_DAY  1440
22
#define SMA_STORAGE_SPLIT_FACTOR 14400  // least records in tsma file
23

24
static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
25
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
26
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
27

28
int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
5✔
29
  int32_t code = TSDB_CODE_SUCCESS;
5✔
30

31
  if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
5!
32
    smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(code));
×
33
  }
34

35
  TAOS_RETURN(code);
5✔
36
}
37

38
int32_t tdProcessTSmaCreate(SSma *pSma, int64_t ver, const char *msg) {
32✔
39
  int32_t code = tdProcessTSmaCreateImpl(pSma, ver, msg);
32✔
40

41
  TAOS_RETURN(code);
33✔
42
}
43

44
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
31✔
45
  int32_t code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days);
31✔
46

47
  TAOS_RETURN(code);
31✔
48
}
49

50
/**
51
 * @brief Judge the tsma file split days
52
 *
53
 * @param pCfg
54
 * @param pCont
55
 * @param contLen
56
 * @param days unit is minute
57
 * @return int32_t
58
 */
59
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
31✔
60
  int32_t  code = 0;
31✔
61
  int32_t  lino = 0;
31✔
62
  SDecoder coder = {0};
31✔
63
  tDecoderInit(&coder, pCont, contLen);
31✔
64

65
  STSma tsma = {0};
31✔
66
  if (tDecodeSVCreateTSmaReq(&coder, &tsma) < 0) {
31!
67
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
68
    TSDB_CHECK_CODE(code, lino, _exit);
×
69
  }
70

71
  STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg;
31✔
72
  int64_t   sInterval = -1;
31✔
73
  TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND, &sInterval));
31!
74
  if (0 == sInterval) {
31✔
75
    *days = pTsdbCfg->days;
7✔
76
    goto _exit;
7✔
77
  }
78
  int64_t records = pTsdbCfg->days * 60 / sInterval;
24✔
79
  if (records >= SMA_STORAGE_SPLIT_FACTOR) {
24✔
80
    *days = pTsdbCfg->days;
10✔
81
  } else {
82
    int64_t mInterval = -1;
14✔
83
    TAOS_CHECK_EXIT(convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE, &mInterval));
14!
84
    int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2;
14✔
85

86
    if (daysPerFile > SMA_STORAGE_MINUTES_MAX) {
14!
87
      *days = SMA_STORAGE_MINUTES_MAX;
×
88
    } else {
89
      *days = (int32_t)daysPerFile;
14✔
90
    }
91

92
    if (*days < pTsdbCfg->days) {
14!
93
      *days = pTsdbCfg->days;
×
94
    }
95
  }
96
_exit:
31✔
97
  if (code) {
31!
98
    smaWarn("vgId:%d, failed at line %d to get tsma days %d since %s", pCfg->vgId, lino, *days, tstrerror(code));
×
99
  } else {
100
    smaDebug("vgId:%d, succeed to get tsma days %d", pCfg->vgId, *days);
31✔
101
  }
102
  tDecoderClear(&coder);
31✔
103
  TAOS_RETURN(code);
31✔
104
}
105

106
/**
107
 * @brief create tsma meta and result stable
108
 *
109
 * @param pSma
110
 * @param version
111
 * @param pMsg
112
 * @return int32_t
113
 */
114
static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t ver, const char *pMsg) {
33✔
115
  int32_t        code = 0;
33✔
116
  int32_t        lino = 0;
33✔
117
  SSmaCfg       *pCfg = (SSmaCfg *)pMsg;
33✔
118
  SName          stbFullName = {0};
33✔
119
  SVCreateStbReq pReq = {0};
33✔
120

121
  if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
33!
122
    // create tsma meta in dstVgId
123
    TAOS_CHECK_EXIT(metaCreateTSma(SMA_META(pSma), ver, pCfg));
33!
124

125
    // create stable to save tsma result in dstVgId
126
    TAOS_CHECK_EXIT(tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
33!
127
    pReq.name = (char *)tNameGetTableName(&stbFullName);
33✔
128
    pReq.suid = pCfg->dstTbUid;
33✔
129
    pReq.schemaRow = pCfg->schemaRow;
33✔
130
    pReq.schemaTag = pCfg->schemaTag;
33✔
131

132
    TAOS_CHECK_EXIT(metaCreateSTable(SMA_META(pSma), ver, &pReq));
33!
133
  } else {
134
    TAOS_CHECK_EXIT(TSDB_CODE_TSMA_INVALID_STAT);
×
135
  }
136

137
_exit:
×
138
  if (code) {
33!
139
    smaError("vgId:%d, failed at line %d to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
×
140
             " dstTb:%s dstVg:%d since %s",
141
             SMA_VID(pSma), lino, pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name,
142
             pCfg->dstVgId, tstrerror(code));
143
  } else {
144
    smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
33✔
145
             " dstTb:%s dstVg:%d",
146
             SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId);
147
  }
148

149
  TAOS_RETURN(code);
33✔
150
}
151

152
int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *pTSchema, int64_t suid,
5✔
153
                         const char *stbFullName, SBatchDeleteReq *pDeleteReq, void **ppData, int32_t *pLen) {
154
  int32_t      code = 0;
5✔
155
  int32_t      lino = 0;
5✔
156
  void        *pBuf = NULL;
5✔
157
  int32_t      len = 0;
5✔
158
  SSubmitReq2 *pReq = NULL;
5✔
159
  SArray      *tagArray = NULL;
5✔
160
  SHashObj    *pTableIndexMap = NULL;
5✔
161

162
  int32_t numOfBlocks = taosArrayGetSize(pBlocks);
5✔
163

164
  tagArray = taosArrayInit(1, sizeof(STagVal));
5✔
165
  pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
5✔
166

167
  if (!tagArray || !pReq) {
5!
168
    TAOS_CHECK_EXIT(terrno);
×
169
  }
170

171
  pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData));
5✔
172
  if (pReq->aSubmitTbData == NULL) {
5!
173
    TAOS_CHECK_EXIT(terrno);
×
174
  }
175

176
  pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
5✔
177
  if (pTableIndexMap == NULL) {
5!
178
    TAOS_CHECK_EXIT(terrno);
×
179
  }
180

181
  // SSubmitTbData req
182
  for (int32_t i = 0; i < numOfBlocks; ++i) {
10✔
183
    SSDataBlock *pDataBlock = taosArrayGet(pBlocks, i);
5✔
184
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
5!
UNCOV
185
      pDeleteReq->suid = suid;
×
UNCOV
186
      pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
×
UNCOV
187
      TAOS_CHECK_EXIT(tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, "", true));
×
UNCOV
188
      continue;
×
189
    }
190

191
    SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE};
5✔
192

193
    int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
5✔
194

195
    TAOS_CHECK_EXIT(buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true, &tbData.pCreateTbReq));
5!
196

197
    {
198
      uint64_t groupId = pDataBlock->info.id.groupId;
5✔
199

200
      int32_t *index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
5✔
201
      if (index == NULL) {  // no data yet, append it
5!
202
        code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, "");
5✔
203
        if (code != TSDB_CODE_SUCCESS) {
5!
204
          continue;
×
205
        }
206

207
        if( taosArrayPush(pReq->aSubmitTbData, &tbData) == NULL) {
10!
208
          code = terrno;
×
209
          continue;
×
210
        }
211

212
        int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1;
5✔
213
        TAOS_CHECK_EXIT(taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)));
5!
214
      } else {
215
        code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, INT64_MIN, "");
×
216
        if (code != TSDB_CODE_SUCCESS) {
×
217
          continue;
×
218
        }
219

220
        SSubmitTbData *pExisted = taosArrayGet(pReq->aSubmitTbData, *index);
×
221
        code = doMergeExistedRows(pExisted, &tbData, "id");
×
222
        if (code != TSDB_CODE_SUCCESS) {
×
223
          continue;
×
224
        }
225
      }
226
    }
227
  }
228

229
  // encode
230
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
5!
231
  if (TSDB_CODE_SUCCESS == code) {
5!
232
    SEncoder encoder;
233
    len += sizeof(SSubmitReq2Msg);
5✔
234
    if (!(pBuf = rpcMallocCont(len))) {
5!
235
      code = terrno;
×
236
      TSDB_CHECK_CODE(code, lino, _exit);
×
237
    }
238

239
    ((SSubmitReq2Msg *)pBuf)->header.vgId = TD_VID(pVnode);
5✔
240
    ((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len);
5✔
241
    ((SSubmitReq2Msg *)pBuf)->version = htobe64(1);
5✔
242
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
5✔
243
    if ((code = tEncodeSubmitReq(&encoder, pReq)) < 0) {
5!
244
      tEncoderClear(&encoder);
×
245
      TSDB_CHECK_CODE(code, lino, _exit);
×
246
    }
247
    tEncoderClear(&encoder);
5✔
248
  }
249

250
_exit:
×
251
  taosArrayDestroy(tagArray);
5✔
252
  taosHashCleanup(pTableIndexMap);
5✔
253
  if (pReq != NULL) {
5!
254
    tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
5✔
255
    taosMemoryFree(pReq);
5✔
256
  }
257

258
  if (code) {
5!
259
    rpcFreeCont(pBuf);
×
260
    taosArrayDestroy(pDeleteReq->deleteReqs);
×
261
    smaWarn("vgId:%d, failed at line %d since %s", TD_VID(pVnode), lino, tstrerror(code));
×
262
  } else {
263
    if (ppData) *ppData = pBuf;
5!
264
    if (pLen) *pLen = len;
5!
265
  }
266
  TAOS_RETURN(code);
5✔
267
}
268

269
static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *pDelReq) {
5✔
270
  int32_t code = 0;
5✔
271
  int32_t lino = 0;
5✔
272

273
  if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
5!
UNCOV
274
    int32_t len = 0;
×
UNCOV
275
    tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
×
UNCOV
276
    TSDB_CHECK_CODE(code, lino, _exit);
×
277

UNCOV
278
    void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
×
UNCOV
279
    if (!pBuf) {
×
280
      code = terrno;
×
281
      TSDB_CHECK_CODE(code, lino, _exit);
×
282
    }
283

284
    SEncoder encoder;
UNCOV
285
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
×
UNCOV
286
    if ((code = tEncodeSBatchDeleteReq(&encoder, pDelReq)) < 0) {
×
287
      tEncoderClear(&encoder);
×
288
      TSDB_CHECK_CODE(code, lino, _exit);
×
289
    }
UNCOV
290
    tEncoderClear(&encoder);
×
291

UNCOV
292
    ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
×
293

UNCOV
294
    SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)};
×
UNCOV
295
    code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
×
UNCOV
296
    TSDB_CHECK_CODE(code, lino, _exit);
×
297
  }
298

299
_exit:
5✔
300
  taosArrayDestroy(pDelReq->deleteReqs);
5✔
301
  if (code) {
5!
302
    smaError("vgId:%d, failed at line %d to process delete req for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), lino,
×
303
             indexUid, tstrerror(code));
304
  }
305

306
  TAOS_RETURN(code);
5✔
307
}
308

309
/**
310
 * @brief Insert/Update Time-range-wise SMA data.
311
 *
312
 * @param pSma
313
 * @param msg
314
 * @return int32_t
315
 */
316
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
5✔
317
  int32_t       code = 0;
5✔
318
  int32_t       lino = 0;
5✔
319
  const SArray *pDataBlocks = (const SArray *)msg;
5✔
320

321
  if (taosArrayGetSize(pDataBlocks) <= 0) {
5!
322
    code = TSDB_CODE_TSMA_INVALID_PARA;
×
323
    TSDB_CHECK_CODE(code, lino, _exit);
×
324
  }
325

326
  if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) != 0) {
5!
327
    code = TSDB_CODE_TSMA_INIT_FAILED;
×
328
    TSDB_CHECK_CODE(code, lino, _exit);
×
329
  }
330

331
  SSmaEnv   *pEnv = SMA_TSMA_ENV(pSma);
5✔
332
  SSmaStat  *pStat = NULL;
5✔
333
  STSmaStat *pTsmaStat = NULL;
5✔
334

335
  if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
5!
336
    code = TSDB_CODE_TSMA_INVALID_ENV;
×
337
    TSDB_CHECK_CODE(code, lino, _exit);
×
338
  }
339

340
  pTsmaStat = SMA_STAT_TSMA(pStat);
5✔
341

342
  if (!pTsmaStat->pTSma) {
5✔
343
    STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
3✔
344
    if (!pTSma) {
3!
345
      code = TSDB_CODE_TSMA_INVALID_PTR;
×
346
      TSDB_CHECK_CODE(code, lino, _exit);
×
347
    }
348
    pTsmaStat->pTSma = pTSma;
3✔
349
    code = metaGetTbTSchemaNotNull(SMA_META(pSma), pTSma->dstTbUid, -1, 1, &pTsmaStat->pTSchema);
3✔
350
    TSDB_CHECK_CODE(code, lino, _exit);
3!
351
  }
352

353
  if (pTsmaStat->pTSma->indexUid != indexUid) {
5!
354
    code = TSDB_CODE_APP_ERROR;
×
355
    TSDB_CHECK_CODE(code, lino, _exit);
×
356
  }
357

358
  SBatchDeleteReq deleteReq = {0};
5✔
359
  void           *pSubmitReq = NULL;
5✔
360
  int32_t         contLen = 0;
5✔
361

362
  code = smaBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, pTsmaStat->pTSma->dstTbUid,
5✔
363
                          pTsmaStat->pTSma->dstTbName, &deleteReq, &pSubmitReq, &contLen);
5✔
364
  TSDB_CHECK_CODE(code, lino, _exit);
5!
365

366
  TAOS_CHECK_EXIT(tsmaProcessDelReq(pSma, indexUid, &deleteReq));
5!
367

368
#if 0
369
  if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) {
370
    code = TSDB_CODE_APP_ERROR;
371
    smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 " failed since %s, %s", SMA_VID(pSma), indexUid,
372
             pTsmaStat->pTSma->indexUid, tstrerror(code), pTsmaStat->pTSma->dstTbName);
373
    goto _err;
374
  }
375
#endif
376

377
  SRpcMsg submitReqMsg = {
5✔
378
      .msgType = TDMT_VND_SUBMIT,
379
      .pCont = pSubmitReq,
380
      .contLen = contLen,
381
  };
382

383
  code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg);
5✔
384
  TSDB_CHECK_CODE(code, lino, _exit);
5!
385

386
_exit:
5✔
387
  if (code) {
5!
388
    smaError("vgId:%d, %s failed at line %d since %s, smaIndex:%" PRIi64, SMA_VID(pSma), __func__, lino,
×
389
             tstrerror(code), indexUid);
390
  }
391
  TAOS_RETURN(code);
5✔
392
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc