• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.63
/source/dnode/vnode/src/meta/metaTable.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17

18
extern SDmNotifyHandle dmNotifyHdl;
19

20
int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp);
21
int32_t metaDropTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp);
22
int32_t metaAlterTableColumnName(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp);
23
int32_t metaAlterTableColumnBytes(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp);
24
int32_t metaUpdateTableTagValue(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq);
25
int32_t metaUpdateTableMultiTagValue(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq);
26
int32_t metaUpdateTableOptions2(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq);
27
int32_t metaUpdateTableColCompress2(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq);
28
int32_t metaAlterTableColumnRef(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp);
29
int32_t metaRemoveTableColumnRef(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp);
30
int32_t metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
31

32
int32_t    metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
33
static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs);
34
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid, int8_t *pSysTbl);
35
void       metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
36
// opt ins_tables query
37
static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
38
static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
39

40
int32_t updataTableColCmpr(SColCmprWrapper *pWp, SSchema *pSchema, int8_t add, uint32_t compress) {
253✔
41
  int32_t nCols = pWp->nCols;
253✔
42
  int32_t ver = pWp->version;
253✔
43
  if (add) {
253✔
44
    SColCmpr *p = taosMemoryRealloc(pWp->pColCmpr, sizeof(SColCmpr) * (nCols + 1));
147!
45
    if (p == NULL) {
147!
UNCOV
46
      return terrno;
×
47
    }
48
    pWp->pColCmpr = p;
147✔
49

50
    SColCmpr *pCol = p + nCols;
147✔
51
    pCol->id = pSchema->colId;
147✔
52
    pCol->alg = compress;
147✔
53
    pWp->nCols = nCols + 1;
147✔
54
    pWp->version = ver;
147✔
55
  } else {
56
    for (int32_t i = 0; i < nCols; i++) {
600!
57
      SColCmpr *pOCmpr = &pWp->pColCmpr[i];
600✔
58
      if (pOCmpr->id == pSchema->colId) {
600✔
59
        int32_t left = (nCols - i - 1) * sizeof(SColCmpr);
106✔
60
        if (left) {
106✔
61
          memmove(pWp->pColCmpr + i, pWp->pColCmpr + i + 1, left);
70✔
62
        }
63
        nCols--;
106✔
64
        break;
106✔
65
      }
66
    }
67
    pWp->nCols = nCols;
106✔
68
    pWp->version = ver;
106✔
69
  }
70
  return 0;
253✔
71
}
72

73
int32_t addTableExtSchema(SMetaEntry *pEntry, const SSchema *pColumn, int32_t newColNum, SExtSchema *pExtSchema) {
147✔
74
  // no need to add ext schema when no column needs ext schemas
75
  if (!HAS_TYPE_MOD(pColumn) && !pEntry->pExtSchemas) return 0;
147!
76
  if (!pEntry->pExtSchemas) {
61✔
77
    // add a column which needs ext schema
78
    // set all extschemas to zero for all columns alrady existed
79
    pEntry->pExtSchemas = (SExtSchema *)taosMemoryCalloc(newColNum, sizeof(SExtSchema));
15!
80
  } else {
81
    // already has columns with ext schema
82
    pEntry->pExtSchemas = (SExtSchema *)taosMemoryRealloc(pEntry->pExtSchemas, sizeof(SExtSchema) * newColNum);
46!
83
  }
84
  if (!pEntry->pExtSchemas) return terrno;
61!
85
  pEntry->pExtSchemas[newColNum - 1] = *pExtSchema;
61✔
86
  return 0;
61✔
87
}
88

89
int32_t dropTableExtSchema(SMetaEntry *pEntry, int32_t dropColId, int32_t newColNum) {
106✔
90
  // no ext schema, no need to drop
91
  if (!pEntry->pExtSchemas) return 0;
106✔
92
  if (dropColId == newColNum) {
47✔
93
    // drop the last column
94
    pEntry->pExtSchemas[dropColId - 1] = (SExtSchema){0};
19✔
95
  } else {
96
    // drop a column in the middle
97
    memmove(pEntry->pExtSchemas + dropColId, pEntry->pExtSchemas + dropColId + 1,
28✔
98
            (newColNum - dropColId) * sizeof(SExtSchema));
28✔
99
  }
100
  for (int32_t i = 0; i < newColNum; i++) {
214✔
101
    if (hasExtSchema(pEntry->pExtSchemas + i)) return 0;
204✔
102
  }
103
  taosMemoryFreeClear(pEntry->pExtSchemas);
10!
104
  return 0;
10✔
105
}
106

UNCOV
107
int32_t updataTableColRef(SColRefWrapper *pWp, const SSchema *pSchema, int8_t add, SColRef *pColRef) {
×
UNCOV
108
  int32_t nCols = pWp->nCols;
×
UNCOV
109
  int32_t ver = pWp->version;
×
UNCOV
110
  if (add) {
×
UNCOV
111
    SColRef *p = taosMemoryRealloc(pWp->pColRef, sizeof(SColRef) * (nCols + 1));
×
UNCOV
112
    if (p == NULL) {
×
113
      return terrno;
×
114
    }
UNCOV
115
    pWp->pColRef = p;
×
116

UNCOV
117
    SColRef *pCol = p + nCols;
×
118
    if (NULL == pColRef) {
×
UNCOV
119
      pCol->hasRef = false;
×
UNCOV
120
      pCol->id = pSchema->colId;
×
121
    } else {
UNCOV
122
      pCol->hasRef = pColRef->hasRef;
×
UNCOV
123
      pCol->id = pSchema->colId;
×
UNCOV
124
      if (pCol->hasRef) {
×
UNCOV
125
        tstrncpy(pCol->refDbName, pColRef->refDbName, TSDB_DB_NAME_LEN);
×
UNCOV
126
        tstrncpy(pCol->refTableName, pColRef->refTableName, TSDB_TABLE_NAME_LEN);
×
UNCOV
127
        tstrncpy(pCol->refColName, pColRef->refColName, TSDB_COL_NAME_LEN);
×
128
      }
129
    }
UNCOV
130
    pWp->nCols = nCols + 1;
×
UNCOV
131
    pWp->version = ver;
×
132
  } else {
UNCOV
133
    for (int32_t i = 0; i < nCols; i++) {
×
UNCOV
134
      SColRef *pOColRef = &pWp->pColRef[i];
×
UNCOV
135
      if (pOColRef->id == pSchema->colId) {
×
136
        int32_t left = (nCols - i - 1) * sizeof(SColRef);
×
UNCOV
137
        if (left) {
×
UNCOV
138
          memmove(pWp->pColRef + i, pWp->pColRef + i + 1, left);
×
139
        }
140
        nCols--;
×
UNCOV
141
        break;
×
142
      }
143
    }
UNCOV
144
    pWp->nCols = nCols;
×
UNCOV
145
    pWp->version = ver;
×
146
  }
UNCOV
147
  return 0;
×
148
}
149

150
int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp) {
14,643✔
151
  pMetaRsp->pSchemas = taosMemoryMalloc(pSchema->nCols * sizeof(SSchema));
14,643!
152
  if (NULL == pMetaRsp->pSchemas) {
14,645!
UNCOV
153
    return terrno;
×
154
  }
155

156
  pMetaRsp->pSchemaExt = taosMemoryMalloc(pSchema->nCols * sizeof(SSchemaExt));
14,645!
157
  if (pMetaRsp->pSchemaExt == NULL) {
14,645!
UNCOV
158
    taosMemoryFree(pMetaRsp->pSchemas);
×
UNCOV
159
    return terrno;
×
160
  }
161

162
  tstrncpy(pMetaRsp->tbName, tbName, TSDB_TABLE_NAME_LEN);
14,645✔
163
  pMetaRsp->numOfColumns = pSchema->nCols;
14,645✔
164
  pMetaRsp->tableType = TSDB_NORMAL_TABLE;
14,645✔
165
  pMetaRsp->sversion = pSchema->version;
14,645✔
166
  pMetaRsp->tuid = uid;
14,645✔
167
  pMetaRsp->virtualStb = false; // super table will never be processed here
14,645✔
168

169
  memcpy(pMetaRsp->pSchemas, pSchema->pSchema, pSchema->nCols * sizeof(SSchema));
14,645✔
170

171
  return 0;
14,645✔
172
}
173

UNCOV
174
int32_t metaUpdateVtbMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, SColRefWrapper *pRef,
×
175
                             STableMetaRsp *pMetaRsp, int8_t tableType) {
176
  int32_t code = TSDB_CODE_SUCCESS;
×
177
  if (!pRef) {
×
UNCOV
178
    return TSDB_CODE_INVALID_PARA;
×
179
  }
UNCOV
180
  if (pSchema) {
×
UNCOV
181
    pMetaRsp->pSchemas = taosMemoryMalloc(pSchema->nCols * sizeof(SSchema));
×
UNCOV
182
    if (NULL == pMetaRsp->pSchemas) {
×
UNCOV
183
      code = terrno;
×
UNCOV
184
      goto _return;
×
185
    }
186

UNCOV
187
    pMetaRsp->pSchemaExt = taosMemoryMalloc(pSchema->nCols * sizeof(SSchemaExt));
×
UNCOV
188
    if (pMetaRsp->pSchemaExt == NULL) {
×
UNCOV
189
      code = terrno;
×
UNCOV
190
      goto _return;
×
191
    }
192

UNCOV
193
    pMetaRsp->numOfColumns = pSchema->nCols;
×
UNCOV
194
    pMetaRsp->sversion = pSchema->version;
×
UNCOV
195
    memcpy(pMetaRsp->pSchemas, pSchema->pSchema, pSchema->nCols * sizeof(SSchema));
×
196
  }
UNCOV
197
  pMetaRsp->pColRefs = taosMemoryMalloc(pRef->nCols * sizeof(SColRef));
×
198
  if (NULL == pMetaRsp->pColRefs) {
×
UNCOV
199
    code = terrno;
×
UNCOV
200
    goto _return;
×
201
  }
UNCOV
202
  memcpy(pMetaRsp->pColRefs, pRef->pColRef, pRef->nCols * sizeof(SColRef));
×
203
  tstrncpy(pMetaRsp->tbName, tbName, TSDB_TABLE_NAME_LEN);
×
UNCOV
204
  pMetaRsp->tuid = uid;
×
UNCOV
205
  pMetaRsp->tableType = tableType;
×
UNCOV
206
  pMetaRsp->virtualStb = false; // super table will never be processed here
×
UNCOV
207
  pMetaRsp->numOfColRefs = pRef->nCols;
×
208

UNCOV
209
  return code;
×
UNCOV
210
_return:
×
UNCOV
211
  taosMemoryFreeClear(pMetaRsp->pSchemaExt);
×
UNCOV
212
  taosMemoryFreeClear(pMetaRsp->pSchemas);
×
UNCOV
213
  taosMemoryFreeClear(pMetaRsp->pColRefs);
×
UNCOV
214
  return code;
×
215
}
216

217
int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema) {
304✔
218
  int32_t code = 0;
304✔
219

220
#ifdef USE_INVERTED_INDEX
221
  if (pMeta->pTagIvtIdx == NULL || pCtbEntry == NULL) {
304!
UNCOV
222
    return TSDB_CODE_INVALID_PARA;
×
223
  }
224
  void       *data = pCtbEntry->ctbEntry.pTags;
304✔
225
  const char *tagName = pSchema->name;
304✔
226

227
  tb_uid_t    suid = pCtbEntry->ctbEntry.suid;
304✔
228
  tb_uid_t    tuid = pCtbEntry->uid;
304✔
229
  const void *pTagData = pCtbEntry->ctbEntry.pTags;
304✔
230
  int32_t     nTagData = 0;
304✔
231

232
  SArray *pTagVals = NULL;
304✔
233
  code = tTagToValArray((const STag *)data, &pTagVals);
304✔
234
  if (code) {
304!
UNCOV
235
    return code;
×
236
  }
237

238
  SIndexMultiTerm *terms = indexMultiTermCreate();
304✔
239
  if (terms == NULL) {
304!
UNCOV
240
    return terrno;
×
241
  }
242

243
  int16_t nCols = taosArrayGetSize(pTagVals);
304✔
244
  for (int i = 0; i < nCols; i++) {
705✔
245
    STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
401✔
246
    char     type = pTagVal->type;
401✔
247

248
    char   *key = pTagVal->pKey;
401✔
249
    int32_t nKey = strlen(key);
401✔
250

251
    SIndexTerm *term = NULL;
401✔
252
    if (type == TSDB_DATA_TYPE_NULL) {
401✔
253
      term = indexTermCreate(suid, ADD_VALUE, TSDB_DATA_TYPE_VARCHAR, key, nKey, NULL, 0);
39✔
254
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
362✔
255
      if (pTagVal->nData > 0) {
210✔
256
        char *val = taosMemoryCalloc(1, pTagVal->nData + VARSTR_HEADER_SIZE);
185!
257
        if (val == NULL) {
185!
258
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
259
        }
260
        int32_t len = taosUcs4ToMbs((TdUcs4 *)pTagVal->pData, pTagVal->nData, val + VARSTR_HEADER_SIZE, NULL);
185✔
261
        if (len < 0) {
185!
262
          TAOS_CHECK_GOTO(len, NULL, _exception);
×
263
        }
264
        memcpy(val, (uint16_t *)&len, VARSTR_HEADER_SIZE);
185✔
265
        type = TSDB_DATA_TYPE_VARCHAR;
185✔
266
        term = indexTermCreate(suid, ADD_VALUE, type, key, nKey, val, len);
185✔
267
        taosMemoryFree(val);
185!
268
      } else if (pTagVal->nData == 0) {
25!
269
        term = indexTermCreate(suid, ADD_VALUE, TSDB_DATA_TYPE_VARCHAR, key, nKey, pTagVal->pData, 0);
25✔
270
      }
271
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
152✔
272
      double val = *(double *)(&pTagVal->i64);
108✔
273
      int    len = sizeof(val);
108✔
274
      term = indexTermCreate(suid, ADD_VALUE, type, key, nKey, (const char *)&val, len);
108✔
275
    } else if (type == TSDB_DATA_TYPE_BOOL) {
44!
276
      int val = *(int *)(&pTagVal->i64);
44✔
277
      int len = sizeof(val);
44✔
278
      term = indexTermCreate(suid, ADD_VALUE, TSDB_DATA_TYPE_BOOL, key, nKey, (const char *)&val, len);
44✔
279
    }
280

281
    if (term != NULL) {
401!
282
      int32_t ret = indexMultiTermAdd(terms, term);
401✔
283
      if (ret < 0) {
401!
UNCOV
284
        metaError("vgId:%d, failed to add term to multi term, uid: %" PRId64 ", key: %s, type: %d, ret: %d",
×
285
                  TD_VID(pMeta->pVnode), tuid, key, type, ret);
286
      }
287
    } else {
UNCOV
288
      code = terrno;
×
UNCOV
289
      goto _exception;
×
290
    }
291
  }
292
  code = indexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
304✔
293
  indexMultiTermDestroy(terms);
304✔
294

295
  taosArrayDestroy(pTagVals);
304✔
296
  return code;
304✔
UNCOV
297
_exception:
×
UNCOV
298
  indexMultiTermDestroy(terms);
×
UNCOV
299
  taosArrayDestroy(pTagVals);
×
300
#endif
UNCOV
301
  return code;
×
302
}
303
int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema) {
19✔
304
int32_t code = 0;
19✔
305
#ifdef USE_INVERTED_INDEX
306
  if (pMeta->pTagIvtIdx == NULL || pCtbEntry == NULL) {
19!
UNCOV
307
    return TSDB_CODE_INVALID_PARA;
×
308
  }
309
  void       *data = pCtbEntry->ctbEntry.pTags;
19✔
310
  const char *tagName = pSchema->name;
19✔
311

312
  tb_uid_t    suid = pCtbEntry->ctbEntry.suid;
19✔
313
  tb_uid_t    tuid = pCtbEntry->uid;
19✔
314
  const void *pTagData = pCtbEntry->ctbEntry.pTags;
19✔
315
  int32_t     nTagData = 0;
19✔
316

317
  SArray *pTagVals = NULL;
19✔
318
  code = tTagToValArray((const STag *)data, &pTagVals);
19✔
319
  if (code) {
19!
UNCOV
320
    return code;
×
321
  }
322

323
  SIndexMultiTerm *terms = indexMultiTermCreate();
19✔
324
  if (terms == NULL) {
19!
UNCOV
325
    return terrno;
×
326
  }
327

328
  int16_t nCols = taosArrayGetSize(pTagVals);
19✔
329
  for (int i = 0; i < nCols; i++) {
60✔
330
    STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
41✔
331
    char     type = pTagVal->type;
41✔
332

333
    char   *key = pTagVal->pKey;
41✔
334
    int32_t nKey = strlen(key);
41✔
335

336
    SIndexTerm *term = NULL;
41✔
337
    if (type == TSDB_DATA_TYPE_NULL) {
41!
UNCOV
338
      term = indexTermCreate(suid, DEL_VALUE, TSDB_DATA_TYPE_VARCHAR, key, nKey, NULL, 0);
×
339
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
41✔
340
      if (pTagVal->nData > 0) {
15!
341
        char *val = taosMemoryCalloc(1, pTagVal->nData + VARSTR_HEADER_SIZE);
15!
342
        if (val == NULL) {
15!
UNCOV
343
          TAOS_CHECK_GOTO(terrno, NULL, _exception);
×
344
        }
345
        int32_t len = taosUcs4ToMbs((TdUcs4 *)pTagVal->pData, pTagVal->nData, val + VARSTR_HEADER_SIZE, NULL);
15✔
346
        if (len < 0) {
15!
UNCOV
347
          TAOS_CHECK_GOTO(len, NULL, _exception);
×
348
        }
349
        memcpy(val, (uint16_t *)&len, VARSTR_HEADER_SIZE);
15✔
350
        type = TSDB_DATA_TYPE_VARCHAR;
15✔
351
        term = indexTermCreate(suid, DEL_VALUE, type, key, nKey, val, len);
15✔
352
        taosMemoryFree(val);
15!
UNCOV
353
      } else if (pTagVal->nData == 0) {
×
UNCOV
354
        term = indexTermCreate(suid, DEL_VALUE, TSDB_DATA_TYPE_VARCHAR, key, nKey, pTagVal->pData, 0);
×
355
      }
356
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
26✔
357
      double val = *(double *)(&pTagVal->i64);
15✔
358
      int    len = sizeof(val);
15✔
359
      term = indexTermCreate(suid, DEL_VALUE, type, key, nKey, (const char *)&val, len);
15✔
360
    } else if (type == TSDB_DATA_TYPE_BOOL) {
11!
361
      int val = *(int *)(&pTagVal->i64);
11✔
362
      int len = sizeof(val);
11✔
363
      term = indexTermCreate(suid, DEL_VALUE, TSDB_DATA_TYPE_BOOL, key, nKey, (const char *)&val, len);
11✔
364
    }
365
    if (term != NULL) {
41!
366
      int32_t ret = indexMultiTermAdd(terms, term);
41✔
367
      if (ret < 0) {
41!
UNCOV
368
        metaError("vgId:%d, failed to add term to multi term, uid: %" PRId64 ", key: %s, type: %d, ret: %d",
×
369
                  TD_VID(pMeta->pVnode), tuid, key, type, ret);
370
      }
371
    } else {
UNCOV
372
      code = terrno;
×
UNCOV
373
      goto _exception;
×
374
    }
375
  }
376
  code = indexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
19✔
377
  indexMultiTermDestroy(terms);
19✔
378
  taosArrayDestroy(pTagVals);
19✔
379
  return code;
19✔
UNCOV
380
_exception:
×
381
  indexMultiTermDestroy(terms);
×
UNCOV
382
  taosArrayDestroy(pTagVals);
×
383
#endif
UNCOV
384
  return code;
×
385
}
386

387
static int32_t metaDropTables(SMeta *pMeta, SArray *tbUids) {
25✔
388
  int32_t code = 0;
25✔
389
  if (taosArrayGetSize(tbUids) == 0) return TSDB_CODE_SUCCESS;
25!
390

391
  int64_t    nCtbDropped = 0;
25✔
392
  SSHashObj *suidHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
25✔
393
  if (suidHash == NULL) {
25!
UNCOV
394
    return terrno;
×
395
  }
396

397
  metaWLock(pMeta);
25✔
398
  for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
1,470✔
399
    tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i);
1,445✔
400
    tb_uid_t suid = 0;
1,445✔
401
    int8_t   sysTbl = 0;
1,445✔
402
    int      type;
403
    code = metaDropTableByUid(pMeta, uid, &type, &suid, &sysTbl);
1,445✔
404
    if (code) return code;
1,445!
405
    if (!sysTbl && type == TSDB_CHILD_TABLE && suid != 0 && suidHash) {
1,445!
406
      int64_t *pVal = tSimpleHashGet(suidHash, &suid, sizeof(tb_uid_t));
1,444✔
407
      if (pVal) {
1,444✔
408
        nCtbDropped = *pVal + 1;
1,420✔
409
      } else {
410
        nCtbDropped = 1;
24✔
411
      }
412
      code = tSimpleHashPut(suidHash, &suid, sizeof(tb_uid_t), &nCtbDropped, sizeof(int64_t));
1,444✔
413
      if (code) return code;
1,444!
414
    }
415
    /*
416
    if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
417
      tsdbCacheDropTable(pMeta->pVnode->pTsdb, uid, suid, NULL);
418
    }
419
    */
420
    metaDebug("batch drop table:%" PRId64, uid);
1,445✔
421
  }
422
  metaULock(pMeta);
25✔
423

424
  // update timeseries
425
  void   *pCtbDropped = NULL;
25✔
426
  int32_t iter = 0;
25✔
427
  while ((pCtbDropped = tSimpleHashIterate(suidHash, pCtbDropped, &iter))) {
49✔
428
    tb_uid_t    *pSuid = tSimpleHashGetKey(pCtbDropped, NULL);
24✔
429
    int32_t      nCols = 0;
24✔
430
    SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
24✔
431
    if (metaGetStbStats(pMeta->pVnode, *pSuid, NULL, &nCols) == 0) {
24!
432
      pStats->numOfTimeSeries -= *(int64_t *)pCtbDropped * (nCols - 1);
24✔
433
    }
434
  }
435
  tSimpleHashCleanup(suidHash);
25✔
436

437
  pMeta->changed = true;
25✔
438
  return 0;
25✔
439
}
440

441
static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) {
28✔
442
  int32_t code = 0;
28✔
443
  // 1, tranverse table's
444
  // 2, validate table name using vnodeValidateTableHash
445
  // 3, push invalidated table's uid into uidList
446

447
  TBC *pCur;
448
  code = tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
28✔
449
  if (code < 0) {
28!
UNCOV
450
    return code;
×
451
  }
452

453
  code = tdbTbcMoveToFirst(pCur);
28✔
454
  if (code) {
28!
UNCOV
455
    tdbTbcClose(pCur);
×
UNCOV
456
    return code;
×
457
  }
458

459
  void *pData = NULL, *pKey = NULL;
28✔
460
  int   nData = 0, nKey = 0;
28✔
461

462
  while (1) {
2,916✔
463
    int32_t ret = tdbTbcNext(pCur, &pKey, &nKey, &pData, &nData);
2,944✔
464
    if (ret < 0) {
2,944✔
465
      break;
28✔
466
    }
467

468
    SMetaEntry me = {0};
2,916✔
469
    SDecoder   dc = {0};
2,916✔
470
    tDecoderInit(&dc, pData, nData);
2,916✔
471
    code = metaDecodeEntry(&dc, &me);
2,916✔
472
    if (code < 0) {
2,916!
UNCOV
473
      tDecoderClear(&dc);
×
UNCOV
474
      return code;
×
475
    }
476

477
    if (me.type != TSDB_SUPER_TABLE) {
2,916✔
478
      char tbFName[TSDB_TABLE_FNAME_LEN + 1];
479
      snprintf(tbFName, sizeof(tbFName), "%s.%s", pMeta->pVnode->config.dbname, me.name);
2,890✔
480
      tbFName[TSDB_TABLE_FNAME_LEN] = '\0';
2,890✔
481
      ret = vnodeValidateTableHash(pMeta->pVnode, tbFName);
2,890✔
482
      if (ret < 0 && terrno == TSDB_CODE_VND_HASH_MISMATCH) {
2,890!
483
        if (taosArrayPush(uidList, &me.uid) == NULL) {
1,445!
UNCOV
484
          code = terrno;
×
UNCOV
485
          break;
×
486
        }
487
      }
488
    }
489
    tDecoderClear(&dc);
2,916✔
490
  }
491
  tdbFree(pData);
28✔
492
  tdbFree(pKey);
28✔
493
  tdbTbcClose(pCur);
28✔
494

495
  return 0;
28✔
496
}
497

498
int32_t metaTrimTables(SMeta *pMeta, int64_t version) {
28✔
499
  int32_t code = 0;
28✔
500

501
  SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
28✔
502
  if (tbUids == NULL) {
28!
503
    return terrno;
×
504
  }
505

506
  code = metaFilterTableByHash(pMeta, tbUids);
28✔
507
  if (code != 0) {
28!
UNCOV
508
    goto end;
×
509
  }
510
  if (TARRAY_SIZE(tbUids) == 0) {
28✔
511
    goto end;
3✔
512
  }
513

514
  metaInfo("vgId:%d, trim %ld tables", TD_VID(pMeta->pVnode), taosArrayGetSize(tbUids));
25!
515
  code = metaDropTables(pMeta, tbUids);
25✔
516
  if (code) goto end;
25!
517

518
end:
25✔
519
  taosArrayDestroy(tbUids);
28✔
520

521
  return code;
28✔
522
}
523

524
int metaTtlFindExpired(SMeta *pMeta, int64_t timePointMs, SArray *tbUids, int32_t ttlDropMaxCount) {
112,501✔
525
  metaRLock(pMeta);
112,501✔
526

527
  int ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids, ttlDropMaxCount);
112,597✔
528

529
  metaULock(pMeta);
112,271✔
530

531
  if (ret != 0) {
112,475!
532
    metaError("ttl failed to find expired table, ret:%d", ret);
×
533
  }
534

535
  return ret;
112,394✔
536
}
537

538
static int metaBuildBtimeIdxKey(SBtimeIdxKey *btimeKey, const SMetaEntry *pME) {
1,445✔
539
  int64_t btime;
540
  if (pME->type == TSDB_CHILD_TABLE) {
1,445✔
541
    btime = pME->ctbEntry.btime;
1,444✔
542
  } else if (pME->type == TSDB_NORMAL_TABLE) {
1!
543
    btime = pME->ntbEntry.btime;
1✔
544
  } else {
UNCOV
545
    return TSDB_CODE_FAILED;
×
546
  }
547

548
  btimeKey->btime = btime;
1,445✔
549
  btimeKey->uid = pME->uid;
1,445✔
550
  return 0;
1,445✔
551
}
552

553
static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
1✔
554
  if (pME->type == TSDB_NORMAL_TABLE) {
1!
555
    ncolKey->ncol = pME->ntbEntry.schemaRow.nCols;
1✔
556
    ncolKey->uid = pME->uid;
1✔
557
  } else {
UNCOV
558
    return TSDB_CODE_FAILED;
×
559
  }
560
  return 0;
1✔
561
}
562

563
static void metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) {
1,445✔
564
  if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return;
1,445!
565

566
  STtlDelTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn};
1,445✔
567
  if (pME->type == TSDB_CHILD_TABLE) {
1,445✔
568
    ctx.ttlDays = pME->ctbEntry.ttlDays;
1,444✔
569
  } else {
570
    ctx.ttlDays = pME->ntbEntry.ttlDays;
1✔
571
  }
572

573
  int32_t ret = ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx);
1,445✔
574
  if (ret < 0) {
1,445!
UNCOV
575
    metaError("vgId:%d, failed to delete ttl for table:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pME->name,
×
576
              pME->uid, tstrerror(ret));
577
  }
578
  return;
1,445✔
579
}
580

581
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid, int8_t *pSysTbl) {
1,445✔
582
  void      *pData = NULL;
1,445✔
583
  int        nData = 0;
1,445✔
584
  int        rc = 0;
1,445✔
585
  SMetaEntry e = {0};
1,445✔
586
  SDecoder   dc = {0};
1,445✔
587
  int32_t    ret = 0;
1,445✔
588

589
  rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData);
1,445✔
590
  if (rc < 0) {
1,445!
UNCOV
591
    return rc;
×
592
  }
593
  int64_t version = ((SUidIdxVal *)pData)[0].version;
1,445✔
594

595
  rc = tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData);
1,445✔
596
  if (rc < 0) {
1,445!
UNCOV
597
    tdbFree(pData);
×
UNCOV
598
    return rc;
×
599
  }
600

601
  tDecoderInit(&dc, pData, nData);
1,445✔
602
  rc = metaDecodeEntry(&dc, &e);
1,445✔
603
  if (rc < 0) {
1,445!
UNCOV
604
    tDecoderClear(&dc);
×
UNCOV
605
    return rc;
×
606
  }
607

608
  if (type) *type = e.type;
1,445!
609

610
  if (e.type == TSDB_CHILD_TABLE) {
1,445✔
611
    if (pSuid) *pSuid = e.ctbEntry.suid;
1,444!
612
    void *tData = NULL;
1,444✔
613
    int   tLen = 0;
1,444✔
614

615
    if (tdbTbGet(pMeta->pUidIdx, &e.ctbEntry.suid, sizeof(tb_uid_t), &tData, &tLen) == 0) {
1,444!
616
      STbDbKey tbDbKey = {.uid = e.ctbEntry.suid, .version = ((SUidIdxVal *)tData)[0].version};
1,444✔
617
      if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &tData, &tLen) == 0) {
1,444!
618
        SDecoder   tdc = {0};
1,444✔
619
        SMetaEntry stbEntry = {0};
1,444✔
620

621
        tDecoderInit(&tdc, tData, tLen);
1,444✔
622
        ret = metaDecodeEntry(&tdc, &stbEntry);
1,444✔
623
        if (ret < 0) {
1,444!
UNCOV
624
          tDecoderClear(&tdc);
×
UNCOV
625
          metaError("vgId:%d, failed to decode child table:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name,
×
626
                    e.ctbEntry.suid, tstrerror(ret));
627
          return ret;
×
628
        }
629

630
        if (pSysTbl) *pSysTbl = metaTbInFilterCache(pMeta, stbEntry.name, 1) ? 1 : 0;
1,444!
631

632
        SSchema        *pTagColumn = NULL;
1,444✔
633
        SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
1,444✔
634
        if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
1,444!
UNCOV
635
          pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0];
×
UNCOV
636
          ret = metaDelJsonVarFromIdx(pMeta, &e, pTagColumn);
×
637
          if (ret < 0) {
×
638
            metaError("vgId:%d, failed to delete json var from idx:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode),
×
639
                      e.name, e.uid, tstrerror(ret));
640
          }
641
        } else {
642
          for (int i = 0; i < pTagSchema->nCols; i++) {
3,614✔
643
            pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[i];
2,170✔
644
            if (!IS_IDX_ON(pTagColumn)) continue;
2,170✔
645
            STagIdxKey *pTagIdxKey = NULL;
1,444✔
646
            int32_t     nTagIdxKey;
647

648
            const void *pTagData = NULL;
1,444✔
649
            int32_t     nTagData = 0;
1,444✔
650

651
            STagVal tagVal = {.cid = pTagColumn->colId};
1,444✔
652
            if (tTagGet((const STag *)e.ctbEntry.pTags, &tagVal)) {
1,444!
653
              if (IS_VAR_DATA_TYPE(pTagColumn->type)) {
1,444!
UNCOV
654
                pTagData = tagVal.pData;
×
UNCOV
655
                nTagData = (int32_t)tagVal.nData;
×
656
              } else {
657
                pTagData = &(tagVal.i64);
1,444✔
658
                nTagData = tDataTypes[pTagColumn->type].bytes;
1,444✔
659
              }
660
            } else {
UNCOV
661
              if (!IS_VAR_DATA_TYPE(pTagColumn->type)) {
×
UNCOV
662
                nTagData = tDataTypes[pTagColumn->type].bytes;
×
663
              }
664
            }
665

666
            if (metaCreateTagIdxKey(e.ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, uid,
1,444!
667
                                    &pTagIdxKey, &nTagIdxKey) == 0) {
668
              ret = tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn);
1,444✔
669
              if (ret < 0) {
1,444!
UNCOV
670
                metaError("vgId:%d, failed to delete tag idx key:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode),
×
671
                          e.name, e.uid, tstrerror(ret));
672
              }
673
            }
674
            metaDestroyTagIdxKey(pTagIdxKey);
1,444✔
675
            pTagIdxKey = NULL;
1,444✔
676
          }
677
        }
678
        tDecoderClear(&tdc);
1,444✔
679
      }
680
      tdbFree(tData);
1,444✔
681
    }
682
  }
683

684
  ret = tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), pMeta->txn);
1,445✔
685
  if (ret < 0) {
1,445!
UNCOV
686
    metaError("vgId:%d, failed to delete table:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name, e.uid,
×
687
              tstrerror(ret));
688
  }
689
  ret = tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, pMeta->txn);
1,445✔
690
  if (ret < 0) {
1,445!
UNCOV
691
    metaError("vgId:%d, failed to delete name idx:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name, e.uid,
×
692
              tstrerror(ret));
693
  }
694
  ret = tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), pMeta->txn);
1,445✔
695
  if (ret < 0) {
1,445!
UNCOV
696
    metaError("vgId:%d, failed to delete uid idx:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name, e.uid,
×
697
              tstrerror(ret));
698
  }
699

700
  if (e.type == TSDB_CHILD_TABLE || e.type == TSDB_NORMAL_TABLE) metaDeleteBtimeIdx(pMeta, &e);
1,445!
701
  if (e.type == TSDB_NORMAL_TABLE) metaDeleteNcolIdx(pMeta, &e);
1,445✔
702

703
  if (e.type != TSDB_SUPER_TABLE) metaDeleteTtl(pMeta, &e);
1,445!
704

705
  if (e.type == TSDB_CHILD_TABLE) {
1,445✔
706
    ret =
707
        tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), pMeta->txn);
1,444✔
708
    if (ret < 0) {
1,444!
709
      metaError("vgId:%d, failed to delete ctb idx:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name, e.uid,
×
710
                tstrerror(ret));
711
    }
712

713
    --pMeta->pVnode->config.vndStats.numOfCTables;
1,444✔
714
    metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1, 0, -1);
1,444✔
715
    ret = metaUidCacheClear(pMeta, e.ctbEntry.suid);
1,444✔
716
    if (ret < 0) {
1,444!
717
      metaError("vgId:%d, failed to clear uid cache:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name,
×
718
                e.ctbEntry.suid, tstrerror(ret));
719
    }
720
    ret = metaTbGroupCacheClear(pMeta, e.ctbEntry.suid);
1,444✔
721
    if (ret < 0) {
1,444!
UNCOV
722
      metaError("vgId:%d, failed to clear group cache:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name,
×
723
                e.ctbEntry.suid, tstrerror(ret));
724
    }
725
    /*
726
    if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
727
      tsdbCacheDropTable(pMeta->pVnode->pTsdb, e.uid, e.ctbEntry.suid, NULL);
728
    }
729
    */
730
  } else if (e.type == TSDB_NORMAL_TABLE) {
1!
731
    // drop schema.db (todo)
732

733
    --pMeta->pVnode->config.vndStats.numOfNTables;
1✔
734
    pMeta->pVnode->config.vndStats.numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1;
1✔
735

736
    /*
737
    if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
738
      tsdbCacheDropTable(pMeta->pVnode->pTsdb, e.uid, -1, &e.ntbEntry.schemaRow);
739
    }
740
    */
UNCOV
741
  } else if (e.type == TSDB_SUPER_TABLE) {
×
UNCOV
742
    ret = tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), pMeta->txn);
×
UNCOV
743
    if (ret < 0) {
×
UNCOV
744
      metaError("vgId:%d, failed to delete suid idx:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name, e.uid,
×
745
                tstrerror(ret));
746
    }
747
    // drop schema.db (todo)
748

UNCOV
749
    ret = metaStatsCacheDrop(pMeta, uid);
×
UNCOV
750
    if (ret < 0) {
×
UNCOV
751
      metaError("vgId:%d, failed to drop stats cache:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name, e.uid,
×
752
                tstrerror(ret));
753
    }
UNCOV
754
    ret = metaUidCacheClear(pMeta, uid);
×
UNCOV
755
    if (ret < 0) {
×
UNCOV
756
      metaError("vgId:%d, failed to clear uid cache:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name, e.uid,
×
757
                tstrerror(ret));
758
    }
UNCOV
759
    ret = metaTbGroupCacheClear(pMeta, uid);
×
UNCOV
760
    if (ret < 0) {
×
UNCOV
761
      metaError("vgId:%d, failed to clear group cache:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name,
×
762
                e.uid, tstrerror(ret));
763
    }
UNCOV
764
    --pMeta->pVnode->config.vndStats.numOfSTables;
×
765
  }
766

767
  ret = metaCacheDrop(pMeta, uid);
1,445✔
768
  if (ret < 0) {
1,445!
769
    metaError("vgId:%d, failed to drop cache:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name, e.uid,
1,445!
770
              tstrerror(ret));
771
  }
772

773
  tDecoderClear(&dc);
1,445✔
774
  tdbFree(pData);
1,445✔
775

776
  return 0;
1,445✔
777
}
778

779
static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
1,445✔
780
  SBtimeIdxKey btimeKey = {0};
1,445✔
781
  if (metaBuildBtimeIdxKey(&btimeKey, pME) < 0) {
1,445!
UNCOV
782
    return 0;
×
783
  }
784
  return tdbTbDelete(pMeta->pBtimeIdx, &btimeKey, sizeof(btimeKey), pMeta->txn);
1,445✔
785
}
786

787
int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME) {
1✔
788
  SNcolIdxKey ncolKey = {0};
1✔
789
  if (metaBuildNColIdxKey(&ncolKey, pME) < 0) {
1!
UNCOV
790
    return 0;
×
791
  }
792
  return tdbTbDelete(pMeta->pNcolIdx, &ncolKey, sizeof(ncolKey), pMeta->txn);
1✔
793
}
794

795
int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pMetaRsp) {
1,082✔
796
  pMeta->changed = true;
1,082✔
797
  switch (pReq->action) {
1,082!
798
    case TSDB_ALTER_TABLE_ADD_COLUMN:
147✔
799
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION:
800
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF:
801
      return metaAddTableColumn(pMeta, version, pReq, pMetaRsp);
147✔
802
    case TSDB_ALTER_TABLE_DROP_COLUMN:
117✔
803
      return metaDropTableColumn(pMeta, version, pReq, pMetaRsp);
117✔
804
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
42✔
805
      return metaAlterTableColumnBytes(pMeta, version, pReq, pMetaRsp);
42✔
806
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
32✔
807
      return metaAlterTableColumnName(pMeta, version, pReq, pMetaRsp);
32✔
808
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:
656✔
809
      return metaUpdateTableTagValue(pMeta, version, pReq);
656✔
810
    case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL:
5✔
811
      return metaUpdateTableMultiTagValue(pMeta, version, pReq);
5✔
812
    case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
78✔
813
      return metaUpdateTableOptions2(pMeta, version, pReq);
78✔
814
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
5✔
815
      return metaUpdateTableColCompress2(pMeta, version, pReq);
5✔
UNCOV
816
    case TSDB_ALTER_TABLE_ALTER_COLUMN_REF:
×
UNCOV
817
      return metaAlterTableColumnRef(pMeta, version, pReq, pMetaRsp);
×
UNCOV
818
    case TSDB_ALTER_TABLE_REMOVE_COLUMN_REF:
×
UNCOV
819
      return metaRemoveTableColumnRef(pMeta, version, pReq, pMetaRsp);
×
UNCOV
820
    default:
×
821
      return terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
×
822
      break;
823
  }
824
}
825

UNCOV
826
static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
×
UNCOV
827
  if (!tsTtlChangeOnWrite) return 0;
×
828

829
  if (changeTimeMs <= 0) {
×
830
    metaWarn("Skip to change ttl deletetion time on write, uid: %" PRId64, uid);
×
831
    return TSDB_CODE_VERSION_NOT_COMPATIBLE;
×
832
  }
833

UNCOV
834
  STtlUpdCtimeCtx ctx = {.uid = uid, .changeTimeMs = changeTimeMs, .pTxn = pMeta->txn};
×
835

UNCOV
836
  return ttlMgrUpdateChangeTime(pMeta->pTtlMgr, &ctx);
×
837
}
838

839
int metaUpdateChangeTimeWithLock(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
11,891,575✔
840
  if (!tsTtlChangeOnWrite) return 0;
11,891,575!
841

UNCOV
842
  metaWLock(pMeta);
×
UNCOV
843
  int ret = metaUpdateChangeTime(pMeta, uid, changeTimeMs);
×
UNCOV
844
  metaULock(pMeta);
×
UNCOV
845
  return ret;
×
846
}
847

848
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,
155,798✔
849
                        STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) {
850
  if (IS_VAR_DATA_TYPE(type)) {
155,798✔
851
    *nTagIdxKey = sizeof(STagIdxKey) + nTagData + VARSTR_HEADER_SIZE + sizeof(tb_uid_t);
23,005✔
852
  } else {
853
    *nTagIdxKey = sizeof(STagIdxKey) + nTagData + sizeof(tb_uid_t);
132,793✔
854
  }
855

856
  *ppTagIdxKey = (STagIdxKey *)taosMemoryMalloc(*nTagIdxKey);
155,798!
857
  if (*ppTagIdxKey == NULL) {
155,834!
UNCOV
858
    return terrno;
×
859
  }
860

861
  taosSetInt64Aligned(&((*ppTagIdxKey)->suid), suid);
155,834✔
862
  (*ppTagIdxKey)->cid = cid;
155,834✔
863
  (*ppTagIdxKey)->isNull = (pTagData == NULL) ? 1 : 0;
155,834✔
864
  (*ppTagIdxKey)->type = type;
155,834✔
865

866
  // refactor
867
  if (IS_VAR_DATA_TYPE(type)) {
155,834!
868
    memcpy((*ppTagIdxKey)->data, (uint16_t *)&nTagData, VARSTR_HEADER_SIZE);
23,060✔
869
    if (pTagData != NULL) memcpy((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE, pTagData, nTagData);
23,060✔
870
    taosSetInt64Aligned((tb_uid_t *)((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE + nTagData), uid);
23,060✔
871
  } else {
872
    if (pTagData != NULL) memcpy((*ppTagIdxKey)->data, pTagData, nTagData);
132,774✔
873
    taosSetInt64Aligned((tb_uid_t *)((*ppTagIdxKey)->data + nTagData), uid);
132,774✔
874
  }
875

876
  return 0;
155,834✔
877
}
878

879
void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
155,004✔
880
  if (pTagIdxKey) taosMemoryFree(pTagIdxKey);
155,004!
881
}
155,001✔
882

883
static void colCompressDebug(SHashObj *pColCmprObj) {
1,040,687✔
884
  void *p = taosHashIterate(pColCmprObj, NULL);
1,040,687✔
885
  while (p) {
8,741,306✔
886
    uint32_t cmprAlg = *(uint32_t *)p;
7,700,808✔
887
    col_id_t colId = *(col_id_t *)taosHashGetKey(p, NULL);
7,700,808✔
888
    p = taosHashIterate(pColCmprObj, p);
7,700,770✔
889

890
    uint8_t l1, l2, lvl;
891
    tcompressDebug(cmprAlg, &l1, &l2, &lvl);
7,701,588✔
892

893
    const char *l1str = columnEncodeStr(l1);
7,701,362✔
894
    const char *l2str = columnCompressStr(l2);
7,700,912✔
895
    const char *lvlstr = columnLevelStr(lvl);
7,700,704✔
896
    metaDebug("colId: %d, encode:%s, compress:%s,level:%s", colId, l1str, l2str, lvlstr);
7,700,568✔
897
  }
898
  return;
1,040,498✔
899
}
900

901
int32_t metaGetColCmpr(SMeta *pMeta, tb_uid_t uid, SHashObj **ppColCmprObj) {
1,040,669✔
902
  int rc = 0;
1,040,669✔
903

904
  SHashObj *pColCmprObj = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
1,040,669✔
905
  if (pColCmprObj == NULL) {
1,040,662!
UNCOV
906
    pColCmprObj = NULL;
×
UNCOV
907
    return TSDB_CODE_OUT_OF_MEMORY;
×
908
  }
909

910
  void      *pData = NULL;
1,040,662✔
911
  int        nData = 0;
1,040,662✔
912
  SMetaEntry e = {0};
1,040,662✔
913
  SDecoder   dc = {0};
1,040,662✔
914

915
  *ppColCmprObj = NULL;
1,040,662✔
916

917
  metaRLock(pMeta);
1,040,662✔
918
  rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData);
1,040,706✔
919
  if (rc < 0) {
1,040,708!
UNCOV
920
    taosHashClear(pColCmprObj);
×
UNCOV
921
    metaULock(pMeta);
×
UNCOV
922
    return TSDB_CODE_FAILED;
×
923
  }
924
  int64_t version = ((SUidIdxVal *)pData)[0].version;
1,040,708✔
925
  rc = tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData);
1,040,708✔
926
  if (rc < 0) {
1,040,708!
UNCOV
927
    metaULock(pMeta);
×
UNCOV
928
    taosHashClear(pColCmprObj);
×
UNCOV
929
    metaError("failed to get table entry");
×
UNCOV
930
    return rc;
×
931
  }
932

933
  tDecoderInit(&dc, pData, nData);
1,040,708✔
934
  rc = metaDecodeEntry(&dc, &e);
1,040,681✔
935
  if (rc < 0) {
1,040,643!
UNCOV
936
    tDecoderClear(&dc);
×
UNCOV
937
    tdbFree(pData);
×
UNCOV
938
    metaULock(pMeta);
×
UNCOV
939
    taosHashClear(pColCmprObj);
×
UNCOV
940
    return rc;
×
941
  }
942
  if (withExtSchema(e.type)) {
1,040,643!
943
    SColCmprWrapper *p = &e.colCmpr;
1,040,635✔
944
    for (int32_t i = 0; i < p->nCols; i++) {
8,742,341✔
945
      SColCmpr *pCmpr = &p->pColCmpr[i];
7,701,396✔
946
      rc = taosHashPut(pColCmprObj, &pCmpr->id, sizeof(pCmpr->id), &pCmpr->alg, sizeof(pCmpr->alg));
7,701,396✔
947
      if (rc < 0) {
7,701,759✔
948
        tDecoderClear(&dc);
53✔
UNCOV
949
        tdbFree(pData);
×
UNCOV
950
        metaULock(pMeta);
×
UNCOV
951
        taosHashClear(pColCmprObj);
×
UNCOV
952
        return rc;
×
953
      }
954
    }
955
  } else {
UNCOV
956
    tDecoderClear(&dc);
×
UNCOV
957
    tdbFree(pData);
×
UNCOV
958
    metaULock(pMeta);
×
UNCOV
959
    taosHashClear(pColCmprObj);
×
UNCOV
960
    return 0;
×
961
  }
962
  tDecoderClear(&dc);
1,040,945✔
963
  tdbFree(pData);
1,040,675✔
964
  metaULock(pMeta);
1,040,696✔
965

966
  *ppColCmprObj = pColCmprObj;
1,040,690✔
967
  colCompressDebug(pColCmprObj);
1,040,690✔
968

969
  return 0;
1,040,586✔
970
}
971
// refactor later
972
void *metaGetIdx(SMeta *pMeta) { return pMeta->pTagIdx; }
52,987✔
973
void *metaGetIvtIdx(SMeta *pMeta) { return pMeta->pTagIvtIdx; }
52,957✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc