• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4442

04 Jul 2025 02:10AM UTC coverage: 63.58% (+0.3%) from 63.29%
#4442

push

travis-ci

web-flow
fix:(stmt2) heap buffer overflow (#31607)

160719 of 321690 branches covered (49.96%)

Branch coverage included in aggregate %.

19 of 22 new or added lines in 3 files covered. (86.36%)

222 existing lines in 60 files now uncovered.

247667 of 320626 relevant lines covered (77.24%)

17710656.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.93
/source/libs/parser/src/parInsertUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "parInsertUtil.h"
17

18
#include "catalog.h"
19
#include "parInt.h"
20
#include "parUtil.h"
21
#include "querynodes.h"
22
#include "tRealloc.h"
23
#include "tdatablock.h"
24
#include "tdataformat.h"
25
#include "tmisce.h"
26

27
void qDestroyBoundColInfo(void* pInfo) {
44,494✔
28
  if (NULL == pInfo) {
44,494✔
29
    return;
23,647✔
30
  }
31

32
  SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
20,847✔
33

34
  taosMemoryFreeClear(pBoundInfo->pColIndex);
20,847!
35
}
36

37
static char* tableNameGetPosition(SToken* pToken, char target) {
10,047,148✔
38
  bool inEscape = false;
10,047,148✔
39
  bool inQuote = false;
10,047,148✔
40
  char quotaStr = 0;
10,047,148✔
41

42
  for (uint32_t i = 0; i < pToken->n; ++i) {
111,167,837✔
43
    if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
110,058,475!
44
      return pToken->z + i;
8,937,786✔
45
    }
46

47
    if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
101,120,689✔
48
      if (!inQuote) {
188,068!
49
        inEscape = !inEscape;
188,099✔
50
      }
51
    }
52

53
    if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
101,120,689✔
54
      if (!inEscape) {
39,420✔
55
        if (!inQuote) {
24,056✔
56
          quotaStr = *(pToken->z + i);
12,028✔
57
          inQuote = !inQuote;
12,028✔
58
        } else if (quotaStr == *(pToken->z + i)) {
12,028!
59
          inQuote = !inQuote;
12,028✔
60
        }
61
      }
62
    }
63
  }
64

65
  return NULL;
1,109,362✔
66
}
67

68
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
10,047,198✔
69
  const char* msg1 = "name too long";
10,047,198✔
70
  const char* msg2 = "invalid database name";
10,047,198✔
71
  const char* msg3 = "db is not specified";
10,047,198✔
72
  const char* msg4 = "invalid table name";
10,047,198✔
73

74
  int32_t code = TSDB_CODE_SUCCESS;
10,047,198✔
75
  char*   p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
10,047,198✔
76

77
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
10,055,094✔
78
    int32_t dbLen = p - pTableName->z;
8,937,236✔
79
    if (dbLen <= 0) {
8,937,236!
80
      return buildInvalidOperationMsg(pMsgBuf, msg2);
×
81
    }
82
    char name[TSDB_DB_FNAME_LEN] = {0};
8,937,236✔
83
    strncpy(name, pTableName->z, dbLen);
8,937,236✔
84
    int32_t actualDbLen = strdequote(name);
8,937,236✔
85

86
    code = tNameSetDbName(pName, acctId, name, actualDbLen);
8,935,419✔
87
    if (code != TSDB_CODE_SUCCESS) {
8,921,469!
88
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
89
    }
90

91
    int32_t tbLen = pTableName->n - dbLen - 1;
8,921,469✔
92
    if (tbLen <= 0) {
8,921,469!
93
      return buildInvalidOperationMsg(pMsgBuf, msg4);
×
94
    }
95

96
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
8,921,469✔
97
    strncpy(tbname, p + 1, tbLen);
8,921,469✔
98
    /*tbLen = */ (void)strdequote(tbname);
8,921,469✔
99

100
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
8,938,127✔
101
    if (code != 0) {
8,919,364!
102
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
103
    }
104
  } else {  // get current DB name first, and then set it into path
105
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
1,117,858✔
106
    strncpy(tbname, pTableName->z, pTableName->n);
1,117,858✔
107
    int32_t tbLen = strdequote(tbname);
1,117,858✔
108
    if (tbLen >= TSDB_TABLE_NAME_LEN) {
1,117,999!
109
      return buildInvalidOperationMsg(pMsgBuf, msg1);
3✔
110
    }
111
    if (tbLen == 0) {
1,118,005✔
112
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "invalid table name");
2✔
113
    }
114

115
    char name[TSDB_TABLE_FNAME_LEN] = {0};
1,118,003✔
116
    strncpy(name, pTableName->z, pTableName->n);
1,118,003✔
117
    (void)strdequote(name);
1,118,003✔
118

119
    if (dbName == NULL) {
1,118,071✔
120
      return buildInvalidOperationMsg(pMsgBuf, msg3);
7✔
121
    }
122
    if (name[0] == '\0') return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
1,118,064!
123

124
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
1,118,064✔
125
    if (code != TSDB_CODE_SUCCESS) {
1,117,978!
126
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
×
127
      return code;
×
128
    }
129

130
    code = tNameFromString(pName, name, T_NAME_TABLE);
1,117,978✔
131
    if (code != 0) {
1,117,983!
132
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
×
133
    }
134
  }
135

136
  if (NULL != strchr(pName->tname, '.')) {
10,037,355!
137
    code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'");
×
138
  }
139

140
  return code;
10,045,673✔
141
}
142

143
int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchema) {
9,673,908✔
144
  while (start < end) {
58,858,278✔
145
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
58,829,922✔
146
      return start;
9,645,552✔
147
    }
148
    ++start;
49,184,370✔
149
  }
150
  return -1;
28,356✔
151
}
152

153
int32_t insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
68,826✔
154
                            SArray* tagName, uint8_t tagNum, int32_t ttl) {
155
  pTbReq->type = TD_CHILD_TABLE;
68,826✔
156
  pTbReq->ctb.pTag = (uint8_t*)pTag;
68,826✔
157
  pTbReq->name = taosStrdup(tname);
68,826!
158
  if (!pTbReq->name) return terrno;
68,845!
159
  pTbReq->ctb.suid = suid;
68,845✔
160
  pTbReq->ctb.tagNum = tagNum;
68,845✔
161
  if (sname) {
68,845✔
162
    pTbReq->ctb.stbName = taosStrdup(sname);
66,329!
163
    if (!pTbReq->ctb.stbName) return terrno;
66,340!
164
  }
165
  pTbReq->ctb.tagName = taosArrayDup(tagName, NULL);
68,856✔
166
  if (!pTbReq->ctb.tagName) return terrno;
68,857✔
167
  pTbReq->ttl = ttl;
68,841✔
168
  pTbReq->commentLen = -1;
68,841✔
169

170
  return TSDB_CODE_SUCCESS;
68,841✔
171
}
172

173
static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
×
174
  for (int32_t i = 0; i < ncols; ++i) {
×
175
    pBoundCols[i] = i;
×
176
  }
177
}
×
178

179
static int32_t initColValues(STableMeta* pTableMeta, SArray* pValues) {
9,879,367✔
180
  SSchema* pSchemas = getTableColumnSchema(pTableMeta);
9,879,367✔
181
  int32_t  code = 0;
9,882,460✔
182
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
119,230,070✔
183
    SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
109,350,205✔
184
    if (NULL == taosArrayPush(pValues, &val)) {
109,347,610!
185
      code = terrno;
×
186
      break;
×
187
    }
188
  }
189
  return code;
9,879,865✔
190
}
191

192
int32_t insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { return initColValues(pTableMeta, aColValues); }
10,595✔
193

194
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
9,976,791✔
195
  pInfo->numOfCols = numOfBound;
9,976,791✔
196
  pInfo->numOfBound = numOfBound;
9,976,791✔
197
  pInfo->hasBoundCols = false;
9,976,791✔
198
  pInfo->mixTagsCols = false;
9,976,791✔
199
  pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t));
9,976,791!
200
  if (NULL == pInfo->pColIndex) {
9,993,892!
201
    return terrno;
×
202
  }
203
  for (int32_t i = 0; i < numOfBound; ++i) {
120,433,646✔
204
    pInfo->pColIndex[i] = i;
110,439,754✔
205
  }
206
  return TSDB_CODE_SUCCESS;
9,993,892✔
207
}
208

209
void insResetBoundColsInfo(SBoundColInfo* pInfo) {
22✔
210
  pInfo->numOfBound = pInfo->numOfCols;
22✔
211
  pInfo->hasBoundCols = false;
22✔
212
  pInfo->mixTagsCols = false;
22✔
213
  for (int32_t i = 0; i < pInfo->numOfCols; ++i) {
110✔
214
    pInfo->pColIndex[i] = i;
88✔
215
  }
216
}
22✔
217

218
void insCheckTableDataOrder(STableDataCxt* pTableCxt, SRowKey* rowKey) {
781,357,239✔
219
  // once the data block is disordered, we do NOT keep last timestamp any more
220
  if (!pTableCxt->ordered) {
781,357,239✔
221
    return;
37,188,938✔
222
  }
223

224
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) < 0) {
744,168,301✔
225
    pTableCxt->ordered = false;
50,678✔
226
  }
227

228
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) == 0) {
744,497,278✔
229
    pTableCxt->duplicateTs = true;
125✔
230
  }
231

232
  // TODO: for variable length data type, we need to copy it out
233
  pTableCxt->lastKey = *rowKey;
744,227,071✔
234
  return;
744,227,071✔
235
}
236

237
void insDestroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
29,434,237!
238

239
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput,
9,845,017✔
240
                                  bool colMode, bool ignoreColVals) {
241
  STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
9,845,017!
242
  if (NULL == pTableCxt) {
9,885,974!
243
    *pOutput = NULL;
×
244
    return terrno;
×
245
  }
246

247
  int32_t code = TSDB_CODE_SUCCESS;
9,885,974✔
248

249
  pTableCxt->lastKey = (SRowKey){0};
9,885,974✔
250
  pTableCxt->ordered = true;
9,885,974✔
251
  pTableCxt->duplicateTs = false;
9,885,974✔
252

253
  pTableCxt->pMeta = tableMetaDup(pTableMeta);
9,885,974✔
254
  if (NULL == pTableCxt->pMeta) {
9,875,129!
255
    code = TSDB_CODE_OUT_OF_MEMORY;
×
256
  }
257
  if (TSDB_CODE_SUCCESS == code) {
9,875,129!
258
    pTableCxt->pSchema =
9,896,987✔
259
        tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
9,875,206✔
260
    if (NULL == pTableCxt->pSchema) {
9,896,987!
261
      code = TSDB_CODE_OUT_OF_MEMORY;
×
262
    }
263
  }
264
  if (TSDB_CODE_SUCCESS == code) {
9,896,910!
265
    code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo);
9,897,849✔
266
  }
267
  if (TSDB_CODE_SUCCESS == code && !ignoreColVals) {
9,895,701!
268
    pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal));
9,879,809✔
269
    if (NULL == pTableCxt->pValues) {
9,870,619!
270
      code = terrno;
×
271
    } else {
272
      code = initColValues(pTableMeta, pTableCxt->pValues);
9,870,619✔
273
    }
274
  }
275
  if (TSDB_CODE_SUCCESS == code) {
9,891,965✔
276
    pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
9,891,322!
277
    if (NULL == pTableCxt->pData) {
9,891,949!
278
      code = terrno;
×
279
    } else {
280
      pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
9,891,949!
281
      pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
9,891,949✔
282
      pTableCxt->pData->suid = pTableMeta->suid;
9,891,949✔
283
      pTableCxt->pData->uid = pTableMeta->uid;
9,891,949✔
284
      pTableCxt->pData->sver = pTableMeta->sversion;
9,891,949✔
285
      pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
9,891,949!
286
      if (pCreateTbReq != NULL) *pCreateTbReq = NULL;
9,891,949!
287
      if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
9,891,949✔
288
        pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
20,995✔
289
        if (NULL == pTableCxt->pData->aCol) {
20,974!
290
          code = terrno;
×
291
        }
292
      } else {
293
        pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
9,870,954✔
294
        if (NULL == pTableCxt->pData->aRowP) {
9,860,992!
295
          code = terrno;
×
296
        }
297
      }
298
    }
299
  }
300
  if (TSDB_CODE_SUCCESS == code) {
9,882,609!
301
    *pOutput = pTableCxt;
9,882,609✔
302
    parserDebug("uid:%" PRId64 ", create table data context, code:%d, vgId:%d", pTableMeta->uid, code,
9,882,609✔
303
                pTableMeta->vgId);
304
  } else {
305
    insDestroyTableDataCxt(pTableCxt);
×
306
  }
307

308
  return code;
9,881,289✔
309
}
310

311
static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
43,440✔
312
  int32_t        code = TSDB_CODE_SUCCESS;
43,440✔
313
  SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData));
43,440!
314
  if (NULL == pTmp) {
43,457!
315
    code = terrno;
×
316
  } else {
317
    pTmp->flags = pSrc->flags;
43,457✔
318
    pTmp->suid = pSrc->suid;
43,457✔
319
    pTmp->uid = pSrc->uid;
43,457✔
320
    pTmp->sver = pSrc->sver;
43,457✔
321
    pTmp->pCreateTbReq = NULL;
43,457✔
322
    if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
43,457✔
323
      if (pSrc->pCreateTbReq) {
42,658✔
324
        code = cloneSVreateTbReq(pSrc->pCreateTbReq, &pTmp->pCreateTbReq);
42,650✔
325
      } else {
326
        pTmp->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
8✔
327
      }
328
    }
329
    if (TSDB_CODE_SUCCESS == code) {
43,453!
330
      if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
43,453✔
331
        pTmp->aCol = taosArrayInit(128, sizeof(SColData));
40,949✔
332
        if (NULL == pTmp->aCol) {
40,943!
333
          code = terrno;
×
334
          taosMemoryFree(pTmp);
×
335
        }
336
      } else {
337
        pTmp->aRowP = taosArrayInit(128, POINTER_BYTES);
2,504✔
338
        if (NULL == pTmp->aRowP) {
2,504!
339
          code = terrno;
×
340
          taosMemoryFree(pTmp);
×
341
        }
342
      }
343
    } else {
344
      taosMemoryFree(pTmp);
×
345
    }
346
  }
347

348
  taosMemoryFree(pSrc);
43,449!
349
  if (TSDB_CODE_SUCCESS == code) {
43,450✔
350
    *pDst = pTmp;
43,447✔
351
  }
352

353
  return code;
43,450✔
354
}
355

356
static void resetColValues(SArray* pValues) {
15,572✔
357
  int32_t num = taosArrayGetSize(pValues);
15,572✔
358
  for (int32_t i = 0; i < num; ++i) {
414,474✔
359
    SColVal* pVal = taosArrayGet(pValues, i);
398,902✔
360
    pVal->flag = CV_FLAG_NONE;
398,902✔
361
  }
362
}
15,572✔
363

364
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
11,901,469✔
365
                           SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals) {
366
  STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
11,901,469✔
367
  if (NULL != tmp) {
11,879,430✔
368
    *pTableCxt = *tmp;
2,029,676✔
369
    if (!ignoreColVals) {
2,029,676✔
370
      resetColValues((*pTableCxt)->pValues);
15,572✔
371
    }
372
    return TSDB_CODE_SUCCESS;
2,029,676✔
373
  }
374
  int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals);
9,849,754✔
375
  if (TSDB_CODE_SUCCESS == code) {
9,879,740!
376
    void* pData = *pTableCxt;  // deal scan coverity
9,880,539✔
377
    code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
9,880,539✔
378
  }
379

380
  if (TSDB_CODE_SUCCESS != code) {
9,894,579!
381
    insDestroyTableDataCxt(*pTableCxt);
×
382
  }
383
  return code;
9,895,519✔
384
}
385

386
static void destroyColVal(void* p) {
109,772,841✔
387
  SColVal* pVal = p;
109,772,841✔
388
  if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type ||
109,772,841✔
389
      TSDB_DATA_TYPE_VARBINARY == pVal->value.type || TSDB_DATA_TYPE_DECIMAL == pVal->value.type) {
104,237,589✔
390
    taosMemoryFreeClear(pVal->value.pData);
5,727,367!
391
  }
392
}
109,772,841✔
393

394
void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
9,920,923✔
395
  if (NULL == pTableCxt) {
9,920,923!
396
    return;
×
397
  }
398

399
  taosMemoryFreeClear(pTableCxt->pMeta);
9,920,923!
400
  tDestroyTSchema(pTableCxt->pSchema);
9,921,730!
401
  insDestroyBoundColInfo(&pTableCxt->boundColsInfo);
9,921,388✔
402
  taosArrayDestroyEx(pTableCxt->pValues, destroyColVal);
9,919,462✔
403
  if (pTableCxt->pData) {
9,920,970✔
404
    tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
57,449✔
405
    taosMemoryFree(pTableCxt->pData);
57,446!
406
  }
407
  taosMemoryFree(pTableCxt);
9,920,984!
408
}
409

410
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
9,863,925✔
411
  if (NULL == pVgCxt) {
9,863,925!
412
    return;
×
413
  }
414

415
  tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
9,863,925✔
416
  taosMemoryFree(pVgCxt->pData);
9,870,060!
417
  taosMemoryFree(pVgCxt);
9,872,712!
418
}
419

420
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
19,570,683✔
421
  if (NULL == pVgCxtList) {
19,570,683✔
422
    return;
9,767,973✔
423
  }
424

425
  size_t size = taosArrayGetSize(pVgCxtList);
9,802,710✔
426
  for (int32_t i = 0; i < size; i++) {
19,696,458✔
427
    void* p = taosArrayGetP(pVgCxtList, i);
9,866,899✔
428
    insDestroyVgroupDataCxt(p);
9,864,393✔
429
  }
430

431
  taosArrayDestroy(pVgCxtList);
9,829,559✔
432
}
433

434
void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) {
×
435
  if (NULL == pVgCxtHash) {
×
436
    return;
×
437
  }
438

439
  void** p = taosHashIterate(pVgCxtHash, NULL);
×
440
  while (p) {
×
441
    insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p);
×
442

443
    p = taosHashIterate(pVgCxtHash, p);
×
444
  }
445

446
  taosHashCleanup(pVgCxtHash);
×
447
}
448

449
void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
9,812,467✔
450
  if (NULL == pTableCxtHash) {
9,812,467✔
451
    return;
20,881✔
452
  }
453

454
  void** p = taosHashIterate(pTableCxtHash, NULL);
9,791,586✔
455
  while (p) {
19,669,382✔
456
    insDestroyTableDataCxt(*(STableDataCxt**)p);
9,880,199✔
457

458
    p = taosHashIterate(pTableCxtHash, p);
9,880,366✔
459
  }
460

461
  taosHashCleanup(pTableCxtHash);
9,789,183✔
462
}
463

464
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) {
9,885,982✔
465
  if (NULL == pVgCxt->pData->aSubmitTbData) {
9,885,982✔
466
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
9,828,934✔
467
    if (NULL == pVgCxt->pData->aSubmitTbData) {
9,824,711!
468
      return terrno;
×
469
    }
470
  }
471

472
  // push data to submit, rebuild empty data for next submit
473
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData)) {
19,770,499!
474
    return terrno;
×
475
  }
476
  int32_t code = 0;
9,888,740✔
477
  if (isRebuild) {
9,888,740✔
478
    code = rebuildTableData(pTableCxt->pData, &pTableCxt->pData);
43,444✔
479
  } else if (clear) {
9,845,296✔
480
    taosMemoryFreeClear(pTableCxt->pData);
9,842,934!
481
  }
482

483
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
9,877,510✔
484

485
  return code;
9,869,136✔
486
}
487

488
static int32_t createVgroupDataCxt(int32_t vgId, SHashObj* pVgroupHash, SArray* pVgroupList, SVgroupDataCxt** pOutput) {
9,819,319✔
489
  SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
9,819,319!
490
  if (NULL == pVgCxt) {
9,870,593!
491
    return terrno;
×
492
  }
493
  pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2));
9,870,593!
494
  if (NULL == pVgCxt->pData) {
9,865,738!
495
    insDestroyVgroupDataCxt(pVgCxt);
×
496
    return terrno;
×
497
  }
498

499
  pVgCxt->vgId = vgId;
9,865,738✔
500
  int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
9,865,738✔
501
  if (TSDB_CODE_SUCCESS == code) {
9,865,330!
502
    if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) {
9,846,600!
503
      code = terrno;
×
504
      insDestroyVgroupDataCxt(pVgCxt);
×
505
      return code;
5,133✔
506
    }
507
    //    uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
508
    *pOutput = pVgCxt;
9,846,600✔
509
  } else {
510
    insDestroyVgroupDataCxt(pVgCxt);
×
511
  }
512
  return code;
9,848,295✔
513
}
514

515
int insColDataComp(const void* lp, const void* rp) {
90,035✔
516
  SColData* pLeft = (SColData*)lp;
90,035✔
517
  SColData* pRight = (SColData*)rp;
90,035✔
518
  if (pLeft->cid < pRight->cid) {
90,035✔
519
    return -1;
89,078✔
520
  } else if (pLeft->cid > pRight->cid) {
957!
521
    return 1;
972✔
522
  }
523

524
  return 0;
×
525
}
526

527
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId,
12,290✔
528
                                 STableColsData* pTbData, SName* sname) {
529
  if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) {
12,290!
530
    return TSDB_CODE_SUCCESS;
12,234✔
531
  }
532

533
  SVgroupInfo      vgInfo = {0};
56✔
534
  SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
56✔
535
                           .requestId = pBuildInfo->requestId,
56✔
536
                           .requestObjRefId = pBuildInfo->requestSelf,
56✔
537
                           .mgmtEps = pBuildInfo->mgmtEpSet};
538

539
  int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo);
56✔
540
  if (TSDB_CODE_SUCCESS != code) {
56!
541
    return code;
×
542
  }
543

544
  code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
56✔
545
  if (TSDB_CODE_SUCCESS != code) {
56!
546
    return code;
×
547
  }
548

549
  return TSDB_CODE_SUCCESS;
56✔
550
}
551

552
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData,
56,784✔
553
                             uint64_t* uid, int32_t* vgId) {
554
  STableVgUid* pTbInfo = NULL;
56,784✔
555
  int32_t      code = 0;
56,784✔
556

557
  if (pTbData->getFromHash) {
56,784✔
558
    pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName));
44,508✔
559
  }
560

561
  if (NULL == pTbInfo) {
56,625✔
562
    SName sname;
563
    code = qCreateSName(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0);
12,323✔
564
    if (TSDB_CODE_SUCCESS != code) {
12,328!
565
      return code;
38✔
566
    }
567

568
    STableMeta*      pTableMeta = NULL;
12,328✔
569
    SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
12,328✔
570
                             .requestId = pBuildInfo->requestId,
12,328✔
571
                             .requestObjRefId = pBuildInfo->requestSelf,
12,328✔
572
                             .mgmtEps = pBuildInfo->mgmtEpSet};
573
    code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta);
12,328✔
574

575
    if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
12,328✔
576
      parserWarn("stmt2 async bind don't find table:%s.%s, try auto create table", sname.dbname, sname.tname);
38!
577
      return code;
38✔
578
    }
579

580
    if (TSDB_CODE_SUCCESS != code) {
12,290!
581
      parserError("stmt2 async get table meta:%s.%s failed, code:%d", sname.dbname, sname.tname, code);
×
582
      return code;
×
583
    }
584

585
    *uid = pTableMeta->uid;
12,290✔
586
    *vgId = pTableMeta->vgId;
12,290✔
587

588
    STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId};
12,290✔
589
    code = tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo));
12,290✔
590
    if (TSDB_CODE_SUCCESS == code) {
12,290!
591
      code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname);
12,290✔
592
    }
593

594
    taosMemoryFree(pTableMeta);
12,290!
595
  } else {
596
    *uid = pTbInfo->uid;
44,302✔
597
    *vgId = pTbInfo->vgid;
44,302✔
598
  }
599

600
  return code;
56,591✔
601
}
602

603
int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
×
604
  int32_t             code = TSDB_CODE_SUCCESS;
×
605
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
×
606

607
  if (TSDB_CODE_SUCCESS == code) {
×
608
    code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
×
609
  }
610

611
  return code;
×
612
}
613

614
int32_t checkAndMergeSVgroupDataCxtByTbname(STableDataCxt* pTbCtx, SVgroupDataCxt* pVgCxt, SSHashObj* pTableNameHash,
56,816✔
615
                                            char* tbname) {
616
  if (NULL == pVgCxt->pData->aSubmitTbData) {
56,816✔
617
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
23,507✔
618
    if (NULL == pVgCxt->pData->aSubmitTbData) {
23,526!
UNCOV
619
      return terrno;
×
620
    }
621
  }
622

623
  int32_t        code = TSDB_CODE_SUCCESS;
56,835✔
624
  SArray**       rowP = NULL;
56,835✔
625

626
  rowP = (SArray**)tSimpleHashGet(pTableNameHash, tbname, strlen(tbname));
56,835✔
627

628
  if (rowP != NULL && *rowP != NULL) {
56,644!
629
    for (int32_t j = 0; j < taosArrayGetSize(*rowP); ++j) {
81✔
630
      SRow* pRow = (SRow*)taosArrayGetP(pTbCtx->pData->aRowP, j);
57✔
631
      if (pRow) {
57✔
632
        if (NULL == taosArrayPush(*rowP, &pRow)) {
60!
633
          return terrno;
×
634
        }
635
      }
636

637
      code = tRowSort(*rowP);
57✔
638
      if (code != TSDB_CODE_SUCCESS) {
57!
UNCOV
639
        return code;
×
640
      }
641
      code = tRowMerge(*rowP, pTbCtx->pSchema, 0);
57✔
642
      if (code != TSDB_CODE_SUCCESS) {
57!
UNCOV
643
        return code;
×
644
      }
645
    }
646

647
    parserDebug("merge same uid data: %" PRId64 ", vgId:%d", pTbCtx->pData->uid, pVgCxt->vgId);
24!
648

649
    if (pTbCtx->pData->pCreateTbReq != NULL) {
24✔
650
      tdDestroySVCreateTbReq(pTbCtx->pData->pCreateTbReq);
4!
651
      taosMemoryFree(pTbCtx->pData->pCreateTbReq);
4!
652
      pTbCtx->pData->pCreateTbReq = NULL;
4✔
653
    }
654

655
    return TSDB_CODE_SUCCESS;
24✔
656
  }
657

658
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTbCtx->pData)) {
113,124!
659
    return terrno;
×
660
  }
661

662
  code = tSimpleHashPut(pTableNameHash, tbname, strlen(tbname), &pTbCtx->pData->aRowP, sizeof(SArray*));
56,504✔
663

664
  if (code != TSDB_CODE_SUCCESS) {
56,503!
UNCOV
665
    return code;
×
666
  }
667

668
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTbCtx->pMeta->uid, pVgCxt->vgId);
56,503!
669

670
  return TSDB_CODE_SUCCESS;
56,513✔
671
}
672

673
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
56,869✔
674
                                  SStbInterlaceInfo* pBuildInfo, SVCreateTbReq* ctbReq) {
675
  int32_t  code = TSDB_CODE_SUCCESS;
56,869✔
676
  uint64_t uid;
677
  int32_t  vgId;
678

679
  pTbCtx->pData->aRowP = pTbData->aCol;
56,869✔
680

681
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId);
56,869✔
682
  if (ctbReq != NULL && code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
56,614✔
683
    pTbCtx->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
38✔
684
    vgId = (int32_t)ctbReq->uid;
38✔
685
    uid = 0;
38✔
686
    pTbCtx->pMeta->vgId = (int32_t)ctbReq->uid;
38✔
687
    ctbReq->uid = 0;
38✔
688
    pTbCtx->pMeta->uid = 0;
38✔
689
    pTbCtx->pData->uid = 0;
38✔
690
    pTbCtx->pData->pCreateTbReq = ctbReq;
38✔
691
    code = TSDB_CODE_SUCCESS;
38✔
692
  } else {
693
    if (TSDB_CODE_SUCCESS != code) {
56,576!
UNCOV
694
      return code;
×
695
    }
696
    pTbCtx->pMeta->vgId = vgId;
56,576✔
697
    pTbCtx->pMeta->uid = uid;
56,576✔
698
    pTbCtx->pData->uid = uid;
56,576✔
699
    pTbCtx->pData->pCreateTbReq = NULL;
56,576✔
700

701
    if (ctbReq != NULL) {
56,576✔
702
      tdDestroySVCreateTbReq(ctbReq);
703
      taosMemoryFree(ctbReq);
52!
704
      ctbReq = NULL;
52✔
705
    }
706
  }
707

708
  if (!pTbData->isOrdered) {
56,614✔
709
    code = tRowSort(pTbCtx->pData->aRowP);
50,765✔
710
  }
711
  if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
56,603!
712
    code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0);
50,758✔
713
  }
714

715
  if (TSDB_CODE_SUCCESS != code) {
56,478!
UNCOV
716
    return code;
×
717
  }
718

719
  SVgroupDataCxt* pVgCxt = NULL;
56,478✔
720
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
56,478✔
721
  if (NULL == pp) {
56,880✔
722
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
23,506✔
723
    if (NULL == pp) {
23,519!
724
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
23,519✔
725
    } else {
UNCOV
726
      pVgCxt = *(SVgroupDataCxt**)pp;
×
727
    }
728
  } else {
729
    pVgCxt = *(SVgroupDataCxt**)pp;
33,374✔
730
  }
731

732
  if (code == TSDB_CODE_SUCCESS) {
56,884✔
733
    code = checkAndMergeSVgroupDataCxtByTbname(pTbCtx, pVgCxt, pBuildInfo->pTableRowDataHash, pTbData->tbName);
56,863✔
734
  }
735

736
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
56,545!
UNCOV
737
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
738
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
UNCOV
739
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
740
    // insDestroyVgroupDataCxt(pVgCxt);
741
  }
742

743
  return code;
56,497✔
744
}
745

746
/*
747
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild,
748
int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
749
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
750
  if (NULL == pVgroupHash || NULL == pVgroupList) {
751
    taosHashCleanup(pVgroupHash);
752
    taosArrayDestroy(pVgroupList);
753
    return TSDB_CODE_OUT_OF_MEMORY;
754
  }
755

756
  int32_t code = TSDB_CODE_SUCCESS;
757

758
  for (int32_t i = 0; i < tbNum; ++i) {
759
    STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i);
760
    pTableCxt->pMeta->vgId = pTableCols->vgId;
761
    pTableCxt->pMeta->uid = pTableCols->uid;
762
    pTableCxt->pData->uid = pTableCols->uid;
763
    pTableCxt->pData->aCol = pTableCols->aCol;
764

765
    SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
766
    if (pCol->nVal <= 0) {
767
      continue;
768
    }
769

770
    if (pTableCxt->pData->pCreateTbReq) {
771
      pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
772
    }
773

774
    taosArraySort(pTableCxt->pData->aCol, insColDataComp);
775

776
    tColDataSortMerge(pTableCxt->pData->aCol);
777

778
    if (TSDB_CODE_SUCCESS == code) {
779
      SVgroupDataCxt* pVgCxt = NULL;
780
      int32_t         vgId = pTableCxt->pMeta->vgId;
781
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
782
      if (NULL == pp) {
783
        code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
784
      } else {
785
        pVgCxt = *(SVgroupDataCxt**)pp;
786
      }
787
      if (TSDB_CODE_SUCCESS == code) {
788
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false);
789
      }
790
    }
791
  }
792

793
  taosHashCleanup(pVgroupHash);
794
  if (TSDB_CODE_SUCCESS == code) {
795
    *pVgDataBlocks = pVgroupList;
796
  } else {
797
    insDestroyVgroupDataCxtList(pVgroupList);
798
  }
799

800
  return code;
801
}
802
*/
803

804
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
9,747,798✔
805
  SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
9,747,798✔
806
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
9,790,972✔
807
  if (NULL == pVgroupHash || NULL == pVgroupList) {
9,796,402!
808
    taosHashCleanup(pVgroupHash);
5,119✔
UNCOV
809
    taosArrayDestroy(pVgroupList);
×
UNCOV
810
    return terrno;
×
811
  }
812

813
  int32_t code = TSDB_CODE_SUCCESS;
9,791,283✔
814
  bool    colFormat = false;
9,791,283✔
815

816
  void* p = taosHashIterate(pTableHash, NULL);
9,791,283✔
817
  if (p) {
9,802,568!
818
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
9,802,888✔
819
    colFormat = (0 != (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT));
9,802,888✔
820
  }
821

822
  while (TSDB_CODE_SUCCESS == code && NULL != p) {
19,686,172✔
823
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
9,899,089✔
824
    if (colFormat) {
9,899,089✔
825
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
40,970✔
826
      if (pCol && pCol->nVal <= 0) {
40,968!
827
        p = taosHashIterate(pTableHash, p);
16✔
828
        continue;
16✔
829
      }
830

831
      if (pTableCxt->pData->pCreateTbReq) {
40,952✔
832
        pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
40,168✔
833
      }
834

835
      taosArraySort(pTableCxt->pData->aCol, insColDataComp);
40,952✔
836

837
      code = tColDataSortMerge(&pTableCxt->pData->aCol);
40,944✔
838
    } else {
839
      // skip the table has no data to insert
840
      // eg: import a csv without valid data
841
      // if (0 == taosArrayGetSize(pTableCxt->pData->aRowP)) {
842
      //   parserWarn("no row in tableDataCxt uid:%" PRId64 " ", pTableCxt->pMeta->uid);
843
      //   p = taosHashIterate(pTableHash, p);
844
      //   continue;
845
      // }
846
      if (!pTableCxt->ordered) {
9,858,119✔
847
        code = tRowSort(pTableCxt->pData->aRowP);
50,678✔
848
      }
849
      if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
9,858,119✔
850
        code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
49,595✔
851
      }
852
    }
853

854
    if (TSDB_CODE_SUCCESS == code) {
9,900,861!
855
      SVgroupDataCxt* pVgCxt = NULL;
9,900,861✔
856
      int32_t         vgId = pTableCxt->pMeta->vgId;
9,900,861✔
857
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
9,900,861✔
858
      if (NULL == pp) {
9,857,182✔
859
        code = createVgroupDataCxt(vgId, pVgroupHash, pVgroupList, &pVgCxt);
9,797,985✔
860
      } else {
861
        pVgCxt = *(SVgroupDataCxt**)pp;
59,197✔
862
      }
863
      if (TSDB_CODE_SUCCESS == code) {
9,888,146!
864
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true);
9,889,145✔
865
      }
866
    }
867
    if (TSDB_CODE_SUCCESS == code) {
9,869,326!
868
      p = taosHashIterate(pTableHash, p);
9,875,955✔
869
    }
870
  }
871

872
  taosHashCleanup(pVgroupHash);
9,787,083✔
873
  if (TSDB_CODE_SUCCESS == code) {
9,786,271✔
874
    *pVgDataBlocks = pVgroupList;
9,779,186✔
875
  } else {
876
    insDestroyVgroupDataCxtList(pVgroupList);
7,085✔
877
  }
878

879
  return code;
9,780,344✔
880
}
881

882
static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
9,827,583✔
883
  int32_t  code = TSDB_CODE_SUCCESS;
9,827,583✔
884
  uint32_t len = 0;
9,827,583✔
885
  void*    pBuf = NULL;
9,827,583✔
886
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
9,827,583!
887
  if (TSDB_CODE_SUCCESS == code) {
9,803,190!
888
    SEncoder encoder;
889
    len += sizeof(SSubmitReq2Msg);
9,809,677✔
890
    pBuf = taosMemoryMalloc(len);
9,809,677!
891
    if (NULL == pBuf) {
9,815,216!
UNCOV
892
      return terrno;
×
893
    }
894
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
9,815,216✔
895
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
9,815,216✔
896
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
9,815,216✔
897
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
9,818,468✔
898
    code = tEncodeSubmitReq(&encoder, pReq);
9,836,447✔
899
    tEncoderClear(&encoder);
9,847,726✔
900
  }
901

902
  if (TSDB_CODE_SUCCESS == code) {
9,866,904!
903
    *pData = pBuf;
9,866,904✔
904
    *pLen = len;
9,866,904✔
905
  } else {
UNCOV
906
    taosMemoryFree(pBuf);
×
907
  }
908
  return code;
9,848,162✔
909
}
910

UNCOV
911
static void destroyVgDataBlocks(void* p) {
×
UNCOV
912
  if (p == NULL) return;
×
UNCOV
913
  SVgDataBlocks* pVg = p;
×
UNCOV
914
  taosMemoryFree(pVg->pData);
×
UNCOV
915
  taosMemoryFree(pVg);
×
916
}
917

918
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) {
9,789,400✔
919
  size_t  numOfVg = taosArrayGetSize(pVgDataCxtList);
9,789,400✔
920
  SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES);
9,798,550!
921
  if (NULL == pDataBlocks) {
9,815,727!
UNCOV
922
    return TSDB_CODE_OUT_OF_MEMORY;
×
923
  }
924

925
  int32_t code = TSDB_CODE_SUCCESS;
9,815,727✔
926
  for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
19,651,662✔
927
    SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
9,843,518✔
928
    if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) {
9,853,913!
UNCOV
929
      continue;
×
930
    }
931
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
9,854,276!
932
    if (NULL == dst) {
9,854,984!
UNCOV
933
      code = terrno;
×
934
    }
935
    if (TSDB_CODE_SUCCESS == code) {
9,854,984!
936
      dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
9,856,530✔
937
      code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
9,850,407✔
938
    }
939
    if (TSDB_CODE_SUCCESS == code) {
9,840,855!
940
      code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
9,840,855✔
941
    }
942
    if (TSDB_CODE_SUCCESS == code) {
9,847,829!
943
      code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);
9,834,325!
944
    }
945
    if (TSDB_CODE_SUCCESS != code) {
9,832,656!
UNCOV
946
      destroyVgDataBlocks(dst);
×
947
    }
948
  }
949

950
  if (append) {
9,808,144✔
951
    if (NULL == *pVgDataBlocks) {
22,508!
952
      *pVgDataBlocks = pDataBlocks;
22,508✔
953
    }
954
    return code;
22,508✔
955
  }
956

957
  if (TSDB_CODE_SUCCESS == code) {
9,785,636✔
958
    *pVgDataBlocks = pDataBlocks;
9,778,078✔
959
  } else {
960
    taosArrayDestroyP(pDataBlocks, destroyVgDataBlocks);
7,558✔
961
  }
962

963
  return code;
9,773,786✔
964
}
965

UNCOV
966
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
×
967
  for (int i = 0; i < numFields; i++) {
×
968
    if (strcmp(pSchema->name, fields[i].name) == 0) {
×
UNCOV
969
      return true;
×
970
    }
971
  }
972

UNCOV
973
  return false;
×
974
}
975

976
int32_t checkSchema(SSchema* pColSchema, SSchemaExt* pColExtSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
582✔
977
  if (*fields != pColSchema->type) {
582✔
978
    if (errstr != NULL)
1!
979
      snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
×
980
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
×
981
    return TSDB_CODE_INVALID_PARA;
1✔
982
  }
983

984
  if (IS_DECIMAL_TYPE(pColSchema->type)) {
581!
985
    uint8_t precision = 0, scale = 0;
1✔
986
    decimalFromTypeMod(pColExtSchema->typeMod, &precision, &scale);
1✔
987
    uint8_t precisionData = 0, scaleData = 0;
1✔
988
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
1✔
989
    extractDecimalTypeInfoFromBytes(&bytes, &precisionData, &scaleData);
1✔
990
    if (precision != precisionData || scale != scaleData) {
1!
UNCOV
991
      if (errstr != NULL)
×
992
        snprintf(errstr, errstrLen,
×
993
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
994
                 "precision:%d, scale:%d",
995
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
996
                 precisionData, scaleData);
UNCOV
997
      return TSDB_CODE_INVALID_PARA;
×
998
    }
999
    return 0;
1✔
1000
  }
1001

1002
  if (IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) > pColSchema->bytes) {
580!
1003
    if (errstr != NULL)
1!
1004
      snprintf(errstr, errstrLen,
×
1005
               "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
UNCOV
1006
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
UNCOV
1007
               *(int32_t*)(fields + sizeof(int8_t)));
×
1008
    return TSDB_CODE_INVALID_PARA;
1✔
1009
  }
1010

1011
  if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
579!
UNCOV
1012
    if (errstr != NULL)
×
UNCOV
1013
      snprintf(errstr, errstrLen,
×
1014
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
UNCOV
1015
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
UNCOV
1016
               *(int32_t*)(fields + sizeof(int8_t)));
×
UNCOV
1017
    return TSDB_CODE_INVALID_PARA;
×
1018
  }
1019
  return 0;
579✔
1020
}
1021

1022
#define PRCESS_DATA(i, j)                                                                                 \
1023
  ret = checkSchema(pColSchema, pColExtSchema, fields, errstr, errstrLen);                                \
1024
  if (ret != 0) {                                                                                         \
1025
    goto end;                                                                                             \
1026
  }                                                                                                       \
1027
                                                                                                          \
1028
  if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {                                                 \
1029
    hasTs = true;                                                                                         \
1030
  }                                                                                                       \
1031
                                                                                                          \
1032
  int8_t* offset = pStart;                                                                                \
1033
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {                                                               \
1034
    pStart += numOfRows * sizeof(int32_t);                                                                \
1035
  } else {                                                                                                \
1036
    pStart += BitmapLen(numOfRows);                                                                       \
1037
  }                                                                                                       \
1038
  char* pData = pStart;                                                                                   \
1039
                                                                                                          \
1040
  SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);                                               \
1041
  ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); \
1042
  if (ret != 0) {                                                                                         \
1043
    goto end;                                                                                             \
1044
  }                                                                                                       \
1045
  fields += sizeof(int8_t) + sizeof(int32_t);                                                             \
1046
  if (needChangeLength && version == BLOCK_VERSION_1) {                                                   \
1047
    pStart += htonl(colLength[i]);                                                                        \
1048
  } else {                                                                                                \
1049
    pStart += colLength[i];                                                                               \
1050
  }                                                                                                       \
1051
  boundInfo->pColIndex[j] = -1;
1052

1053
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
130✔
1054
                     int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
1055
  int ret = 0;
130✔
1056
  if (data == NULL) {
130!
UNCOV
1057
    uError("rawBlockBindData, data is NULL");
×
UNCOV
1058
    return TSDB_CODE_APP_ERROR;
×
1059
  }
1060
  void* tmp =
1061
      taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
130✔
1062
  SVCreateTbReq* pCreateReqTmp = NULL;
130✔
1063
  if (tmp == NULL && pCreateTb != NULL) {
130✔
1064
    ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
15✔
1065
    if (ret != TSDB_CODE_SUCCESS) {
15!
UNCOV
1066
      uError("cloneSVreateTbReq error");
×
UNCOV
1067
      goto end;
×
1068
    }
1069
  }
1070

1071
  STableDataCxt* pTableCxt = NULL;
130✔
1072
  ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
130✔
1073
                           sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
1074
  if (pCreateReqTmp != NULL) {
130!
UNCOV
1075
    tdDestroySVCreateTbReq(pCreateReqTmp);
×
1076
    taosMemoryFree(pCreateReqTmp);
×
1077
  }
1078

1079
  if (ret != TSDB_CODE_SUCCESS) {
130!
UNCOV
1080
    uError("insGetTableDataCxt error");
×
UNCOV
1081
    goto end;
×
1082
  }
1083

1084
  pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
130✔
1085
  if (tmp == NULL) {
130✔
1086
    ret = initTableColSubmitData(pTableCxt);
117✔
1087
    if (ret != TSDB_CODE_SUCCESS) {
117!
UNCOV
1088
      uError("initTableColSubmitData error");
×
UNCOV
1089
      goto end;
×
1090
    }
1091
  }
1092

1093
  char* p = (char*)data;
130✔
1094
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1095
  // column length |
1096
  int32_t version = *(int32_t*)data;
130✔
1097
  p += sizeof(int32_t);
130✔
1098
  p += sizeof(int32_t);
130✔
1099

1100
  int32_t numOfRows = *(int32_t*)p;
130✔
1101
  p += sizeof(int32_t);
130✔
1102

1103
  int32_t numOfCols = *(int32_t*)p;
130✔
1104
  p += sizeof(int32_t);
130✔
1105

1106
  p += sizeof(int32_t);
130✔
1107
  p += sizeof(uint64_t);
130✔
1108

1109
  int8_t* fields = p;
130✔
1110
  if (*fields >= TSDB_DATA_TYPE_MAX || *fields < 0) {
130!
UNCOV
1111
    uError("fields type error:%d", *fields);
×
UNCOV
1112
    ret = TSDB_CODE_INVALID_PARA;
×
UNCOV
1113
    goto end;
×
1114
  }
1115
  p += numOfCols * (sizeof(int8_t) + sizeof(int32_t));
130✔
1116

1117
  int32_t* colLength = (int32_t*)p;
130✔
1118
  p += sizeof(int32_t) * numOfCols;
130✔
1119

1120
  char* pStart = p;
130✔
1121

1122
  SSchema*       pSchema = getTableColumnSchema(pTableMeta);
130✔
1123
  SSchemaExt*    pExtSchemas = getTableColumnExtSchema(pTableMeta);
130✔
1124
  SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
130✔
1125

1126
  if (tFields != NULL && numFields != numOfCols) {
130!
UNCOV
1127
    if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to data cols:%d", numFields, numOfCols);
×
UNCOV
1128
    ret = TSDB_CODE_INVALID_PARA;
×
UNCOV
1129
    goto end;
×
1130
  }
1131

1132
  bool hasTs = false;
130✔
1133
  if (tFields == NULL) {
130✔
1134
    int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
6✔
1135
    for (int j = 0; j < len; j++) {
18✔
1136
      SSchema*    pColSchema = &pSchema[j];
14✔
1137
      SSchemaExt* pColExtSchema = &pExtSchemas[j];
14✔
1138
      PRCESS_DATA(j, j)
14!
1139
    }
1140
  } else {
1141
    for (int i = 0; i < numFields; i++) {
654✔
1142
      for (int j = 0; j < boundInfo->numOfBound; j++) {
4,352!
1143
        SSchema*    pColSchema = &pSchema[j];
4,352✔
1144
        SSchemaExt* pColExtSchema = &pExtSchemas[j];
4,352✔
1145
        char*       fieldName = NULL;
4,352✔
1146
        if (raw) {
4,352✔
1147
          fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
4,347✔
1148
        } else {
1149
          fieldName = ((TAOS_FIELD*)tFields)[i].name;
5✔
1150
        }
1151
        if (strcmp(pColSchema->name, fieldName) == 0) {
4,352✔
1152
          PRCESS_DATA(i, j)
530!
1153
          break;
530✔
1154
        }
1155
      }
1156
    }
1157
  }
1158

1159
  if (!hasTs) {
128!
UNCOV
1160
    if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
×
UNCOV
1161
    ret = TSDB_CODE_INVALID_PARA;
×
UNCOV
1162
    goto end;
×
1163
  }
1164

1165
  // process NULL data
1166
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
682✔
1167
    if (boundInfo->pColIndex[c] != -1) {
554✔
1168
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
13✔
1169
      ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
13✔
1170
      if (ret != 0) {
13!
1171
        goto end;
×
1172
      }
1173
    } else {
1174
      boundInfo->pColIndex[c] = c;  // restore for next block
541✔
1175
    }
1176
  }
1177

1178
end:
128✔
1179
  return ret;
130✔
1180
}
1181

UNCOV
1182
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data) {
×
1183
  int code = transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion);
×
UNCOV
1184
  if (code != 0) {
×
1185
    return code;
×
1186
  }
1187
  SVgroupDataCxt* pVgCxt = NULL;
×
1188
  void**          pp = taosHashGet(pVgroupHash, &pTableMeta->vgId, sizeof(pTableMeta->vgId));
×
1189
  if (NULL == pp) {
×
UNCOV
1190
    code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt);
×
UNCOV
1191
    if (code != 0) {
×
UNCOV
1192
      return code;
×
1193
    }
1194
  } else {
1195
    pVgCxt = *(SVgroupDataCxt**)pp;
×
1196
  }
UNCOV
1197
  if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1198
    pVgCxt->pData->aSubmitTbData = taosArrayInit(0, POINTER_BYTES);
×
UNCOV
1199
    pVgCxt->pData->raw = true;
×
1200
    if (NULL == pVgCxt->pData->aSubmitTbData) {
×
UNCOV
1201
      return terrno;
×
1202
    }
1203
  }
1204

1205
  // push data to submit, rebuild empty data for next submit
UNCOV
1206
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, &data)) {
×
UNCOV
1207
    return terrno;
×
1208
  }
1209

UNCOV
1210
  uTrace("add raw data to vgId:%d, len:%d", pTableMeta->vgId, *(int32_t*)data);
×
1211

UNCOV
1212
  return 0;
×
1213
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc