• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4941

27 Jan 2026 10:23AM UTC coverage: 66.868% (+0.04%) from 66.832%
#4941

push

travis-ci

web-flow
fix: asan invalid write issue (#34400)

7 of 8 new or added lines in 2 files covered. (87.5%)

560 existing lines in 126 files now uncovered.

204401 of 305680 relevant lines covered (66.87%)

126915843.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.17
/source/libs/parser/src/parInsertUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "parInsertUtil.h"
17

18
#include "catalog.h"
19
#include "parInt.h"
20
#include "parUtil.h"
21
#include "querynodes.h"
22
#include "tRealloc.h"
23
#include "taoserror.h"
24
#include "tarray.h"
25
#include "tdatablock.h"
26
#include "tdataformat.h"
27
#include "tmisce.h"
28
#include "ttypes.h"
29

30
static char* tableNameGetPosition(SToken* pToken, char target) {
653,453,065✔
31
  bool inEscape = false;
653,453,065✔
32
  bool inQuote = false;
653,453,065✔
33
  char quotaStr = 0;
653,453,065✔
34

35
  for (uint32_t i = 0; i < pToken->n; ++i) {
2,147,483,647✔
36
    if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
2,147,483,647✔
37
      return pToken->z + i;
240,627,121✔
38
    }
39

40
    if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
2,147,483,647✔
41
      if (!inQuote) {
42,773,119✔
42
        inEscape = !inEscape;
42,762,872✔
43
      }
44
    }
45

46
    if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
2,147,483,647✔
47
      if (!inEscape) {
6,230,994✔
48
        if (!inQuote) {
6,229,350✔
49
          quotaStr = *(pToken->z + i);
3,114,675✔
50
          inQuote = !inQuote;
3,114,675✔
51
        } else if (quotaStr == *(pToken->z + i)) {
3,114,675✔
52
          inQuote = !inQuote;
3,114,675✔
53
        }
54
      }
55
    }
56
  }
57

58
  return NULL;
412,840,886✔
59
}
60

61
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
653,451,578✔
62
  const char* msg1 = "table name is too long";
653,451,578✔
63
  const char* msg2 = "invalid database name";
653,451,578✔
64
  const char* msg3 = "db is not specified";
653,451,578✔
65
  const char* msg4 = "invalid table name";
653,451,578✔
66
  const char* msg5 = "database name is too long";
653,451,578✔
67

68
  int32_t code = TSDB_CODE_SUCCESS;
653,451,578✔
69
  char*   p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
653,451,578✔
70

71
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
653,467,378✔
72
    // before dbname dequote
73
    int32_t dbLen = p - pTableName->z;
240,627,133✔
74
    if (dbLen <= 0) {
240,618,033✔
75
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
76
    }
77
    if (dbLen >= TSDB_DB_FNAME_LEN + TSDB_NAME_QUOTE) {
240,618,033✔
78
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg5);
×
79
    }
80

81
    char name[TSDB_DB_FNAME_LEN + TSDB_NAME_QUOTE] = {0};
240,618,033✔
82
    strncpy(name, pTableName->z, dbLen);
240,619,392✔
83
    int32_t actualDbLen = strdequote(name);
240,619,238✔
84

85
    // after dbname dequote
86
    if (actualDbLen <= 0) {
240,620,965✔
87
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
743✔
88
    }
89
    if (actualDbLen >= TSDB_DB_NAME_LEN) {
240,620,235✔
90
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg5);
×
91
    }
92

93
    code = tNameSetDbName(pName, acctId, name, actualDbLen);
240,620,235✔
94
    if (code != TSDB_CODE_SUCCESS) {
240,622,975✔
95
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
96
    }
97

98
    // before tbname dequote
99
    int32_t tbLen = pTableName->n - dbLen - 1;
240,623,012✔
100
    if (tbLen <= 0) {
240,618,385✔
UNCOV
101
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
×
102
    }
103
    if (tbLen >= TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE) {
240,622,900✔
104
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
126✔
105
    }
106

107
    char tbname[TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE] = {0};
240,622,774✔
108
    strncpy(tbname, p + 1, tbLen);
240,623,577✔
109
    int32_t actualTbLen = strdequote(tbname);
240,616,141✔
110

111
    // after tbname dequote
112
    if (actualTbLen <= 0) {
240,623,484✔
113
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
25✔
114
    }
115
    if (actualTbLen >= TSDB_TABLE_NAME_LEN) {
240,624,059✔
116
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
126✔
117
    }
118

119
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
240,623,933✔
120
    if (code != 0) {
240,625,842✔
121
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
3,787✔
122
    }
123
  } else {  // get current DB name first, and then set it into path
124
    // before tbname dequote
125
    int32_t tbLen = pTableName->n;
412,840,245✔
126
    if (tbLen <= 0) {
412,832,206✔
127
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
×
128
    }
129

130
    if (tbLen >= TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE) {
412,836,495✔
131
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
126✔
132
    }
133

134
    char tbname[TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE] = {0};
412,836,369✔
135
    strncpy(tbname, pTableName->z, tbLen);
412,833,392✔
136
    int32_t actualTbLen = strdequote(tbname);
412,834,021✔
137
    // after tbname dequote
138
    if (actualTbLen <= 0) {
412,834,545✔
139
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
3,721✔
140
    }
141
    if (actualTbLen >= TSDB_TABLE_NAME_LEN) {
412,830,956✔
142
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
126✔
143
    }
144

145
    if (dbName == NULL || strlen(dbName) == 0) {
412,830,830✔
146
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_DB_NOT_SPECIFIED, msg3);
24✔
147
    }
148

149
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
412,841,573✔
150
    if (code != TSDB_CODE_SUCCESS) {
412,833,575✔
151
      code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
152
      return code;
×
153
    }
154

155
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
412,833,575✔
156
    if (code != 0) {
412,829,806✔
157
      code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
×
158
    }
159
  }
160

161
  if (NULL != strchr(pName->tname, '.')) {
653,455,072✔
162
    code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'");
×
163
  }
164

165
  return code;
653,458,679✔
166
}
167

168
int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchema) {
928,761,940✔
169
  while (start < end) {
2,147,483,647✔
170
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
2,147,483,647✔
171
      return start;
922,831,566✔
172
    }
173
    ++start;
2,147,483,647✔
174
  }
175
  return -1;
5,930,623✔
176
}
177

178
int32_t insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
15,259,689✔
179
                            SArray* tagName, uint8_t tagNum, int32_t ttl) {
180
  pTbReq->type = TD_CHILD_TABLE;
15,259,689✔
181
  pTbReq->ctb.pTag = (uint8_t*)pTag;
15,264,601✔
182
  pTbReq->name = taosStrdup(tname);
15,260,368✔
183
  if (!pTbReq->name) return terrno;
15,268,545✔
184
  pTbReq->ctb.suid = suid;
15,255,170✔
185
  pTbReq->ctb.tagNum = tagNum;
15,264,354✔
186
  if (sname) {
15,260,077✔
187
    pTbReq->ctb.stbName = taosStrdup(sname);
14,629,469✔
188
    if (!pTbReq->ctb.stbName) return terrno;
14,630,125✔
189
  }
190
  pTbReq->ctb.tagName = taosArrayDup(tagName, NULL);
15,264,289✔
191
  if (!pTbReq->ctb.tagName) return terrno;
15,266,109✔
192
  pTbReq->ttl = ttl;
15,262,954✔
193
  pTbReq->commentLen = -1;
15,265,224✔
194

195
  return TSDB_CODE_SUCCESS;
15,257,430✔
196
}
197

198
static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
×
199
  for (int32_t i = 0; i < ncols; ++i) {
×
200
    pBoundCols[i] = i;
×
201
  }
202
}
×
203

204
static int32_t initColValues(STableMeta* pTableMeta, SArray* pValues) {
592,380,881✔
205
  SSchema* pSchemas = getTableColumnSchema(pTableMeta);
592,380,881✔
206
  int32_t  code = 0;
592,388,830✔
207
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
2,147,483,647✔
208
    SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
2,147,483,647✔
209
    if (NULL == taosArrayPush(pValues, &val)) {
2,147,483,647✔
210
      code = terrno;
×
211
      break;
×
212
    }
213
  }
214
  return code;
592,393,865✔
215
}
216

217
int32_t insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { return initColValues(pTableMeta, aColValues); }
198,707✔
218

219
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
616,620,665✔
220
  pInfo->numOfCols = numOfBound;
616,620,665✔
221
  pInfo->numOfBound = numOfBound;
616,635,629✔
222
  pInfo->hasBoundCols = false;
616,628,990✔
223
  pInfo->mixTagsCols = false;
616,632,294✔
224
  pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t));
616,634,645✔
225
  if (NULL == pInfo->pColIndex) {
616,630,686✔
226
    return terrno;
×
227
  }
228
  for (int32_t i = 0; i < numOfBound; ++i) {
2,147,483,647✔
229
    pInfo->pColIndex[i] = i;
2,147,483,647✔
230
  }
231
  return TSDB_CODE_SUCCESS;
616,633,108✔
232
}
233

234
void insResetBoundColsInfo(SBoundColInfo* pInfo) {
2,862✔
235
  pInfo->numOfBound = pInfo->numOfCols;
2,862✔
236
  pInfo->hasBoundCols = false;
2,862✔
237
  pInfo->mixTagsCols = false;
2,862✔
238
  for (int32_t i = 0; i < pInfo->numOfCols; ++i) {
14,310✔
239
    pInfo->pColIndex[i] = i;
11,448✔
240
  }
241
}
2,862✔
242

243
void insCheckTableDataOrder(STableDataCxt* pTableCxt, SRowKey* rowKey) {
2,147,483,647✔
244
  // once the data block is disordered, we do NOT keep last timestamp any more
245
  if (!pTableCxt->ordered) {
2,147,483,647✔
246
    return;
28,539,023✔
247
  }
248

249
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) < 0) {
2,147,483,647✔
250
    pTableCxt->ordered = false;
1,408,362✔
251
  }
252

253
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) == 0) {
2,147,483,647✔
254
    pTableCxt->duplicateTs = true;
1,763,005✔
255
  }
256

257
  // TODO: for variable length data type, we need to copy it out
258
  pTableCxt->lastKey = *rowKey;
2,147,483,647✔
259
  return;
2,147,483,647✔
260
}
261

262
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput,
594,852,050✔
263
                                  bool colMode, bool ignoreColVals) {
264
  STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
594,852,050✔
265
  if (NULL == pTableCxt) {
594,842,663✔
266
    *pOutput = NULL;
×
267
    return terrno;
×
268
  }
269

270
  int32_t code = TSDB_CODE_SUCCESS;
594,842,663✔
271

272
  pTableCxt->lastKey = (SRowKey){0};
594,842,663✔
273
  pTableCxt->ordered = true;
594,850,317✔
274
  pTableCxt->duplicateTs = false;
594,852,459✔
275

276
  pTableCxt->pMeta = tableMetaDup(pTableMeta);
594,856,319✔
277
  if (NULL == pTableCxt->pMeta) {
594,869,637✔
278
    code = TSDB_CODE_OUT_OF_MEMORY;
×
279
  }
280
  if (TSDB_CODE_SUCCESS == code) {
594,874,353✔
281
    pTableCxt->pSchema =
594,872,627✔
282
        tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
594,874,776✔
283
    if (NULL == pTableCxt->pSchema) {
594,872,808✔
284
      code = TSDB_CODE_OUT_OF_MEMORY;
×
285
    }
286
  }
287
  pTableCxt->hasBlob = schemaHasBlob(pTableCxt->pSchema);
594,874,811✔
288

289
  if (TSDB_CODE_SUCCESS == code) {
594,865,702✔
290
    code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo);
594,864,963✔
291
  }
292
  if (TSDB_CODE_SUCCESS == code && !ignoreColVals) {
594,865,886✔
293
    pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal));
592,185,915✔
294
    if (NULL == pTableCxt->pValues) {
592,178,434✔
295
      code = terrno;
×
296
    } else {
297
      code = initColValues(pTableMeta, pTableCxt->pValues);
592,182,612✔
298
    }
299
  }
300
  if (TSDB_CODE_SUCCESS == code) {
594,874,913✔
301
    pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
594,874,830✔
302
    if (NULL == pTableCxt->pData) {
594,860,152✔
303
      code = terrno;
×
304
    } else {
305
      pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
594,864,541✔
306
      pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
594,871,644✔
307
      pTableCxt->pData->suid = pTableMeta->suid;
594,873,809✔
308
      pTableCxt->pData->uid = pTableMeta->uid;
594,862,292✔
309
      pTableCxt->pData->sver = pTableMeta->sversion;
594,866,175✔
310
      pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
594,870,285✔
311
      int8_t flag = pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT;
594,870,001✔
312
      if (pCreateTbReq != NULL) *pCreateTbReq = NULL;
594,868,013✔
313
      if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
594,867,369✔
314
        pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
6,573,323✔
315
        if (NULL == pTableCxt->pData->aCol) {
6,575,261✔
316
          code = terrno;
×
317
        }
318
      } else {
319
        pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
588,287,134✔
320
        if (NULL == pTableCxt->pData->aRowP) {
588,288,685✔
321
          code = terrno;
×
322
        }
323
      }
324
    }
325
  }
326
  if (TSDB_CODE_SUCCESS == code) {
594,872,319✔
327
    *pOutput = pTableCxt;
594,872,319✔
328
    parserDebug("uid:%" PRId64 ", create table data context, code:%d, vgId:%d", pTableMeta->uid, code,
594,873,424✔
329
                pTableMeta->vgId);
330
  } else {
331
    insDestroyTableDataCxt(pTableCxt);
×
332
  }
333

334
  return code;
594,873,853✔
335
}
336

337
static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst, int8_t hasBlob) {
7,220,277✔
338
  int32_t        code = TSDB_CODE_SUCCESS;
7,220,277✔
339
  SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData));
7,220,277✔
340
  if (NULL == pTmp) {
7,218,676✔
341
    code = terrno;
×
342
  } else {
343
    pTmp->flags = pSrc->flags;
7,218,676✔
344
    pTmp->suid = pSrc->suid;
7,219,545✔
345
    pTmp->uid = pSrc->uid;
7,221,435✔
346
    pTmp->sver = pSrc->sver;
7,221,737✔
347
    pTmp->pCreateTbReq = NULL;
7,221,750✔
348
    if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
7,220,149✔
349
      if (pSrc->pCreateTbReq) {
7,072,406✔
350
        code = cloneSVreateTbReq(pSrc->pCreateTbReq, &pTmp->pCreateTbReq);
7,069,506✔
351
      } else {
352
        pTmp->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
×
353
      }
354
    }
355
    if (TSDB_CODE_SUCCESS == code) {
7,220,277✔
356
      if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
7,220,277✔
357
        pTmp->aCol = taosArrayInit(128, sizeof(SColData));
6,583,153✔
358
        if (NULL == pTmp->aCol) {
6,583,606✔
359
          code = terrno;
×
360
          taosMemoryFree(pTmp);
×
361
        }
362
      } else {
363
        pTmp->aRowP = taosArrayInit(128, POINTER_BYTES);
638,938✔
364
        if (NULL == pTmp->aRowP) {
638,397✔
365
          code = terrno;
×
366
          taosMemoryFree(pTmp);
×
367
        }
368

369
        if (code != 0) {
638,397✔
370
          taosArrayDestroy(pTmp->aRowP);
×
371
          taosMemoryFree(pTmp);
×
372
        }
373
      }
374

375
    } else {
376
      taosMemoryFree(pTmp);
×
377
    }
378
  }
379

380
  taosMemoryFree(pSrc);
7,222,344✔
381
  if (TSDB_CODE_SUCCESS == code) {
7,223,315✔
382
    *pDst = pTmp;
7,222,446✔
383
  }
384

385
  return code;
7,223,630✔
386
}
387

388
static void resetColValues(SArray* pValues) {
32,782,351✔
389
  int32_t num = taosArrayGetSize(pValues);
32,782,351✔
390
  for (int32_t i = 0; i < num; ++i) {
370,132,025✔
391
    SColVal* pVal = taosArrayGet(pValues, i);
337,349,674✔
392
    pVal->flag = CV_FLAG_NONE;
337,349,674✔
393
  }
394
}
32,782,351✔
395

396
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
1,657,290,918✔
397
                           SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals) {
398
  STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
1,657,290,918✔
399
  if (NULL != tmp) {
1,657,307,068✔
400
    *pTableCxt = *tmp;
1,062,439,095✔
401
    if (!ignoreColVals) {
1,062,439,095✔
402
      resetColValues((*pTableCxt)->pValues);
32,782,351✔
403
    }
404
    return TSDB_CODE_SUCCESS;
1,062,439,095✔
405
  }
406
  int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals);
594,867,973✔
407
  if (TSDB_CODE_SUCCESS == code) {
594,871,501✔
408
    void* pData = *pTableCxt;  // deal scan coverity
594,872,507✔
409
    code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
594,873,595✔
410
  }
411

412
  if (TSDB_CODE_SUCCESS != code) {
594,870,812✔
413
    insDestroyTableDataCxt(*pTableCxt);
×
414
  }
415
  return code;
594,872,512✔
416
}
417

418
void destroyColVal(void* p) {
2,147,483,647✔
419
  if (!p) return;
2,147,483,647✔
420

421
  SColVal* pVal = (SColVal*)p;
2,147,483,647✔
422

423
  if (IS_VAR_DATA_TYPE(pVal->value.type) || TSDB_DATA_TYPE_DECIMAL == pVal->value.type) {
2,147,483,647✔
424
    taosMemoryFreeClear(pVal->value.pData);
1,799,835,481✔
425
  }
426
}
427

428
void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
600,520,993✔
429
  if (NULL == pTableCxt) {
600,520,993✔
430
    return;
×
431
  }
432

433
  taosMemoryFreeClear(pTableCxt->pMeta);
600,520,993✔
434
  tDestroyTSchema(pTableCxt->pSchema);
600,517,812✔
435
  qDestroyBoundColInfo(&pTableCxt->boundColsInfo);
600,519,936✔
436
  taosArrayDestroyEx(pTableCxt->pValues, destroyColVal);
600,519,937✔
437
  if (pTableCxt->pData) {
600,520,822✔
438
    tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
13,567,512✔
439
    taosMemoryFree(pTableCxt->pData);
13,566,803✔
440
  }
441
  taosMemoryFree(pTableCxt);
600,519,544✔
442
}
443

444
static void destroyColValSml(void* p) {
10,080,549✔
445
  SColVal* pVal = p;
10,080,549✔
446
  if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type ||
10,080,549✔
447
      TSDB_DATA_TYPE_VARBINARY == pVal->value.type) {
9,858,196✔
448
    taosMemoryFreeClear(pVal->value.pData);
227,327✔
449
  }
450
}
10,080,334✔
451

452
static void insDestroyTableDataCxtSml(STableDataCxt* pTableCxt) {
708,485✔
453
  if (NULL == pTableCxt) {
708,485✔
454
    return;
×
455
  }
456

457
  taosMemoryFreeClear(pTableCxt->pMeta);
708,485✔
458
  tDestroyTSchema(pTableCxt->pSchema);
708,442✔
459
  qDestroyBoundColInfo(&pTableCxt->boundColsInfo);
708,398✔
460
  taosArrayDestroyEx(pTableCxt->pValues, destroyColValSml);
708,472✔
461
  if (pTableCxt->pData) {
708,386✔
462
    tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
708,386✔
463
    taosMemoryFree(pTableCxt->pData);
708,429✔
464
  }
465
  taosMemoryFree(pTableCxt);
708,429✔
466
}
467

468
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
568,770,865✔
469
  if (NULL == pVgCxt) {
568,770,865✔
470
    return;
×
471
  }
472

473
  tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
568,770,865✔
474
  taosMemoryFree(pVgCxt->pData);
568,767,381✔
475

476
  taosMemoryFree(pVgCxt);
568,769,664✔
477
}
478

479
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
1,108,133,328✔
480
  if (NULL == pVgCxtList) {
1,108,133,328✔
481
    return;
554,100,586✔
482
  }
483

484
  size_t size = taosArrayGetSize(pVgCxtList);
554,032,742✔
485
  for (int32_t i = 0; i < size; i++) {
1,122,805,967✔
486
    void* p = taosArrayGetP(pVgCxtList, i);
568,773,723✔
487
    insDestroyVgroupDataCxt(p);
568,776,050✔
488
  }
489

490
  taosArrayDestroy(pVgCxtList);
554,032,244✔
491
}
492

493
void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) {
×
494
  if (NULL == pVgCxtHash) {
×
495
    return;
×
496
  }
497

498
  void** p = taosHashIterate(pVgCxtHash, NULL);
×
499
  while (p) {
×
500
    insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p);
×
501

502
    p = taosHashIterate(pVgCxtHash, p);
×
503
  }
504

505
  taosHashCleanup(pVgCxtHash);
×
506
}
507

508
void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
553,881,129✔
509
  if (NULL == pTableCxtHash) {
553,881,129✔
510
    return;
6,552,671✔
511
  }
512

513
  void** p = taosHashIterate(pTableCxtHash, NULL);
547,328,458✔
514
  while (p) {
1,134,854,668✔
515
    insDestroyTableDataCxt(*(STableDataCxt**)p);
587,524,774✔
516

517
    p = taosHashIterate(pTableCxtHash, p);
587,524,718✔
518
  }
519

520
  taosHashCleanup(pTableCxtHash);
547,329,894✔
521
}
522

523
void insDestroyTableDataCxtHashMapSml(SHashObj* pTableCxtHash) {
957,371✔
524
  if (NULL == pTableCxtHash) {
957,371✔
525
    return;
×
526
  }
527

528
  void** p = taosHashIterate(pTableCxtHash, NULL);
957,371✔
529
  while (p) {
1,665,813✔
530
    insDestroyTableDataCxtSml(*(STableDataCxt**)p);
708,485✔
531

532
    p = taosHashIterate(pTableCxtHash, p);
708,429✔
533
  }
534

535
  taosHashCleanup(pTableCxtHash);
957,328✔
536
}
537

538
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) {
596,179,510✔
539
  int32_t code = 0;
596,179,510✔
540
  if (NULL == pVgCxt->pData->aSubmitTbData) {
596,179,510✔
541
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
568,316,772✔
542
    if (pVgCxt->pData->aSubmitTbData == NULL) {
568,317,529✔
543
      return terrno;
×
544
    }
545
    if (pTableCxt->hasBlob) {
568,318,858✔
546
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
21,419✔
547
      if (NULL == pVgCxt->pData->aSubmitBlobData) {
21,419✔
548
        return terrno;
×
549
      }
550
    }
551
  }
552

553
  // push data to submit, rebuild empty data for next submit
554
  if (!pTableCxt->hasBlob) pTableCxt->pData->pBlobSet = NULL;
596,185,012✔
555

556
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData)) {
1,192,369,182✔
557
    return terrno;
×
558
  }
559

560
  if (pTableCxt->hasBlob) {
596,185,482✔
561
    parserDebug("blob row transfer %p, pData %p, %s", pTableCxt->pData->pBlobSet, pTableCxt->pData, __func__);
21,419✔
562
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTableCxt->pData->pBlobSet)) {
42,838✔
563
      return terrno;
×
564
    }
565
    pTableCxt->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
21,419✔
566
  }
567

568
  if (isRebuild) {
596,180,931✔
569
    code = rebuildTableData(pTableCxt->pData, &pTableCxt->pData, pTableCxt->hasBlob);
7,221,537✔
570
  } else if (clear) {
588,959,394✔
571
    taosMemoryFreeClear(pTableCxt->pData);
587,035,573✔
572
  }
573
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
596,182,789✔
574

575
  return code;
596,180,388✔
576
}
577

578
static int32_t createVgroupDataCxt(int32_t vgId, SHashObj* pVgroupHash, SArray* pVgroupList, SVgroupDataCxt** pOutput) {
568,865,275✔
579
  SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
568,865,275✔
580
  if (NULL == pVgCxt) {
568,867,423✔
581
    return terrno;
×
582
  }
583
  pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2));
568,867,423✔
584
  if (NULL == pVgCxt->pData) {
568,866,465✔
585
    insDestroyVgroupDataCxt(pVgCxt);
×
586
    return terrno;
×
587
  }
588

589
  pVgCxt->vgId = vgId;
568,865,375✔
590
  int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
568,865,975✔
591
  if (TSDB_CODE_SUCCESS == code) {
568,868,893✔
592
    if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) {
568,869,324✔
593
      code = terrno;
×
594
      insDestroyVgroupDataCxt(pVgCxt);
×
595
      return code;
×
596
    }
597
    //    uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
598
    *pOutput = pVgCxt;
568,869,324✔
599
  } else {
600
    insDestroyVgroupDataCxt(pVgCxt);
×
601
  }
602
  return code;
568,868,508✔
603
}
604

605
int insColDataComp(const void* lp, const void* rp) {
28,036,982✔
606
  SColData* pLeft = (SColData*)lp;
28,036,982✔
607
  SColData* pRight = (SColData*)rp;
28,036,982✔
608
  if (pLeft->cid < pRight->cid) {
28,036,982✔
609
    return -1;
27,985,943✔
610
  } else if (pLeft->cid > pRight->cid) {
52,164✔
611
    return 1;
52,164✔
612
  }
613

614
  return 0;
×
615
}
616

617
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId,
76,697✔
618
                                 STableColsData* pTbData, SName* sname) {
619
  if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) {
76,697✔
620
    return TSDB_CODE_SUCCESS;
65,591✔
621
  }
622

623
  SVgroupInfo      vgInfo = {0};
11,106✔
624
  SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
21,992✔
625
                           .requestId = pBuildInfo->requestId,
11,106✔
626
                           .requestObjRefId = pBuildInfo->requestSelf,
11,106✔
627
                           .mgmtEps = pBuildInfo->mgmtEpSet};
628

629
  int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo);
11,106✔
630
  if (TSDB_CODE_SUCCESS != code) {
11,106✔
631
    return code;
×
632
  }
633

634
  code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
11,106✔
635
  if (TSDB_CODE_SUCCESS != code) {
11,063✔
636
    return code;
×
637
  }
638

639
  return TSDB_CODE_SUCCESS;
11,063✔
640
}
641

642
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData,
2,936,345✔
643
                             uint64_t* uid, int32_t* vgId, uint64_t* suid) {
644
  STableVgUid* pTbInfo = NULL;
2,936,345✔
645
  int32_t      code = 0;
2,936,345✔
646

647
  if (pTbData->getFromHash) {
2,936,345✔
648
    pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName));
2,860,401✔
649
  }
650

651
  if (NULL == pTbInfo) {
2,936,697✔
652
    SName sname;
62,084✔
653
    code = qCreateSName2(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0);
77,147✔
654
    if (TSDB_CODE_SUCCESS != code) {
77,147✔
655
      return code;
450✔
656
    }
657

658
    STableMeta*      pTableMeta = NULL;
77,147✔
659
    SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
139,229✔
660
                             .requestId = pBuildInfo->requestId,
77,102✔
661
                             .requestObjRefId = pBuildInfo->requestSelf,
77,145✔
662
                             .mgmtEps = pBuildInfo->mgmtEpSet};
663
    code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta);
77,145✔
664

665
    if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
77,135✔
666
      parserWarn("stmt2 async bind don't find table:%s.%s, try auto create table", sname.dbname, sname.tname);
450✔
667
      return code;
450✔
668
    }
669

670
    if (TSDB_CODE_SUCCESS != code) {
76,685✔
671
      parserError("stmt2 async get table meta:%s.%s failed, code:%d", sname.dbname, sname.tname, code);
×
672
      return code;
×
673
    }
674

675
    *uid = pTableMeta->uid;
76,685✔
676
    *vgId = pTableMeta->vgId;
76,683✔
677
    *suid = pTableMeta->suid;
76,679✔
678

679
    STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId, .suid = *suid};
76,683✔
680
    code = tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo));
76,630✔
681
    if (TSDB_CODE_SUCCESS == code) {
76,697✔
682
      code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname);
76,697✔
683
    }
684

685
    taosMemoryFree(pTableMeta);
76,654✔
686
  } else {
687
    *uid = pTbInfo->uid;
2,859,550✔
688
    *vgId = pTbInfo->vgid;
2,859,692✔
689
    *suid = pTbInfo->suid;
2,859,827✔
690
  }
691

692
  return code;
2,936,130✔
693
}
694

695
int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
×
696
  int32_t             code = TSDB_CODE_SUCCESS;
×
697
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
×
698

699
  if (TSDB_CODE_SUCCESS == code) {
×
700
    code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
×
701
  }
702

703
  return code;
×
704
}
705

706
int32_t checkAndMergeSVgroupDataCxtByTbname(STableDataCxt* pTbCtx, SVgroupDataCxt* pVgCxt, SSHashObj* pTableNameHash,
1,012,026✔
707
                                            char* tbname) {
708
  if (NULL == pVgCxt->pData->aSubmitTbData) {
1,012,026✔
709
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
548,811✔
710
    if (NULL == pVgCxt->pData->aSubmitTbData) {
549,007✔
711
      return terrno;
×
712
    }
713
    if (pTbCtx->hasBlob) {
548,964✔
714
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
×
715
      if (pVgCxt->pData->aSubmitBlobData == NULL) {
×
716
        return terrno;
×
717
      }
718
    }
719
  }
720

721
  int32_t  code = TSDB_CODE_SUCCESS;
1,012,480✔
722
  SArray** rowP = NULL;
1,012,480✔
723

724
  rowP = (SArray**)tSimpleHashGet(pTableNameHash, tbname, strlen(tbname));
1,012,480✔
725

726
  if (rowP != NULL && *rowP != NULL) {
1,012,412✔
727
    int32_t aRowPSize = taosArrayGetSize(pTbCtx->pData->aRowP);
×
728
    for (int32_t j = 0; j < aRowPSize; ++j) {
×
729
      SRow* pRow = (SRow*)taosArrayGetP(pTbCtx->pData->aRowP, j);
×
730
      if (pRow) {
×
731
        if (NULL == taosArrayPush(*rowP, &pRow)) {
×
732
          return terrno;
×
733
        }
734
      }
735
    }
736

737
    if (pTbCtx->hasBlob == 0) {
×
738
      code = tRowSort(*rowP);
×
739
      TAOS_CHECK_RETURN(code);
×
740

741
      code = tRowMerge(*rowP, pTbCtx->pSchema, 0);
×
742
      TAOS_CHECK_RETURN(code);
×
743
    } else {
744
      code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
745
      TAOS_CHECK_RETURN(code);
×
746

747
      code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
748
      TAOS_CHECK_RETURN(code);
×
749
    }
750
  
751
    parserDebug("merge same uid data: %" PRId64 ", vgId:%d", pTbCtx->pData->uid, pVgCxt->vgId);
×
752

753
    taosArrayDestroy(pTbCtx->pData->aRowP);
×
754
    if (pTbCtx->pData->pCreateTbReq != NULL) {
×
755
      tdDestroySVCreateTbReq(pTbCtx->pData->pCreateTbReq);
×
756
      taosMemoryFree(pTbCtx->pData->pCreateTbReq);
×
757
      pTbCtx->pData->pCreateTbReq = NULL;
×
758
    }
759
    return TSDB_CODE_SUCCESS;
×
760
  }
761

762
  if (pTbCtx->hasBlob == 0) {
1,012,412✔
763
    pTbCtx->pData->pBlobSet = NULL;  // if no blob, set it to NULL
1,012,425✔
764
  }
765

766
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTbCtx->pData)) {
2,024,722✔
767
    return terrno;
×
768
  }
769

770
  if (pTbCtx->hasBlob) {
1,012,353✔
771
    parserDebug("blob row transfer %p, pData %p, %s", pTbCtx->pData->pBlobSet, pTbCtx->pData, __func__);
×
772
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTbCtx->pData->pBlobSet)) {
×
773
      return terrno;
×
774
    }
775
    pTbCtx->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
×
776
  }
777

778
  code = tSimpleHashPut(pTableNameHash, tbname, strlen(tbname), &pTbCtx->pData->aRowP, sizeof(SArray*));
1,012,310✔
779

780
  if (code != TSDB_CODE_SUCCESS) {
1,012,284✔
781
    return code;
×
782
  }
783

784
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTbCtx->pMeta->uid, pVgCxt->vgId);
1,012,284✔
785

786
  return TSDB_CODE_SUCCESS;
1,012,327✔
787
}
788

789
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
1,924,035✔
790
                                  SStbInterlaceInfo* pBuildInfo) {
791
  int32_t  code = TSDB_CODE_SUCCESS;
1,924,035✔
792
  uint64_t uid;
1,910,070✔
793
  int32_t  vgId;
1,910,315✔
794
  uint64_t suid;
1,910,413✔
795

796
  pTbCtx->pData->aRowP = pTbData->aCol;
1,924,427✔
797

798
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId, &suid);
1,924,775✔
799
  if (TSDB_CODE_SUCCESS != code) {
1,924,584✔
800
    return code;
×
801
  }
802

803
  pTbCtx->pMeta->vgId = vgId;
1,924,584✔
804
  pTbCtx->pMeta->uid = uid;
1,924,633✔
805
  pTbCtx->pData->uid = uid;
1,924,391✔
806

807
  if (!pTbCtx->ordered) {
1,924,587✔
808
    code = tRowSort(pTbCtx->pData->aRowP);
12✔
809
  }
810
  if (code == TSDB_CODE_SUCCESS && (!pTbCtx->ordered || pTbCtx->duplicateTs)) {
1,924,442✔
811
    code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0);
66✔
812
  }
813

814
  if (TSDB_CODE_SUCCESS != code) {
1,924,234✔
815
    return code;
×
816
  }
817

818
  SVgroupDataCxt* pVgCxt = NULL;
1,924,234✔
819
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
1,924,327✔
820
  if (NULL == pp) {
1,923,901✔
821
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
333,758✔
822
    if (NULL == pp) {
333,848✔
823
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
333,848✔
824
    } else {
825
      pVgCxt = *(SVgroupDataCxt**)pp;
×
826
    }
827
  } else {
828
    pVgCxt = *(SVgroupDataCxt**)pp;
1,590,143✔
829
  }
830

831
  if (TSDB_CODE_SUCCESS == code) {
1,924,433✔
832
    code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false);
1,924,433✔
833
  }
834

835
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
1,924,308✔
836
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
837
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
838
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
839
    // insDestroyVgroupDataCxt(pVgCxt);
840
  }
841

842
  return code;
1,924,293✔
843
}
844

845
int32_t insAppendStmt2TableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
1,011,998✔
846
                                   SStbInterlaceInfo* pBuildInfo, SVCreateTbReq* ctbReq) {
847
  int32_t  code = TSDB_CODE_SUCCESS;
1,011,998✔
848
  uint64_t uid;
316,669✔
849
  int32_t  vgId;
316,673✔
850
  uint64_t suid;
316,673✔
851

852
  pTbCtx->pData->aRowP = pTbData->aCol;
1,012,002✔
853
  pTbCtx->pData->pBlobSet = pTbData->pBlobSet;
1,012,260✔
854

855
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId, &suid);
1,012,303✔
856
  if (ctbReq != NULL && code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
1,012,180✔
857
    pTbCtx->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
450✔
858
    vgId = (int32_t)ctbReq->uid;
450✔
859
    uid = 0;
450✔
860
    pTbCtx->pMeta->vgId = (int32_t)ctbReq->uid;
450✔
861
    ctbReq->uid = 0;
450✔
862
    pTbCtx->pMeta->uid = 0;
450✔
863
    pTbCtx->pData->uid = 0;
450✔
864
    pTbCtx->pData->pCreateTbReq = ctbReq;
450✔
865
    code = TSDB_CODE_SUCCESS;
450✔
866
  } else {
867
    if (TSDB_CODE_SUCCESS != code) {
1,011,730✔
868
      return code;
×
869
    }
870
    if (pTbCtx->pData->suid != suid) {
1,011,730✔
871
      return TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
872
    }
873

874
    pTbCtx->pMeta->vgId = vgId;
1,011,902✔
875
    pTbCtx->pMeta->uid = uid;
1,011,816✔
876
    pTbCtx->pData->uid = uid;
1,011,902✔
877
    pTbCtx->pData->pCreateTbReq = NULL;
1,011,902✔
878

879
    if (ctbReq != NULL) {
1,011,945✔
880
      tdDestroySVCreateTbReq(ctbReq);
881
      taosMemoryFree(ctbReq);
588,406✔
882
      ctbReq = NULL;
588,381✔
883
    }
884
  }
885

886
  if (pTbCtx->hasBlob == 0) {
1,012,442✔
887
    if (!pTbData->isOrdered) {
1,012,449✔
888
      code = tRowSort(pTbCtx->pData->aRowP);
×
889
    }
890
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
1,012,277✔
891
      code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, PREFER_NON_NULL);
×
892
    }
893
  } else {
894
    if (!pTbData->isOrdered) {
36✔
895
      code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
896
    }
897
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
36✔
898
      code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
899
    }
900
  }
901

902
  if (TSDB_CODE_SUCCESS != code) {
1,012,192✔
903
    return code;
×
904
  }
905

906
  SVgroupDataCxt* pVgCxt = NULL;
1,012,192✔
907
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
1,012,278✔
908
  if (NULL == pp) {
1,012,196✔
909
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
548,902✔
910
    if (NULL == pp) {
549,063✔
911
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
549,063✔
912
    } else {
913
      pVgCxt = *(SVgroupDataCxt**)pp;
×
914
    }
915
  } else {
916
    pVgCxt = *(SVgroupDataCxt**)pp;
463,294✔
917
  }
918

919
  if (code == TSDB_CODE_SUCCESS) {
1,012,223✔
920
    code = checkAndMergeSVgroupDataCxtByTbname(pTbCtx, pVgCxt, pBuildInfo->pTableRowDataHash, pTbData->tbName);
1,012,199✔
921
  }
922

923
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
1,012,265✔
924
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
925
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
926
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
927
    // insDestroyVgroupDataCxt(pVgCxt);
928
  }
929

930
  return code;
1,012,174✔
931
}
932

933
/*
934
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild,
935
int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
936
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
937
  if (NULL == pVgroupHash || NULL == pVgroupList) {
938
    taosHashCleanup(pVgroupHash);
939
    taosArrayDestroy(pVgroupList);
940
    return TSDB_CODE_OUT_OF_MEMORY;
941
  }
942

943
  int32_t code = TSDB_CODE_SUCCESS;
944

945
  for (int32_t i = 0; i < tbNum; ++i) {
946
    STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i);
947
    pTableCxt->pMeta->vgId = pTableCols->vgId;
948
    pTableCxt->pMeta->uid = pTableCols->uid;
949
    pTableCxt->pData->uid = pTableCols->uid;
950
    pTableCxt->pData->aCol = pTableCols->aCol;
951

952
    SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
953
    if (pCol->nVal <= 0) {
954
      continue;
955
    }
956

957
    if (pTableCxt->pData->pCreateTbReq) {
958
      pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
959
    }
960

961
    taosArraySort(pTableCxt->pData->aCol, insColDataComp);
962

963
    tColDataSortMerge(pTableCxt->pData->aCol);
964

965
    if (TSDB_CODE_SUCCESS == code) {
966
      SVgroupDataCxt* pVgCxt = NULL;
967
      int32_t         vgId = pTableCxt->pMeta->vgId;
968
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
969
      if (NULL == pp) {
970
        code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
971
      } else {
972
        pVgCxt = *(SVgroupDataCxt**)pp;
973
      }
974
      if (TSDB_CODE_SUCCESS == code) {
975
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false);
976
      }
977
    }
978
  }
979

980
  taosHashCleanup(pVgroupHash);
981
  if (TSDB_CODE_SUCCESS == code) {
982
    *pVgDataBlocks = pVgroupList;
983
  } else {
984
    insDestroyVgroupDataCxtList(pVgroupList);
985
  }
986

987
  return code;
988
}
989
*/
990

991
static int8_t colDataHasBlob(SColData* pCol) {
×
992
  if (IS_STR_DATA_BLOB(pCol->type)) {
×
993
    return 1;
×
994
  }
995
  return 0;
×
996
}
997
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
553,314,305✔
998
  SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
553,314,305✔
999
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
553,320,179✔
1000
  if (NULL == pVgroupHash || NULL == pVgroupList) {
553,316,584✔
1001
    taosHashCleanup(pVgroupHash);
973✔
1002
    taosArrayDestroy(pVgroupList);
×
1003
    return terrno;
×
1004
  }
1005

1006
  int32_t code = TSDB_CODE_SUCCESS;
553,315,611✔
1007
  bool    colFormat = false;
553,315,611✔
1008

1009
  void* p = taosHashIterate(pTableHash, NULL);
553,315,611✔
1010
  if (p) {
553,324,621✔
1011
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
553,324,573✔
1012
    colFormat = (0 != (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT));
553,324,870✔
1013
  }
1014

1015
  while (TSDB_CODE_SUCCESS == code && NULL != p) {
1,147,586,718✔
1016
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
594,262,002✔
1017
    if (colFormat) {
594,262,002✔
1018
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
6,586,047✔
1019
      if (pCol && pCol->nVal <= 0) {
6,586,047✔
1020
        p = taosHashIterate(pTableHash, p);
135✔
1021
        continue;
135✔
1022
      }
1023

1024
      if (pTableCxt->pData->pCreateTbReq) {
6,585,912✔
1025
        pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
6,446,835✔
1026
      }
1027
      int8_t isBlob = IS_STR_DATA_BLOB(pCol->type) ? 1 : 0;
6,585,584✔
1028
      if (isBlob == 0) {
6,585,912✔
1029
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
6,585,912✔
1030
        code = tColDataSortMerge(&pTableCxt->pData->aCol);
6,582,825✔
1031
      } else {
1032
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
×
1033
        code = tColDataSortMergeWithBlob(&pTableCxt->pData->aCol, pTableCxt->pData->pBlobSet);
×
1034
      }
1035
    } else {
1036
      // skip the table has no data to insert
1037
      // eg: import a csv without valid data
1038
      // if (0 == taosArrayGetSize(pTableCxt->pData->aRowP)) {
1039
      //   parserWarn("no row in tableDataCxt uid:%" PRId64 " ", pTableCxt->pMeta->uid);
1040
      //   p = taosHashIterate(pTableHash, p);
1041
      //   continue;
1042
      // }
1043
      if (pTableCxt->hasBlob == 0) {
587,675,955✔
1044
        if (!pTableCxt->ordered) {
587,654,549✔
1045
          code = tRowSort(pTableCxt->pData->aRowP);
1,407,627✔
1046
        }
1047
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
587,653,685✔
1048
          code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, PREFER_NON_NULL);
1,538,522✔
1049
        }
1050
      } else {
1051
        if (!pTableCxt->ordered) {
21,419✔
1052
          code = tRowSortWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet);
735✔
1053
        }
1054
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
21,419✔
1055
          code = tRowMergeWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet, 0);
735✔
1056
        }
1057
      }
1058
    }
1059

1060
    if (TSDB_CODE_SUCCESS == code) {
594,258,851✔
1061
      SVgroupDataCxt* pVgCxt = NULL;
594,258,851✔
1062
      int32_t         vgId = pTableCxt->pMeta->vgId;
594,259,909✔
1063
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
594,258,170✔
1064
      if (NULL == pp) {
594,258,134✔
1065
        code = createVgroupDataCxt(vgId, pVgroupHash, pVgroupList, &pVgCxt);
567,984,440✔
1066
      } else {
1067
        pVgCxt = *(SVgroupDataCxt**)pp;
26,273,694✔
1068
      }
1069
      if (TSDB_CODE_SUCCESS == code) {
594,259,332✔
1070
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true);
594,259,415✔
1071
      }
1072
    }
1073
    if (TSDB_CODE_SUCCESS == code) {
594,259,162✔
1074
      p = taosHashIterate(pTableHash, p);
594,259,174✔
1075
    }
1076
  }
1077

1078
  taosHashCleanup(pVgroupHash);
553,324,716✔
1079
  if (TSDB_CODE_SUCCESS == code) {
553,317,718✔
1080
    *pVgDataBlocks = pVgroupList;
553,319,034✔
1081
  } else {
UNCOV
1082
    insDestroyVgroupDataCxtList(pVgroupList);
×
1083
  }
1084

1085
  return code;
553,320,328✔
1086
}
1087

1088
static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
568,859,993✔
1089
  int32_t  code = TSDB_CODE_SUCCESS;
568,859,993✔
1090
  uint32_t len = 0;
568,859,993✔
1091
  void*    pBuf = NULL;
568,859,993✔
1092
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
568,859,993✔
1093
  if (TSDB_CODE_SUCCESS == code) {
568,862,130✔
1094
    SEncoder encoder;
566,014,684✔
1095
    len += sizeof(SSubmitReq2Msg);
568,855,604✔
1096
    pBuf = taosMemoryMalloc(len);
568,855,604✔
1097
    if (NULL == pBuf) {
568,851,830✔
1098
      return terrno;
×
1099
    }
1100
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
568,851,830✔
1101
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
568,859,661✔
1102
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
568,861,406✔
1103
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
568,865,105✔
1104
    code = tEncodeSubmitReq(&encoder, pReq);
568,863,607✔
1105
    tEncoderClear(&encoder);
568,868,502✔
1106
  }
1107

1108
  if (TSDB_CODE_SUCCESS == code) {
568,867,128✔
1109
    *pData = pBuf;
568,867,128✔
1110
    *pLen = len;
568,866,564✔
1111
  } else {
1112
    taosMemoryFree(pBuf);
×
1113
  }
1114
  return code;
568,858,261✔
1115
}
1116

1117
static void destroyVgDataBlocks(void* p) {
×
1118
  if (p == NULL) return;
×
1119
  SVgDataBlocks* pVg = p;
×
1120
  taosMemoryFree(pVg->pData);
×
1121
  taosMemoryFree(pVg);
×
1122
}
1123

1124
int32_t insResetBlob(SSubmitReq2* p) {
568,860,388✔
1125
  int32_t code = 0;
568,860,388✔
1126
  if (p->raw) {
568,860,388✔
1127
    return TSDB_CODE_SUCCESS;  // no blob data in raw mode
×
1128
  }
1129

1130
  if (p->aSubmitBlobData != NULL) {
568,864,478✔
1131
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
42,838✔
1132
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
21,419✔
1133
      SBlobSet**     ppBlob = taosArrayGet(p->aSubmitBlobData, i);
21,419✔
1134
      SBlobSet*      pBlob = ppBlob ? *ppBlob : NULL;
21,419✔
1135
      int32_t        nrow = taosArrayGetSize(pSubmitTbData->aRowP);
21,419✔
1136
      int32_t        nblob = 0;
21,419✔
1137
      if (nrow > 0 && pBlob) {
21,419✔
1138
        nblob = taosArrayGetSize(pBlob->pSeqTable);
21,419✔
1139
      }
1140
      uTrace("blob %p row size %d, pData size %d", pBlob, nblob, nrow);
21,419✔
1141
      pSubmitTbData->pBlobSet = pBlob;
21,419✔
1142
      if (ppBlob != NULL) *ppBlob = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
21,419✔
1143
    }
1144
  } else {
1145
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
1,166,013,511✔
1146
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
597,171,326✔
1147
      pSubmitTbData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
597,171,549✔
1148
    }
1149
  }
1150

1151
  return code;
568,868,431✔
1152
}
1153
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) {
554,128,966✔
1154
  size_t  numOfVg = taosArrayGetSize(pVgDataCxtList);
554,128,966✔
1155
  SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES);
554,131,624✔
1156
  if (NULL == pDataBlocks) {
554,129,845✔
1157
    return TSDB_CODE_OUT_OF_MEMORY;
×
1158
  }
1159

1160
  int32_t code = TSDB_CODE_SUCCESS;
554,129,845✔
1161
  for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
1,122,996,974✔
1162
    SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
568,865,988✔
1163
    if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) {
568,867,980✔
1164
      continue;
×
1165
    }
1166
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
568,866,771✔
1167
    if (NULL == dst) {
568,856,436✔
1168
      code = terrno;
×
1169
    }
1170

1171
    if (TSDB_CODE_SUCCESS == code) {
568,856,436✔
1172
      dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
568,857,221✔
1173
      code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
568,862,216✔
1174
    }
1175
    if (TSDB_CODE_SUCCESS == code) {
568,870,263✔
1176
      code = insResetBlob(src->pData);
568,870,263✔
1177
    }
1178

1179
    if (TSDB_CODE_SUCCESS == code) {
568,862,212✔
1180
      code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
568,862,212✔
1181
    }
1182
    if (TSDB_CODE_SUCCESS == code) {
568,857,579✔
1183
      code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);
568,867,142✔
1184
    }
1185
    if (TSDB_CODE_SUCCESS != code) {
568,867,495✔
1186
      destroyVgDataBlocks(dst);
×
1187
    }
1188
  }
1189

1190
  if (append) {
554,130,986✔
1191
    if (NULL == *pVgDataBlocks) {
809,136✔
1192
      *pVgDataBlocks = pDataBlocks;
809,180✔
1193
    }
1194
    return code;
809,315✔
1195
  }
1196

1197
  if (TSDB_CODE_SUCCESS == code) {
553,321,850✔
1198
    *pVgDataBlocks = pDataBlocks;
553,313,157✔
1199
  } else {
1200
    taosArrayDestroyP(pDataBlocks, destroyVgDataBlocks);
8,694✔
1201
  }
1202

1203
  return code;
553,317,183✔
1204
}
1205

1206
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
×
1207
  for (int i = 0; i < numFields; i++) {
×
1208
    if (strcmp(pSchema->name, fields[i].name) == 0) {
×
1209
      return true;
×
1210
    }
1211
  }
1212

1213
  return false;
×
1214
}
1215

1216
int32_t checkSchema(SSchema* pColSchema, SSchemaExt* pColExtSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
162,754✔
1217
  if (*fields != pColSchema->type) {
162,754✔
1218
    if (errstr != NULL) {
273✔
1219
      snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
×
1220
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
×
1221
    } else {
1222
      char buf[512] = {0};
273✔
1223
      snprintf(buf, sizeof(buf), "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
546✔
1224
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
546✔
1225
      uError("checkSchema %s", buf);
273✔
1226
    }
1227
    return TSDB_CODE_INVALID_PARA;
273✔
1228
  }
1229

1230
  if (IS_DECIMAL_TYPE(pColSchema->type)) {
162,481✔
1231
    uint8_t precision = 0, scale = 0;
273✔
1232
    decimalFromTypeMod(pColExtSchema->typeMod, &precision, &scale);
273✔
1233
    uint8_t precisionData = 0, scaleData = 0;
273✔
1234
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
273✔
1235
    extractDecimalTypeInfoFromBytes(&bytes, &precisionData, &scaleData);
273✔
1236
    if (precision != precisionData || scale != scaleData) {
273✔
1237
      if (errstr != NULL) {
×
1238
        snprintf(errstr, errstrLen,
×
1239
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
1240
                 "precision:%d, scale:%d",
1241
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
1242
                 precisionData, scaleData);
1243
        return TSDB_CODE_INVALID_PARA;
×
1244
      } else {
1245
        char buf[512] = {0};
×
1246
        snprintf(buf, sizeof(buf),
×
1247
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
1248
                 "precision:%d, scale:%d",
1249
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
1250
                 precisionData, scaleData);
1251
        uError("checkSchema %s", buf);
×
1252
        return TSDB_CODE_INVALID_PARA;
×
1253
      }
1254
    }
1255
    return 0;
273✔
1256
  }
1257

1258
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {
162,208✔
1259
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
28,384✔
1260
    if (IS_STR_DATA_BLOB(pColSchema->type)) {
28,384✔
1261
      if (bytes >= TSDB_MAX_BLOB_LEN) {
×
1262
        uError("column blob data bytes exceed max limit, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
×
1263
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name, bytes);
1264
        return TSDB_CODE_INVALID_PARA;
×
1265
      }
1266
    } else {
1267
      if (bytes > pColSchema->bytes) {
28,384✔
1268
        if (errstr != NULL) {
273✔
1269
          snprintf(errstr, errstrLen,
×
1270
                   "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1271
                   pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1272
                   *(int32_t*)(fields + sizeof(int8_t)));
×
1273
        } else {
1274
          char buf[512] = {0};
273✔
1275
          snprintf(buf, sizeof(buf),
546✔
1276
                   "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1277
                   pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
819✔
1278
                   *(int32_t*)(fields + sizeof(int8_t)));
273✔
1279
          uError("checkSchema %s", buf);
273✔
1280
        }
1281
        return TSDB_CODE_INVALID_PARA;
273✔
1282
      }
1283
    }
1284
  }
1285

1286
  if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
161,935✔
1287
    if (errstr != NULL) {
×
1288
      snprintf(errstr, errstrLen,
×
1289
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1290
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1291
               *(int32_t*)(fields + sizeof(int8_t)));
×
1292
    } else {
1293
      char buf[512] = {0};
×
1294
      snprintf(buf, sizeof(buf),
×
1295
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1296
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1297
               *(int32_t*)(fields + sizeof(int8_t)));
×
1298
      uError("checkSchema %s", buf);
×
1299
    }
1300
    return TSDB_CODE_INVALID_PARA;
×
1301
  }
1302
  return 0;
161,935✔
1303
}
1304

1305
#define PRCESS_DATA(i, j)                                                                                          \
1306
  ret = checkSchema(pColSchema, pColExtSchema, fields, errstr, errstrLen);                                         \
1307
  if (ret != 0) {                                                                                                  \
1308
    goto end;                                                                                                      \
1309
  }                                                                                                                \
1310
                                                                                                                   \
1311
  if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {                                                          \
1312
    hasTs = true;                                                                                                  \
1313
  }                                                                                                                \
1314
                                                                                                                   \
1315
  int8_t* offset = pStart;                                                                                         \
1316
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {                                                                        \
1317
    pStart += numOfRows * sizeof(int32_t);                                                                         \
1318
  } else {                                                                                                         \
1319
    pStart += BitmapLen(numOfRows);                                                                                \
1320
  }                                                                                                                \
1321
  char* pData = pStart;                                                                                            \
1322
                                                                                                                   \
1323
  SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);                                                        \
1324
  if (hasBlob) {                                                                                                   \
1325
    ret = tColDataAddValueByDataBlockWithBlob(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData, \
1326
                                              pBlobSet);                                                           \
1327
  } else {                                                                                                         \
1328
    ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);        \
1329
  }                                                                                                                \
1330
  if (ret != 0) {                                                                                                  \
1331
    goto end;                                                                                                      \
1332
  }                                                                                                                \
1333
  fields += sizeof(int8_t) + sizeof(int32_t);                                                                      \
1334
  if (needChangeLength && version == BLOCK_VERSION_1) {                                                            \
1335
    pStart += htonl(colLength[i]);                                                                                 \
1336
  } else {                                                                                                         \
1337
    pStart += colLength[i];                                                                                        \
1338
  }                                                                                                                \
1339
  boundInfo->pColIndex[j] = -1;
1340

1341
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
35,318✔
1342
                     int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
1343
  int       ret = 0;
35,318✔
1344
  int8_t    hasBlob = 0;
35,318✔
1345
  SBlobSet* pBlobSet = NULL;
35,318✔
1346
  if (data == NULL) {
35,318✔
1347
    uError("rawBlockBindData, data is NULL");
×
1348
    return TSDB_CODE_APP_ERROR;
×
1349
  }
1350
  void* tmp =
1351
      taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
35,318✔
1352
  SVCreateTbReq* pCreateReqTmp = NULL;
35,318✔
1353
  if (tmp == NULL && pCreateTb != NULL) {
35,318✔
1354
    ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
3,356✔
1355
    if (ret != TSDB_CODE_SUCCESS) {
3,356✔
1356
      uError("cloneSVreateTbReq error");
×
1357
      goto end;
×
1358
    }
1359
  }
1360

1361
  STableDataCxt* pTableCxt = NULL;
35,318✔
1362
  ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
35,318✔
1363
                           sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
1364
  if (pCreateReqTmp != NULL) {
35,318✔
1365
    tdDestroySVCreateTbReq(pCreateReqTmp);
×
1366
    taosMemoryFree(pCreateReqTmp);
×
1367
  }
1368

1369
  hasBlob = pTableCxt->hasBlob;
35,318✔
1370
  if (hasBlob && pTableCxt->pData->pBlobSet == NULL) {
35,318✔
1371
    ret = tBlobSetCreate(512, 0, &pTableCxt->pData->pBlobSet);
×
1372
    if (pTableCxt->pData->pBlobSet == NULL) {
×
1373
      uError("create blob set failed");
×
1374
      ret = terrno;
×
1375
    }
1376
  }
1377

1378
  if (ret != TSDB_CODE_SUCCESS) {
35,318✔
1379
    uError("insGetTableDataCxt error");
×
1380
    goto end;
×
1381
  }
1382
  pBlobSet = pTableCxt->pData->pBlobSet;
35,318✔
1383

1384
  pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
35,318✔
1385
  if (tmp == NULL) {
35,318✔
1386
    ret = initTableColSubmitData(pTableCxt);
31,678✔
1387
    if (ret != TSDB_CODE_SUCCESS) {
31,678✔
1388
      uError("initTableColSubmitData error");
×
1389
      goto end;
×
1390
    }
1391
  }
1392

1393
  char* p = (char*)data;
35,318✔
1394
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1395
  // column length |
1396
  int32_t version = *(int32_t*)data;
35,318✔
1397
  p += sizeof(int32_t);
35,318✔
1398
  p += sizeof(int32_t);
35,318✔
1399

1400
  int32_t numOfRows = *(int32_t*)p;
35,318✔
1401
  p += sizeof(int32_t);
35,318✔
1402

1403
  int32_t numOfCols = *(int32_t*)p;
35,318✔
1404
  p += sizeof(int32_t);
35,318✔
1405

1406
  p += sizeof(int32_t);
35,318✔
1407
  p += sizeof(uint64_t);
35,318✔
1408

1409
  int8_t* fields = (int8_t*)p;
35,318✔
1410
  if (*fields >= TSDB_DATA_TYPE_MAX || *fields < 0) {
35,318✔
1411
    uError("fields type error:%d", *fields);
×
1412
    ret = TSDB_CODE_INVALID_PARA;
×
1413
    goto end;
×
1414
  }
1415
  p += numOfCols * (sizeof(int8_t) + sizeof(int32_t));
35,318✔
1416

1417
  int32_t* colLength = (int32_t*)p;
35,318✔
1418
  p += sizeof(int32_t) * numOfCols;
35,318✔
1419

1420
  char* pStart = p;
35,318✔
1421

1422
  SSchema*       pSchema = getTableColumnSchema(pTableMeta);
35,318✔
1423
  SSchemaExt*    pExtSchemas = getTableColumnExtSchema(pTableMeta);
35,318✔
1424
  SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
35,318✔
1425

1426
  if (tFields != NULL && numFields != numOfCols) {
35,318✔
1427
    if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to data cols:%d", numFields, numOfCols);
×
1428
    ret = TSDB_CODE_INVALID_PARA;
×
1429
    goto end;
×
1430
  }
1431

1432
  bool hasTs = false;
35,318✔
1433
  if (tFields == NULL) {
35,318✔
1434
    int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
1,638✔
1435
    for (int j = 0; j < len; j++) {
4,914✔
1436
      SSchema*    pColSchema = &pSchema[j];
3,822✔
1437
      SSchemaExt* pColExtSchema = &pExtSchemas[j];
3,822✔
1438
      PRCESS_DATA(j, j)
3,822✔
1439
    }
1440
  } else {
1441
    for (int i = 0; i < numFields; i++) {
181,959✔
1442
      for (int j = 0; j < boundInfo->numOfBound; j++) {
1,373,440✔
1443
        SSchema*    pColSchema = &pSchema[j];
1,373,440✔
1444
        SSchemaExt* pColExtSchema = &pExtSchemas[j];
1,373,440✔
1445
        char*       fieldName = NULL;
1,373,440✔
1446
        if (raw) {
1,373,440✔
1447
          fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
1,372,075✔
1448
        } else {
1449
          fieldName = ((TAOS_FIELD*)tFields)[i].name;
1,365✔
1450
        }
1451
        if (strcmp(pColSchema->name, fieldName) == 0) {
1,373,440✔
1452
          PRCESS_DATA(i, j)
148,279✔
1453
          break;
148,279✔
1454
        }
1455
      }
1456
    }
1457
  }
1458

1459
  if (!hasTs) {
34,772✔
1460
    if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
×
1461
    ret = TSDB_CODE_INVALID_PARA;
×
1462
    goto end;
×
1463
  }
1464

1465
  // process NULL data
1466
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
189,661✔
1467
    if (boundInfo->pColIndex[c] != -1) {
154,889✔
1468
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
3,607✔
1469
      ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
3,607✔
1470
      if (ret != 0) {
3,607✔
1471
        goto end;
×
1472
      }
1473
    } else {
1474
      boundInfo->pColIndex[c] = c;  // restore for next block
151,282✔
1475
    }
1476
  }
1477

1478
end:
35,318✔
1479
  return ret;
35,318✔
1480
}
1481

1482
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data) {
×
1483
  int code = transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion);
×
1484
  if (code != 0) {
×
1485
    return code;
×
1486
  }
1487
  SVgroupDataCxt* pVgCxt = NULL;
×
1488
  void**          pp = taosHashGet(pVgroupHash, &pTableMeta->vgId, sizeof(pTableMeta->vgId));
×
1489
  if (NULL == pp) {
×
1490
    code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt);
×
1491
    if (code != 0) {
×
1492
      return code;
×
1493
    }
1494
  } else {
1495
    pVgCxt = *(SVgroupDataCxt**)pp;
×
1496
  }
1497
  if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1498
    pVgCxt->pData->aSubmitTbData = taosArrayInit(0, POINTER_BYTES);
×
1499
    pVgCxt->pData->raw = true;
×
1500
    if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1501
      return terrno;
×
1502
    }
1503
  }
1504

1505
  // push data to submit, rebuild empty data for next submit
1506
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, &data)) {
×
1507
    return terrno;
×
1508
  }
1509

1510
  uTrace("add raw data to vgId:%d, len:%d", pTableMeta->vgId, *(int32_t*)data);
×
1511

1512
  return 0;
×
1513
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc