• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4913

06 Jan 2026 01:30AM UTC coverage: 64.884% (-0.004%) from 64.888%
#4913

push

travis-ci

web-flow
merge: from main to 3.0 branch #34167

180 of 319 new or added lines in 14 files covered. (56.43%)

571 existing lines in 128 files now uncovered.

195016 of 300563 relevant lines covered (64.88%)

117540852.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.06
/source/libs/parser/src/parInsertUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "parInsertUtil.h"
17

18
#include "catalog.h"
19
#include "parInt.h"
20
#include "parUtil.h"
21
#include "querynodes.h"
22
#include "tRealloc.h"
23
#include "taoserror.h"
24
#include "tarray.h"
25
#include "tdatablock.h"
26
#include "tdataformat.h"
27
#include "tmisce.h"
28
#include "ttypes.h"
29

30
void qDestroyBoundColInfo(void* pInfo) {
3,827,884✔
31
  if (NULL == pInfo) {
3,827,884✔
32
    return;
664,045✔
33
  }
34

35
  SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
3,163,839✔
36

37
  taosMemoryFreeClear(pBoundInfo->pColIndex);
3,163,839✔
38
}
39

40
static char* tableNameGetPosition(SToken* pToken, char target) {
588,121,060✔
41
  bool inEscape = false;
588,121,060✔
42
  bool inQuote = false;
588,121,060✔
43
  char quotaStr = 0;
588,121,060✔
44

45
  for (uint32_t i = 0; i < pToken->n; ++i) {
2,147,483,647✔
46
    if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
2,147,483,647✔
47
      return pToken->z + i;
207,106,363✔
48
    }
49

50
    if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
2,147,483,647✔
51
      if (!inQuote) {
21,988,196✔
52
        inEscape = !inEscape;
21,989,184✔
53
      }
54
    }
55

56
    if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
2,147,483,647✔
57
      if (!inEscape) {
6,110,184✔
58
        if (!inQuote) {
6,108,800✔
59
          quotaStr = *(pToken->z + i);
3,054,400✔
60
          inQuote = !inQuote;
3,054,400✔
61
        } else if (quotaStr == *(pToken->z + i)) {
3,054,400✔
62
          inQuote = !inQuote;
3,054,400✔
63
        }
64
      }
65
    }
66
  }
67

68
  return NULL;
381,024,887✔
69
}
70

71
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
588,120,710✔
72
  const char* msg1 = "table name is too long";
588,120,710✔
73
  const char* msg2 = "invalid database name";
588,120,710✔
74
  const char* msg3 = "db is not specified";
588,120,710✔
75
  const char* msg4 = "invalid table name";
588,120,710✔
76
  const char* msg5 = "database name is too long";
588,120,710✔
77

78
  int32_t code = TSDB_CODE_SUCCESS;
588,120,710✔
79
  char*   p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
588,120,710✔
80

81
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
588,132,965✔
82
    // before dbname dequote
83
    int32_t dbLen = p - pTableName->z;
207,108,093✔
84
    if (dbLen <= 0) {
207,104,226✔
85
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
86
    }
87
    if (dbLen >= TSDB_DB_FNAME_LEN + TSDB_NAME_QUOTE) {
207,104,226✔
88
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg5);
×
89
    }
90

91
    char name[TSDB_DB_FNAME_LEN + TSDB_NAME_QUOTE] = {0};
207,104,226✔
92
    strncpy(name, pTableName->z, dbLen);
207,099,534✔
93
    int32_t actualDbLen = strdequote(name);
207,103,870✔
94

95
    // after dbname dequote
96
    if (actualDbLen <= 0) {
207,099,875✔
97
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
47✔
98
    }
99
    if (actualDbLen >= TSDB_DB_NAME_LEN) {
207,099,828✔
100
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg5);
×
101
    }
102

103
    code = tNameSetDbName(pName, acctId, name, actualDbLen);
207,099,828✔
104
    if (code != TSDB_CODE_SUCCESS) {
207,105,435✔
105
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
106
    }
107

108
    // before tbname dequote
109
    int32_t tbLen = pTableName->n - dbLen - 1;
207,105,445✔
110
    if (tbLen <= 0) {
207,106,522✔
111
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
10,290✔
112
    }
113
    if (tbLen >= TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE) {
207,096,232✔
114
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
113✔
115
    }
116

117
    char tbname[TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE] = {0};
207,096,119✔
118
    strncpy(tbname, p + 1, tbLen);
207,098,526✔
119
    int32_t actualTbLen = strdequote(tbname);
207,097,970✔
120

121
    // after tbname dequote
122
    if (actualTbLen <= 0) {
207,106,427✔
123
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
34✔
124
    }
125
    if (actualTbLen >= TSDB_TABLE_NAME_LEN) {
207,106,430✔
126
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
113✔
127
    }
128

129
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
207,106,317✔
130
    if (code != 0) {
207,106,687✔
131
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
×
132
    }
133
  } else {  // get current DB name first, and then set it into path
134
    // before tbname dequote
135
    int32_t tbLen = pTableName->n;
381,024,872✔
136
    if (tbLen <= 0) {
381,022,046✔
137
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
×
138
    }
139

140
    if (tbLen >= TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE) {
381,023,111✔
141
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
113✔
142
    }
143

144
    char tbname[TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE] = {0};
381,022,998✔
145
    strncpy(tbname, pTableName->z, tbLen);
381,024,200✔
146
    int32_t actualTbLen = strdequote(tbname);
381,021,758✔
147
    // after tbname dequote
148
    if (actualTbLen <= 0) {
381,021,487✔
149
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
132✔
150
    }
151
    if (actualTbLen >= TSDB_TABLE_NAME_LEN) {
381,021,711✔
152
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
113✔
153
    }
154

155
    if (dbName == NULL || strlen(dbName) == 0) {
381,021,598✔
UNCOV
156
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_DB_NOT_SPECIFIED, msg3);
×
157
    }
158

159
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
381,024,722✔
160
    if (code != TSDB_CODE_SUCCESS) {
381,022,229✔
161
      code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
162
      return code;
×
163
    }
164

165
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
381,022,229✔
166
    if (code != 0) {
381,019,948✔
167
      code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
×
168
    }
169
  }
170

171
  if (NULL != strchr(pName->tname, '.')) {
588,125,621✔
172
    code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'");
×
173
  }
174

175
  return code;
588,126,597✔
176
}
177

178
int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchema) {
887,410,233✔
179
  while (start < end) {
2,147,483,647✔
180
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
2,147,483,647✔
181
      return start;
881,728,953✔
182
    }
183
    ++start;
2,147,483,647✔
184
  }
185
  return -1;
5,678,082✔
186
}
187

188
int32_t insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
11,103,780✔
189
                            SArray* tagName, uint8_t tagNum, int32_t ttl) {
190
  pTbReq->type = TD_CHILD_TABLE;
11,103,780✔
191
  pTbReq->ctb.pTag = (uint8_t*)pTag;
11,107,787✔
192
  pTbReq->name = taosStrdup(tname);
11,101,992✔
193
  if (!pTbReq->name) return terrno;
11,105,612✔
194
  pTbReq->ctb.suid = suid;
11,102,562✔
195
  pTbReq->ctb.tagNum = tagNum;
11,101,952✔
196
  if (sname) {
11,096,115✔
197
    pTbReq->ctb.stbName = taosStrdup(sname);
10,482,914✔
198
    if (!pTbReq->ctb.stbName) return terrno;
10,481,684✔
199
  }
200
  pTbReq->ctb.tagName = taosArrayDup(tagName, NULL);
11,096,715✔
201
  if (!pTbReq->ctb.tagName) return terrno;
11,105,969✔
202
  pTbReq->ttl = ttl;
11,102,634✔
203
  pTbReq->commentLen = -1;
11,102,024✔
204

205
  return TSDB_CODE_SUCCESS;
11,102,939✔
206
}
207

208
static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
×
209
  for (int32_t i = 0; i < ncols; ++i) {
×
210
    pBoundCols[i] = i;
×
211
  }
212
}
×
213

214
static int32_t initColValues(STableMeta* pTableMeta, SArray* pValues) {
536,417,088✔
215
  SSchema* pSchemas = getTableColumnSchema(pTableMeta);
536,417,088✔
216
  int32_t  code = 0;
536,421,959✔
217
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
2,147,483,647✔
218
    SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
2,147,483,647✔
219
    if (NULL == taosArrayPush(pValues, &val)) {
2,147,483,647✔
220
      code = terrno;
×
221
      break;
×
222
    }
223
  }
224
  return code;
536,431,375✔
225
}
226

227
int32_t insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { return initColValues(pTableMeta, aColValues); }
195,785✔
228

229
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
556,095,764✔
230
  pInfo->numOfCols = numOfBound;
556,095,764✔
231
  pInfo->numOfBound = numOfBound;
556,101,604✔
232
  pInfo->hasBoundCols = false;
556,102,573✔
233
  pInfo->mixTagsCols = false;
556,108,102✔
234
  pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t));
556,103,505✔
235
  if (NULL == pInfo->pColIndex) {
556,098,343✔
236
    return terrno;
×
237
  }
238
  for (int32_t i = 0; i < numOfBound; ++i) {
2,147,483,647✔
239
    pInfo->pColIndex[i] = i;
2,147,483,647✔
240
  }
241
  return TSDB_CODE_SUCCESS;
556,107,687✔
242
}
243

244
void insResetBoundColsInfo(SBoundColInfo* pInfo) {
2,418✔
245
  pInfo->numOfBound = pInfo->numOfCols;
2,418✔
246
  pInfo->hasBoundCols = false;
2,418✔
247
  pInfo->mixTagsCols = false;
2,418✔
248
  for (int32_t i = 0; i < pInfo->numOfCols; ++i) {
12,090✔
249
    pInfo->pColIndex[i] = i;
9,672✔
250
  }
251
}
2,418✔
252

253
void insCheckTableDataOrder(STableDataCxt* pTableCxt, SRowKey* rowKey) {
2,147,483,647✔
254
  // once the data block is disordered, we do NOT keep last timestamp any more
255
  if (!pTableCxt->ordered) {
2,147,483,647✔
256
    return;
20,535,955✔
257
  }
258

259
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) < 0) {
2,147,483,647✔
260
    pTableCxt->ordered = false;
1,338,478✔
261
  }
262

263
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) == 0) {
2,147,483,647✔
264
    pTableCxt->duplicateTs = true;
1,668,879✔
265
  }
266

267
  // TODO: for variable length data type, we need to copy it out
268
  pTableCxt->lastKey = *rowKey;
2,147,483,647✔
269
  return;
2,147,483,647✔
270
}
271

272
void insDestroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
1,602,586,567✔
273

274
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput,
538,884,280✔
275
                                  bool colMode, bool ignoreColVals) {
276
  STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
538,884,280✔
277
  if (NULL == pTableCxt) {
538,871,975✔
278
    *pOutput = NULL;
×
279
    return terrno;
×
280
  }
281

282
  int32_t code = TSDB_CODE_SUCCESS;
538,871,975✔
283

284
  pTableCxt->lastKey = (SRowKey){0};
538,871,975✔
285
  pTableCxt->ordered = true;
538,874,070✔
286
  pTableCxt->duplicateTs = false;
538,878,369✔
287

288
  pTableCxt->pMeta = tableMetaDup(pTableMeta);
538,878,075✔
289
  if (NULL == pTableCxt->pMeta) {
538,895,740✔
290
    code = TSDB_CODE_OUT_OF_MEMORY;
×
291
  }
292
  if (TSDB_CODE_SUCCESS == code) {
538,897,435✔
293
    pTableCxt->pSchema =
538,894,595✔
294
        tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
538,897,207✔
295
    if (NULL == pTableCxt->pSchema) {
538,892,377✔
296
      code = TSDB_CODE_OUT_OF_MEMORY;
×
297
    }
298
  }
299
  pTableCxt->hasBlob = schemaHasBlob(pTableCxt->pSchema);
538,896,758✔
300

301
  if (TSDB_CODE_SUCCESS == code) {
538,892,617✔
302
    code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo);
538,893,384✔
303
  }
304
  if (TSDB_CODE_SUCCESS == code && !ignoreColVals) {
538,897,949✔
305
    pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal));
536,226,354✔
306
    if (NULL == pTableCxt->pValues) {
536,218,823✔
307
      code = terrno;
×
308
    } else {
309
      code = initColValues(pTableMeta, pTableCxt->pValues);
536,219,506✔
310
    }
311
  }
312
  if (TSDB_CODE_SUCCESS == code) {
538,901,039✔
313
    pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
538,902,313✔
314
    if (NULL == pTableCxt->pData) {
538,888,427✔
315
      code = terrno;
×
316
    } else {
317
      pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
538,895,978✔
318
      pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
538,897,832✔
319
      pTableCxt->pData->suid = pTableMeta->suid;
538,897,576✔
320
      pTableCxt->pData->uid = pTableMeta->uid;
538,894,722✔
321
      pTableCxt->pData->sver = pTableMeta->sversion;
538,894,588✔
322
      pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
538,895,667✔
323
      int8_t flag = pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT;
538,888,917✔
324
      if (pCreateTbReq != NULL) *pCreateTbReq = NULL;
538,888,498✔
325
      if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
538,885,544✔
326
        pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
3,191,836✔
327
        if (NULL == pTableCxt->pData->aCol) {
3,188,225✔
328
          code = terrno;
×
329
        }
330
      } else {
331
        pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
535,698,206✔
332
        if (NULL == pTableCxt->pData->aRowP) {
535,702,297✔
333
          code = terrno;
×
334
        }
335
      }
336
    }
337
  }
338
  if (TSDB_CODE_SUCCESS == code) {
538,893,998✔
339
    *pOutput = pTableCxt;
538,893,998✔
340
    parserDebug("uid:%" PRId64 ", create table data context, code:%d, vgId:%d", pTableMeta->uid, code,
538,892,160✔
341
                pTableMeta->vgId);
342
  } else {
343
    insDestroyTableDataCxt(pTableCxt);
×
344
  }
345

346
  return code;
538,895,607✔
347
}
348

349
static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst, int8_t hasBlob) {
3,817,017✔
350
  int32_t        code = TSDB_CODE_SUCCESS;
3,817,017✔
351
  SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData));
3,817,017✔
352
  if (NULL == pTmp) {
3,817,474✔
353
    code = terrno;
×
354
  } else {
355
    pTmp->flags = pSrc->flags;
3,817,474✔
356
    pTmp->suid = pSrc->suid;
3,817,474✔
357
    pTmp->uid = pSrc->uid;
3,817,474✔
358
    pTmp->sver = pSrc->sver;
3,817,066✔
359
    pTmp->pCreateTbReq = NULL;
3,817,066✔
360
    if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
3,816,761✔
361
      if (pSrc->pCreateTbReq) {
3,677,550✔
362
        code = cloneSVreateTbReq(pSrc->pCreateTbReq, &pTmp->pCreateTbReq);
3,675,720✔
363
      } else {
364
        pTmp->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
×
365
      }
366
    }
367
    if (TSDB_CODE_SUCCESS == code) {
3,817,425✔
368
      if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
3,817,425✔
369
        pTmp->aCol = taosArrayInit(128, sizeof(SColData));
3,194,492✔
370
        if (NULL == pTmp->aCol) {
3,195,444✔
371
          code = terrno;
×
372
          taosMemoryFree(pTmp);
×
373
        }
374
      } else {
375
        pTmp->aRowP = taosArrayInit(128, POINTER_BYTES);
622,830✔
376
        if (NULL == pTmp->aRowP) {
622,513✔
377
          code = terrno;
×
378
          taosMemoryFree(pTmp);
×
379
        }
380

381
        if (code != 0) {
622,513✔
382
          taosArrayDestroy(pTmp->aRowP);
×
383
          taosMemoryFree(pTmp);
×
384
        }
385
      }
386

387
    } else {
388
      taosMemoryFree(pTmp);
×
389
    }
390
  }
391

392
  taosMemoryFree(pSrc);
3,817,701✔
393
  if (TSDB_CODE_SUCCESS == code) {
3,817,730✔
394
    *pDst = pTmp;
3,817,388✔
395
  }
396

397
  return code;
3,817,730✔
398
}
399

400
static void resetColValues(SArray* pValues) {
31,499,512✔
401
  int32_t num = taosArrayGetSize(pValues);
31,499,512✔
402
  for (int32_t i = 0; i < num; ++i) {
354,560,002✔
403
    SColVal* pVal = taosArrayGet(pValues, i);
323,060,490✔
404
    pVal->flag = CV_FLAG_NONE;
323,060,490✔
405
  }
406
}
31,499,512✔
407

408
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
1,537,724,690✔
409
                           SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals) {
410
  STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
1,537,724,690✔
411
  if (NULL != tmp) {
1,537,740,652✔
412
    *pTableCxt = *tmp;
998,843,691✔
413
    if (!ignoreColVals) {
998,843,691✔
414
      resetColValues((*pTableCxt)->pValues);
31,499,512✔
415
    }
416
    return TSDB_CODE_SUCCESS;
998,843,691✔
417
  }
418
  int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals);
538,896,961✔
419
  if (TSDB_CODE_SUCCESS == code) {
538,897,669✔
420
    void* pData = *pTableCxt;  // deal scan coverity
538,898,360✔
421
    code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
538,899,012✔
422
  }
423

424
  if (TSDB_CODE_SUCCESS != code) {
538,900,270✔
425
    insDestroyTableDataCxt(*pTableCxt);
×
426
  }
427
  return code;
538,900,181✔
428
}
429

430
static void destroyColVal(void* p) {
2,147,483,647✔
431
  SColVal* pVal = p;
2,147,483,647✔
432
  if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type ||
2,147,483,647✔
433
      TSDB_DATA_TYPE_VARBINARY == pVal->value.type || TSDB_DATA_TYPE_DECIMAL == pVal->value.type) {
2,147,483,647✔
434
    taosMemoryFreeClear(pVal->value.pData);
841,784,649✔
435
  }
436
}
2,147,483,647✔
437

438
void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
541,878,050✔
439
  if (NULL == pTableCxt) {
541,878,050✔
440
    return;
×
441
  }
442

443
  taosMemoryFreeClear(pTableCxt->pMeta);
541,878,050✔
444
  tDestroyTSchema(pTableCxt->pSchema);
541,876,881✔
445
  insDestroyBoundColInfo(&pTableCxt->boundColsInfo);
541,875,711✔
446
  taosArrayDestroyEx(pTableCxt->pValues, destroyColVal);
541,873,050✔
447
  if (pTableCxt->pData) {
541,875,659✔
448
    tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
7,478,770✔
449
    taosMemoryFree(pTableCxt->pData);
7,477,270✔
450
  }
451
  taosMemoryFree(pTableCxt);
541,872,811✔
452
}
453

454
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
500,417,398✔
455
  if (NULL == pVgCxt) {
500,417,398✔
456
    return;
×
457
  }
458

459
  tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
500,417,398✔
460
  taosMemoryFree(pVgCxt->pData);
500,410,673✔
461

462
  taosMemoryFree(pVgCxt);
500,412,357✔
463
}
464

465
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
972,340,031✔
466
  if (NULL == pVgCxtList) {
972,340,031✔
467
    return;
486,240,734✔
468
  }
469

470
  size_t size = taosArrayGetSize(pVgCxtList);
486,099,297✔
471
  for (int32_t i = 0; i < size; i++) {
986,509,558✔
472
    void* p = taosArrayGetP(pVgCxtList, i);
500,419,969✔
473
    insDestroyVgroupDataCxt(p);
500,420,598✔
474
  }
475

476
  taosArrayDestroy(pVgCxtList);
486,089,589✔
477
}
478

479
void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) {
×
480
  if (NULL == pVgCxtHash) {
×
481
    return;
×
482
  }
483

484
  void** p = taosHashIterate(pVgCxtHash, NULL);
×
485
  while (p) {
×
486
    insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p);
×
487

488
    p = taosHashIterate(pVgCxtHash, p);
×
489
  }
490

491
  taosHashCleanup(pVgCxtHash);
×
492
}
493

494
void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
486,963,666✔
495
  if (NULL == pTableCxtHash) {
486,963,666✔
496
    return;
3,164,826✔
497
  }
498

499
  void** p = taosHashIterate(pTableCxtHash, NULL);
483,798,840✔
500
  while (p) {
1,019,450,259✔
501
    insDestroyTableDataCxt(*(STableDataCxt**)p);
535,649,645✔
502

503
    p = taosHashIterate(pTableCxtHash, p);
535,649,238✔
504
  }
505

506
  taosHashCleanup(pTableCxtHash);
483,800,614✔
507
}
508

509
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) {
539,982,772✔
510
  int32_t code = 0;
539,982,772✔
511
  if (NULL == pVgCxt->pData->aSubmitTbData) {
539,982,772✔
512
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
500,030,770✔
513
    if (pVgCxt->pData->aSubmitTbData == NULL) {
500,033,423✔
514
      return terrno;
×
515
    }
516
    if (pTableCxt->hasBlob) {
500,031,682✔
517
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
21,222✔
518
      if (NULL == pVgCxt->pData->aSubmitBlobData) {
21,222✔
519
        return terrno;
×
520
      }
521
    }
522
  }
523

524
  // push data to submit, rebuild empty data for next submit
525
  if (!pTableCxt->hasBlob) pTableCxt->pData->pBlobSet = NULL;
539,987,078✔
526

527
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData)) {
1,079,971,721✔
528
    return terrno;
×
529
  }
530

531
  if (pTableCxt->hasBlob) {
539,987,100✔
532
    parserDebug("blob row transfer %p, pData %p, %s", pTableCxt->pData->pBlobSet, pTableCxt->pData, __func__);
21,222✔
533
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTableCxt->pData->pBlobSet)) {
42,444✔
534
      return terrno;
×
535
    }
536
    pTableCxt->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
21,222✔
537
  }
538

539
  if (isRebuild) {
539,982,031✔
540
    code = rebuildTableData(pTableCxt->pData, &pTableCxt->pData, pTableCxt->hasBlob);
3,816,980✔
541
  } else if (clear) {
536,165,051✔
542
    taosMemoryFreeClear(pTableCxt->pData);
534,475,297✔
543
  }
544
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
539,984,356✔
545

546
  return code;
539,983,870✔
547
}
548

549
static int32_t createVgroupDataCxt(int32_t vgId, SHashObj* pVgroupHash, SArray* pVgroupList, SVgroupDataCxt** pOutput) {
500,504,433✔
550
  SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
500,504,433✔
551
  if (NULL == pVgCxt) {
500,505,078✔
552
    return terrno;
×
553
  }
554
  pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2));
500,505,078✔
555
  if (NULL == pVgCxt->pData) {
500,505,421✔
556
    insDestroyVgroupDataCxt(pVgCxt);
×
557
    return terrno;
×
558
  }
559

560
  pVgCxt->vgId = vgId;
500,505,234✔
561
  int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
500,504,392✔
562
  if (TSDB_CODE_SUCCESS == code) {
500,504,366✔
563
    if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) {
500,506,005✔
564
      code = terrno;
×
565
      insDestroyVgroupDataCxt(pVgCxt);
×
566
      return code;
×
567
    }
568
    //    uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
569
    *pOutput = pVgCxt;
500,506,005✔
570
  } else {
571
    insDestroyVgroupDataCxt(pVgCxt);
×
572
  }
573
  return code;
500,505,445✔
574
}
575

576
int insColDataComp(const void* lp, const void* rp) {
14,340,058✔
577
  SColData* pLeft = (SColData*)lp;
14,340,058✔
578
  SColData* pRight = (SColData*)rp;
14,340,058✔
579
  if (pLeft->cid < pRight->cid) {
14,340,058✔
580
    return -1;
14,295,088✔
581
  } else if (pLeft->cid > pRight->cid) {
43,581✔
582
    return 1;
43,470✔
583
  }
584

585
  return 0;
111✔
586
}
587

588
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId,
62,004✔
589
                                 STableColsData* pTbData, SName* sname) {
590
  if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) {
62,004✔
591
    return TSDB_CODE_SUCCESS;
51,315✔
592
  }
593

594
  SVgroupInfo      vgInfo = {0};
10,690✔
595
  SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
21,192✔
596
                           .requestId = pBuildInfo->requestId,
10,690✔
597
                           .requestObjRefId = pBuildInfo->requestSelf,
10,690✔
598
                           .mgmtEps = pBuildInfo->mgmtEpSet};
599

600
  int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo);
10,690✔
601
  if (TSDB_CODE_SUCCESS != code) {
10,690✔
602
    return code;
×
603
  }
604

605
  code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
10,690✔
606
  if (TSDB_CODE_SUCCESS != code) {
10,690✔
607
    return code;
×
608
  }
609

610
  return TSDB_CODE_SUCCESS;
10,690✔
611
}
612

613
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData,
2,559,285✔
614
                             uint64_t* uid, int32_t* vgId, uint64_t* suid) {
615
  STableVgUid* pTbInfo = NULL;
2,559,285✔
616
  int32_t      code = 0;
2,559,285✔
617

618
  if (pTbData->getFromHash) {
2,559,285✔
619
    pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName));
2,497,801✔
620
  }
621

622
  if (NULL == pTbInfo) {
2,559,598✔
623
    SName sname;
49,789✔
624
    code = qCreateSName2(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0);
62,375✔
625
    if (TSDB_CODE_SUCCESS != code) {
62,394✔
626
      return code;
390✔
627
    }
628

629
    STableMeta*      pTableMeta = NULL;
62,394✔
630
    SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
112,182✔
631
                             .requestId = pBuildInfo->requestId,
62,393✔
632
                             .requestObjRefId = pBuildInfo->requestSelf,
62,392✔
633
                             .mgmtEps = pBuildInfo->mgmtEpSet};
634
    code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta);
62,394✔
635

636
    if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
62,342✔
637
      parserWarn("stmt2 async bind don't find table:%s.%s, try auto create table", sname.dbname, sname.tname);
390✔
638
      return code;
390✔
639
    }
640

641
    if (TSDB_CODE_SUCCESS != code) {
61,952✔
642
      parserError("stmt2 async get table meta:%s.%s failed, code:%d", sname.dbname, sname.tname, code);
×
643
      return code;
×
644
    }
645

646
    *uid = pTableMeta->uid;
61,952✔
647
    *vgId = pTableMeta->vgId;
61,995✔
648
    *suid = pTableMeta->suid;
61,992✔
649

650
    STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId, .suid = *suid};
61,909✔
651
    code = tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo));
61,864✔
652
    if (TSDB_CODE_SUCCESS == code) {
61,962✔
653
      code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname);
62,003✔
654
    }
655

656
    taosMemoryFree(pTableMeta);
61,964✔
657
  } else {
658
    *uid = pTbInfo->uid;
2,497,223✔
659
    *vgId = pTbInfo->vgid;
2,497,302✔
660
    *suid = pTbInfo->suid;
2,497,302✔
661
  }
662

663
  return code;
2,559,290✔
664
}
665

666
int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
×
667
  int32_t             code = TSDB_CODE_SUCCESS;
×
668
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
×
669

670
  if (TSDB_CODE_SUCCESS == code) {
×
671
    code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
×
672
  }
673

674
  return code;
×
675
}
676

677
int32_t checkAndMergeSVgroupDataCxtByTbname(STableDataCxt* pTbCtx, SVgroupDataCxt* pVgCxt, SSHashObj* pTableNameHash,
867,461✔
678
                                            char* tbname) {
679
  if (NULL == pVgCxt->pData->aSubmitTbData) {
867,461✔
680
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
473,702✔
681
    if (NULL == pVgCxt->pData->aSubmitTbData) {
473,723✔
682
      return terrno;
×
683
    }
684
    if (pTbCtx->hasBlob) {
473,759✔
685
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
×
686
      if (pVgCxt->pData->aSubmitBlobData == NULL) {
×
687
        return terrno;
×
688
      }
689
    }
690
  }
691

692
  int32_t  code = TSDB_CODE_SUCCESS;
867,513✔
693
  SArray** rowP = NULL;
867,513✔
694

695
  rowP = (SArray**)tSimpleHashGet(pTableNameHash, tbname, strlen(tbname));
867,513✔
696

697
  if (rowP != NULL && *rowP != NULL) {
867,635✔
698
    for (int32_t j = 0; j < taosArrayGetSize(*rowP); ++j) {
×
699
      SRow* pRow = (SRow*)taosArrayGetP(pTbCtx->pData->aRowP, j);
×
700
      if (pRow) {
×
701
        if (NULL == taosArrayPush(*rowP, &pRow)) {
×
702
          return terrno;
×
703
        }
704
      }
705

706
      if (pTbCtx->hasBlob == 0) {
×
707
        code = tRowSort(*rowP);
×
708
        TAOS_CHECK_RETURN(code);
×
709

710
        code = tRowMerge(*rowP, pTbCtx->pSchema, 0);
×
711
        TAOS_CHECK_RETURN(code);
×
712
      } else {
713
        code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
714
        TAOS_CHECK_RETURN(code);
×
715

716
        code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
717
        TAOS_CHECK_RETURN(code);
×
718
      }
719
    }
720

721
    parserDebug("merge same uid data: %" PRId64 ", vgId:%d", pTbCtx->pData->uid, pVgCxt->vgId);
×
722

723
    if (pTbCtx->pData->pCreateTbReq != NULL) {
×
724
      tdDestroySVCreateTbReq(pTbCtx->pData->pCreateTbReq);
×
725
      taosMemoryFree(pTbCtx->pData->pCreateTbReq);
×
726
      pTbCtx->pData->pCreateTbReq = NULL;
×
727
    }
728
    return TSDB_CODE_SUCCESS;
×
729
  }
730

731
  if (pTbCtx->hasBlob == 0) {
867,635✔
732
    pTbCtx->pData->pBlobSet = NULL;  // if no blob, set it to NULL
867,611✔
733
  }
734

735
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTbCtx->pData)) {
1,735,256✔
736
    return terrno;
×
737
  }
738

739
  if (pTbCtx->hasBlob) {
867,621✔
740
    parserDebug("blob row transfer %p, pData %p, %s", pTbCtx->pData->pBlobSet, pTbCtx->pData, __func__);
×
741
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTbCtx->pData->pBlobSet)) {
×
742
      return terrno;
×
743
    }
744
    pTbCtx->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
×
745
  }
746

747
  code = tSimpleHashPut(pTableNameHash, tbname, strlen(tbname), &pTbCtx->pData->aRowP, sizeof(SArray*));
867,621✔
748

749
  if (code != TSDB_CODE_SUCCESS) {
867,553✔
750
    return code;
×
751
  }
752

753
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTbCtx->pMeta->uid, pVgCxt->vgId);
867,553✔
754

755
  return TSDB_CODE_SUCCESS;
867,563✔
756
}
757

758
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
1,691,578✔
759
                                  SStbInterlaceInfo* pBuildInfo) {
760
  int32_t  code = TSDB_CODE_SUCCESS;
1,691,578✔
761
  uint64_t uid;
1,679,916✔
762
  int32_t  vgId;
1,680,131✔
763
  uint64_t suid;
1,680,174✔
764

765
  pTbCtx->pData->aRowP = pTbData->aCol;
1,691,922✔
766

767
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId, &suid);
1,692,481✔
768
  if (TSDB_CODE_SUCCESS != code) {
1,691,953✔
769
    return code;
×
770
  }
771

772
  pTbCtx->pMeta->vgId = vgId;
1,691,953✔
773
  pTbCtx->pMeta->uid = uid;
1,691,912✔
774
  pTbCtx->pData->uid = uid;
1,692,041✔
775

776
  if (!pTbCtx->ordered) {
1,692,127✔
777
    code = tRowSort(pTbCtx->pData->aRowP);
10✔
778
  }
779
  if (code == TSDB_CODE_SUCCESS && (!pTbCtx->ordered || pTbCtx->duplicateTs)) {
1,692,084✔
780
    code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0);
53✔
781
  }
782

783
  if (TSDB_CODE_SUCCESS != code) {
1,691,729✔
784
    return code;
×
785
  }
786

787
  SVgroupDataCxt* pVgCxt = NULL;
1,691,729✔
788
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
1,691,815✔
789
  if (NULL == pp) {
1,692,125✔
790
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
296,499✔
791
    if (NULL == pp) {
296,455✔
792
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
296,455✔
793
    } else {
794
      pVgCxt = *(SVgroupDataCxt**)pp;
×
795
    }
796
  } else {
797
    pVgCxt = *(SVgroupDataCxt**)pp;
1,395,626✔
798
  }
799

800
  if (TSDB_CODE_SUCCESS == code) {
1,692,178✔
801
    code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false);
1,692,178✔
802
  }
803

804
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
1,691,357✔
805
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
806
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
807
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
808
    // insDestroyVgroupDataCxt(pVgCxt);
809
  }
810

811
  return code;
1,691,489✔
812
}
813

814
int32_t insAppendStmt2TableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
867,339✔
815
                                   SStbInterlaceInfo* pBuildInfo, SVCreateTbReq* ctbReq) {
816
  int32_t  code = TSDB_CODE_SUCCESS;
867,339✔
817
  uint64_t uid;
278,501✔
818
  int32_t  vgId;
278,501✔
819
  uint64_t suid;
278,501✔
820

821
  pTbCtx->pData->aRowP = pTbData->aCol;
867,339✔
822
  pTbCtx->pData->pBlobSet = pTbData->pBlobSet;
867,411✔
823

824
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId, &suid);
867,597✔
825
  if (ctbReq != NULL && code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
867,559✔
826
    pTbCtx->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
390✔
827
    vgId = (int32_t)ctbReq->uid;
390✔
828
    uid = 0;
390✔
829
    pTbCtx->pMeta->vgId = (int32_t)ctbReq->uid;
390✔
830
    ctbReq->uid = 0;
390✔
831
    pTbCtx->pMeta->uid = 0;
390✔
832
    pTbCtx->pData->uid = 0;
390✔
833
    pTbCtx->pData->pCreateTbReq = ctbReq;
390✔
834
    code = TSDB_CODE_SUCCESS;
390✔
835
  } else {
836
    if (TSDB_CODE_SUCCESS != code) {
867,169✔
837
      return code;
×
838
    }
839
    if (pTbCtx->pData->suid != suid) {
867,169✔
840
      return TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
841
    }
842

843
    pTbCtx->pMeta->vgId = vgId;
867,159✔
844
    pTbCtx->pMeta->uid = uid;
867,205✔
845
    pTbCtx->pData->uid = uid;
867,205✔
846
    pTbCtx->pData->pCreateTbReq = NULL;
867,200✔
847

848
    if (ctbReq != NULL) {
867,241✔
849
      tdDestroySVCreateTbReq(ctbReq);
850
      taosMemoryFree(ctbReq);
499,670✔
851
      ctbReq = NULL;
499,650✔
852
    }
853
  }
854

855
  if (pTbCtx->hasBlob == 0) {
867,681✔
856
    if (!pTbData->isOrdered) {
867,507✔
857
      code = tRowSort(pTbCtx->pData->aRowP);
×
858
    }
859
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
867,475✔
860
      code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, PREFER_NON_NULL);
×
861
    }
862
  } else {
863
    if (!pTbData->isOrdered) {
20✔
864
      code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
865
    }
866
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
20✔
867
      code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
868
    }
869
  }
870

871
  if (TSDB_CODE_SUCCESS != code) {
867,451✔
872
    return code;
×
873
  }
874

875
  SVgroupDataCxt* pVgCxt = NULL;
867,451✔
876
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
867,493✔
877
  if (NULL == pp) {
867,361✔
878
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
473,491✔
879
    if (NULL == pp) {
473,543✔
880
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
473,543✔
881
    } else {
882
      pVgCxt = *(SVgroupDataCxt**)pp;
×
883
    }
884
  } else {
885
    pVgCxt = *(SVgroupDataCxt**)pp;
393,870✔
886
  }
887

888
  if (code == TSDB_CODE_SUCCESS) {
867,539✔
889
    code = checkAndMergeSVgroupDataCxtByTbname(pTbCtx, pVgCxt, pBuildInfo->pTableRowDataHash, pTbData->tbName);
867,529✔
890
  }
891

892
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
867,533✔
893
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
894
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
895
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
896
    // insDestroyVgroupDataCxt(pVgCxt);
897
  }
898

899
  return code;
867,407✔
900
}
901

902
/*
903
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild,
904
int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
905
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
906
  if (NULL == pVgroupHash || NULL == pVgroupList) {
907
    taosHashCleanup(pVgroupHash);
908
    taosArrayDestroy(pVgroupList);
909
    return TSDB_CODE_OUT_OF_MEMORY;
910
  }
911

912
  int32_t code = TSDB_CODE_SUCCESS;
913

914
  for (int32_t i = 0; i < tbNum; ++i) {
915
    STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i);
916
    pTableCxt->pMeta->vgId = pTableCols->vgId;
917
    pTableCxt->pMeta->uid = pTableCols->uid;
918
    pTableCxt->pData->uid = pTableCols->uid;
919
    pTableCxt->pData->aCol = pTableCols->aCol;
920

921
    SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
922
    if (pCol->nVal <= 0) {
923
      continue;
924
    }
925

926
    if (pTableCxt->pData->pCreateTbReq) {
927
      pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
928
    }
929

930
    taosArraySort(pTableCxt->pData->aCol, insColDataComp);
931

932
    tColDataSortMerge(pTableCxt->pData->aCol);
933

934
    if (TSDB_CODE_SUCCESS == code) {
935
      SVgroupDataCxt* pVgCxt = NULL;
936
      int32_t         vgId = pTableCxt->pMeta->vgId;
937
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
938
      if (NULL == pp) {
939
        code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
940
      } else {
941
        pVgCxt = *(SVgroupDataCxt**)pp;
942
      }
943
      if (TSDB_CODE_SUCCESS == code) {
944
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false);
945
      }
946
    }
947
  }
948

949
  taosHashCleanup(pVgroupHash);
950
  if (TSDB_CODE_SUCCESS == code) {
951
    *pVgDataBlocks = pVgroupList;
952
  } else {
953
    insDestroyVgroupDataCxtList(pVgroupList);
954
  }
955

956
  return code;
957
}
958
*/
959

960
static int8_t colDataHasBlob(SColData* pCol) {
×
961
  if (IS_STR_DATA_BLOB(pCol->type)) {
×
962
    return 1;
×
963
  }
964
  return 0;
×
965
}
966
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
485,482,662✔
967
  SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
485,482,662✔
968
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
485,482,959✔
969
  if (NULL == pVgroupHash || NULL == pVgroupList) {
485,480,501✔
970
    taosHashCleanup(pVgroupHash);
1,162✔
971
    taosArrayDestroy(pVgroupList);
×
972
    return terrno;
×
973
  }
974

975
  int32_t code = TSDB_CODE_SUCCESS;
485,479,339✔
976
  bool    colFormat = false;
485,479,339✔
977

978
  void* p = taosHashIterate(pTableHash, NULL);
485,479,339✔
979
  if (p) {
485,487,090✔
980
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
485,487,484✔
981
    colFormat = (0 != (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT));
485,486,762✔
982
  }
983

984
  while (TSDB_CODE_SUCCESS == code && NULL != p) {
1,023,783,493✔
985
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
538,296,744✔
986
    if (colFormat) {
538,297,235✔
987
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
3,196,302✔
988
      if (pCol && pCol->nVal <= 0) {
3,196,504✔
989
        p = taosHashIterate(pTableHash, p);
108✔
990
        continue;
108✔
991
      }
992

993
      if (pTableCxt->pData->pCreateTbReq) {
3,196,091✔
994
        pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
3,066,205✔
995
      }
996
      int8_t isBlob = IS_STR_DATA_BLOB(pCol->type) ? 1 : 0;
3,195,786✔
997
      if (isBlob == 0) {
3,194,327✔
998
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
3,195,139✔
999
        code = tColDataSortMerge(&pTableCxt->pData->aCol);
3,196,054✔
1000
      } else {
1001
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
×
1002
        code = tColDataSortMergeWithBlob(&pTableCxt->pData->aCol, pTableCxt->pData->pBlobSet);
×
1003
      }
1004
    } else {
1005
      // skip the table has no data to insert
1006
      // eg: import a csv without valid data
1007
      // if (0 == taosArrayGetSize(pTableCxt->pData->aRowP)) {
1008
      //   parserWarn("no row in tableDataCxt uid:%" PRId64 " ", pTableCxt->pMeta->uid);
1009
      //   p = taosHashIterate(pTableHash, p);
1010
      //   continue;
1011
      // }
1012
      if (pTableCxt->hasBlob == 0) {
535,100,933✔
1013
        if (!pTableCxt->ordered) {
535,078,620✔
1014
          code = tRowSort(pTableCxt->pData->aRowP);
1,337,747✔
1015
        }
1016
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
535,078,460✔
1017
          code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, PREFER_NON_NULL);
1,462,631✔
1018
        }
1019
      } else {
1020
        if (!pTableCxt->ordered) {
21,222✔
1021
          code = tRowSortWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet);
731✔
1022
        }
1023
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
21,222✔
1024
          code = tRowMergeWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet, 0);
731✔
1025
        }
1026
      }
1027
    }
1028

1029
    if (TSDB_CODE_SUCCESS == code) {
538,294,934✔
1030
      SVgroupDataCxt* pVgCxt = NULL;
538,294,934✔
1031
      int32_t         vgId = pTableCxt->pMeta->vgId;
538,294,941✔
1032
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
538,295,270✔
1033
      if (NULL == pp) {
538,294,114✔
1034
        code = createVgroupDataCxt(vgId, pVgroupHash, pVgroupList, &pVgCxt);
499,734,666✔
1035
      } else {
1036
        pVgCxt = *(SVgroupDataCxt**)pp;
38,559,448✔
1037
      }
1038
      if (TSDB_CODE_SUCCESS == code) {
538,293,882✔
1039
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true);
538,293,670✔
1040
      }
1041
    }
1042
    if (TSDB_CODE_SUCCESS == code) {
538,293,972✔
1043
      p = taosHashIterate(pTableHash, p);
538,293,972✔
1044
    }
1045
  }
1046

1047
  taosHashCleanup(pVgroupHash);
485,486,749✔
1048
  if (TSDB_CODE_SUCCESS == code) {
485,482,424✔
1049
    *pVgDataBlocks = pVgroupList;
485,482,893✔
1050
  } else {
UNCOV
1051
    insDestroyVgroupDataCxtList(pVgroupList);
×
1052
  }
1053

1054
  return code;
485,484,384✔
1055
}
1056

1057
static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
500,501,250✔
1058
  int32_t  code = TSDB_CODE_SUCCESS;
500,501,250✔
1059
  uint32_t len = 0;
500,501,250✔
1060
  void*    pBuf = NULL;
500,501,250✔
1061
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
500,501,250✔
1062
  if (TSDB_CODE_SUCCESS == code) {
500,502,365✔
1063
    SEncoder encoder;
497,763,357✔
1064
    len += sizeof(SSubmitReq2Msg);
500,503,016✔
1065
    pBuf = taosMemoryMalloc(len);
500,503,016✔
1066
    if (NULL == pBuf) {
500,498,148✔
1067
      return terrno;
×
1068
    }
1069
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
500,498,148✔
1070
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
500,502,810✔
1071
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
500,503,987✔
1072
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
500,505,695✔
1073
    code = tEncodeSubmitReq(&encoder, pReq);
500,504,460✔
1074
    tEncoderClear(&encoder);
500,506,494✔
1075
  }
1076

1077
  if (TSDB_CODE_SUCCESS == code) {
500,506,358✔
1078
    *pData = pBuf;
500,506,358✔
1079
    *pLen = len;
500,505,723✔
1080
  } else {
1081
    taosMemoryFree(pBuf);
×
1082
  }
1083
  return code;
500,498,879✔
1084
}
1085

1086
static void destroyVgDataBlocks(void* p) {
×
1087
  if (p == NULL) return;
×
1088
  SVgDataBlocks* pVg = p;
×
1089
  taosMemoryFree(pVg->pData);
×
1090
  taosMemoryFree(pVg);
×
1091
}
1092

1093
int32_t insResetBlob(SSubmitReq2* p) {
500,501,966✔
1094
  int32_t code = 0;
500,501,966✔
1095
  if (p->raw) {
500,501,966✔
1096
    return TSDB_CODE_SUCCESS;  // no blob data in raw mode
×
1097
  }
1098

1099
  if (p->aSubmitBlobData != NULL) {
500,504,506✔
1100
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
42,444✔
1101
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
21,222✔
1102
      SBlobSet**     ppBlob = taosArrayGet(p->aSubmitBlobData, i);
21,222✔
1103
      SBlobSet*      pBlob = ppBlob ? *ppBlob : NULL;
21,222✔
1104
      int32_t        nrow = taosArrayGetSize(pSubmitTbData->aRowP);
21,222✔
1105
      int32_t        nblob = 0;
21,222✔
1106
      if (nrow > 0 && pBlob) {
21,222✔
1107
        nblob = taosArrayGetSize(pBlob->pSeqTable);
21,222✔
1108
      }
1109
      uTrace("blob %p row size %d, pData size %d", pBlob, nblob, nrow);
21,222✔
1110
      pSubmitTbData->pBlobSet = pBlob;
21,222✔
1111
      if (ppBlob != NULL) *ppBlob = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
21,222✔
1112
    }
1113
  } else {
1114
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
1,041,314,339✔
1115
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
540,829,687✔
1116
      pSubmitTbData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
540,832,489✔
1117
    }
1118
  }
1119

1120
  return code;
500,506,491✔
1121
}
1122
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) {
486,186,552✔
1123
  size_t  numOfVg = taosArrayGetSize(pVgDataCxtList);
486,186,552✔
1124
  SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES);
486,186,661✔
1125
  if (NULL == pDataBlocks) {
486,185,138✔
1126
    return TSDB_CODE_OUT_OF_MEMORY;
×
1127
  }
1128

1129
  int32_t code = TSDB_CODE_SUCCESS;
486,185,138✔
1130
  for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
986,689,852✔
1131
    SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
500,504,776✔
1132
    if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) {
500,504,110✔
1133
      continue;
×
1134
    }
1135
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
500,503,603✔
1136
    if (NULL == dst) {
500,502,910✔
1137
      code = terrno;
×
1138
    }
1139

1140
    if (TSDB_CODE_SUCCESS == code) {
500,502,910✔
1141
      dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
500,503,029✔
1142
      code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
500,504,124✔
1143
    }
1144
    if (TSDB_CODE_SUCCESS == code) {
500,506,439✔
1145
      code = insResetBlob(src->pData);
500,506,439✔
1146
    }
1147

1148
    if (TSDB_CODE_SUCCESS == code) {
500,505,309✔
1149
      code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
500,505,309✔
1150
    }
1151
    if (TSDB_CODE_SUCCESS == code) {
500,500,301✔
1152
      code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);
500,506,052✔
1153
    }
1154
    if (TSDB_CODE_SUCCESS != code) {
500,505,152✔
1155
      destroyVgDataBlocks(dst);
×
1156
    }
1157
  }
1158

1159
  if (append) {
486,185,076✔
1160
    if (NULL == *pVgDataBlocks) {
702,388✔
1161
      *pVgDataBlocks = pDataBlocks;
702,424✔
1162
    }
1163
    return code;
702,460✔
1164
  }
1165

1166
  if (TSDB_CODE_SUCCESS == code) {
485,482,688✔
1167
    *pVgDataBlocks = pDataBlocks;
485,479,706✔
1168
  } else {
1169
    taosArrayDestroyP(pDataBlocks, destroyVgDataBlocks);
3,053✔
1170
  }
1171

1172
  return code;
485,482,978✔
1173
}
1174

1175
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
×
1176
  for (int i = 0; i < numFields; i++) {
×
1177
    if (strcmp(pSchema->name, fields[i].name) == 0) {
×
1178
      return true;
×
1179
    }
1180
  }
1181

1182
  return false;
×
1183
}
1184

1185
int32_t checkSchema(SSchema* pColSchema, SSchemaExt* pColExtSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
154,480✔
1186
  if (*fields != pColSchema->type) {
154,480✔
1187
    if (errstr != NULL) {
265✔
1188
      snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
×
1189
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
×
1190
    } else {
1191
      char buf[512] = {0};
265✔
1192
      snprintf(buf, sizeof(buf), "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
530✔
1193
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
530✔
1194
      uError("checkSchema %s", buf);
265✔
1195
    }
1196
    return TSDB_CODE_INVALID_PARA;
265✔
1197
  }
1198

1199
  if (IS_DECIMAL_TYPE(pColSchema->type)) {
154,215✔
1200
    uint8_t precision = 0, scale = 0;
265✔
1201
    decimalFromTypeMod(pColExtSchema->typeMod, &precision, &scale);
265✔
1202
    uint8_t precisionData = 0, scaleData = 0;
265✔
1203
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
265✔
1204
    extractDecimalTypeInfoFromBytes(&bytes, &precisionData, &scaleData);
265✔
1205
    if (precision != precisionData || scale != scaleData) {
265✔
1206
      if (errstr != NULL) {
×
1207
        snprintf(errstr, errstrLen,
×
1208
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
1209
                 "precision:%d, scale:%d",
1210
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
1211
                 precisionData, scaleData);
1212
        return TSDB_CODE_INVALID_PARA;
×
1213
      } else {
1214
        char buf[512] = {0};
×
1215
        snprintf(buf, sizeof(buf),
×
1216
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
1217
                 "precision:%d, scale:%d",
1218
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
1219
                 precisionData, scaleData);
1220
        uError("checkSchema %s", buf);
×
1221
        return TSDB_CODE_INVALID_PARA;
×
1222
      }
1223
    }
1224
    return 0;
265✔
1225
  }
1226

1227
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {
153,950✔
1228
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
27,077✔
1229
    if (IS_STR_DATA_BLOB(pColSchema->type)) {
27,077✔
1230
      if (bytes >= TSDB_MAX_BLOB_LEN) {
×
1231
        uError("column blob data bytes exceed max limit, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
×
1232
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name, bytes);
1233
        return TSDB_CODE_INVALID_PARA;
×
1234
      }
1235
    } else {
1236
      if (bytes > pColSchema->bytes) {
27,077✔
1237
        if (errstr != NULL) {
265✔
1238
          snprintf(errstr, errstrLen,
×
1239
                   "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1240
                   pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1241
                   *(int32_t*)(fields + sizeof(int8_t)));
×
1242
        } else {
1243
          char buf[512] = {0};
265✔
1244
          snprintf(buf, sizeof(buf),
530✔
1245
                   "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1246
                   pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
795✔
1247
                   *(int32_t*)(fields + sizeof(int8_t)));
265✔
1248
          uError("checkSchema %s", buf);
265✔
1249
        }
1250
        return TSDB_CODE_INVALID_PARA;
265✔
1251
      }
1252
    }
1253
  }
1254

1255
  if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
153,685✔
1256
    if (errstr != NULL) {
×
1257
      snprintf(errstr, errstrLen,
×
1258
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1259
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1260
               *(int32_t*)(fields + sizeof(int8_t)));
×
1261
    } else {
1262
      char buf[512] = {0};
×
1263
      snprintf(buf, sizeof(buf),
×
1264
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1265
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1266
               *(int32_t*)(fields + sizeof(int8_t)));
×
1267
      uError("checkSchema %s", buf);
×
1268
    }
1269
    return TSDB_CODE_INVALID_PARA;
×
1270
  }
1271
  return 0;
153,685✔
1272
}
1273

1274
#define PRCESS_DATA(i, j)                                                                                          \
1275
  ret = checkSchema(pColSchema, pColExtSchema, fields, errstr, errstrLen);                                         \
1276
  if (ret != 0) {                                                                                                  \
1277
    goto end;                                                                                                      \
1278
  }                                                                                                                \
1279
                                                                                                                   \
1280
  if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {                                                          \
1281
    hasTs = true;                                                                                                  \
1282
  }                                                                                                                \
1283
                                                                                                                   \
1284
  int8_t* offset = pStart;                                                                                         \
1285
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {                                                                        \
1286
    pStart += numOfRows * sizeof(int32_t);                                                                         \
1287
  } else {                                                                                                         \
1288
    pStart += BitmapLen(numOfRows);                                                                                \
1289
  }                                                                                                                \
1290
  char* pData = pStart;                                                                                            \
1291
                                                                                                                   \
1292
  SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);                                                        \
1293
  if (hasBlob) {                                                                                                   \
1294
    ret = tColDataAddValueByDataBlockWithBlob(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData, \
1295
                                              pBlobSet);                                                           \
1296
  } else {                                                                                                         \
1297
    ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);        \
1298
  }                                                                                                                \
1299
  if (ret != 0) {                                                                                                  \
1300
    goto end;                                                                                                      \
1301
  }                                                                                                                \
1302
  fields += sizeof(int8_t) + sizeof(int32_t);                                                                      \
1303
  if (needChangeLength && version == BLOCK_VERSION_1) {                                                            \
1304
    pStart += htonl(colLength[i]);                                                                                 \
1305
  } else {                                                                                                         \
1306
    pStart += colLength[i];                                                                                        \
1307
  }                                                                                                                \
1308
  boundInfo->pColIndex[j] = -1;
1309

1310
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
33,649✔
1311
                     int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
1312
  int       ret = 0;
33,649✔
1313
  int8_t    hasBlob = 0;
33,649✔
1314
  SBlobSet* pBlobSet = NULL;
33,649✔
1315
  if (data == NULL) {
33,649✔
1316
    uError("rawBlockBindData, data is NULL");
×
1317
    return TSDB_CODE_APP_ERROR;
×
1318
  }
1319
  void* tmp =
1320
      taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
33,649✔
1321
  SVCreateTbReq* pCreateReqTmp = NULL;
33,649✔
1322
  if (tmp == NULL && pCreateTb != NULL) {
33,649✔
1323
    ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
3,224✔
1324
    if (ret != TSDB_CODE_SUCCESS) {
3,224✔
1325
      uError("cloneSVreateTbReq error");
×
1326
      goto end;
×
1327
    }
1328
  }
1329

1330
  STableDataCxt* pTableCxt = NULL;
33,649✔
1331
  ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
33,649✔
1332
                           sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
1333
  if (pCreateReqTmp != NULL) {
33,649✔
1334
    tdDestroySVCreateTbReq(pCreateReqTmp);
×
1335
    taosMemoryFree(pCreateReqTmp);
×
1336
  }
1337

1338
  hasBlob = pTableCxt->hasBlob;
33,649✔
1339
  if (hasBlob && pTableCxt->pData->pBlobSet == NULL) {
33,649✔
1340
    ret = tBlobSetCreate(512, 0, &pTableCxt->pData->pBlobSet);
×
1341
    if (pTableCxt->pData->pBlobSet == NULL) {
×
1342
      uError("create blob set failed");
×
1343
      ret = terrno;
×
1344
    }
1345
  }
1346

1347
  if (ret != TSDB_CODE_SUCCESS) {
33,649✔
1348
    uError("insGetTableDataCxt error");
×
1349
    goto end;
×
1350
  }
1351
  pBlobSet = pTableCxt->pData->pBlobSet;
33,649✔
1352

1353
  pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
33,649✔
1354
  if (tmp == NULL) {
33,649✔
1355
    ret = initTableColSubmitData(pTableCxt);
30,154✔
1356
    if (ret != TSDB_CODE_SUCCESS) {
30,154✔
1357
      uError("initTableColSubmitData error");
×
1358
      goto end;
×
1359
    }
1360
  }
1361

1362
  char* p = (char*)data;
33,649✔
1363
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1364
  // column length |
1365
  int32_t version = *(int32_t*)data;
33,649✔
1366
  p += sizeof(int32_t);
33,649✔
1367
  p += sizeof(int32_t);
33,649✔
1368

1369
  int32_t numOfRows = *(int32_t*)p;
33,649✔
1370
  p += sizeof(int32_t);
33,649✔
1371

1372
  int32_t numOfCols = *(int32_t*)p;
33,649✔
1373
  p += sizeof(int32_t);
33,649✔
1374

1375
  p += sizeof(int32_t);
33,649✔
1376
  p += sizeof(uint64_t);
33,649✔
1377

1378
  int8_t* fields = (int8_t*)p;
33,649✔
1379
  if (*fields >= TSDB_DATA_TYPE_MAX || *fields < 0) {
33,649✔
1380
    uError("fields type error:%d", *fields);
×
1381
    ret = TSDB_CODE_INVALID_PARA;
×
1382
    goto end;
×
1383
  }
1384
  p += numOfCols * (sizeof(int8_t) + sizeof(int32_t));
33,649✔
1385

1386
  int32_t* colLength = (int32_t*)p;
33,649✔
1387
  p += sizeof(int32_t) * numOfCols;
33,649✔
1388

1389
  char* pStart = p;
33,649✔
1390

1391
  SSchema*       pSchema = getTableColumnSchema(pTableMeta);
33,649✔
1392
  SSchemaExt*    pExtSchemas = getTableColumnExtSchema(pTableMeta);
33,649✔
1393
  SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
33,649✔
1394

1395
  if (tFields != NULL && numFields != numOfCols) {
33,649✔
1396
    if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to data cols:%d", numFields, numOfCols);
×
1397
    ret = TSDB_CODE_INVALID_PARA;
×
1398
    goto end;
×
1399
  }
1400

1401
  bool hasTs = false;
33,649✔
1402
  if (tFields == NULL) {
33,649✔
1403
    int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
1,590✔
1404
    for (int j = 0; j < len; j++) {
4,770✔
1405
      SSchema*    pColSchema = &pSchema[j];
3,710✔
1406
      SSchemaExt* pColExtSchema = &pExtSchemas[j];
3,710✔
1407
      PRCESS_DATA(j, j)
3,710✔
1408
    }
1409
  } else {
1410
    for (int i = 0; i < numFields; i++) {
172,598✔
1411
      for (int j = 0; j < boundInfo->numOfBound; j++) {
1,281,343✔
1412
        SSchema*    pColSchema = &pSchema[j];
1,281,343✔
1413
        SSchemaExt* pColExtSchema = &pExtSchemas[j];
1,281,343✔
1414
        char*       fieldName = NULL;
1,281,343✔
1415
        if (raw) {
1,281,343✔
1416
          fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
1,280,018✔
1417
        } else {
1418
          fieldName = ((TAOS_FIELD*)tFields)[i].name;
1,325✔
1419
        }
1420
        if (strcmp(pColSchema->name, fieldName) == 0) {
1,281,343✔
1421
          PRCESS_DATA(i, j)
140,539✔
1422
          break;
140,539✔
1423
        }
1424
      }
1425
    }
1426
  }
1427

1428
  if (!hasTs) {
33,119✔
1429
    if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
×
1430
    ret = TSDB_CODE_INVALID_PARA;
×
1431
    goto end;
×
1432
  }
1433

1434
  // process NULL data
1435
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
180,046✔
1436
    if (boundInfo->pColIndex[c] != -1) {
146,927✔
1437
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
3,473✔
1438
      ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
3,473✔
1439
      if (ret != 0) {
3,473✔
1440
        goto end;
×
1441
      }
1442
    } else {
1443
      boundInfo->pColIndex[c] = c;  // restore for next block
143,454✔
1444
    }
1445
  }
1446

1447
end:
33,649✔
1448
  return ret;
33,649✔
1449
}
1450

1451
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data) {
×
1452
  int code = transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion);
×
1453
  if (code != 0) {
×
1454
    return code;
×
1455
  }
1456
  SVgroupDataCxt* pVgCxt = NULL;
×
1457
  void**          pp = taosHashGet(pVgroupHash, &pTableMeta->vgId, sizeof(pTableMeta->vgId));
×
1458
  if (NULL == pp) {
×
1459
    code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt);
×
1460
    if (code != 0) {
×
1461
      return code;
×
1462
    }
1463
  } else {
1464
    pVgCxt = *(SVgroupDataCxt**)pp;
×
1465
  }
1466
  if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1467
    pVgCxt->pData->aSubmitTbData = taosArrayInit(0, POINTER_BYTES);
×
1468
    pVgCxt->pData->raw = true;
×
1469
    if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1470
      return terrno;
×
1471
    }
1472
  }
1473

1474
  // push data to submit, rebuild empty data for next submit
1475
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, &data)) {
×
1476
    return terrno;
×
1477
  }
1478

1479
  uTrace("add raw data to vgId:%d, len:%d", pTableMeta->vgId, *(int32_t*)data);
×
1480

1481
  return 0;
×
1482
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc