• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4804

16 Oct 2025 10:33AM UTC coverage: 61.259% (+0.1%) from 61.147%
#4804

push

travis-ci

happyguoxy
Merge branch 'cover/3.0' of github.com:taosdata/TDengine into cover/3.0

156021 of 324369 branches covered (48.1%)

Branch coverage included in aggregate %.

79 of 100 new or added lines in 19 files covered. (79.0%)

3318 existing lines in 125 files now uncovered.

207798 of 269534 relevant lines covered (77.1%)

168909799.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.43
/source/libs/parser/src/parInsertUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "parInsertUtil.h"
17

18
#include "catalog.h"
19
#include "parInt.h"
20
#include "parUtil.h"
21
#include "querynodes.h"
22
#include "tRealloc.h"
23
#include "taoserror.h"
24
#include "tarray.h"
25
#include "tdatablock.h"
26
#include "tdataformat.h"
27
#include "tmisce.h"
28
#include "ttypes.h"
29

30
void qDestroyBoundColInfo(void* pInfo) {
11,820,669✔
31
  if (NULL == pInfo) {
11,820,669✔
32
    return;
8,097,725✔
33
  }
34

35
  SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
3,722,944✔
36

37
  taosMemoryFreeClear(pBoundInfo->pColIndex);
3,722,944!
38
}
39

40
static char* tableNameGetPosition(SToken* pToken, char target) {
1,019,502,905✔
41
  bool inEscape = false;
1,019,502,905✔
42
  bool inQuote = false;
1,019,502,905✔
43
  char quotaStr = 0;
1,019,502,905✔
44

45
  for (uint32_t i = 0; i < pToken->n; ++i) {
2,147,483,647✔
46
    if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
2,147,483,647!
47
      return pToken->z + i;
385,399,934✔
48
    }
49

50
    if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
2,147,483,647✔
51
      if (!inQuote) {
29,328,260!
52
        inEscape = !inEscape;
29,329,811✔
53
      }
54
    }
55

56
    if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
2,147,483,647✔
57
      if (!inEscape) {
11,589,617✔
58
        if (!inQuote) {
11,580,438✔
59
          quotaStr = *(pToken->z + i);
5,790,219✔
60
          inQuote = !inQuote;
5,790,219✔
61
        } else if (quotaStr == *(pToken->z + i)) {
5,790,219!
62
          inQuote = !inQuote;
5,790,219✔
63
        }
64
      }
65
    }
66
  }
67

68
  return NULL;
634,113,257✔
69
}
70

71
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
1,019,490,341✔
72
  const char* msg1 = "name too long";
1,019,490,341✔
73
  const char* msg2 = "invalid database name";
1,019,490,341✔
74
  const char* msg3 = "db is not specified";
1,019,490,341✔
75
  const char* msg4 = "invalid table name";
1,019,490,341✔
76

77
  int32_t code = TSDB_CODE_SUCCESS;
1,019,490,341✔
78
  char*   p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
1,019,490,341✔
79

80
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
1,019,513,590✔
81
    int32_t dbLen = p - pTableName->z;
385,402,274✔
82
    if (dbLen <= 0) {
385,397,672!
83
      return buildInvalidOperationMsg(pMsgBuf, msg2);
×
84
    }
85
    char name[TSDB_DB_FNAME_LEN] = {0};
385,397,672✔
86
    strncpy(name, pTableName->z, dbLen);
385,398,386!
87
    int32_t actualDbLen = strdequote(name);
385,401,594✔
88

89
    code = tNameSetDbName(pName, acctId, name, actualDbLen);
385,397,230✔
90
    if (code != TSDB_CODE_SUCCESS) {
385,399,195!
91
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
92
    }
93

94
    int32_t tbLen = pTableName->n - dbLen - 1;
385,399,195✔
95
    if (tbLen <= 0) {
385,399,195!
96
      return buildInvalidOperationMsg(pMsgBuf, msg4);
×
97
    }
98

99
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
385,399,195✔
100
    strncpy(tbname, p + 1, tbLen);
385,396,792!
101
    /*tbLen = */ (void)strdequote(tbname);
385,395,902✔
102

103
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
385,401,929✔
104
    if (code != 0) {
385,401,100!
105
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
106
    }
107
  } else {  // get current DB name first, and then set it into path
108
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
634,111,316✔
109
    strncpy(tbname, pTableName->z, pTableName->n);
634,107,710!
110
    int32_t tbLen = strdequote(tbname);
634,113,075✔
111
    if (tbLen >= TSDB_TABLE_NAME_LEN) {
634,106,150!
UNCOV
112
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
113
    }
114
    if (tbLen == 0) {
634,107,532✔
115
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "invalid table name");
1,540✔
116
    }
117

118
    char name[TSDB_TABLE_FNAME_LEN] = {0};
634,105,992✔
119
    strncpy(name, pTableName->z, pTableName->n);
634,111,113!
120
    (void)strdequote(name);
634,107,132✔
121

122
    if (dbName == NULL) {
634,107,113✔
123
      return buildInvalidOperationMsg(pMsgBuf, msg3);
1,656✔
124
    }
125
    if (name[0] == '\0') return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
634,105,661!
126

127
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
634,105,661!
128
    if (code != TSDB_CODE_SUCCESS) {
634,101,761!
129
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
×
130
      return code;
×
131
    }
132

133
    code = tNameFromString(pName, name, T_NAME_TABLE);
634,101,761✔
134
    if (code != 0) {
634,102,957!
135
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
×
136
    }
137
  }
138

139
  if (NULL != strchr(pName->tname, '.')) {
1,019,505,112!
140
    code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'");
468✔
141
  }
142

143
  return code;
1,019,501,869✔
144
}
145

146
int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchema) {
2,081,114,520✔
147
  while (start < end) {
2,147,483,647✔
148
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
2,147,483,647!
149
      return start;
2,055,216,958✔
150
    }
151
    ++start;
2,147,483,647✔
152
  }
153
  return -1;
25,907,894✔
154
}
155

156
int32_t insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
45,707,643✔
157
                            SArray* tagName, uint8_t tagNum, int32_t ttl) {
158
  pTbReq->type = TD_CHILD_TABLE;
45,707,643✔
159
  pTbReq->ctb.pTag = (uint8_t*)pTag;
45,709,437✔
160
  pTbReq->name = taosStrdup(tname);
45,707,886!
161
  if (!pTbReq->name) return terrno;
45,707,910!
162
  pTbReq->ctb.suid = suid;
45,707,910✔
163
  pTbReq->ctb.tagNum = tagNum;
45,708,839✔
164
  if (sname) {
45,707,013✔
165
    pTbReq->ctb.stbName = taosStrdup(sname);
43,159,502!
166
    if (!pTbReq->ctb.stbName) return terrno;
43,158,904!
167
  }
168
  pTbReq->ctb.tagName = taosArrayDup(tagName, NULL);
45,706,415✔
169
  if (!pTbReq->ctb.tagName) return terrno;
45,707,344!
170
  pTbReq->ttl = ttl;
45,708,241✔
171
  pTbReq->commentLen = -1;
45,708,241✔
172

173
  return TSDB_CODE_SUCCESS;
45,708,540✔
174
}
175

176
static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
×
177
  for (int32_t i = 0; i < ncols; ++i) {
×
178
    pBoundCols[i] = i;
×
179
  }
180
}
×
181

182
static int32_t initColValues(STableMeta* pTableMeta, SArray* pValues) {
958,111,366✔
183
  SSchema* pSchemas = getTableColumnSchema(pTableMeta);
958,111,366✔
184
  int32_t  code = 0;
958,113,344✔
185
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
2,147,483,647✔
186
    SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
2,147,483,647✔
187
    if (NULL == taosArrayPush(pValues, &val)) {
2,147,483,647!
188
      code = terrno;
×
189
      break;
×
190
    }
191
  }
192
  return code;
958,128,295✔
193
}
194

195
int32_t insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { return initColValues(pTableMeta, aColValues); }
8,873,028✔
196

197
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
1,023,829,796✔
198
  pInfo->numOfCols = numOfBound;
1,023,829,796✔
199
  pInfo->numOfBound = numOfBound;
1,023,843,924✔
200
  pInfo->hasBoundCols = false;
1,023,836,012✔
201
  pInfo->mixTagsCols = false;
1,023,832,977✔
202
  pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t));
1,023,825,772!
203
  if (NULL == pInfo->pColIndex) {
1,023,837,243!
204
    return terrno;
×
205
  }
206
  for (int32_t i = 0; i < numOfBound; ++i) {
2,147,483,647✔
207
    pInfo->pColIndex[i] = i;
2,147,483,647✔
208
  }
209
  return TSDB_CODE_SUCCESS;
1,023,843,271✔
210
}
211

212
void insResetBoundColsInfo(SBoundColInfo* pInfo) {
4,816✔
213
  pInfo->numOfBound = pInfo->numOfCols;
4,816✔
214
  pInfo->hasBoundCols = false;
4,816✔
215
  pInfo->mixTagsCols = false;
4,816✔
216
  for (int32_t i = 0; i < pInfo->numOfCols; ++i) {
24,080✔
217
    pInfo->pColIndex[i] = i;
19,264✔
218
  }
219
}
4,816✔
220

221
void insCheckTableDataOrder(STableDataCxt* pTableCxt, SRowKey* rowKey) {
2,147,483,647✔
222
  // once the data block is disordered, we do NOT keep last timestamp any more
223
  if (!pTableCxt->ordered) {
2,147,483,647✔
224
    return;
1,717,673,847✔
225
  }
226

227
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) < 0) {
2,147,483,647✔
228
    pTableCxt->ordered = false;
1,948,489✔
229
  }
230

231
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) == 0) {
2,147,483,647✔
232
    pTableCxt->duplicateTs = true;
1,774,919✔
233
  }
234

235
  // TODO: for variable length data type, we need to copy it out
236
  pTableCxt->lastKey = *rowKey;
2,147,483,647✔
237
  return;
2,147,483,647✔
238
}
239

240
void insDestroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
2,147,483,647!
241

242
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput,
960,989,507✔
243
                                  bool colMode, bool ignoreColVals) {
244
  STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
960,989,507!
245
  if (NULL == pTableCxt) {
960,969,508!
246
    *pOutput = NULL;
×
247
    return terrno;
×
248
  }
249

250
  int32_t code = TSDB_CODE_SUCCESS;
960,969,508✔
251

252
  pTableCxt->lastKey = (SRowKey){0};
960,969,508✔
253
  pTableCxt->ordered = true;
960,971,918✔
254
  pTableCxt->duplicateTs = false;
960,987,944✔
255

256
  pTableCxt->pMeta = tableMetaDup(pTableMeta);
960,983,349✔
257
  if (NULL == pTableCxt->pMeta) {
961,003,488!
258
    code = TSDB_CODE_OUT_OF_MEMORY;
×
259
  }
260
  if (TSDB_CODE_SUCCESS == code) {
961,005,028✔
261
    pTableCxt->pSchema =
961,005,250✔
262
        tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
960,997,691✔
263
    if (NULL == pTableCxt->pSchema) {
961,002,547!
264
      code = TSDB_CODE_OUT_OF_MEMORY;
×
265
    }
266
  }
267
  pTableCxt->hasBlob = schemaHasBlob(pTableCxt->pSchema);
961,017,130✔
268

269
  if (TSDB_CODE_SUCCESS == code) {
960,991,367!
270
    code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo);
960,998,035✔
271
  }
272
  if (TSDB_CODE_SUCCESS == code && !ignoreColVals) {
961,002,327✔
273
    pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal));
949,236,957✔
274
    if (NULL == pTableCxt->pValues) {
949,232,649!
275
      code = terrno;
×
276
    } else {
277
      code = initColValues(pTableMeta, pTableCxt->pValues);
949,235,912✔
278
    }
279
  }
280
  if (TSDB_CODE_SUCCESS == code) {
961,015,213✔
281
    pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
961,016,162!
282
    if (NULL == pTableCxt->pData) {
960,995,449!
283
      code = terrno;
×
284
    } else {
285
      pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
960,991,447✔
286
      pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
961,000,363✔
287
      pTableCxt->pData->suid = pTableMeta->suid;
960,999,185✔
288
      pTableCxt->pData->uid = pTableMeta->uid;
961,002,469✔
289
      pTableCxt->pData->sver = pTableMeta->sversion;
961,001,371✔
290
      pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
960,980,149✔
291
      int8_t flag = pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT;
960,996,072✔
292
      if (pCreateTbReq != NULL) *pCreateTbReq = NULL;
960,992,136✔
293
      if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
960,991,564✔
294
        pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
3,753,335✔
295
        if (NULL == pTableCxt->pData->aCol) {
3,756,275!
296
          code = terrno;
×
297
        }
298
      } else {
299
        pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
957,223,377✔
300
        if (NULL == pTableCxt->pData->aRowP) {
957,237,294!
301
          code = terrno;
×
302
        }
303
      }
304
    }
305
  }
306
  if (TSDB_CODE_SUCCESS == code) {
960,998,633!
307
    *pOutput = pTableCxt;
960,998,633✔
308
    parserDebug("uid:%" PRId64 ", create table data context, code:%d, vgId:%d", pTableMeta->uid, code,
961,000,064✔
309
                pTableMeta->vgId);
310
  } else {
311
    insDestroyTableDataCxt(pTableCxt);
×
312
  }
313

314
  return code;
961,002,694✔
315
}
316

317
static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst, int8_t hasBlob) {
6,241,286✔
318
  int32_t        code = TSDB_CODE_SUCCESS;
6,241,286✔
319
  SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData));
6,241,286!
320
  if (NULL == pTmp) {
6,240,187!
321
    code = terrno;
×
322
  } else {
323
    pTmp->flags = pSrc->flags;
6,240,187✔
324
    pTmp->suid = pSrc->suid;
6,240,817✔
325
    pTmp->uid = pSrc->uid;
6,239,943✔
326
    pTmp->sver = pSrc->sver;
6,240,532✔
327
    pTmp->pCreateTbReq = NULL;
6,239,345✔
328
    if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
6,240,233✔
329
      if (pSrc->pCreateTbReq) {
5,583,439✔
330
        code = cloneSVreateTbReq(pSrc->pCreateTbReq, &pTmp->pCreateTbReq);
5,582,027✔
331
      } else {
332
        pTmp->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
184✔
333
      }
334
    }
335
    if (TSDB_CODE_SUCCESS == code) {
6,240,785!
336
      if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
6,240,785✔
337
        pTmp->aCol = taosArrayInit(128, sizeof(SColData));
3,677,969✔
338
        if (NULL == pTmp->aCol) {
3,677,969!
339
          code = terrno;
×
340
          taosMemoryFree(pTmp);
×
341
        }
342
      } else {
343
        pTmp->aRowP = taosArrayInit(128, POINTER_BYTES);
2,563,414✔
344
        if (NULL == pTmp->aRowP) {
2,563,091!
345
          code = terrno;
×
346
          taosMemoryFree(pTmp);
×
347
        }
348

349
        if (code != 0) {
2,563,091!
350
          taosArrayDestroy(pTmp->aRowP);
×
351
          taosMemoryFree(pTmp);
×
352
        }
353
      }
354

355
    } else {
356
      taosMemoryFree(pTmp);
×
357
    }
358
  }
359

360
  taosMemoryFree(pSrc);
6,241,060!
361
  if (TSDB_CODE_SUCCESS == code) {
6,241,359!
362
    *pDst = pTmp;
6,241,359✔
363
  }
364

365
  return code;
6,241,359✔
366
}
367

368
static void resetColValues(SArray* pValues) {
3,166,627✔
369
  int32_t num = taosArrayGetSize(pValues);
3,166,627✔
370
  for (int32_t i = 0; i < num; ++i) {
53,925,554✔
371
    SColVal* pVal = taosArrayGet(pValues, i);
50,758,927✔
372
    pVal->flag = CV_FLAG_NONE;
50,758,927✔
373
  }
374
}
3,166,627✔
375

376
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
2,147,483,647✔
377
                           SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals) {
378
  STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
2,147,483,647✔
379
  if (NULL != tmp) {
2,147,483,647✔
380
    *pTableCxt = *tmp;
1,758,425,380✔
381
    if (!ignoreColVals) {
1,758,425,380✔
382
      resetColValues((*pTableCxt)->pValues);
3,166,627✔
383
    }
384
    return TSDB_CODE_SUCCESS;
1,758,425,380✔
385
  }
386
  int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals);
961,000,802✔
387
  if (TSDB_CODE_SUCCESS == code) {
961,006,637!
388
    void* pData = *pTableCxt;  // deal scan coverity
961,008,537✔
389
    code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
961,009,212✔
390
  }
391

392
  if (TSDB_CODE_SUCCESS != code) {
961,010,299!
393
    insDestroyTableDataCxt(*pTableCxt);
×
394
  }
395
  return code;
961,012,139✔
396
}
397

398
static void destroyColVal(void* p) {
2,147,483,647✔
399
  SColVal* pVal = p;
2,147,483,647✔
400
  if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type ||
2,147,483,647✔
401
      TSDB_DATA_TYPE_VARBINARY == pVal->value.type || TSDB_DATA_TYPE_DECIMAL == pVal->value.type) {
2,147,483,647✔
402
    taosMemoryFreeClear(pVal->value.pData);
1,807,699,104!
403
  }
404
}
2,147,483,647✔
405

406
void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
964,042,952✔
407
  if (NULL == pTableCxt) {
964,042,952!
408
    return;
×
409
  }
410

411
  taosMemoryFreeClear(pTableCxt->pMeta);
964,042,952!
412
  tDestroyTSchema(pTableCxt->pSchema);
964,038,007!
413
  insDestroyBoundColInfo(&pTableCxt->boundColsInfo);
964,040,988✔
414
  taosArrayDestroyEx(pTableCxt->pValues, destroyColVal);
964,039,431✔
415
  if (pTableCxt->pData) {
964,042,854✔
416
    tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
21,887,269✔
417
    taosMemoryFree(pTableCxt->pData);
21,885,961!
418
  }
419
  taosMemoryFree(pTableCxt);
964,036,644!
420
}
421

422
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
901,110,806✔
423
  if (NULL == pVgCxt) {
901,110,806!
424
    return;
×
425
  }
426

427
  tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
901,110,806✔
428
  taosMemoryFree(pVgCxt->pData);
901,109,307!
429

430
  taosMemoryFree(pVgCxt);
901,108,548!
431
}
432

433
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
1,746,718,896✔
434
  if (NULL == pVgCxtList) {
1,746,718,896✔
435
    return;
880,513,424✔
436
  }
437

438
  size_t size = taosArrayGetSize(pVgCxtList);
866,205,472✔
439
  for (int32_t i = 0; i < size; i++) {
1,767,320,952✔
440
    void* p = taosArrayGetP(pVgCxtList, i);
901,114,574✔
441
    insDestroyVgroupDataCxt(p);
901,116,149✔
442
  }
443

444
  taosArrayDestroy(pVgCxtList);
866,206,378✔
445
}
446

447
void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) {
×
448
  if (NULL == pVgCxtHash) {
×
449
    return;
×
450
  }
451

452
  void** p = taosHashIterate(pVgCxtHash, NULL);
×
453
  while (p) {
×
454
    insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p);
×
455

456
    p = taosHashIterate(pVgCxtHash, p);
×
457
  }
458

459
  taosHashCleanup(pVgCxtHash);
×
460
}
461

462
void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
882,511,044✔
463
  if (NULL == pTableCxtHash) {
882,511,044✔
464
    return;
3,724,786✔
465
  }
466

467
  void** p = taosHashIterate(pTableCxtHash, NULL);
878,786,258✔
468
  while (p) {
1,835,986,582✔
469
    insDestroyTableDataCxt(*(STableDataCxt**)p);
957,194,773✔
470

471
    p = taosHashIterate(pTableCxtHash, p);
957,190,490✔
472
  }
473

474
  taosHashCleanup(pTableCxtHash);
878,791,809✔
475
}
476

477
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) {
956,560,116✔
478
  int32_t code = 0;
956,560,116✔
479
  if (NULL == pVgCxt->pData->aSubmitTbData) {
956,560,116✔
480
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
896,946,492✔
481
    if (pVgCxt->pData->aSubmitTbData == NULL) {
896,951,815!
482
      return terrno;
×
483
    }
484
    if (pTableCxt->hasBlob) {
896,951,315✔
485
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
51,336✔
486
      if (NULL == pVgCxt->pData->aSubmitBlobData) {
51,336!
487
        return terrno;
×
488
      }
489
    }
490
  }
491

492
  // push data to submit, rebuild empty data for next submit
493
  if (!pTableCxt->hasBlob) pTableCxt->pData->pBlobSet = NULL;
956,570,256✔
494

495
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData)) {
1,913,130,094!
496
    return terrno;
×
497
  }
498

499
  if (pTableCxt->hasBlob) {
956,565,062✔
500
    parserDebug("blob row transfer %p, pData %p, %s", pTableCxt->pData->pBlobSet, pTableCxt->pData, __func__);
51,336!
501
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTableCxt->pData->pBlobSet)) {
102,672!
502
      return terrno;
×
503
    }
504
    pTableCxt->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
51,336✔
505
  }
506

507
  if (isRebuild) {
956,560,249✔
508
    code = rebuildTableData(pTableCxt->pData, &pTableCxt->pData, pTableCxt->hasBlob);
6,240,702✔
509
  } else if (clear) {
950,319,547✔
510
    taosMemoryFreeClear(pTableCxt->pData);
942,132,649!
511
  }
512
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
956,565,718✔
513

514
  return code;
956,561,607✔
515
}
516

517
static int32_t createVgroupDataCxt(int32_t vgId, SHashObj* pVgroupHash, SArray* pVgroupList, SVgroupDataCxt** pOutput) {
901,198,341✔
518
  SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
901,198,341!
519
  if (NULL == pVgCxt) {
901,204,618!
520
    return terrno;
×
521
  }
522
  pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2));
901,204,618!
523
  if (NULL == pVgCxt->pData) {
901,207,737!
524
    insDestroyVgroupDataCxt(pVgCxt);
×
525
    return terrno;
×
526
  }
527

528
  pVgCxt->vgId = vgId;
901,205,222✔
529
  int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
901,199,499✔
530
  if (TSDB_CODE_SUCCESS == code) {
901,208,472!
531
    if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) {
901,209,693!
532
      code = terrno;
×
533
      insDestroyVgroupDataCxt(pVgCxt);
×
534
      return code;
×
535
    }
536
    //    uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
537
    *pOutput = pVgCxt;
901,209,693✔
538
  } else {
539
    insDestroyVgroupDataCxt(pVgCxt);
×
540
  }
541
  return code;
901,206,501✔
542
}
543

544
int insColDataComp(const void* lp, const void* rp) {
23,436,245✔
545
  SColData* pLeft = (SColData*)lp;
23,436,245✔
546
  SColData* pRight = (SColData*)rp;
23,436,245✔
547
  if (pLeft->cid < pRight->cid) {
23,436,245✔
548
    return -1;
22,588,358✔
549
  } else if (pLeft->cid > pRight->cid) {
852,288!
550
    return 1;
852,288✔
551
  }
552

553
  return 0;
×
554
}
555

556
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId,
1,850,868✔
557
                                 STableColsData* pTbData, SName* sname) {
558
  if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) {
1,850,868!
559
    return TSDB_CODE_SUCCESS;
1,621,139✔
560
  }
561

562
  SVgroupInfo      vgInfo = {0};
229,833✔
563
  SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
457,103✔
564
                           .requestId = pBuildInfo->requestId,
229,833✔
565
                           .requestObjRefId = pBuildInfo->requestSelf,
229,833✔
566
                           .mgmtEps = pBuildInfo->mgmtEpSet};
567

568
  int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo);
229,833✔
569
  if (TSDB_CODE_SUCCESS != code) {
229,918!
570
    return code;
×
571
  }
572

573
  code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
229,918✔
574
  if (TSDB_CODE_SUCCESS != code) {
229,918!
575
    return code;
×
576
  }
577

578
  return TSDB_CODE_SUCCESS;
229,918✔
579
}
580

581
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData,
14,373,082✔
582
                             uint64_t* uid, int32_t* vgId, uint64_t* suid) {
583
  STableVgUid* pTbInfo = NULL;
14,373,082✔
584
  int32_t      code = 0;
14,373,082✔
585

586
  if (pTbData->getFromHash) {
14,373,082✔
587
    pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName));
12,529,274!
588
  }
589

590
  if (NULL == pTbInfo) {
14,377,836✔
591
    SName sname;
842,246✔
592
    code = qCreateSName2(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0);
1,852,757✔
593
    if (TSDB_CODE_SUCCESS != code) {
1,852,068!
594
      return code;
1,889✔
595
    }
596

597
    STableMeta*      pTableMeta = NULL;
1,852,068✔
598
    SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
2,693,038✔
599
                             .requestId = pBuildInfo->requestId,
1,851,415✔
600
                             .requestObjRefId = pBuildInfo->requestSelf,
1,852,068✔
601
                             .mgmtEps = pBuildInfo->mgmtEpSet};
602
    code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta);
1,851,415✔
603

604
    if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
1,852,568✔
605
      parserWarn("stmt2 async bind don't find table:%s.%s, try auto create table", sname.dbname, sname.tname);
1,700!
606
      return code;
1,889✔
607
    }
608

609
    if (TSDB_CODE_SUCCESS != code) {
1,850,868!
610
      parserError("stmt2 async get table meta:%s.%s failed, code:%d", sname.dbname, sname.tname, code);
×
611
      return code;
×
612
    }
613

614
    *uid = pTableMeta->uid;
1,850,868✔
615
    *vgId = pTableMeta->vgId;
1,850,215✔
616
    *suid = pTableMeta->suid;
1,849,562✔
617

618
    STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId, .suid = *suid};
1,850,215✔
619
    code = tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo));
1,850,215!
620
    if (TSDB_CODE_SUCCESS == code) {
1,849,666✔
621
      code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname);
1,849,562✔
622
    }
623

624
    taosMemoryFree(pTableMeta);
1,851,161!
625
  } else {
626
    *uid = pTbInfo->uid;
12,525,079✔
627
    *vgId = pTbInfo->vgid;
12,525,079✔
628
    *suid = pTbInfo->suid;
12,524,437✔
629
  }
630

631
  return code;
14,374,222✔
632
}
633

634
int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
×
635
  int32_t             code = TSDB_CODE_SUCCESS;
×
636
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
×
637

638
  if (TSDB_CODE_SUCCESS == code) {
×
639
    code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
×
640
  }
641

642
  return code;
×
643
}
644

645
int32_t checkAndMergeSVgroupDataCxtByTbname(STableDataCxt* pTbCtx, SVgroupDataCxt* pVgCxt, SSHashObj* pTableNameHash,
6,187,143✔
646
                                            char* tbname) {
647
  if (NULL == pVgCxt->pData->aSubmitTbData) {
6,187,143✔
648
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
4,252,866✔
649
    if (NULL == pVgCxt->pData->aSubmitTbData) {
4,254,780!
650
      return terrno;
×
651
    }
652
    if (pTbCtx->hasBlob) {
4,254,780!
653
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
×
654
      if (pVgCxt->pData->aSubmitBlobData == NULL) {
×
655
        return terrno;
×
656
      }
657
    }
658
  }
659

660
  int32_t  code = TSDB_CODE_SUCCESS;
6,192,852✔
661
  SArray** rowP = NULL;
6,192,852✔
662

663
  rowP = (SArray**)tSimpleHashGet(pTableNameHash, tbname, strlen(tbname));
6,192,852!
664

665
  if (rowP != NULL && *rowP != NULL) {
6,192,210!
666
    for (int32_t j = 0; j < taosArrayGetSize(*rowP); ++j) {
1,058✔
667
      SRow* pRow = (SRow*)taosArrayGetP(pTbCtx->pData->aRowP, j);
736✔
668
      if (pRow) {
736✔
669
        if (NULL == taosArrayPush(*rowP, &pRow)) {
920!
670
          return terrno;
×
671
        }
672
      }
673

674
      if (pTbCtx->hasBlob == 0) {
736!
675
        code = tRowSort(*rowP);
736✔
676
        TAOS_CHECK_RETURN(code);
736!
677

678
        code = tRowMerge(*rowP, pTbCtx->pSchema, 0);
736✔
679
        TAOS_CHECK_RETURN(code);
736!
680
      } else {
681
        code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
682
        TAOS_CHECK_RETURN(code);
×
683

684
        code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
685
        TAOS_CHECK_RETURN(code);
×
686
      }
687
    }
688

689
    parserDebug("merge same uid data: %" PRId64 ", vgId:%d", pTbCtx->pData->uid, pVgCxt->vgId);
322!
690

691
    if (pTbCtx->pData->pCreateTbReq != NULL) {
322✔
692
      tdDestroySVCreateTbReq(pTbCtx->pData->pCreateTbReq);
276!
693
      taosMemoryFree(pTbCtx->pData->pCreateTbReq);
276!
694
      pTbCtx->pData->pCreateTbReq = NULL;
276✔
695
    }
696
    return TSDB_CODE_SUCCESS;
322✔
697
  }
698

699
  if (pTbCtx->hasBlob == 0) {
6,191,888!
700
    pTbCtx->pData->pBlobSet = NULL;  // if no blob, set it to NULL
6,191,888✔
701
  }
702

703
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTbCtx->pData)) {
12,383,153!
704
    return terrno;
×
705
  }
706

707
  if (pTbCtx->hasBlob) {
6,191,888!
708
    parserDebug("blob row transfer %p, pData %p, %s", pTbCtx->pData->pBlobSet, pTbCtx->pData, __func__);
×
709
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTbCtx->pData->pBlobSet)) {
×
710
      return terrno;
×
711
    }
712
    pTbCtx->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
×
713
  }
714

715
  code = tSimpleHashPut(pTableNameHash, tbname, strlen(tbname), &pTbCtx->pData->aRowP, sizeof(SArray*));
6,191,888!
716

717
  if (code != TSDB_CODE_SUCCESS) {
6,190,000!
718
    return code;
×
719
  }
720

721
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTbCtx->pMeta->uid, pVgCxt->vgId);
6,190,000!
722

723
  return TSDB_CODE_SUCCESS;
6,191,246✔
724
}
725

726
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
8,187,098✔
727
                                  SStbInterlaceInfo* pBuildInfo) {
728
  int32_t  code = TSDB_CODE_SUCCESS;
8,187,098✔
729
  uint64_t uid;
3,651,665✔
730
  int32_t  vgId;
3,651,665✔
731
  uint64_t suid;
3,651,665✔
732

733
  pTbCtx->pData->aRowP = pTbData->aCol;
8,187,098✔
734

735
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId, &suid);
8,187,098✔
736
  if (TSDB_CODE_SUCCESS != code) {
8,185,642!
737
    return code;
×
738
  }
739

740
  pTbCtx->pMeta->vgId = vgId;
8,185,642✔
741
  pTbCtx->pMeta->uid = uid;
8,185,642✔
742
  pTbCtx->pData->uid = uid;
8,185,642✔
743

744
  if (!pTbCtx->ordered) {
8,185,642!
745
    code = tRowSort(pTbCtx->pData->aRowP);
603✔
746
  }
747
  if (code == TSDB_CODE_SUCCESS && (!pTbCtx->ordered || pTbCtx->duplicateTs)) {
8,185,642!
748
    code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0);
603✔
749
  }
750

751
  if (TSDB_CODE_SUCCESS != code) {
8,185,642!
752
    return code;
×
753
  }
754

755
  SVgroupDataCxt* pVgCxt = NULL;
8,185,642✔
756
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
8,185,642✔
757
  if (NULL == pp) {
8,187,618✔
758
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
4,484,966✔
759
    if (NULL == pp) {
4,485,070!
760
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
4,485,070✔
761
    } else {
762
      pVgCxt = *(SVgroupDataCxt**)pp;
×
763
    }
764
  } else {
765
    pVgCxt = *(SVgroupDataCxt**)pp;
3,702,652✔
766
  }
767

768
  if (TSDB_CODE_SUCCESS == code) {
8,186,741✔
769
    code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false);
8,186,722✔
770
  }
771

772
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
8,186,635!
773
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
774
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
775
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
776
    // insDestroyVgroupDataCxt(pVgCxt);
777
  }
778

779
  return code;
8,185,907✔
780
}
781

782
int32_t insAppendStmt2TableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
6,187,831✔
783
                                   SStbInterlaceInfo* pBuildInfo, SVCreateTbReq* ctbReq) {
784
  int32_t  code = TSDB_CODE_SUCCESS;
6,187,831✔
785
  uint64_t uid;
6,129,573✔
786
  int32_t  vgId;
6,130,215✔
787
  uint64_t suid;
6,130,215✔
788

789
  pTbCtx->pData->aRowP = pTbData->aCol;
6,188,473✔
790
  pTbCtx->pData->pBlobSet = pTbData->pBlobSet;
6,191,656✔
791

792
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId, &suid);
6,191,656✔
793
  if (ctbReq != NULL && code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
6,189,742✔
794
    pTbCtx->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
1,866✔
795
    vgId = (int32_t)ctbReq->uid;
1,866✔
796
    uid = 0;
1,866✔
797
    pTbCtx->pMeta->vgId = (int32_t)ctbReq->uid;
1,866✔
798
    ctbReq->uid = 0;
1,866✔
799
    pTbCtx->pMeta->uid = 0;
1,866✔
800
    pTbCtx->pData->uid = 0;
1,866✔
801
    pTbCtx->pData->pCreateTbReq = ctbReq;
1,866✔
802
    code = TSDB_CODE_SUCCESS;
1,866✔
803
  } else {
804
    if (TSDB_CODE_SUCCESS != code) {
6,187,876✔
805
      return code;
23✔
806
    }
807
    if (pTbCtx->pData->suid != suid) {
6,187,853✔
808
      return TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
46✔
809
    }
810

811
    pTbCtx->pMeta->vgId = vgId;
6,188,419✔
812
    pTbCtx->pMeta->uid = uid;
6,187,153✔
813
    pTbCtx->pData->uid = uid;
6,188,419✔
814
    pTbCtx->pData->pCreateTbReq = NULL;
6,189,072✔
815

816
    if (ctbReq != NULL) {
6,188,430✔
817
      tdDestroySVCreateTbReq(ctbReq);
818
      taosMemoryFree(ctbReq);
1,403!
819
      ctbReq = NULL;
1,403✔
820
    }
821
  }
822

823
  if (pTbCtx->hasBlob == 0) {
6,190,296!
824
    if (!pTbData->isOrdered) {
6,191,568!
825
      code = tRowSort(pTbCtx->pData->aRowP);
46✔
826
    }
827
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
6,188,410!
828
      code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, PREFER_NON_NULL);
46✔
829
    }
830
  } else {
831
    if (!pTbData->isOrdered) {
×
832
      code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
833
    }
834
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
×
835
      code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
836
    }
837
  }
838

839
  if (TSDB_CODE_SUCCESS != code) {
6,189,031!
840
    return code;
×
841
  }
842

843
  SVgroupDataCxt* pVgCxt = NULL;
6,189,031✔
844
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
6,190,315✔
845
  if (NULL == pp) {
6,191,568✔
846
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
4,254,138✔
847
    if (NULL == pp) {
4,254,780!
848
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
4,254,780✔
849
    } else {
850
      pVgCxt = *(SVgroupDataCxt**)pp;
×
851
    }
852
  } else {
853
    pVgCxt = *(SVgroupDataCxt**)pp;
1,937,430✔
854
  }
855

856
  if (code == TSDB_CODE_SUCCESS) {
6,190,938!
857
    code = checkAndMergeSVgroupDataCxtByTbname(pTbCtx, pVgCxt, pBuildInfo->pTableRowDataHash, pTbData->tbName);
6,190,938✔
858
  }
859

860
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
6,189,057!
861
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
862
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
863
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
864
    // insDestroyVgroupDataCxt(pVgCxt);
865
  }
866

867
  return code;
6,188,434✔
868
}
869

870
/*
871
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild,
872
int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
873
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
874
  if (NULL == pVgroupHash || NULL == pVgroupList) {
875
    taosHashCleanup(pVgroupHash);
876
    taosArrayDestroy(pVgroupList);
877
    return TSDB_CODE_OUT_OF_MEMORY;
878
  }
879

880
  int32_t code = TSDB_CODE_SUCCESS;
881

882
  for (int32_t i = 0; i < tbNum; ++i) {
883
    STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i);
884
    pTableCxt->pMeta->vgId = pTableCols->vgId;
885
    pTableCxt->pMeta->uid = pTableCols->uid;
886
    pTableCxt->pData->uid = pTableCols->uid;
887
    pTableCxt->pData->aCol = pTableCols->aCol;
888

889
    SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
890
    if (pCol->nVal <= 0) {
891
      continue;
892
    }
893

894
    if (pTableCxt->pData->pCreateTbReq) {
895
      pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
896
    }
897

898
    taosArraySort(pTableCxt->pData->aCol, insColDataComp);
899

900
    tColDataSortMerge(pTableCxt->pData->aCol);
901

902
    if (TSDB_CODE_SUCCESS == code) {
903
      SVgroupDataCxt* pVgCxt = NULL;
904
      int32_t         vgId = pTableCxt->pMeta->vgId;
905
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
906
      if (NULL == pp) {
907
        code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
908
      } else {
909
        pVgCxt = *(SVgroupDataCxt**)pp;
910
      }
911
      if (TSDB_CODE_SUCCESS == code) {
912
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false);
913
      }
914
    }
915
  }
916

917
  taosHashCleanup(pVgroupHash);
918
  if (TSDB_CODE_SUCCESS == code) {
919
    *pVgDataBlocks = pVgroupList;
920
  } else {
921
    insDestroyVgroupDataCxtList(pVgroupList);
922
  }
923

924
  return code;
925
}
926
*/
927

928
static int8_t colDataHasBlob(SColData* pCol) {
×
929
  if (IS_STR_DATA_BLOB(pCol->type)) {
×
930
    return 1;
×
931
  }
932
  return 0;
×
933
}
934
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
859,053,483✔
935
  SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
859,053,483✔
936
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
859,076,238✔
937
  if (NULL == pVgroupHash || NULL == pVgroupList) {
859,075,447!
UNCOV
938
    taosHashCleanup(pVgroupHash);
×
939
    taosArrayDestroy(pVgroupList);
×
940
    return terrno;
×
941
  }
942

943
  int32_t code = TSDB_CODE_SUCCESS;
859,075,765✔
944
  bool    colFormat = false;
859,075,765✔
945

946
  void* p = taosHashIterate(pTableHash, NULL);
859,075,765✔
947
  if (p) {
859,087,657!
948
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
859,087,657✔
949
    colFormat = (0 != (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT));
859,088,526✔
950
  }
951

952
  while (TSDB_CODE_SUCCESS == code && NULL != p) {
1,807,471,086✔
953
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
948,382,539✔
954
    if (colFormat) {
948,381,909✔
955
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
3,679,118✔
956
      if (pCol && pCol->nVal <= 0) {
3,678,819!
957
        p = taosHashIterate(pTableHash, p);
850✔
958
        continue;
850✔
959
      }
960

961
      if (pTableCxt->pData->pCreateTbReq) {
3,677,969✔
962
        pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
3,039,314✔
963
      }
964
      int8_t isBlob = IS_STR_DATA_BLOB(pCol->type) ? 1 : 0;
3,678,268!
965
      if (isBlob == 0) {
3,677,969!
966
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
3,677,969✔
967
        code = tColDataSortMerge(&pTableCxt->pData->aCol);
3,677,969✔
968
      } else {
969
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
×
970
        code = tColDataSortMergeWithBlob(&pTableCxt->pData->aCol, pTableCxt->pData->pBlobSet);
×
971
      }
972
    } else {
973
      // skip the table has no data to insert
974
      // eg: import a csv without valid data
975
      // if (0 == taosArrayGetSize(pTableCxt->pData->aRowP)) {
976
      //   parserWarn("no row in tableDataCxt uid:%" PRId64 " ", pTableCxt->pMeta->uid);
977
      //   p = taosHashIterate(pTableHash, p);
978
      //   continue;
979
      // }
980
      if (pTableCxt->hasBlob == 0) {
944,702,791✔
981
        if (!pTableCxt->ordered) {
944,653,519✔
982
          code = tRowSort(pTableCxt->pData->aRowP);
1,946,257✔
983
        }
984
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
944,649,993!
985
          code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, PREFER_NON_NULL);
1,987,411✔
986
        }
987
      } else {
988
        if (!pTableCxt->ordered) {
51,336!
989
          code = tRowSortWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet);
2,232✔
990
        }
991
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
51,336!
992
          code = tRowMergeWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet, 0);
2,232✔
993
        }
994
      }
995
    }
996

997
    if (TSDB_CODE_SUCCESS == code) {
948,380,030!
998
      SVgroupDataCxt* pVgCxt = NULL;
948,380,030✔
999
      int32_t         vgId = pTableCxt->pMeta->vgId;
948,381,135✔
1000
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
948,377,636✔
1001
      if (NULL == pp) {
948,377,018✔
1002
        code = createVgroupDataCxt(vgId, pVgroupHash, pVgroupList, &pVgCxt);
892,465,068✔
1003
      } else {
1004
        pVgCxt = *(SVgroupDataCxt**)pp;
55,911,950✔
1005
      }
1006
      if (TSDB_CODE_SUCCESS == code) {
948,376,998✔
1007
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true);
948,376,116✔
1008
      }
1009
    }
1010
    if (TSDB_CODE_SUCCESS == code) {
948,380,182!
1011
      p = taosHashIterate(pTableHash, p);
948,380,182✔
1012
    }
1013
  }
1014

1015
  taosHashCleanup(pVgroupHash);
859,088,547✔
1016
  if (TSDB_CODE_SUCCESS == code) {
859,079,950!
1017
    *pVgDataBlocks = pVgroupList;
859,081,945✔
1018
  } else {
1019
    insDestroyVgroupDataCxtList(pVgroupList);
×
1020
  }
1021

1022
  return code;
859,081,940✔
1023
}
1024

1025
static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
901,187,746✔
1026
  int32_t  code = TSDB_CODE_SUCCESS;
901,187,746✔
1027
  uint32_t len = 0;
901,187,746✔
1028
  void*    pBuf = NULL;
901,187,746✔
1029
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
901,187,746!
1030
  if (TSDB_CODE_SUCCESS == code) {
901,191,003✔
1031
    SEncoder encoder;
892,057,176✔
1032
    len += sizeof(SSubmitReq2Msg);
901,177,306✔
1033
    pBuf = taosMemoryMalloc(len);
901,177,306!
1034
    if (NULL == pBuf) {
901,180,695!
1035
      return terrno;
×
1036
    }
1037
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
901,180,695✔
1038
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
901,198,386✔
1039
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
901,202,373✔
1040
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
901,202,569✔
1041
    code = tEncodeSubmitReq(&encoder, pReq);
901,203,135✔
1042
    tEncoderClear(&encoder);
901,208,195✔
1043
  }
1044

1045
  if (TSDB_CODE_SUCCESS == code) {
901,203,753!
1046
    *pData = pBuf;
901,203,753✔
1047
    *pLen = len;
901,203,520✔
1048
  } else {
1049
    taosMemoryFree(pBuf);
×
1050
  }
1051
  return code;
901,191,955✔
1052
}
1053

1054
static void destroyVgDataBlocks(void* p) {
×
1055
  if (p == NULL) return;
×
1056
  SVgDataBlocks* pVg = p;
×
1057
  taosMemoryFree(pVg->pData);
×
1058
  taosMemoryFree(pVg);
×
1059
}
1060

1061
int32_t insResetBlob(SSubmitReq2* p) {
901,189,025✔
1062
  int32_t code = 0;
901,189,025✔
1063
  if (p->raw) {
901,189,025!
1064
    return TSDB_CODE_SUCCESS;  // no blob data in raw mode
×
1065
  }
1066

1067
  if (p->aSubmitBlobData != NULL) {
901,198,065✔
1068
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
102,672✔
1069
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
51,336✔
1070
      SBlobSet**     ppBlob = taosArrayGet(p->aSubmitBlobData, i);
51,336✔
1071
      SBlobSet*      pBlob = ppBlob ? *ppBlob : NULL;
51,336!
1072
      int32_t        nrow = taosArrayGetSize(pSubmitTbData->aRowP);
51,336✔
1073
      int32_t        nblob = 0;
51,336✔
1074
      if (nrow > 0 && pBlob) {
51,336!
1075
        nblob = taosArrayGetSize(pBlob->pSeqTable);
51,336✔
1076
      }
1077
      uTrace("blob %p row size %d, pData size %d", pBlob, nblob, nrow);
51,336!
1078
      pSubmitTbData->pBlobSet = pBlob;
51,336✔
1079
      if (ppBlob != NULL) *ppBlob = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
51,336!
1080
    }
1081
  } else {
1082
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
1,863,834,465✔
1083
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
962,689,726✔
1084
      pSubmitTbData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
962,700,710✔
1085
    }
1086
  }
1087

1088
  return code;
901,205,845✔
1089
}
1090
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) {
866,294,971✔
1091
  size_t  numOfVg = taosArrayGetSize(pVgDataCxtList);
866,294,971✔
1092
  SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES);
866,298,628!
1093
  if (NULL == pDataBlocks) {
866,295,468!
1094
    return TSDB_CODE_OUT_OF_MEMORY;
×
1095
  }
1096

1097
  int32_t code = TSDB_CODE_SUCCESS;
866,295,468✔
1098
  for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
1,767,499,762✔
1099
    SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
901,198,845✔
1100
    if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) {
901,203,715!
1101
      continue;
×
1102
    }
1103
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
901,197,581!
1104
    if (NULL == dst) {
901,190,222!
1105
      code = terrno;
×
1106
    }
1107

1108
    if (TSDB_CODE_SUCCESS == code) {
901,190,222✔
1109
      dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
901,190,356✔
1110
      code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
901,193,301✔
1111
    }
1112
    if (TSDB_CODE_SUCCESS == code) {
901,208,901!
1113
      code = insResetBlob(src->pData);
901,208,901✔
1114
    }
1115

1116
    if (TSDB_CODE_SUCCESS == code) {
901,194,528!
1117
      code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
901,194,655✔
1118
    }
1119
    if (TSDB_CODE_SUCCESS == code) {
901,192,010!
1120
      code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);
901,204,432!
1121
    }
1122
    if (TSDB_CODE_SUCCESS != code) {
901,199,891!
1123
      destroyVgDataBlocks(dst);
×
1124
    }
1125
  }
1126

1127
  if (append) {
866,300,917✔
1128
    if (NULL == *pVgDataBlocks) {
7,220,868!
1129
      *pVgDataBlocks = pDataBlocks;
7,220,868✔
1130
    }
1131
    return code;
7,220,868✔
1132
  }
1133

1134
  if (TSDB_CODE_SUCCESS == code) {
859,080,049✔
1135
    *pVgDataBlocks = pDataBlocks;
859,062,936✔
1136
  } else {
1137
    taosArrayDestroyP(pDataBlocks, destroyVgDataBlocks);
17,113✔
1138
  }
1139

1140
  return code;
859,069,214✔
1141
}
1142

1143
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
×
1144
  for (int i = 0; i < numFields; i++) {
×
1145
    if (strcmp(pSchema->name, fields[i].name) == 0) {
×
1146
      return true;
×
1147
    }
1148
  }
1149

1150
  return false;
×
1151
}
1152

1153
int32_t checkSchema(SSchema* pColSchema, SSchemaExt* pColExtSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
166,060✔
1154
  if (*fields != pColSchema->type) {
166,060✔
1155
    if (errstr != NULL) {
295!
1156
      snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
×
1157
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
×
1158
    } else {
1159
      char buf[512] = {0};
295✔
1160
      snprintf(buf, sizeof(buf), "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
590!
1161
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
590✔
1162
      uError("checkSchema %s", buf);
295!
1163
    }
1164
    return TSDB_CODE_INVALID_PARA;
295✔
1165
  }
1166

1167
  if (IS_DECIMAL_TYPE(pColSchema->type)) {
165,765!
1168
    uint8_t precision = 0, scale = 0;
602✔
1169
    decimalFromTypeMod(pColExtSchema->typeMod, &precision, &scale);
602✔
1170
    uint8_t precisionData = 0, scaleData = 0;
602✔
1171
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
602✔
1172
    extractDecimalTypeInfoFromBytes(&bytes, &precisionData, &scaleData);
602✔
1173
    if (precision != precisionData || scale != scaleData) {
602!
1174
      if (errstr != NULL) {
×
1175
        snprintf(errstr, errstrLen,
×
1176
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
1177
                 "precision:%d, scale:%d",
1178
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
1179
                 precisionData, scaleData);
1180
        return TSDB_CODE_INVALID_PARA;
×
1181
      } else {
1182
        char buf[512] = {0};
×
1183
        snprintf(buf, sizeof(buf),
×
1184
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
1185
                 "precision:%d, scale:%d",
1186
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
1187
                 precisionData, scaleData);
1188
        uError("checkSchema %s", buf);
×
1189
        return TSDB_CODE_INVALID_PARA;
×
1190
      }
1191
    }
1192
    return 0;
602✔
1193
  }
1194

1195
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {
165,163!
1196
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
29,287✔
1197
    if (IS_STR_DATA_BLOB(pColSchema->type)) {
29,287!
1198
      if (bytes >= TSDB_MAX_BLOB_LEN) {
×
1199
        uError("column blob data bytes exceed max limit, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
×
1200
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name, bytes);
1201
        return TSDB_CODE_INVALID_PARA;
×
1202
      }
1203
    } else {
1204
      if (bytes > pColSchema->bytes) {
29,287✔
1205
        if (errstr != NULL) {
295!
1206
          snprintf(errstr, errstrLen,
×
1207
                   "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1208
                   pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1209
                   *(int32_t*)(fields + sizeof(int8_t)));
×
1210
        } else {
1211
          char buf[512] = {0};
295✔
1212
          snprintf(buf, sizeof(buf),
590✔
1213
                   "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1214
                   pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
885!
1215
                   *(int32_t*)(fields + sizeof(int8_t)));
295✔
1216
          uError("checkSchema %s", buf);
295!
1217
        }
1218
        return TSDB_CODE_INVALID_PARA;
295✔
1219
      }
1220
    }
1221
  }
1222

1223
  if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
164,868!
1224
    if (errstr != NULL) {
×
1225
      snprintf(errstr, errstrLen,
×
1226
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1227
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1228
               *(int32_t*)(fields + sizeof(int8_t)));
×
1229
    } else {
1230
      char buf[512] = {0};
×
1231
      snprintf(buf, sizeof(buf),
×
1232
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1233
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1234
               *(int32_t*)(fields + sizeof(int8_t)));
×
1235
      uError("checkSchema %s", buf);
×
1236
    }
1237
    return TSDB_CODE_INVALID_PARA;
×
1238
  }
1239
  return 0;
164,868✔
1240
}
1241

1242
#define PRCESS_DATA(i, j)                                                                                          \
1243
  ret = checkSchema(pColSchema, pColExtSchema, fields, errstr, errstrLen);                                         \
1244
  if (ret != 0) {                                                                                                  \
1245
    goto end;                                                                                                      \
1246
  }                                                                                                                \
1247
                                                                                                                   \
1248
  if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {                                                          \
1249
    hasTs = true;                                                                                                  \
1250
  }                                                                                                                \
1251
                                                                                                                   \
1252
  int8_t* offset = pStart;                                                                                         \
1253
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {                                                                        \
1254
    pStart += numOfRows * sizeof(int32_t);                                                                         \
1255
  } else {                                                                                                         \
1256
    pStart += BitmapLen(numOfRows);                                                                                \
1257
  }                                                                                                                \
1258
  char* pData = pStart;                                                                                            \
1259
                                                                                                                   \
1260
  SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);                                                        \
1261
  if (hasBlob) {                                                                                                   \
1262
    ret = tColDataAddValueByDataBlockWithBlob(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData, \
1263
                                              pBlobSet);                                                           \
1264
  } else {                                                                                                         \
1265
    ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);        \
1266
  }                                                                                                                \
1267
  if (ret != 0) {                                                                                                  \
1268
    goto end;                                                                                                      \
1269
  }                                                                                                                \
1270
  fields += sizeof(int8_t) + sizeof(int32_t);                                                                      \
1271
  if (needChangeLength && version == BLOCK_VERSION_1) {                                                            \
1272
    pStart += htonl(colLength[i]);                                                                                 \
1273
  } else {                                                                                                         \
1274
    pStart += colLength[i];                                                                                        \
1275
  }                                                                                                                \
1276
  boundInfo->pColIndex[j] = -1;
1277

1278
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
36,695✔
1279
                     int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
1280
  int       ret = 0;
36,695✔
1281
  int8_t    hasBlob = 0;
36,695✔
1282
  SBlobSet* pBlobSet = NULL;
36,695✔
1283
  if (data == NULL) {
36,695!
1284
    uError("rawBlockBindData, data is NULL");
×
1285
    return TSDB_CODE_APP_ERROR;
×
1286
  }
1287
  void* tmp =
1288
      taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
36,695✔
1289
  SVCreateTbReq* pCreateReqTmp = NULL;
36,695✔
1290
  if (tmp == NULL && pCreateTb != NULL) {
36,695✔
1291
    ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
3,472✔
1292
    if (ret != TSDB_CODE_SUCCESS) {
3,472!
1293
      uError("cloneSVreateTbReq error");
×
1294
      goto end;
×
1295
    }
1296
  }
1297

1298
  STableDataCxt* pTableCxt = NULL;
36,695✔
1299
  ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
36,695✔
1300
                           sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
1301
  if (pCreateReqTmp != NULL) {
36,695!
1302
    tdDestroySVCreateTbReq(pCreateReqTmp);
×
1303
    taosMemoryFree(pCreateReqTmp);
×
1304
  }
1305

1306
  hasBlob = pTableCxt->hasBlob;
36,695✔
1307
  if (hasBlob && pTableCxt->pData->pBlobSet == NULL) {
36,695!
1308
    ret = tBlobSetCreate(512, 0, &pTableCxt->pData->pBlobSet);
×
1309
    if (pTableCxt->pData->pBlobSet == NULL) {
×
1310
      uError("create blob set failed");
×
1311
      ret = terrno;
×
1312
    }
1313
  }
1314

1315
  if (ret != TSDB_CODE_SUCCESS) {
36,695!
1316
    uError("insGetTableDataCxt error");
×
1317
    goto end;
×
1318
  }
1319
  pBlobSet = pTableCxt->pData->pBlobSet;
36,695✔
1320

1321
  pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
36,695✔
1322
  if (tmp == NULL) {
36,695✔
1323
    ret = initTableColSubmitData(pTableCxt);
32,924✔
1324
    if (ret != TSDB_CODE_SUCCESS) {
32,924!
1325
      uError("initTableColSubmitData error");
×
1326
      goto end;
×
1327
    }
1328
  }
1329

1330
  char* p = (char*)data;
36,695✔
1331
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1332
  // column length |
1333
  int32_t version = *(int32_t*)data;
36,695✔
1334
  p += sizeof(int32_t);
36,695✔
1335
  p += sizeof(int32_t);
36,695✔
1336

1337
  int32_t numOfRows = *(int32_t*)p;
36,695✔
1338
  p += sizeof(int32_t);
36,695✔
1339

1340
  int32_t numOfCols = *(int32_t*)p;
36,695✔
1341
  p += sizeof(int32_t);
36,695✔
1342

1343
  p += sizeof(int32_t);
36,695✔
1344
  p += sizeof(uint64_t);
36,695✔
1345

1346
  int8_t* fields = p;
36,695✔
1347
  if (*fields >= TSDB_DATA_TYPE_MAX || *fields < 0) {
36,695!
1348
    uError("fields type error:%d", *fields);
×
1349
    ret = TSDB_CODE_INVALID_PARA;
×
1350
    goto end;
×
1351
  }
1352
  p += numOfCols * (sizeof(int8_t) + sizeof(int32_t));
36,695✔
1353

1354
  int32_t* colLength = (int32_t*)p;
36,695✔
1355
  p += sizeof(int32_t) * numOfCols;
36,695✔
1356

1357
  char* pStart = p;
36,695✔
1358

1359
  SSchema*       pSchema = getTableColumnSchema(pTableMeta);
36,695✔
1360
  SSchemaExt*    pExtSchemas = getTableColumnExtSchema(pTableMeta);
36,695✔
1361
  SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
36,695✔
1362

1363
  if (tFields != NULL && numFields != numOfCols) {
36,695!
1364
    if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to data cols:%d", numFields, numOfCols);
×
1365
    ret = TSDB_CODE_INVALID_PARA;
×
1366
    goto end;
×
1367
  }
1368

1369
  bool hasTs = false;
36,695✔
1370
  if (tFields == NULL) {
36,695✔
1371
    int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
1,770✔
1372
    for (int j = 0; j < len; j++) {
5,310✔
1373
      SSchema*    pColSchema = &pSchema[j];
4,130✔
1374
      SSchemaExt* pColExtSchema = &pExtSchemas[j];
4,130✔
1375
      PRCESS_DATA(j, j)
4,130!
1376
    }
1377
  } else {
1378
    for (int i = 0; i < numFields; i++) {
185,810✔
1379
      for (int j = 0; j < boundInfo->numOfBound; j++) {
1,321,550!
1380
        SSchema*    pColSchema = &pSchema[j];
1,321,550✔
1381
        SSchemaExt* pColExtSchema = &pExtSchemas[j];
1,321,550✔
1382
        char*       fieldName = NULL;
1,321,550✔
1383
        if (raw) {
1,321,550✔
1384
          fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
1,320,075✔
1385
        } else {
1386
          fieldName = ((TAOS_FIELD*)tFields)[i].name;
1,475✔
1387
        }
1388
        if (strcmp(pColSchema->name, fieldName) == 0) {
1,321,550!
1389
          PRCESS_DATA(i, j)
150,885!
1390
          break;
150,885✔
1391
        }
1392
      }
1393
    }
1394
  }
1395

1396
  if (!hasTs) {
36,105!
1397
    if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
×
1398
    ret = TSDB_CODE_INVALID_PARA;
×
1399
    goto end;
×
1400
  }
1401

1402
  // process NULL data
1403
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
194,307✔
1404
    if (boundInfo->pColIndex[c] != -1) {
158,202✔
1405
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
4,072✔
1406
      ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
4,072✔
1407
      if (ret != 0) {
4,072!
1408
        goto end;
×
1409
      }
1410
    } else {
1411
      boundInfo->pColIndex[c] = c;  // restore for next block
154,130✔
1412
    }
1413
  }
1414

1415
end:
36,695✔
1416
  return ret;
36,695✔
1417
}
1418

1419
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data) {
×
1420
  int code = transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion);
×
1421
  if (code != 0) {
×
1422
    return code;
×
1423
  }
1424
  SVgroupDataCxt* pVgCxt = NULL;
×
1425
  void**          pp = taosHashGet(pVgroupHash, &pTableMeta->vgId, sizeof(pTableMeta->vgId));
×
1426
  if (NULL == pp) {
×
1427
    code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt);
×
1428
    if (code != 0) {
×
1429
      return code;
×
1430
    }
1431
  } else {
1432
    pVgCxt = *(SVgroupDataCxt**)pp;
×
1433
  }
1434
  if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1435
    pVgCxt->pData->aSubmitTbData = taosArrayInit(0, POINTER_BYTES);
×
1436
    pVgCxt->pData->raw = true;
×
1437
    if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1438
      return terrno;
×
1439
    }
1440
  }
1441

1442
  // push data to submit, rebuild empty data for next submit
1443
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, &data)) {
×
1444
    return terrno;
×
1445
  }
1446

1447
  uTrace("add raw data to vgId:%d, len:%d", pTableMeta->vgId, *(int32_t*)data);
×
1448

1449
  return 0;
×
1450
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc