• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5049

11 May 2026 06:30AM UTC coverage: 73.313% (+0.09%) from 73.222%
#5049

push

travis-ci

web-flow
feat: refactor taosdump code to improve backup speed and compression ratio (#35292)

6625 of 8435 new or added lines in 28 files covered. (78.54%)

2491 existing lines in 142 files now uncovered.

281233 of 383605 relevant lines covered (73.31%)

132489999.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.27
/source/libs/parser/src/parInsertUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "parInsertUtil.h"
17

18
#include "catalog.h"
19
#include "parInt.h"
20
#include "parUtil.h"
21
#include "querynodes.h"
22
#include "tRealloc.h"
23
#include "taoserror.h"
24
#include "tarray.h"
25
#include "tdatablock.h"
26
#include "tdataformat.h"
27
#include "tmisce.h"
28
#include "ttypes.h"
29

30
static char* tableNameGetPosition(SToken* pToken, char target) {
763,922,972✔
31
  bool inEscape = false;
763,922,972✔
32
  bool inQuote = false;
763,922,972✔
33
  char quotaStr = 0;
763,922,972✔
34

35
  for (uint32_t i = 0; i < pToken->n; ++i) {
2,147,483,647✔
36
    if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
2,147,483,647✔
37
      return pToken->z + i;
232,593,547✔
38
    }
39

40
    if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
2,147,483,647✔
41
      if (!inQuote) {
50,803,067✔
42
        inEscape = !inEscape;
50,801,253✔
43
      }
44
    }
45

46
    if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
2,147,483,647✔
47
      if (!inEscape) {
7,360,908✔
48
        if (!inQuote) {
7,358,688✔
49
          quotaStr = *(pToken->z + i);
3,679,344✔
50
          inQuote = !inQuote;
3,679,344✔
51
        } else if (quotaStr == *(pToken->z + i)) {
3,679,344✔
52
          inQuote = !inQuote;
3,679,344✔
53
        }
54
      }
55
    }
56
  }
57

58
  return NULL;
531,391,629✔
59
}
60

61
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
763,922,408✔
62
  const char* msg1 = "table name is too long";
763,922,408✔
63
  const char* msg2 = "invalid database name";
763,922,408✔
64
  const char* msg3 = "db is not specified";
763,922,408✔
65
  const char* msg4 = "invalid table name";
763,922,408✔
66
  const char* msg5 = "database name is too long";
763,922,408✔
67

68
  int32_t code = TSDB_CODE_SUCCESS;
763,922,408✔
69
  char*   p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
763,922,408✔
70

71
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
763,988,597✔
72
    // before dbname dequote
73
    int32_t dbLen = p - pTableName->z;
232,595,257✔
74
    if (dbLen <= 0) {
232,590,304✔
75
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
396✔
76
    }
77
    if (dbLen >= TSDB_DB_FNAME_LEN + TSDB_NAME_QUOTE) {
232,590,304✔
78
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg5);
×
79
    }
80

81
    char name[TSDB_DB_FNAME_LEN + TSDB_NAME_QUOTE] = {0};
232,590,304✔
82
    tstrncpy(name, pTableName->z, dbLen < sizeof(name) ? dbLen + 1 : sizeof(name));
232,581,169✔
83
    int32_t actualDbLen = strdequote(name);
232,578,287✔
84

85
    // after dbname dequote
86
    if (actualDbLen <= 0) {
232,577,713✔
87
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
88
    }
89
    if (actualDbLen >= TSDB_DB_NAME_LEN) {
232,577,726✔
90
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg5);
×
91
    }
92

93
    code = tNameSetDbName(pName, acctId, name, actualDbLen);
232,577,726✔
94
    if (code != TSDB_CODE_SUCCESS) {
232,584,025✔
95
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
96
    }
97

98
    // before tbname dequote
99
    int32_t tbLen = pTableName->n - dbLen - 1;
232,584,038✔
100
    if (tbLen <= 0) {
232,576,394✔
UNCOV
101
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
×
102
    }
103
    if (tbLen >= TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE) {
232,581,966✔
104
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
563✔
105
    }
106

107
    char tbname[TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE] = {0};
232,581,403✔
108
    tstrncpy(tbname, p + 1, tbLen < sizeof(tbname) ? tbLen + 1 : sizeof(tbname));
232,584,369✔
109
    int32_t actualTbLen = strdequote(tbname);
232,576,424✔
110

111
    // after tbname dequote
112
    if (actualTbLen <= 0) {
232,574,245✔
113
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
10✔
114
    }
115
    if (actualTbLen >= TSDB_TABLE_NAME_LEN) {
232,574,235✔
116
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
167✔
117
    }
118

119
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
232,574,068✔
120
    if (code != 0) {
232,589,261✔
121
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
365✔
122
    }
123
  } else {  // get current DB name first, and then set it into path
124
    // before tbname dequote
125
    int32_t tbLen = pTableName->n;
531,393,340✔
126
    if (tbLen <= 0) {
531,326,543✔
127
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
990✔
128
    }
129

130
    if (tbLen >= TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE) {
531,348,996✔
131
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
959✔
132
    }
133

134
    char tbname[TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE] = {0};
531,348,037✔
135
    tstrncpy(tbname, pTableName->z, tbLen < sizeof(tbname) ? tbLen + 1 : sizeof(tbname));
531,381,866✔
136
    int32_t actualTbLen = strdequote(tbname);
531,312,802✔
137
    // after tbname dequote
138
    if (actualTbLen <= 0) {
531,269,886✔
139
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
482✔
140
    }
141
    if (actualTbLen >= TSDB_TABLE_NAME_LEN) {
531,269,405✔
142
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
167✔
143
    }
144

145
    if (dbName == NULL || strlen(dbName) == 0) {
531,269,238✔
146
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_DB_NOT_SPECIFIED, msg3);
891✔
147
    }
148

149
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
531,394,785✔
150
    if (code != TSDB_CODE_SUCCESS) {
531,339,244✔
151
      code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
152
      return code;
×
153
    }
154

155
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
531,339,244✔
156
    if (code != 0) {
531,369,912✔
157
      code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
×
158
    }
159
  }
160

161
  if (NULL != strchr(pName->tname, '.')) {
763,947,770✔
162
    code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'");
×
163
  }
164

165
  return code;
763,901,961✔
166
}
167
#if 0 // converted to static inline function in parInsertUtil.h
168
int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchema) {
169
  while (start < end) {
170
    if (strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
171
      return start;
172
    }
173
    ++start;
174
  }
175
  return -1;
176
}
177
#endif
178

179
int32_t insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
18,350,396✔
180
                            SArray* tagName, uint8_t tagNum, int32_t ttl) {
181
  pTbReq->type = TD_CHILD_TABLE;
18,350,396✔
182
  pTbReq->ctb.pTag = (uint8_t*)pTag;
18,355,080✔
183
  pTbReq->name = taosStrdup(tname);
18,349,827✔
184
  if (!pTbReq->name) return terrno;
18,353,706✔
185
  pTbReq->ctb.suid = suid;
18,347,133✔
186
  pTbReq->ctb.tagNum = tagNum;
18,342,908✔
187
  if (sname) {
18,335,958✔
188
    pTbReq->ctb.stbName = taosStrdup(sname);
17,588,341✔
189
    if (!pTbReq->ctb.stbName) return terrno;
17,592,865✔
190
  }
191
  pTbReq->ctb.tagName = taosArrayDup(tagName, NULL);
18,339,750✔
192
  if (!pTbReq->ctb.tagName) return terrno;
18,348,965✔
193
  pTbReq->ttl = ttl;
18,340,244✔
194
  pTbReq->commentLen = -1;
18,341,037✔
195

196
  return TSDB_CODE_SUCCESS;
18,341,067✔
197
}
198

199
static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
×
200
  for (int32_t i = 0; i < ncols; ++i) {
×
201
    pBoundCols[i] = i;
×
202
  }
203
}
×
204

205
static int32_t initColValues(STableMeta* pTableMeta, SArray* pValues) {
694,154,008✔
206
  SSchema* pSchemas = getTableColumnSchema(pTableMeta);
694,154,008✔
207
  int32_t  code = 0;
694,177,968✔
208
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
2,147,483,647✔
209
    SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
2,147,483,647✔
210
    if (NULL == taosArrayPush(pValues, &val)) {
2,147,483,647✔
211
      code = terrno;
×
212
      break;
×
213
    }
214
  }
215
  return code;
694,358,040✔
216
}
217

218
int32_t insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { return initColValues(pTableMeta, aColValues); }
246,439✔
219

220
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
720,611,139✔
221
  pInfo->numOfCols = numOfBound;
720,611,139✔
222
  pInfo->numOfBound = numOfBound;
720,702,140✔
223
  pInfo->hasBoundCols = false;
720,666,221✔
224
  pInfo->mixTagsCols = false;
720,701,283✔
225
  pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t));
720,590,669✔
226
  if (NULL == pInfo->pColIndex) {
720,676,145✔
227
    return terrno;
×
228
  }
229
  for (int32_t i = 0; i < numOfBound; ++i) {
2,147,483,647✔
230
    pInfo->pColIndex[i] = i;
2,147,483,647✔
231
  }
232
  return TSDB_CODE_SUCCESS;
720,735,405✔
233
}
234

235
void insResetBoundColsInfo(SBoundColInfo* pInfo) {
3,296✔
236
  pInfo->numOfBound = pInfo->numOfCols;
3,296✔
237
  pInfo->hasBoundCols = false;
3,296✔
238
  pInfo->mixTagsCols = false;
3,296✔
239
  for (int32_t i = 0; i < pInfo->numOfCols; ++i) {
16,480✔
240
    pInfo->pColIndex[i] = i;
13,184✔
241
  }
242
}
3,296✔
243

244
void insCheckTableDataOrder(STableDataCxt* pTableCxt, SRowKey* rowKey) {
2,147,483,647✔
245
  // once the data block is disordered, we do NOT keep last timestamp any more
246
  if (!pTableCxt->ordered) {
2,147,483,647✔
247
    return;
28,521,530✔
248
  }
249

250
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) < 0) {
2,147,483,647✔
251
    pTableCxt->ordered = false;
1,645,874✔
252
  }
253

254
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) == 0) {
2,147,483,647✔
255
    pTableCxt->duplicateTs = true;
2,078,137✔
256
  }
257

258
  // TODO: for variable length data type, we need to copy it out
259
  pTableCxt->lastKey = *rowKey;
2,147,483,647✔
260
  return;
2,147,483,647✔
261
}
262

263
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput,
697,147,005✔
264
                                  bool colMode, bool ignoreColVals) {
265
  STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
697,147,005✔
266
  if (NULL == pTableCxt) {
697,074,299✔
267
    *pOutput = NULL;
×
268
    return terrno;
×
269
  }
270

271
  int32_t code = TSDB_CODE_SUCCESS;
697,074,299✔
272

273
  pTableCxt->lastKey = (SRowKey){0};
697,074,299✔
274
  pTableCxt->ordered = true;
697,103,097✔
275
  pTableCxt->duplicateTs = false;
697,121,981✔
276

277
  pTableCxt->pMeta = tableMetaDup(pTableMeta);
697,089,238✔
278
  if (NULL == pTableCxt->pMeta) {
697,138,380✔
279
    code = TSDB_CODE_OUT_OF_MEMORY;
×
280
  }
281
  if (TSDB_CODE_SUCCESS == code) {
697,129,852✔
282
    pTableCxt->pSchema =
697,258,385✔
283
        tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
697,086,012✔
284
    if (NULL == pTableCxt->pSchema) {
697,218,443✔
285
      code = TSDB_CODE_OUT_OF_MEMORY;
×
286
    }
287
  }
288
  pTableCxt->hasBlob = schemaHasBlob(pTableCxt->pSchema);
697,315,273✔
289

290
  if (TSDB_CODE_SUCCESS == code) {
697,177,271✔
291
    code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo);
697,229,650✔
292
  }
293
  if (TSDB_CODE_SUCCESS == code && !ignoreColVals) {
697,143,353✔
294
    pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal));
693,909,746✔
295
    if (NULL == pTableCxt->pValues) {
693,943,322✔
296
      code = terrno;
×
297
    } else {
298
      code = initColValues(pTableMeta, pTableCxt->pValues);
693,926,708✔
299
    }
300
  }
301
  if (TSDB_CODE_SUCCESS == code) {
697,342,524✔
302
    pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
697,306,721✔
303
    if (NULL == pTableCxt->pData) {
697,067,135✔
304
      code = terrno;
×
305
    } else {
306
      pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
697,157,608✔
307
      pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
697,206,787✔
308
      pTableCxt->pData->suid = pTableMeta->suid;
697,141,264✔
309
      pTableCxt->pData->uid = pTableMeta->uid;
697,186,874✔
310
      pTableCxt->pData->sver = pTableMeta->sversion;
697,136,517✔
311
      pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
697,029,313✔
312
      int8_t flag = pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT;
697,126,347✔
313
      if (pCreateTbReq != NULL) *pCreateTbReq = NULL;
697,054,524✔
314
      if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
697,078,238✔
315
        pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
7,853,604✔
316
        if (NULL == pTableCxt->pData->aCol) {
7,855,574✔
317
          code = terrno;
×
318
        }
319
      } else {
320
        pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
689,171,296✔
321
        if (NULL == pTableCxt->pData->aRowP) {
689,296,842✔
322
          code = terrno;
×
323
        }
324
      }
325
    }
326
  }
327
  if (TSDB_CODE_SUCCESS == code) {
697,215,179✔
328
    *pOutput = pTableCxt;
697,215,179✔
329
    parserDebug("uid:%" PRId64 ", create table data context, code:%d, vgId:%d", pTableMeta->uid, code,
697,194,523✔
330
                pTableMeta->vgId);
331
  } else {
332
    insDestroyTableDataCxt(pTableCxt);
×
333
  }
334

335
  return code;
697,205,708✔
336
}
337

338
static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst, int8_t hasBlob) {
8,654,646✔
339
  int32_t        code = TSDB_CODE_SUCCESS;
8,654,646✔
340
  SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData));
8,654,646✔
341
  if (NULL == pTmp) {
8,650,618✔
342
    code = terrno;
×
343
  } else {
344
    pTmp->flags = pSrc->flags;
8,650,618✔
345
    pTmp->suid = pSrc->suid;
8,653,192✔
346
    pTmp->uid = pSrc->uid;
8,651,272✔
347
    pTmp->sver = pSrc->sver;
8,656,120✔
348
    pTmp->pCreateTbReq = NULL;
8,651,926✔
349
    if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
8,646,367✔
350
      if (pSrc->pCreateTbReq) {
8,407,801✔
351
        code = cloneSVreateTbReq(pSrc->pCreateTbReq, &pTmp->pCreateTbReq);
8,402,051✔
352
      } else {
353
        pTmp->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
1,584✔
354
      }
355
    }
356
    if (TSDB_CODE_SUCCESS == code) {
8,655,025✔
357
      if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
8,655,025✔
358
        pTmp->aCol = taosArrayInit(128, sizeof(SColData));
7,895,470✔
359
        if (NULL == pTmp->aCol) {
7,894,808✔
360
          code = terrno;
×
361
          taosMemoryFree(pTmp);
×
362
        }
363
      } else {
364
        pTmp->aRowP = taosArrayInit(128, POINTER_BYTES);
764,164✔
365
        if (NULL == pTmp->aRowP) {
763,120✔
366
          code = terrno;
×
367
          taosMemoryFree(pTmp);
×
368
        }
369

370
        if (code != 0) {
763,120✔
371
          taosArrayDestroy(pTmp->aRowP);
×
372
          taosMemoryFree(pTmp);
×
373
        }
374
      }
375

376
    } else {
377
      taosMemoryFree(pTmp);
×
378
    }
379
  }
380

381
  taosMemoryFree(pSrc);
8,659,522✔
382
  if (TSDB_CODE_SUCCESS == code) {
8,659,128✔
383
    *pDst = pTmp;
8,657,552✔
384
  }
385

386
  return code;
8,659,128✔
387
}
388

389
static void resetColValues(SArray* pValues) {
38,487,262✔
390
  int32_t num = taosArrayGetSize(pValues);
38,487,262✔
391
  for (int32_t i = 0; i < num; ++i) {
433,471,344✔
392
    SColVal* pVal = taosArrayGet(pValues, i);
394,984,082✔
393
    pVal->flag = CV_FLAG_NONE;
394,984,082✔
394
  }
395
}
38,487,262✔
396

397
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
1,934,206,627✔
398
                           SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals) {
399
  STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
1,934,206,627✔
400
  if (NULL != tmp) {
1,934,321,787✔
401
    *pTableCxt = *tmp;
1,237,083,990✔
402
    if (!ignoreColVals) {
1,237,083,990✔
403
      resetColValues((*pTableCxt)->pValues);
38,487,262✔
404
    }
405
    return TSDB_CODE_SUCCESS;
1,237,083,990✔
406
  }
407
  int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals);
697,237,797✔
408
  if (TSDB_CODE_SUCCESS == code) {
697,220,821✔
409
    void* pData = *pTableCxt;  // deal scan coverity
697,238,152✔
410
    code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
697,240,847✔
411
  }
412

413
  if (TSDB_CODE_SUCCESS != code) {
697,287,477✔
414
    insDestroyTableDataCxt(*pTableCxt);
×
415
  }
416
  return code;
697,291,139✔
417
}
418

419
void destroyColVal(void* p) {
2,147,483,647✔
420
  if (!p) return;
2,147,483,647✔
421

422
  SColVal* pVal = (SColVal*)p;
2,147,483,647✔
423

424
  if (IS_VAR_DATA_TYPE(pVal->value.type) || TSDB_DATA_TYPE_DECIMAL == pVal->value.type) {
2,147,483,647✔
425
    taosMemoryFreeClear(pVal->value.pData);
1,596,724,019✔
426
  }
427
}
428

429
void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
704,029,636✔
430
  if (NULL == pTableCxt) {
704,029,636✔
431
    return;
×
432
  }
433

434
  taosMemoryFreeClear(pTableCxt->pMeta);
704,029,636✔
435
  tDestroyTSchema(pTableCxt->pSchema);
704,008,042✔
436
  qDestroyBoundColInfo(&pTableCxt->boundColsInfo);
704,011,493✔
437
  taosArrayDestroyEx(pTableCxt->pValues, destroyColVal);
703,994,532✔
438
  if (pTableCxt->pData) {
703,998,230✔
439
    tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
16,185,703✔
440
    taosMemoryFree(pTableCxt->pData);
16,181,901✔
441
  }
442
  taosMemoryFree(pTableCxt);
704,005,785✔
443
}
444

445
static void destroyColValSml(void* p) {
12,731,414✔
446
  SColVal* pVal = p;
12,731,414✔
447
  if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type ||
12,731,414✔
448
      TSDB_DATA_TYPE_VARBINARY == pVal->value.type) {
12,465,019✔
449
    taosMemoryFreeClear(pVal->value.pData);
271,393✔
450
  }
451
}
12,731,904✔
452

453
static void insDestroyTableDataCxtSml(STableDataCxt* pTableCxt) {
863,008✔
454
  if (NULL == pTableCxt) {
863,008✔
455
    return;
×
456
  }
457

458
  taosMemoryFreeClear(pTableCxt->pMeta);
863,008✔
459
  tDestroyTSchema(pTableCxt->pSchema);
862,826✔
460
  qDestroyBoundColInfo(&pTableCxt->boundColsInfo);
863,602✔
461
  taosArrayDestroyEx(pTableCxt->pValues, destroyColValSml);
862,947✔
462
  if (pTableCxt->pData) {
863,748✔
463
    tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
863,748✔
464
    taosMemoryFree(pTableCxt->pData);
863,661✔
465
  }
466
  taosMemoryFree(pTableCxt);
863,613✔
467
}
468

469
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
644,135,788✔
470
  if (NULL == pVgCxt) {
644,135,788✔
471
    return;
×
472
  }
473

474
  tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
644,135,788✔
475
  taosMemoryFree(pVgCxt->pData);
644,106,750✔
476

477
  taosMemoryFree(pVgCxt);
644,113,237✔
478
}
479

480
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
1,255,671,289✔
481
  if (NULL == pVgCxtList) {
1,255,671,289✔
482
    return;
627,887,376✔
483
  }
484

485
  size_t size = taosArrayGetSize(pVgCxtList);
627,783,913✔
486
  for (int32_t i = 0; i < size; i++) {
1,271,946,302✔
487
    void* p = taosArrayGetP(pVgCxtList, i);
644,152,245✔
488
    insDestroyVgroupDataCxt(p);
644,150,722✔
489
  }
490

491
  taosArrayDestroy(pVgCxtList);
627,794,057✔
492
}
493

494
void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) {
×
495
  if (NULL == pVgCxtHash) {
×
496
    return;
×
497
  }
498

499
  void** p = taosHashIterate(pVgCxtHash, NULL);
×
500
  while (p) {
×
501
    insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p);
×
502

503
    p = taosHashIterate(pVgCxtHash, p);
×
504
  }
505

506
  taosHashCleanup(pVgCxtHash);
×
507
}
508

509
void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
627,719,449✔
510
  if (NULL == pTableCxtHash) {
627,719,449✔
511
    return;
7,808,287✔
512
  }
513

514
  void** p = taosHashIterate(pTableCxtHash, NULL);
619,911,162✔
515
  while (p) {
1,308,442,975✔
516
    insDestroyTableDataCxt(*(STableDataCxt**)p);
688,517,626✔
517

518
    p = taosHashIterate(pTableCxtHash, p);
688,489,908✔
519
  }
520

521
  taosHashCleanup(pTableCxtHash);
619,925,349✔
522
}
523

524
void insDestroyTableDataCxtHashMapSml(SHashObj* pTableCxtHash) {
1,128,592✔
525
  if (NULL == pTableCxtHash) {
1,128,592✔
526
    return;
×
527
  }
528

529
  void** p = taosHashIterate(pTableCxtHash, NULL);
1,128,592✔
530
  while (p) {
1,992,293✔
531
    insDestroyTableDataCxtSml(*(STableDataCxt**)p);
863,021✔
532

533
    p = taosHashIterate(pTableCxtHash, p);
863,009✔
534
  }
535

536
  taosHashCleanup(pTableCxtHash);
1,129,272✔
537
}
538

539
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) {
698,778,512✔
540
  int32_t code = 0;
698,778,512✔
541
  if (NULL == pVgCxt->pData->aSubmitTbData) {
698,778,512✔
542
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
643,565,984✔
543
    if (pVgCxt->pData->aSubmitTbData == NULL) {
643,585,567✔
544
      return terrno;
×
545
    }
546
    if (pTableCxt->hasBlob) {
643,610,292✔
547
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
49,323✔
548
      if (NULL == pVgCxt->pData->aSubmitBlobData) {
49,323✔
549
        return terrno;
×
550
      }
551
    }
552
  }
553

554
  // push data to submit, rebuild empty data for next submit
555
  if (!pTableCxt->hasBlob) pTableCxt->pData->pBlobSet = NULL;
698,898,691✔
556

557
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData)) {
1,397,738,857✔
558
    return terrno;
×
559
  }
560

561
  if (pTableCxt->hasBlob) {
698,885,934✔
562
    parserDebug("blob row transfer %p, pData %p, %s", pTableCxt->pData->pBlobSet, pTableCxt->pData, __func__);
50,844✔
563
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTableCxt->pData->pBlobSet)) {
101,688✔
564
      return terrno;
×
565
    }
566
    pTableCxt->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
50,844✔
567
  }
568

569
  if (isRebuild) {
698,782,903✔
570
    code = rebuildTableData(pTableCxt->pData, &pTableCxt->pData, pTableCxt->hasBlob);
8,653,262✔
571
  } else if (clear) {
690,129,641✔
572
    taosMemoryFreeClear(pTableCxt->pData);
687,814,388✔
573
  }
574
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
698,821,670✔
575

576
  return code;
698,822,195✔
577
}
578

579
static int32_t createVgroupDataCxt(int32_t vgId, SHashObj* pVgroupHash, SArray* pVgroupList, SVgroupDataCxt** pOutput) {
644,153,747✔
580
  SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
644,153,747✔
581
  if (NULL == pVgCxt) {
644,152,763✔
582
    return terrno;
×
583
  }
584
  pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2));
644,152,763✔
585
  if (NULL == pVgCxt->pData) {
644,131,668✔
586
    insDestroyVgroupDataCxt(pVgCxt);
×
587
    return terrno;
×
588
  }
589

590
  pVgCxt->vgId = vgId;
644,140,406✔
591
  int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
644,180,380✔
592
  if (TSDB_CODE_SUCCESS == code) {
644,245,133✔
593
    if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) {
644,245,868✔
594
      code = terrno;
×
595
      insDestroyVgroupDataCxt(pVgCxt);
×
UNCOV
596
      return code;
×
597
    }
598
    //    uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
599
    *pOutput = pVgCxt;
644,245,868✔
600
  } else {
601
    insDestroyVgroupDataCxt(pVgCxt);
×
602
  }
603
  return code;
644,228,149✔
604
}
605

606
int insColDataComp(const void* lp, const void* rp) {
34,057,548✔
607
  SColData* pLeft = (SColData*)lp;
34,057,548✔
608
  SColData* pRight = (SColData*)rp;
34,057,548✔
609
  if (pLeft->cid < pRight->cid) {
34,057,548✔
610
    return -1;
34,033,641✔
611
  } else if (pLeft->cid > pRight->cid) {
29,853✔
612
    return 1;
29,853✔
613
  }
614

615
  return 0;
×
616
}
617

618
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId,
81,108✔
619
                                 STableColsData* pTbData, SName* sname) {
620
  if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) {
81,108✔
621
    return TSDB_CODE_SUCCESS;
65,735✔
622
  }
623

624
  SVgroupInfo      vgInfo = {0};
15,373✔
625
  SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
29,945✔
626
                           .requestId = pBuildInfo->requestId,
15,373✔
627
                           .requestObjRefId = pBuildInfo->requestSelf,
15,373✔
628
                           .mgmtEps = pBuildInfo->mgmtEpSet};
629

630
  int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo);
15,373✔
631
  if (TSDB_CODE_SUCCESS != code) {
15,373✔
632
    return code;
×
633
  }
634

635
  code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
15,373✔
636
  if (TSDB_CODE_SUCCESS != code) {
15,373✔
637
    return code;
×
638
  }
639

640
  return TSDB_CODE_SUCCESS;
15,373✔
641
}
642

643
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData,
3,426,661✔
644
                             uint64_t* uid, int32_t* vgId, uint64_t* suid) {
645
  STableVgUid* pTbInfo = NULL;
3,426,661✔
646
  int32_t      code = 0;
3,426,661✔
647

648
  if (pTbData->getFromHash) {
3,426,661✔
649
    pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName));
3,341,600✔
650
  }
651

652
  if (NULL == pTbInfo) {
3,426,962✔
653
    SName sname;
55,041✔
654
    code = qCreateSName2(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0);
92,884✔
655
    if (TSDB_CODE_SUCCESS != code) {
92,824✔
656
      return code;
11,776✔
657
    }
658

659
    STableMeta*      pTableMeta = NULL;
92,824✔
660
    SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
147,746✔
661
                             .requestId = pBuildInfo->requestId,
92,884✔
662
                             .requestObjRefId = pBuildInfo->requestSelf,
92,824✔
663
                             .mgmtEps = pBuildInfo->mgmtEpSet};
664
    code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta);
92,765✔
665

666
    if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
92,872✔
667
      parserWarn("stmt2 async bind don't find table:%s.%s, try auto create table", sname.dbname, sname.tname);
11,776✔
668
      return code;
11,776✔
669
    }
670

671
    if (TSDB_CODE_SUCCESS != code) {
81,096✔
672
      parserError("stmt2 async get table meta:%s.%s failed, code:%d", sname.dbname, sname.tname, code);
×
673
      return code;
×
674
    }
675

676
    *uid = pTableMeta->uid;
81,096✔
677
    *vgId = pTableMeta->vgId;
81,096✔
678
    *suid = pTableMeta->suid;
81,096✔
679

680
    STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId, .suid = *suid};
81,096✔
681
    code = tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo));
81,037✔
682
    if (TSDB_CODE_SUCCESS == code) {
81,048✔
683
      code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname);
81,048✔
684
    }
685

686
    taosMemoryFree(pTableMeta);
81,048✔
687
  } else {
688
    *uid = pTbInfo->uid;
3,334,078✔
689
    *vgId = pTbInfo->vgid;
3,334,088✔
690
    *suid = pTbInfo->suid;
3,334,195✔
691
  }
692

693
  return code;
3,415,295✔
694
}
695

696
int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
×
697
  int32_t             code = TSDB_CODE_SUCCESS;
×
698
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
×
699

700
  if (TSDB_CODE_SUCCESS == code) {
×
701
    code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
×
702
  }
703

704
  return code;
×
705
}
706

707
int32_t checkAndMergeSVgroupDataCxtByTbname(STableDataCxt* pTbCtx, SVgroupDataCxt* pVgCxt, SSHashObj* pTableNameHash,
1,112,939✔
708
                                            char* tbname) {
709
  if (NULL == pVgCxt->pData->aSubmitTbData) {
1,112,939✔
710
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
610,873✔
711
    if (NULL == pVgCxt->pData->aSubmitTbData) {
610,859✔
712
      return terrno;
×
713
    }
714
    if (pTbCtx->hasBlob) {
610,859✔
715
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
198✔
716
      if (pVgCxt->pData->aSubmitBlobData == NULL) {
198✔
717
        return terrno;
×
718
      }
719
    }
720
  }
721

722
  int32_t  code = TSDB_CODE_SUCCESS;
1,112,973✔
723
  SArray** rowP = NULL;
1,112,973✔
724

725
  rowP = (SArray**)tSimpleHashGet(pTableNameHash, tbname, strlen(tbname));
1,112,973✔
726

727
  if (rowP != NULL && *rowP != NULL) {
1,113,178✔
728
    int32_t aRowPSize = taosArrayGetSize(pTbCtx->pData->aRowP);
2,772✔
729
    for (int32_t j = 0; j < aRowPSize; ++j) {
6,732✔
730
      SRow* pRow = (SRow*)taosArrayGetP(pTbCtx->pData->aRowP, j);
3,960✔
731
      if (pRow) {
3,960✔
732
        if (NULL == taosArrayPush(*rowP, &pRow)) {
7,920✔
733
          return terrno;
×
734
        }
735
      }
736
    }
737

738
    if (pTbCtx->hasBlob == 0) {
2,772✔
739
      code = tRowSort(*rowP);
2,772✔
740
      TAOS_CHECK_RETURN(code);
2,772✔
741

742
      code = tRowMerge(*rowP, pTbCtx->pSchema, 0);
2,772✔
743
      TAOS_CHECK_RETURN(code);
2,772✔
744
    } else {
745
      code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
746
      TAOS_CHECK_RETURN(code);
×
747

748
      code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
749
      TAOS_CHECK_RETURN(code);
×
750
    }
751
  
752
    parserDebug("merge same uid data: %" PRId64 ", vgId:%d", pTbCtx->pData->uid, pVgCxt->vgId);
2,772✔
753

754
    taosArrayDestroy(pTbCtx->pData->aRowP);
2,772✔
755
    if (pTbCtx->pData->pCreateTbReq != NULL) {
2,772✔
756
      tdDestroySVCreateTbReq(pTbCtx->pData->pCreateTbReq);
2,376✔
757
      taosMemoryFree(pTbCtx->pData->pCreateTbReq);
2,376✔
758
      pTbCtx->pData->pCreateTbReq = NULL;
2,376✔
759
    }
760
    return TSDB_CODE_SUCCESS;
2,772✔
761
  }
762

763
  if (pTbCtx->hasBlob == 0) {
1,110,406✔
764
    pTbCtx->pData->pBlobSet = NULL;  // if no blob, set it to NULL
1,110,172✔
765
  }
766

767
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTbCtx->pData)) {
2,220,716✔
768
    return terrno;
×
769
  }
770

771
  if (pTbCtx->hasBlob) {
1,110,358✔
772
    parserDebug("blob row transfer %p, pData %p, %s", pTbCtx->pData->pBlobSet, pTbCtx->pData, __func__);
198✔
773
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTbCtx->pData->pBlobSet)) {
396✔
774
      return terrno;
×
775
    }
776
    pTbCtx->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
198✔
777
  }
778

779
  code = tSimpleHashPut(pTableNameHash, tbname, strlen(tbname), &pTbCtx->pData->aRowP, sizeof(SArray*));
1,110,358✔
780

781
  if (code != TSDB_CODE_SUCCESS) {
1,110,260✔
782
    return code;
×
783
  }
784

785
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTbCtx->pMeta->uid, pVgCxt->vgId);
1,110,260✔
786

787
  return TSDB_CODE_SUCCESS;
1,110,309✔
788
}
789

790
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
2,313,058✔
791
                                  SStbInterlaceInfo* pBuildInfo) {
792
  int32_t  code = TSDB_CODE_SUCCESS;
2,313,058✔
793
  uint64_t uid;
2,289,014✔
794
  int32_t  vgId;
2,289,891✔
795
  uint64_t suid;
2,290,009✔
796

797
  pTbCtx->pData->aRowP = pTbData->aCol;
2,314,171✔
798

799
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId, &suid);
2,314,407✔
800
  if (TSDB_CODE_SUCCESS != code) {
2,313,117✔
801
    return code;
×
802
  }
803

804
  pTbCtx->pMeta->vgId = vgId;
2,313,117✔
805
  pTbCtx->pMeta->uid = uid;
2,313,580✔
806
  pTbCtx->pData->uid = uid;
2,313,593✔
807

808
  if (!pTbCtx->ordered) {
2,312,882✔
809
    code = tRowSort(pTbCtx->pData->aRowP);
11✔
810
  }
811
  if (code == TSDB_CODE_SUCCESS && (!pTbCtx->ordered || pTbCtx->duplicateTs)) {
2,312,991✔
812
    code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0);
11✔
813
  }
814

815
  if (TSDB_CODE_SUCCESS != code) {
2,312,999✔
816
    return code;
×
817
  }
818

819
  SVgroupDataCxt* pVgCxt = NULL;
2,312,999✔
820
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
2,312,705✔
821
  if (NULL == pp) {
2,312,591✔
822
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
393,729✔
823
    if (NULL == pp) {
393,841✔
824
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
393,841✔
825
    } else {
826
      pVgCxt = *(SVgroupDataCxt**)pp;
×
827
    }
828
  } else {
829
    pVgCxt = *(SVgroupDataCxt**)pp;
1,918,862✔
830
  }
831

832
  if (TSDB_CODE_SUCCESS == code) {
2,312,982✔
833
    code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false);
2,312,994✔
834
  }
835

836
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
2,312,058✔
837
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
838
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
839
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
840
    // insDestroyVgroupDataCxt(pVgCxt);
841
  }
842

843
  return code;
2,310,774✔
844
}
845

846
int32_t insAppendStmt2TableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
1,113,079✔
847
                                   SStbInterlaceInfo* pBuildInfo, SVCreateTbReq* ctbReq) {
848
  int32_t  code = TSDB_CODE_SUCCESS;
1,113,079✔
849
  uint64_t uid;
382,656✔
850
  int32_t  vgId;
382,714✔
851
  uint64_t suid;
382,714✔
852

853
  pTbCtx->pData->aRowP = pTbData->aCol;
1,113,137✔
854
  pTbCtx->pData->pBlobSet = pTbData->pBlobSet;
1,113,137✔
855

856
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId, &suid);
1,113,185✔
857
  if (ctbReq != NULL && code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
1,113,377✔
858
    pTbCtx->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
11,578✔
859
    vgId = (int32_t)ctbReq->uid;
11,578✔
860
    uid = 0;
11,578✔
861
    pTbCtx->pMeta->vgId = (int32_t)ctbReq->uid;
11,578✔
862
    ctbReq->uid = 0;
11,578✔
863
    pTbCtx->pMeta->uid = 0;
11,578✔
864
    pTbCtx->pData->uid = 0;
11,578✔
865
    pTbCtx->pData->pCreateTbReq = ctbReq;
11,578✔
866
    code = TSDB_CODE_SUCCESS;
11,578✔
867
  } else {
868
    if (TSDB_CODE_SUCCESS != code) {
1,101,799✔
869
      return code;
198✔
870
    }
871
    if (pTbCtx->pData->suid != suid) {
1,101,601✔
872
      return TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
873
    }
874

875
    pTbCtx->pMeta->vgId = vgId;
1,101,649✔
876
    pTbCtx->pMeta->uid = uid;
1,101,649✔
877
    pTbCtx->pData->uid = uid;
1,101,540✔
878
    pTbCtx->pData->pCreateTbReq = NULL;
1,101,601✔
879

880
    if (ctbReq != NULL) {
1,101,601✔
881
      tdDestroySVCreateTbReq(ctbReq);
882
      taosMemoryFree(ctbReq);
601,252✔
883
      ctbReq = NULL;
601,288✔
884
    }
885
  }
886

887
  if (pTbCtx->hasBlob == 0) {
1,113,276✔
888
    if (!pTbData->isOrdered) {
1,112,981✔
889
      code = tRowSort(pTbCtx->pData->aRowP);
594✔
890
    }
891
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
1,112,920✔
892
      code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, PREFER_NON_NULL);
1,782✔
893
    }
894
  } else {
895
    if (!pTbData->isOrdered) {
295✔
896
      code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
897
    }
898
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
295✔
899
      code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
900
    }
901
  }
902

903
  if (TSDB_CODE_SUCCESS != code) {
1,112,974✔
904
    return code;
×
905
  }
906

907
  SVgroupDataCxt* pVgCxt = NULL;
1,112,974✔
908
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
1,113,083✔
909
  if (NULL == pp) {
1,113,178✔
910
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
610,932✔
911
    if (NULL == pp) {
610,909✔
912
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
610,909✔
913
    } else {
914
      pVgCxt = *(SVgroupDataCxt**)pp;
×
915
    }
916
  } else {
917
    pVgCxt = *(SVgroupDataCxt**)pp;
502,246✔
918
  }
919

920
  if (code == TSDB_CODE_SUCCESS) {
1,113,033✔
921
    code = checkAndMergeSVgroupDataCxtByTbname(pTbCtx, pVgCxt, pBuildInfo->pTableRowDataHash, pTbData->tbName);
1,113,009✔
922
  }
923

924
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
1,113,056✔
925
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
926
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
927
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
928
    // insDestroyVgroupDataCxt(pVgCxt);
929
  }
930

931
  return code;
1,112,851✔
932
}
933

934
/*
935
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild,
936
int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
937
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
938
  if (NULL == pVgroupHash || NULL == pVgroupList) {
939
    taosHashCleanup(pVgroupHash);
940
    taosArrayDestroy(pVgroupList);
941
    return TSDB_CODE_OUT_OF_MEMORY;
942
  }
943

944
  int32_t code = TSDB_CODE_SUCCESS;
945

946
  for (int32_t i = 0; i < tbNum; ++i) {
947
    STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i);
948
    pTableCxt->pMeta->vgId = pTableCols->vgId;
949
    pTableCxt->pMeta->uid = pTableCols->uid;
950
    pTableCxt->pData->uid = pTableCols->uid;
951
    pTableCxt->pData->aCol = pTableCols->aCol;
952

953
    SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
954
    if (pCol->nVal <= 0) {
955
      continue;
956
    }
957

958
    if (pTableCxt->pData->pCreateTbReq) {
959
      pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
960
    }
961

962
    taosArraySort(pTableCxt->pData->aCol, insColDataComp);
963

964
    tColDataSortMerge(pTableCxt->pData->aCol);
965

966
    if (TSDB_CODE_SUCCESS == code) {
967
      SVgroupDataCxt* pVgCxt = NULL;
968
      int32_t         vgId = pTableCxt->pMeta->vgId;
969
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
970
      if (NULL == pp) {
971
        code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
972
      } else {
973
        pVgCxt = *(SVgroupDataCxt**)pp;
974
      }
975
      if (TSDB_CODE_SUCCESS == code) {
976
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false);
977
      }
978
    }
979
  }
980

981
  taosHashCleanup(pVgroupHash);
982
  if (TSDB_CODE_SUCCESS == code) {
983
    *pVgDataBlocks = pVgroupList;
984
  } else {
985
    insDestroyVgroupDataCxtList(pVgroupList);
986
  }
987

988
  return code;
989
}
990
*/
991

992
static int8_t colDataHasBlob(SColData* pCol) {
×
993
  if (IS_STR_DATA_BLOB(pCol->type)) {
×
994
    return 1;
×
995
  }
996
  return 0;
×
997
}
998
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
626,891,849✔
999
  SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
626,891,849✔
1000
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
626,984,559✔
1001
  if (NULL == pVgroupHash || NULL == pVgroupList) {
626,958,090✔
1002
    taosHashCleanup(pVgroupHash);
387✔
1003
    taosArrayDestroy(pVgroupList);
×
1004
    return terrno;
×
1005
  }
1006

1007
  int32_t code = TSDB_CODE_SUCCESS;
626,958,055✔
1008
  bool    colFormat = false;
626,958,055✔
1009

1010
  void* p = taosHashIterate(pTableHash, NULL);
626,958,055✔
1011
  if (p) {
627,017,723✔
1012
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
627,026,670✔
1013
    colFormat = (0 != (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT));
627,001,936✔
1014
  }
1015

1016
  while (TSDB_CODE_SUCCESS == code && NULL != p) {
1,323,631,001✔
1017
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
696,557,110✔
1018
    if (colFormat) {
696,559,482✔
1019
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
7,903,497✔
1020
      if (pCol && pCol->nVal <= 0) {
7,903,497✔
1021
        p = taosHashIterate(pTableHash, p);
4,449✔
1022
        continue;
4,449✔
1023
      }
1024

1025
      if (pTableCxt->pData->pCreateTbReq) {
7,899,048✔
1026
        pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
7,662,631✔
1027
      }
1028
      int8_t isBlob = IS_STR_DATA_BLOB(pCol->type) ? 1 : 0;
7,897,552✔
1029
      if (isBlob == 0) {
7,894,120✔
1030
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
7,896,404✔
1031
        code = tColDataSortMerge(&pTableCxt->pData->aCol);
7,896,018✔
1032
      } else {
1033
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
×
1034
        code = tColDataSortMergeWithBlob(&pTableCxt->pData->aCol, pTableCxt->pData->pBlobSet);
×
1035
      }
1036
    } else {
1037
      // skip the table has no data to insert
1038
      // eg: import a csv without valid data
1039
      // if (0 == taosArrayGetSize(pTableCxt->pData->aRowP)) {
1040
      //   parserWarn("no row in tableDataCxt uid:%" PRId64 " ", pTableCxt->pMeta->uid);
1041
      //   p = taosHashIterate(pTableHash, p);
1042
      //   continue;
1043
      // }
1044
      if (pTableCxt->hasBlob == 0) {
688,655,985✔
1045
        if (!pTableCxt->ordered) {
688,627,301✔
1046
          code = tRowSort(pTableCxt->pData->aRowP);
1,645,022✔
1047
        }
1048
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
688,551,698✔
1049
          code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, PREFER_NON_NULL);
1,815,278✔
1050
        }
1051
      } else {
1052
        if (!pTableCxt->ordered) {
50,491✔
1053
          code = tRowSortWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet);
852✔
1054
        }
1055
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
50,491✔
1056
          code = tRowMergeWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet, 0);
852✔
1057
        }
1058
      }
1059
    }
1060

1061
    if (TSDB_CODE_SUCCESS == code) {
696,505,465✔
1062
      SVgroupDataCxt* pVgCxt = NULL;
696,505,465✔
1063
      int32_t         vgId = pTableCxt->pMeta->vgId;
696,519,515✔
1064
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
696,543,159✔
1065
      if (NULL == pp) {
696,505,766✔
1066
        code = createVgroupDataCxt(vgId, pVgroupHash, pVgroupList, &pVgCxt);
643,165,411✔
1067
      } else {
1068
        pVgCxt = *(SVgroupDataCxt**)pp;
53,340,355✔
1069
      }
1070
      if (TSDB_CODE_SUCCESS == code) {
696,575,788✔
1071
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true);
696,570,158✔
1072
      }
1073
    }
1074
    if (TSDB_CODE_SUCCESS == code) {
696,501,567✔
1075
      p = taosHashIterate(pTableHash, p);
696,501,567✔
1076
    }
1077
  }
1078

1079
  taosHashCleanup(pVgroupHash);
627,073,891✔
1080
  if (TSDB_CODE_SUCCESS == code) {
626,947,787✔
1081
    *pVgDataBlocks = pVgroupList;
626,965,382✔
1082
  } else {
1083
    insDestroyVgroupDataCxtList(pVgroupList);
35✔
1084
  }
1085

1086
  return code;
626,978,275✔
1087
}
1088

1089
static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
644,142,578✔
1090
  int32_t  code = TSDB_CODE_SUCCESS;
644,142,578✔
1091
  uint32_t len = 0;
644,142,578✔
1092
  void*    pBuf = NULL;
644,142,578✔
1093
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
644,142,578✔
1094
  if (TSDB_CODE_SUCCESS == code) {
644,168,204✔
1095
    SEncoder encoder;
641,230,240✔
1096
    len += sizeof(SSubmitReq2Msg);
644,164,515✔
1097
    pBuf = taosMemoryMalloc(len);
644,164,515✔
1098
    if (NULL == pBuf) {
644,093,162✔
1099
      return terrno;
×
1100
    }
1101
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
644,093,162✔
1102
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
644,171,810✔
1103
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
644,192,641✔
1104
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
644,206,471✔
1105
    code = tEncodeSubmitReq(&encoder, pReq);
644,222,177✔
1106
    tEncoderClear(&encoder);
644,251,823✔
1107
  }
1108

1109
  if (TSDB_CODE_SUCCESS == code) {
644,221,784✔
1110
    *pData = pBuf;
644,221,784✔
1111
    *pLen = len;
644,227,854✔
1112
  } else {
1113
    taosMemoryFree(pBuf);
×
1114
  }
1115
  return code;
644,098,244✔
1116
}
1117

1118
static void destroyVgDataBlocks(void* p) {
×
1119
  if (p == NULL) return;
×
1120
  SVgDataBlocks* pVg = p;
×
1121
  taosMemoryFree(pVg->pData);
×
1122
  taosMemoryFree(pVg);
×
1123
}
1124

1125
int32_t insResetBlob(SSubmitReq2* p) {
644,133,084✔
1126
  int32_t code = 0;
644,133,084✔
1127
  if (p->raw) {
644,133,084✔
1128
    return TSDB_CODE_SUCCESS;  // no blob data in raw mode
×
1129
  }
1130

1131
  if (p->aSubmitBlobData != NULL) {
644,182,191✔
1132
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
100,563✔
1133
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
51,042✔
1134
      SBlobSet**     ppBlob = taosArrayGet(p->aSubmitBlobData, i);
51,042✔
1135
      SBlobSet*      pBlob = ppBlob ? *ppBlob : NULL;
51,042✔
1136
      int32_t        nrow = taosArrayGetSize(pSubmitTbData->aRowP);
51,042✔
1137
      int32_t        nblob = 0;
51,042✔
1138
      if (nrow > 0 && pBlob) {
51,042✔
1139
        nblob = taosArrayGetSize(pBlob->pSeqTable);
51,042✔
1140
      }
1141
      uTrace("blob %p row size %d, pData size %d", pBlob, nblob, nrow);
51,042✔
1142
      pSubmitTbData->pBlobSet = pBlob;
51,042✔
1143
      if (ppBlob != NULL) *ppBlob = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
51,042✔
1144
    }
1145
  } else {
1146
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
1,343,978,832✔
1147
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
699,874,475✔
1148
      pSubmitTbData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
699,907,389✔
1149
    }
1150
  }
1151

1152
  return code;
644,212,239✔
1153
}
1154
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) {
627,837,495✔
1155
  size_t  numOfVg = taosArrayGetSize(pVgDataCxtList);
627,837,495✔
1156
  SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES);
627,881,058✔
1157
  if (NULL == pDataBlocks) {
627,865,510✔
1158
    return TSDB_CODE_OUT_OF_MEMORY;
×
1159
  }
1160

1161
  int32_t code = TSDB_CODE_SUCCESS;
627,865,510✔
1162
  for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
1,272,074,546✔
1163
    SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
644,185,808✔
1164
    if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) {
644,212,757✔
1165
      continue;
×
1166
    }
1167
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
644,161,002✔
1168
    if (NULL == dst) {
644,148,642✔
1169
      code = terrno;
×
1170
    }
1171

1172
    if (TSDB_CODE_SUCCESS == code) {
644,148,642✔
1173
      dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
644,164,462✔
1174
      code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
644,170,581✔
1175
    }
1176
    if (TSDB_CODE_SUCCESS == code) {
644,246,288✔
1177
      code = insResetBlob(src->pData);
644,246,288✔
1178
    }
1179

1180
    if (TSDB_CODE_SUCCESS == code) {
644,180,494✔
1181
      code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
644,180,494✔
1182
    }
1183
    if (TSDB_CODE_SUCCESS == code) {
644,111,756✔
1184
      code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);
644,209,915✔
1185
    }
1186
    if (TSDB_CODE_SUCCESS != code) {
644,212,266✔
1187
      destroyVgDataBlocks(dst);
×
1188
    }
1189
  }
1190

1191
  if (append) {
627,888,738✔
1192
    if (NULL == *pVgDataBlocks) {
905,554✔
1193
      *pVgDataBlocks = pDataBlocks;
905,578✔
1194
    }
1195
    return code;
905,610✔
1196
  }
1197

1198
  if (TSDB_CODE_SUCCESS == code) {
626,983,184✔
1199
    *pVgDataBlocks = pDataBlocks;
626,912,285✔
1200
  } else {
1201
    taosArrayDestroyP(pDataBlocks, destroyVgDataBlocks);
71,439✔
1202
  }
1203

1204
  return code;
626,948,541✔
1205
}
1206

1207
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
×
1208
  for (int i = 0; i < numFields; i++) {
×
1209
    if (strcmp(pSchema->name, fields[i].name) == 0) {
×
1210
      return true;
×
1211
    }
1212
  }
1213

1214
  return false;
×
1215
}
1216

1217
int32_t checkSchema(SSchema* pColSchema, SSchemaExt* pColExtSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
255,748✔
1218
  if (*fields != pColSchema->type) {
255,748✔
1219
    if (errstr != NULL) {
343✔
1220
      snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
×
1221
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
×
1222
    } else {
1223
      char buf[512] = {0};
343✔
1224
      snprintf(buf, sizeof(buf), "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
686✔
1225
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
686✔
1226
      uError("checkSchema %s", buf);
343✔
1227
    }
1228
    return TSDB_CODE_INVALID_PARA;
343✔
1229
  }
1230

1231
  if (IS_DECIMAL_TYPE(pColSchema->type)) {
255,405✔
1232
    uint8_t precision = 0, scale = 0;
343✔
1233
    decimalFromTypeMod(pColExtSchema->typeMod, &precision, &scale);
343✔
1234
    uint8_t precisionData = 0, scaleData = 0;
343✔
1235
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
343✔
1236
    extractDecimalTypeInfoFromBytes(&bytes, &precisionData, &scaleData);
343✔
1237
    if (precision != precisionData || scale != scaleData) {
343✔
1238
      if (errstr != NULL) {
×
1239
        snprintf(errstr, errstrLen,
×
1240
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
1241
                 "precision:%d, scale:%d",
1242
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
1243
                 precisionData, scaleData);
1244
        return TSDB_CODE_INVALID_PARA;
×
1245
      } else {
1246
        char buf[512] = {0};
×
1247
        snprintf(buf, sizeof(buf),
×
1248
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
1249
                 "precision:%d, scale:%d",
1250
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
1251
                 precisionData, scaleData);
1252
        uError("checkSchema %s", buf);
×
1253
        return TSDB_CODE_INVALID_PARA;
×
1254
      }
1255
    }
1256
    return 0;
343✔
1257
  }
1258

1259
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {
255,062✔
1260
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
49,919✔
1261
    if (IS_STR_DATA_BLOB(pColSchema->type)) {
49,919✔
1262
      if (bytes >= TSDB_MAX_BLOB_LEN) {
×
1263
        uError("column blob data bytes exceed max limit, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
×
1264
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name, bytes);
1265
        return TSDB_CODE_INVALID_PARA;
×
1266
      }
1267
    } else {
1268
      if (bytes > pColSchema->bytes) {
49,919✔
1269
        if (errstr != NULL) {
343✔
1270
          snprintf(errstr, errstrLen,
×
1271
                   "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1272
                   pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1273
                   *(int32_t*)(fields + sizeof(int8_t)));
×
1274
        } else {
1275
          char buf[512] = {0};
343✔
1276
          snprintf(buf, sizeof(buf),
686✔
1277
                   "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1278
                   pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
1,029✔
1279
                   *(int32_t*)(fields + sizeof(int8_t)));
343✔
1280
          uError("checkSchema %s", buf);
343✔
1281
        }
1282
        return TSDB_CODE_INVALID_PARA;
343✔
1283
      }
1284
    }
1285
  }
1286

1287
  if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
254,719✔
1288
    if (errstr != NULL) {
×
1289
      snprintf(errstr, errstrLen,
×
1290
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1291
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1292
               *(int32_t*)(fields + sizeof(int8_t)));
×
1293
    } else {
1294
      char buf[512] = {0};
×
1295
      snprintf(buf, sizeof(buf),
×
1296
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1297
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1298
               *(int32_t*)(fields + sizeof(int8_t)));
×
1299
      uError("checkSchema %s", buf);
×
1300
    }
1301
    return TSDB_CODE_INVALID_PARA;
×
1302
  }
1303
  return 0;
254,719✔
1304
}
1305

1306
#define PRCESS_DATA(i, j)                                                                                          \
1307
  ret = checkSchema(pColSchema, pColExtSchema, fields, errstr, errstrLen);                                         \
1308
  if (ret != 0) {                                                                                                  \
1309
    goto end;                                                                                                      \
1310
  }                                                                                                                \
1311
                                                                                                                   \
1312
  if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {                                                          \
1313
    hasTs = true;                                                                                                  \
1314
  }                                                                                                                \
1315
                                                                                                                   \
1316
  int8_t* offset = pStart;                                                                                         \
1317
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {                                                                        \
1318
    pStart += numOfRows * sizeof(int32_t);                                                                         \
1319
  } else {                                                                                                         \
1320
    pStart += BitmapLen(numOfRows);                                                                                \
1321
  }                                                                                                                \
1322
  char* pData = pStart;                                                                                            \
1323
                                                                                                                   \
1324
  SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);                                                        \
1325
  if (hasBlob) {                                                                                                   \
1326
    ret = tColDataAddValueByDataBlockWithBlob(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData, \
1327
                                              pBlobSet);                                                           \
1328
  } else {                                                                                                         \
1329
    ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);        \
1330
  }                                                                                                                \
1331
  if (ret != 0) {                                                                                                  \
1332
    goto end;                                                                                                      \
1333
  }                                                                                                                \
1334
  fields += sizeof(int8_t) + sizeof(int32_t);                                                                      \
1335
  if (needChangeLength && version == BLOCK_VERSION_1) {                                                            \
1336
    pStart += htonl(colLength[i]);                                                                                 \
1337
  } else {                                                                                                         \
1338
    pStart += colLength[i];                                                                                        \
1339
  }                                                                                                                \
1340
  boundInfo->pColIndex[j] = -1;
1341

1342
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
60,492✔
1343
                     int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
1344
  int       ret = 0;
60,492✔
1345
  int8_t    hasBlob = 0;
60,492✔
1346
  SBlobSet* pBlobSet = NULL;
60,492✔
1347
  if (data == NULL) {
60,492✔
1348
    uError("rawBlockBindData, data is NULL");
×
1349
    return TSDB_CODE_APP_ERROR;
×
1350
  }
1351
  void* tmp =
1352
      taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
60,492✔
1353
  SVCreateTbReq* pCreateReqTmp = NULL;
60,492✔
1354
  if (tmp == NULL && pCreateTb != NULL) {
60,492✔
1355
    ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
5,725✔
1356
    if (ret != TSDB_CODE_SUCCESS) {
5,725✔
1357
      uError("cloneSVreateTbReq error");
×
1358
      goto end;
×
1359
    }
1360
  }
1361

1362
  STableDataCxt* pTableCxt = NULL;
60,492✔
1363
  ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
60,492✔
1364
                           sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
1365
  if (pCreateReqTmp != NULL) {
60,492✔
1366
    tdDestroySVCreateTbReq(pCreateReqTmp);
×
1367
    taosMemoryFree(pCreateReqTmp);
×
1368
  }
1369

1370
  hasBlob = pTableCxt->hasBlob;
60,492✔
1371
  if (hasBlob && pTableCxt->pData->pBlobSet == NULL) {
60,492✔
1372
    ret = tBlobSetCreate(512, 0, &pTableCxt->pData->pBlobSet);
×
1373
    if (pTableCxt->pData->pBlobSet == NULL) {
×
1374
      uError("create blob set failed");
×
1375
      ret = terrno;
×
1376
    }
1377
  }
1378

1379
  if (ret != TSDB_CODE_SUCCESS) {
60,492✔
1380
    uError("insGetTableDataCxt error");
×
1381
    goto end;
×
1382
  }
1383
  pBlobSet = pTableCxt->pData->pBlobSet;
60,492✔
1384

1385
  pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
60,492✔
1386
  if (tmp == NULL) {
60,492✔
1387
    ret = initTableColSubmitData(pTableCxt);
55,969✔
1388
    if (ret != TSDB_CODE_SUCCESS) {
55,969✔
1389
      uError("initTableColSubmitData error");
×
1390
      goto end;
×
1391
    }
1392
  }
1393

1394
  char* p = (char*)data;
60,492✔
1395
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1396
  // column length |
1397
  int32_t version = *(int32_t*)data;
60,492✔
1398
  p += sizeof(int32_t);
60,492✔
1399
  p += sizeof(int32_t);
60,492✔
1400

1401
  int32_t numOfRows = *(int32_t*)p;
60,492✔
1402
  p += sizeof(int32_t);
60,492✔
1403

1404
  int32_t numOfCols = *(int32_t*)p;
60,492✔
1405
  p += sizeof(int32_t);
60,492✔
1406

1407
  p += sizeof(int32_t);
60,492✔
1408
  p += sizeof(uint64_t);
60,492✔
1409

1410
  int8_t* fields = (int8_t*)p;
60,492✔
1411
  if (*fields >= TSDB_DATA_TYPE_MAX || *fields < 0) {
60,492✔
1412
    uError("fields type error:%d", *fields);
×
1413
    ret = TSDB_CODE_INVALID_PARA;
×
1414
    goto end;
×
1415
  }
1416
  p += numOfCols * (sizeof(int8_t) + sizeof(int32_t));
60,492✔
1417

1418
  int32_t* colLength = (int32_t*)p;
60,492✔
1419
  p += sizeof(int32_t) * numOfCols;
60,492✔
1420

1421
  char* pStart = p;
60,492✔
1422

1423
  SSchema*       pSchema = getTableColumnSchema(pTableMeta);
60,492✔
1424
  SSchemaExt*    pExtSchemas = getTableColumnExtSchema(pTableMeta);
60,492✔
1425
  SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
60,492✔
1426

1427
  if (tFields != NULL && numFields != numOfCols) {
60,492✔
1428
    if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to data cols:%d", numFields, numOfCols);
×
1429
    ret = TSDB_CODE_INVALID_PARA;
×
1430
    goto end;
×
1431
  }
1432

1433
  bool hasTs = false;
60,492✔
1434
  if (tFields == NULL) {
60,492✔
1435
    int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
2,058✔
1436
    for (int j = 0; j < len; j++) {
6,174✔
1437
      SSchema*    pColSchema = &pSchema[j];
4,802✔
1438
      SSchemaExt* pColExtSchema = &pExtSchemas[j];
4,802✔
1439
      PRCESS_DATA(j, j)
4,802✔
1440
    }
1441
  } else {
1442
    for (int i = 0; i < numFields; i++) {
297,528✔
1443
      for (int j = 0; j < boundInfo->numOfBound; j++) {
1,769,082✔
1444
        SSchema*    pColSchema = &pSchema[j];
1,769,082✔
1445
        SSchemaExt* pColExtSchema = &pExtSchemas[j];
1,769,082✔
1446
        char*       fieldName = NULL;
1,769,082✔
1447
        if (raw) {
1,769,082✔
1448
          fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
1,767,367✔
1449
        } else {
1450
          fieldName = ((TAOS_FIELD*)tFields)[i].name;
1,715✔
1451
        }
1452
        if (strcmp(pColSchema->name, fieldName) == 0) {
1,769,082✔
1453
          PRCESS_DATA(i, j)
239,094✔
1454
          break;
239,094✔
1455
        }
1456
      }
1457
    }
1458
  }
1459

1460
  if (!hasTs) {
59,806✔
1461
    if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
×
1462
    ret = TSDB_CODE_INVALID_PARA;
×
1463
    goto end;
×
1464
  }
1465

1466
  // process NULL data
1467
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
309,935✔
1468
    if (boundInfo->pColIndex[c] != -1) {
250,129✔
1469
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
7,262✔
1470
      ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
7,262✔
1471
      if (ret != 0) {
7,262✔
1472
        goto end;
×
1473
      }
1474
    } else {
1475
      boundInfo->pColIndex[c] = c;  // restore for next block
242,867✔
1476
    }
1477
  }
1478

1479
end:
60,492✔
1480
  return ret;
60,492✔
1481
}
1482

1483
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data) {
×
1484
  int code = transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion);
×
1485
  if (code != 0) {
×
1486
    return code;
×
1487
  }
1488
  SVgroupDataCxt* pVgCxt = NULL;
×
1489
  void**          pp = taosHashGet(pVgroupHash, &pTableMeta->vgId, sizeof(pTableMeta->vgId));
×
1490
  if (NULL == pp) {
×
1491
    code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt);
×
1492
    if (code != 0) {
×
1493
      return code;
×
1494
    }
1495
  } else {
1496
    pVgCxt = *(SVgroupDataCxt**)pp;
×
1497
  }
1498
  if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1499
    pVgCxt->pData->aSubmitTbData = taosArrayInit(0, POINTER_BYTES);
×
1500
    pVgCxt->pData->raw = true;
×
1501
    if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1502
      return terrno;
×
1503
    }
1504
  }
1505

1506
  // push data to submit, rebuild empty data for next submit
1507
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, &data)) {
×
1508
    return terrno;
×
1509
  }
1510

1511
  uTrace("add raw data to vgId:%d, len:%d", pTableMeta->vgId, *(int32_t*)data);
×
1512

1513
  return 0;
×
1514
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc