• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3561

19 Dec 2024 03:15AM UTC coverage: 58.812% (-1.3%) from 60.124%
#3561

push

travis-ci

web-flow
Merge pull request #29213 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

130770 of 287658 branches covered (45.46%)

Branch coverage included in aggregate %.

32 of 78 new or added lines in 4 files covered. (41.03%)

7347 existing lines in 166 files now uncovered.

205356 of 283866 relevant lines covered (72.34%)

7187865.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.07
/source/libs/parser/src/parInsertUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "parInsertUtil.h"
17

18
#include "catalog.h"
19
#include "parInt.h"
20
#include "parUtil.h"
21
#include "querynodes.h"
22
#include "tRealloc.h"
23
#include "tdatablock.h"
24
#include "tmisce.h"
25

26
void qDestroyBoundColInfo(void* pInfo) {
4✔
27
  if (NULL == pInfo) {
4✔
28
    return;
3✔
29
  }
30

31
  SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
1✔
32

33
  taosMemoryFreeClear(pBoundInfo->pColIndex);
1!
34
}
35

36
static char* tableNameGetPosition(SToken* pToken, char target) {
9,505,878✔
37
  bool inEscape = false;
9,505,878✔
38
  bool inQuote = false;
9,505,878✔
39
  char quotaStr = 0;
9,505,878✔
40

41
  for (uint32_t i = 0; i < pToken->n; ++i) {
106,626,660✔
42
    if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
105,643,154!
43
      return pToken->z + i;
8,522,372✔
44
    }
45

46
    if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
97,120,782✔
47
      if (!inQuote) {
202!
48
        inEscape = !inEscape;
202✔
49
      }
50
    }
51

52
    if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
97,120,782!
53
      if (!inEscape) {
34,172✔
54
        if (!inQuote) {
24,000✔
55
          quotaStr = *(pToken->z + i);
12,000✔
56
          inQuote = !inQuote;
12,000✔
57
        } else if (quotaStr == *(pToken->z + i)) {
12,000!
58
          inQuote = !inQuote;
12,000✔
59
        }
60
      }
61
    }
62
  }
63

64
  return NULL;
983,506✔
65
}
66

67
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
9,505,143✔
68
  const char* msg1 = "name too long";
9,505,143✔
69
  const char* msg2 = "invalid database name";
9,505,143✔
70
  const char* msg3 = "db is not specified";
9,505,143✔
71
  const char* msg4 = "invalid table name";
9,505,143✔
72

73
  int32_t code = TSDB_CODE_SUCCESS;
9,505,143✔
74
  char*   p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
9,505,143✔
75

76
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
9,521,138✔
77
    int32_t dbLen = p - pTableName->z;
8,521,235✔
78
    if (dbLen <= 0) {
8,521,235!
79
      return buildInvalidOperationMsg(pMsgBuf, msg2);
×
80
    }
81
    char name[TSDB_DB_FNAME_LEN] = {0};
8,521,235✔
82
    strncpy(name, pTableName->z, dbLen);
8,521,235✔
83
    int32_t actualDbLen = strdequote(name);
8,521,235✔
84

85
    code = tNameSetDbName(pName, acctId, name, actualDbLen);
8,518,408✔
86
    if (code != TSDB_CODE_SUCCESS) {
8,511,225!
87
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
88
    }
89

90
    int32_t tbLen = pTableName->n - dbLen - 1;
8,511,225✔
91
    if (tbLen <= 0) {
8,511,225!
92
      return buildInvalidOperationMsg(pMsgBuf, msg4);
×
93
    }
94

95
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
8,511,225✔
96
    strncpy(tbname, p + 1, tbLen);
8,511,225✔
97
    /*tbLen = */ (void)strdequote(tbname);
8,511,225✔
98

99
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
8,525,519✔
100
    if (code != 0) {
8,515,359!
101
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
102
    }
103
  } else {  // get current DB name first, and then set it into path
104
    if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
999,903!
105
      return buildInvalidOperationMsg(pMsgBuf, msg1);
2✔
106
    }
107
    if (pTableName->n == 0) {
999,903!
108
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "invalid table name");
×
109
    }
110

111
    char name[TSDB_TABLE_FNAME_LEN] = {0};
999,903✔
112
    strncpy(name, pTableName->z, pTableName->n);
999,903✔
113
    (void)strdequote(name);
999,903✔
114

115
    if (dbName == NULL) {
1,000,063✔
116
      return buildInvalidOperationMsg(pMsgBuf, msg3);
9✔
117
    }
118
    if (name[0] == '\0') return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
1,000,054!
119

120
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
1,000,054✔
121
    if (code != TSDB_CODE_SUCCESS) {
1,000,036!
122
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
×
123
      return code;
×
124
    }
125

126
    code = tNameFromString(pName, name, T_NAME_TABLE);
1,000,036✔
127
    if (code != 0) {
1,000,037!
128
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
×
129
    }
130
  }
131

132
  if (NULL != strchr(pName->tname, '.')) {
9,515,396!
133
    code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'");
×
134
  }
135

136
  return code;
9,514,775✔
137
}
138

139
int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchema) {
8,714,174✔
140
  while (start < end) {
8,803,458✔
141
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
8,792,453✔
142
      return start;
8,703,169✔
143
    }
144
    ++start;
89,284✔
145
  }
146
  return -1;
11,005✔
147
}
148

149
int32_t insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
48,405✔
150
                            SArray* tagName, uint8_t tagNum, int32_t ttl) {
151
  pTbReq->type = TD_CHILD_TABLE;
48,405✔
152
  pTbReq->ctb.pTag = (uint8_t*)pTag;
48,405✔
153
  pTbReq->name = taosStrdup(tname);
48,405!
154
  if (!pTbReq->name) return terrno;
48,405!
155
  pTbReq->ctb.suid = suid;
48,405✔
156
  pTbReq->ctb.tagNum = tagNum;
48,405✔
157
  if (sname) {
48,405✔
158
    pTbReq->ctb.stbName = taosStrdup(sname);
47,871!
159
    if (!pTbReq->ctb.stbName) return terrno;
47,871!
160
  }
161
  pTbReq->ctb.tagName = taosArrayDup(tagName, NULL);
48,405✔
162
  if (!pTbReq->ctb.tagName) return terrno;
48,405!
163
  pTbReq->ttl = ttl;
48,405✔
164
  pTbReq->commentLen = -1;
48,405✔
165

166
  return TSDB_CODE_SUCCESS;
48,405✔
167
}
168

169
static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
×
170
  for (int32_t i = 0; i < ncols; ++i) {
×
171
    pBoundCols[i] = i;
×
172
  }
173
}
×
174

175
static int32_t initColValues(STableMeta* pTableMeta, SArray* pValues) {
9,467,074✔
176
  SSchema* pSchemas = getTableColumnSchema(pTableMeta);
9,467,074✔
177
  int32_t  code = 0;
9,469,203✔
178
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
61,134,759✔
179
    SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
51,635,795✔
180
    if (NULL == taosArrayPush(pValues, &val)) {
51,665,556!
181
      code = terrno;
×
182
      break;
×
183
    }
184
  }
185
  return code;
9,498,964✔
186
}
187

188
int32_t insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { return initColValues(pTableMeta, aColValues); }
646✔
189

190
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
9,516,874✔
191
  pInfo->numOfCols = numOfBound;
9,516,874✔
192
  pInfo->numOfBound = numOfBound;
9,516,874✔
193
  pInfo->hasBoundCols = false;
9,516,874✔
194
  pInfo->mixTagsCols = false;
9,516,874✔
195
  pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t));
9,516,874!
196
  if (NULL == pInfo->pColIndex) {
9,530,477!
197
    return terrno;
×
198
  }
199
  for (int32_t i = 0; i < numOfBound; ++i) {
61,480,422✔
200
    pInfo->pColIndex[i] = i;
51,949,945✔
201
  }
202
  return TSDB_CODE_SUCCESS;
9,530,477✔
203
}
204

205
void insResetBoundColsInfo(SBoundColInfo* pInfo) {
20✔
206
  pInfo->numOfBound = pInfo->numOfCols;
20✔
207
  pInfo->hasBoundCols = false;
20✔
208
  pInfo->mixTagsCols = false;
20✔
209
  for (int32_t i = 0; i < pInfo->numOfCols; ++i) {
100✔
210
    pInfo->pColIndex[i] = i;
80✔
211
  }
212
}
20✔
213

214
void insCheckTableDataOrder(STableDataCxt* pTableCxt, SRowKey* rowKey) {
271,697,114✔
215
  // once the data block is disordered, we do NOT keep last timestamp any more
216
  if (!pTableCxt->ordered) {
271,697,114✔
217
    return;
12,785,726✔
218
  }
219

220
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) < 0) {
258,911,388✔
221
    pTableCxt->ordered = false;
9,255✔
222
  }
223

224
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) == 0) {
259,084,680✔
225
    pTableCxt->duplicateTs = true;
82✔
226
  }
227

228
  // TODO: for variable length data type, we need to copy it out
229
  pTableCxt->lastKey = *rowKey;
258,837,008✔
230
  return;
258,837,008✔
231
}
232

233
void insDestroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
28,061,797!
234

235
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput,
9,433,975✔
236
                                  bool colMode, bool ignoreColVals) {
237
  STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
9,433,975!
238
  if (NULL == pTableCxt) {
9,475,408!
239
    *pOutput = NULL;
×
240
    return terrno;
×
241
  }
242

243
  int32_t code = TSDB_CODE_SUCCESS;
9,475,408✔
244

245
  pTableCxt->lastKey = (SRowKey){0};
9,475,408✔
246
  pTableCxt->ordered = true;
9,475,408✔
247
  pTableCxt->duplicateTs = false;
9,475,408✔
248

249
  pTableCxt->pMeta = tableMetaDup(pTableMeta);
9,475,408✔
250
  if (NULL == pTableCxt->pMeta) {
9,473,089!
251
    code = TSDB_CODE_OUT_OF_MEMORY;
×
252
  }
253
  if (TSDB_CODE_SUCCESS == code) {
9,473,089!
254
    pTableCxt->pSchema =
9,486,889✔
255
        tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
9,474,573✔
256
    if (NULL == pTableCxt->pSchema) {
9,486,889!
257
      code = TSDB_CODE_OUT_OF_MEMORY;
×
258
    }
259
  }
260
  if (TSDB_CODE_SUCCESS == code) {
9,485,405✔
261
    code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo);
9,478,572✔
262
  }
263
  if (TSDB_CODE_SUCCESS == code && !ignoreColVals) {
9,492,554✔
264
    pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal));
9,480,770✔
265
    if (NULL == pTableCxt->pValues) {
9,467,550!
266
      code = terrno;
×
267
    } else {
268
      code = initColValues(pTableMeta, pTableCxt->pValues);
9,467,550✔
269
    }
270
  }
271
  if (TSDB_CODE_SUCCESS == code) {
9,471,613✔
272
    pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
9,470,730!
273
    if (NULL == pTableCxt->pData) {
9,486,865!
274
      code = terrno;
×
275
    } else {
276
      pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
9,486,865!
277
      pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
9,486,865✔
278
      pTableCxt->pData->suid = pTableMeta->suid;
9,486,865✔
279
      pTableCxt->pData->uid = pTableMeta->uid;
9,486,865✔
280
      pTableCxt->pData->sver = pTableMeta->sversion;
9,486,865✔
281
      pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
9,486,865✔
282
      if (pCreateTbReq != NULL) *pCreateTbReq = NULL;
9,486,865✔
283
      if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
9,486,865✔
284
        pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
127✔
285
        if (NULL == pTableCxt->pData->aCol) {
127!
286
          code = terrno;
×
287
        }
288
      } else {
289
        pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
9,486,738✔
290
        if (NULL == pTableCxt->pData->aRowP) {
9,473,845!
291
          code = terrno;
×
292
        }
293
      }
294
    }
295
  }
296
  if (TSDB_CODE_SUCCESS == code) {
9,474,855!
297
    *pOutput = pTableCxt;
9,474,855✔
298
    qDebug("tableDataCxt created, code:%d, uid:%" PRId64 ", vgId:%d", code, pTableMeta->uid, pTableMeta->vgId);
9,474,855✔
299
  } else {
300
    insDestroyTableDataCxt(pTableCxt);
×
301
  }
302

303
  return code;
9,475,697✔
304
}
305

306
static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
642✔
307
  int32_t        code = TSDB_CODE_SUCCESS;
642✔
308
  SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData));
642!
309
  if (NULL == pTmp) {
642!
310
    code = terrno;
×
311
  } else {
312
    pTmp->flags = pSrc->flags;
642✔
313
    pTmp->suid = pSrc->suid;
642✔
314
    pTmp->uid = pSrc->uid;
642✔
315
    pTmp->sver = pSrc->sver;
642✔
316
    pTmp->pCreateTbReq = NULL;
642✔
317
    if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
642✔
318
      if (pSrc->pCreateTbReq) {
538!
319
        code = cloneSVreateTbReq(pSrc->pCreateTbReq, &pTmp->pCreateTbReq);
538✔
320
      } else {
321
        pTmp->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
×
322
      }
323
    }
324
    if (TSDB_CODE_SUCCESS == code) {
642!
325
      if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
642✔
326
        pTmp->aCol = taosArrayInit(128, sizeof(SColData));
126✔
327
        if (NULL == pTmp->aCol) {
126!
328
          code = terrno;
×
329
          taosMemoryFree(pTmp);
×
330
        }
331
      } else {
332
        pTmp->aRowP = taosArrayInit(128, POINTER_BYTES);
516✔
333
        if (NULL == pTmp->aRowP) {
516!
334
          code = terrno;
×
335
          taosMemoryFree(pTmp);
×
336
        }
337
      }
338
    } else {
339
      taosMemoryFree(pTmp);
×
340
    }
341
  }
342

343
  taosMemoryFree(pSrc);
642!
344
  if (TSDB_CODE_SUCCESS == code) {
642!
345
    *pDst = pTmp;
642✔
346
  }
347

348
  return code;
642✔
349
}
350

351
static void resetColValues(SArray* pValues) {
109✔
352
  int32_t num = taosArrayGetSize(pValues);
109✔
353
  for (int32_t i = 0; i < num; ++i) {
567✔
354
    SColVal* pVal = taosArrayGet(pValues, i);
458✔
355
    pVal->flag = CV_FLAG_NONE;
458✔
356
  }
357
}
109✔
358

359
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
9,468,006✔
360
                           SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals) {
361
  STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
9,468,006✔
362
  if (NULL != tmp) {
9,439,933✔
363
    *pTableCxt = *tmp;
3,173✔
364
    if (!ignoreColVals) {
3,173✔
365
      resetColValues((*pTableCxt)->pValues);
109✔
366
    }
367
    return TSDB_CODE_SUCCESS;
3,173✔
368
  }
369
  int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals);
9,436,760✔
370
  if (TSDB_CODE_SUCCESS == code) {
9,475,100!
371
    void* pData = *pTableCxt;  // deal scan coverity
9,475,356✔
372
    code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
9,475,356✔
373
  }
374

375
  if (TSDB_CODE_SUCCESS != code) {
9,483,229!
376
    insDestroyTableDataCxt(*pTableCxt);
×
377
  }
378
  return code;
9,482,217✔
379
}
380

381
static void destroyColVal(void* p) {
51,743,156✔
382
  SColVal* pVal = p;
51,743,156✔
383
  if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type ||
51,743,156!
384
      TSDB_DATA_TYPE_VARBINARY == pVal->value.type) {
48,198,075✔
385
    taosMemoryFreeClear(pVal->value.pData);
3,547,165!
386
  }
387
}
51,743,156✔
388

389
void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
9,490,501✔
390
  if (NULL == pTableCxt) {
9,490,501!
391
    return;
×
392
  }
393

394
  taosMemoryFreeClear(pTableCxt->pMeta);
9,490,501!
395
  tDestroyTSchema(pTableCxt->pSchema);
9,489,639!
396
  insDestroyBoundColInfo(&pTableCxt->boundColsInfo);
9,490,606✔
397
  taosArrayDestroyEx(pTableCxt->pValues, destroyColVal);
9,489,567✔
398
  if (pTableCxt->pData) {
9,491,457✔
399
    tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
1,284✔
400
    taosMemoryFree(pTableCxt->pData);
1,284!
401
  }
402
  taosMemoryFree(pTableCxt);
9,491,457!
403
}
404

405
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
9,433,537✔
406
  if (NULL == pVgCxt) {
9,433,537!
407
    return;
×
408
  }
409

410
  tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
9,433,537✔
411
  taosMemoryFree(pVgCxt->pData);
9,440,931!
412
  taosMemoryFree(pVgCxt);
9,441,124!
413
}
414

415
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
18,749,942✔
416
  if (NULL == pVgCxtList) {
18,749,942✔
417
    return;
9,371,854✔
418
  }
419

420
  size_t size = taosArrayGetSize(pVgCxtList);
9,378,088✔
421
  for (int32_t i = 0; i < size; i++) {
18,845,214✔
422
    void* p = taosArrayGetP(pVgCxtList, i);
9,437,130✔
423
    insDestroyVgroupDataCxt(p);
9,433,856✔
424
  }
425

426
  taosArrayDestroy(pVgCxtList);
9,408,084✔
427
}
428

429
void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) {
×
430
  if (NULL == pVgCxtHash) {
×
431
    return;
×
432
  }
433

434
  void** p = taosHashIterate(pVgCxtHash, NULL);
×
435
  while (p) {
×
436
    insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p);
×
437

438
    p = taosHashIterate(pVgCxtHash, p);
×
439
  }
440

441
  taosHashCleanup(pVgCxtHash);
×
442
}
443

444
void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
9,407,186✔
445
  if (NULL == pTableCxtHash) {
9,407,186✔
446
    return;
1✔
447
  }
448

449
  void** p = taosHashIterate(pTableCxtHash, NULL);
9,407,185✔
450
  while (p) {
18,896,428✔
451
    insDestroyTableDataCxt(*(STableDataCxt**)p);
9,491,350✔
452

453
    p = taosHashIterate(pTableCxtHash, p);
9,490,701✔
454
  }
455

456
  taosHashCleanup(pTableCxtHash);
9,405,078✔
457
}
458

459
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) {
9,465,772✔
460
  if (NULL == pVgCxt->pData->aSubmitTbData) {
9,465,772✔
461
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
9,417,928✔
462
    if (NULL == pVgCxt->pData->aSubmitTbData) {
9,419,821!
463
      return terrno;
×
464
    }
465
  }
466

467
  // push data to submit, rebuild empty data for next submit
468
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData)) {
18,939,174!
469
    return terrno;
×
470
  }
471
  int32_t code = 0;
9,471,509✔
472
  if (isRebuild) {
9,471,509✔
473
    code = rebuildTableData(pTableCxt->pData, &pTableCxt->pData);
642✔
474
  } else if (clear) {
9,470,867!
475
    taosMemoryFreeClear(pTableCxt->pData);
9,471,959!
476
  }
477

478
  qDebug("add tableDataCxt uid:%" PRId64 " to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
9,458,393✔
479

480
  return code;
9,463,960✔
481
}
482

483
static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHash, SArray* pVgroupList,
9,384,696✔
484
                                   SVgroupDataCxt** pOutput) {
485
  SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
9,384,696!
486
  if (NULL == pVgCxt) {
9,433,980!
487
    return terrno;
×
488
  }
489
  pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2));
9,433,980!
490
  if (NULL == pVgCxt->pData) {
9,438,678!
491
    insDestroyVgroupDataCxt(pVgCxt);
×
492
    return terrno;
×
493
  }
494

495
  pVgCxt->vgId = pTableCxt->pMeta->vgId;
9,438,678✔
496
  int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
9,438,678✔
497
  if (TSDB_CODE_SUCCESS == code) {
9,429,637!
498
    if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) {
9,419,253!
499
      code = terrno;
×
500
      insDestroyVgroupDataCxt(pVgCxt);
×
UNCOV
501
      return code;
×
502
    }
503
    //    uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
504
    *pOutput = pVgCxt;
9,419,253✔
505
  } else {
506
    insDestroyVgroupDataCxt(pVgCxt);
×
507
  }
508
  return code;
9,420,861✔
509
}
510

511
int insColDataComp(const void* lp, const void* rp) {
662✔
512
  SColData* pLeft = (SColData*)lp;
662✔
513
  SColData* pRight = (SColData*)rp;
662✔
514
  if (pLeft->cid < pRight->cid) {
662!
515
    return -1;
662✔
516
  } else if (pLeft->cid > pRight->cid) {
×
517
    return 1;
×
518
  }
519

520
  return 0;
×
521
}
522

UNCOV
523
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId,
×
524
                                 STableColsData* pTbData, SName* sname) {
UNCOV
525
  if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) {
×
UNCOV
526
    return TSDB_CODE_SUCCESS;
×
527
  }
528

UNCOV
529
  SVgroupInfo      vgInfo = {0};
×
UNCOV
530
  SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
×
UNCOV
531
                           .requestId = pBuildInfo->requestId,
×
UNCOV
532
                           .requestObjRefId = pBuildInfo->requestSelf,
×
533
                           .mgmtEps = pBuildInfo->mgmtEpSet};
534

UNCOV
535
  int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo);
×
UNCOV
536
  if (TSDB_CODE_SUCCESS != code) {
×
537
    return code;
×
538
  }
539

UNCOV
540
  code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
×
UNCOV
541
  if (TSDB_CODE_SUCCESS != code) {
×
542
    return code;
×
543
  }
544

UNCOV
545
  return TSDB_CODE_SUCCESS;
×
546
}
547

UNCOV
548
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData,
×
549
                             uint64_t* uid, int32_t* vgId) {
UNCOV
550
  STableVgUid* pTbInfo = NULL;
×
UNCOV
551
  int32_t      code = 0;
×
552

UNCOV
553
  if (pTbData->getFromHash) {
×
UNCOV
554
    pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName));
×
555
  }
556

UNCOV
557
  if (NULL == pTbInfo) {
×
558
    SName sname;
UNCOV
559
    code = qCreateSName(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0);
×
UNCOV
560
    if (TSDB_CODE_SUCCESS != code) {
×
561
      return code;
×
562
    }
563

UNCOV
564
    STableMeta*      pTableMeta = NULL;
×
UNCOV
565
    SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
×
UNCOV
566
                             .requestId = pBuildInfo->requestId,
×
UNCOV
567
                             .requestObjRefId = pBuildInfo->requestSelf,
×
568
                             .mgmtEps = pBuildInfo->mgmtEpSet};
UNCOV
569
    code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta);
×
570

UNCOV
571
    if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
×
572
      parserDebug("tb %s.%s not exist", sname.dbname, sname.tname);
×
573
      return code;
×
574
    }
575

UNCOV
576
    if (TSDB_CODE_SUCCESS != code) {
×
577
      return code;
×
578
    }
579

UNCOV
580
    *uid = pTableMeta->uid;
×
UNCOV
581
    *vgId = pTableMeta->vgId;
×
582

UNCOV
583
    STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId};
×
UNCOV
584
    code = tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo));
×
UNCOV
585
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
586
      code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname);
×
587
    }
588

UNCOV
589
    taosMemoryFree(pTableMeta);
×
590
  } else {
UNCOV
591
    *uid = pTbInfo->uid;
×
UNCOV
592
    *vgId = pTbInfo->vgid;
×
593
  }
594

UNCOV
595
  return code;
×
596
}
597

598
int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
×
599
  int32_t             code = TSDB_CODE_SUCCESS;
×
600
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
×
601

602
  if (TSDB_CODE_SUCCESS == code) {
×
603
    code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
×
604
  }
605

606
  return code;
×
607
}
608

UNCOV
609
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
×
610
                                  SStbInterlaceInfo* pBuildInfo) {
UNCOV
611
  int32_t  code = TSDB_CODE_SUCCESS;
×
612
  uint64_t uid;
613
  int32_t  vgId;
614

UNCOV
615
  pTbCtx->pData->aRowP = pTbData->aCol;
×
616

UNCOV
617
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId);
×
UNCOV
618
  if (TSDB_CODE_SUCCESS != code) {
×
619
    return code;
×
620
  }
621

UNCOV
622
  pTbCtx->pMeta->vgId = vgId;
×
UNCOV
623
  pTbCtx->pMeta->uid = uid;
×
UNCOV
624
  pTbCtx->pData->uid = uid;
×
625

UNCOV
626
  if (!pTbCtx->ordered) {
×
627
    code = tRowSort(pTbCtx->pData->aRowP);
×
628
  }
UNCOV
629
  if (code == TSDB_CODE_SUCCESS && (!pTbCtx->ordered || pTbCtx->duplicateTs)) {
×
630
    code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0);
×
631
  }
632

UNCOV
633
  if (TSDB_CODE_SUCCESS != code) {
×
634
    return code;
×
635
  }
636

UNCOV
637
  SVgroupDataCxt* pVgCxt = NULL;
×
UNCOV
638
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
×
UNCOV
639
  if (NULL == pp) {
×
UNCOV
640
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
×
UNCOV
641
    if (NULL == pp) {
×
UNCOV
642
      code = createVgroupDataCxt(pTbCtx, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
×
643
    } else {
644
      pVgCxt = *(SVgroupDataCxt**)pp;
×
645
    }
646
  } else {
647
    pVgCxt = *(SVgroupDataCxt**)pp;
×
648
  }
649

UNCOV
650
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
651
    code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false);
×
652
  }
653

UNCOV
654
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
×
655
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
656
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
657
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
658
    // insDestroyVgroupDataCxt(pVgCxt);
659
  }
660

UNCOV
661
  return code;
×
662
}
663

664
/*
665
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild,
666
int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
667
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
668
  if (NULL == pVgroupHash || NULL == pVgroupList) {
669
    taosHashCleanup(pVgroupHash);
670
    taosArrayDestroy(pVgroupList);
671
    return TSDB_CODE_OUT_OF_MEMORY;
672
  }
673

674
  int32_t code = TSDB_CODE_SUCCESS;
675

676
  for (int32_t i = 0; i < tbNum; ++i) {
677
    STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i);
678
    pTableCxt->pMeta->vgId = pTableCols->vgId;
679
    pTableCxt->pMeta->uid = pTableCols->uid;
680
    pTableCxt->pData->uid = pTableCols->uid;
681
    pTableCxt->pData->aCol = pTableCols->aCol;
682

683
    SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
684
    if (pCol->nVal <= 0) {
685
      continue;
686
    }
687

688
    if (pTableCxt->pData->pCreateTbReq) {
689
      pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
690
    }
691

692
    taosArraySort(pTableCxt->pData->aCol, insColDataComp);
693

694
    tColDataSortMerge(pTableCxt->pData->aCol);
695

696
    if (TSDB_CODE_SUCCESS == code) {
697
      SVgroupDataCxt* pVgCxt = NULL;
698
      int32_t         vgId = pTableCxt->pMeta->vgId;
699
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
700
      if (NULL == pp) {
701
        code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
702
      } else {
703
        pVgCxt = *(SVgroupDataCxt**)pp;
704
      }
705
      if (TSDB_CODE_SUCCESS == code) {
706
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false);
707
      }
708
    }
709
  }
710

711
  taosHashCleanup(pVgroupHash);
712
  if (TSDB_CODE_SUCCESS == code) {
713
    *pVgDataBlocks = pVgroupList;
714
  } else {
715
    insDestroyVgroupDataCxtList(pVgroupList);
716
  }
717

718
  return code;
719
}
720
*/
721

722
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
9,357,670✔
723
  SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
9,357,670✔
724
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
9,379,082✔
725
  if (NULL == pVgroupHash || NULL == pVgroupList) {
9,399,251!
UNCOV
726
    taosHashCleanup(pVgroupHash);
×
727
    taosArrayDestroy(pVgroupList);
×
728
    return terrno;
×
729
  }
730

731
  int32_t code = TSDB_CODE_SUCCESS;
9,399,703✔
732
  bool    colFormat = false;
9,399,703✔
733

734
  void* p = taosHashIterate(pTableHash, NULL);
9,399,703✔
735
  if (p) {
9,404,975!
736
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
9,405,222✔
737
    colFormat = (0 != (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT));
9,405,222✔
738
  }
739

740
  while (TSDB_CODE_SUCCESS == code && NULL != p) {
18,877,943✔
741
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
9,485,071✔
742
    if (colFormat) {
9,485,071✔
743
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
126✔
744
      if (pCol && pCol->nVal <= 0) {
126!
745
        p = taosHashIterate(pTableHash, p);
×
746
        continue;
×
747
      }
748

749
      if (pTableCxt->pData->pCreateTbReq) {
126✔
750
        pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
22✔
751
      }
752

753
      taosArraySort(pTableCxt->pData->aCol, insColDataComp);
126✔
754

755
      code = tColDataSortMerge(&pTableCxt->pData->aCol);
126✔
756
    } else {
757
      // skip the table has no data to insert
758
      // eg: import a csv without valid data
759
      // if (0 == taosArrayGetSize(pTableCxt->pData->aRowP)) {
760
      //   qWarn("no row in tableDataCxt uid:%" PRId64 " ", pTableCxt->pMeta->uid);
761
      //   p = taosHashIterate(pTableHash, p);
762
      //   continue;
763
      // }
764
      if (!pTableCxt->ordered) {
9,484,945✔
765
        code = tRowSort(pTableCxt->pData->aRowP);
9,255✔
766
      }
767
      if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
9,484,945✔
768
        code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
8,066✔
769
      }
770
    }
771

772
    if (TSDB_CODE_SUCCESS == code) {
9,482,737!
773
      SVgroupDataCxt* pVgCxt = NULL;
9,482,737✔
774
      int32_t         vgId = pTableCxt->pMeta->vgId;
9,482,737✔
775
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
9,482,737✔
776
      if (NULL == pp) {
9,436,156✔
777
        code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
9,386,167✔
778
      } else {
779
        pVgCxt = *(SVgroupDataCxt**)pp;
49,989✔
780
      }
781
      if (TSDB_CODE_SUCCESS == code) {
9,466,981!
782
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true);
9,468,524✔
783
      }
784
    }
785
    if (TSDB_CODE_SUCCESS == code) {
9,463,950!
786
      p = taosHashIterate(pTableHash, p);
9,465,301✔
787
    }
788
  }
789

790
  taosHashCleanup(pVgroupHash);
9,392,872✔
791
  if (TSDB_CODE_SUCCESS == code) {
9,382,677!
792
    *pVgDataBlocks = pVgroupList;
9,384,492✔
793
  } else {
UNCOV
794
    insDestroyVgroupDataCxtList(pVgroupList);
×
795
  }
796

797
  return code;
9,385,448✔
798
}
799

800
static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
9,396,154✔
801
  int32_t  code = TSDB_CODE_SUCCESS;
9,396,154✔
802
  uint32_t len = 0;
9,396,154✔
803
  void*    pBuf = NULL;
9,396,154✔
804
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
9,396,154!
805
  if (TSDB_CODE_SUCCESS == code) {
9,400,951!
806
    SEncoder encoder;
807
    len += sizeof(SSubmitReq2Msg);
9,417,791✔
808
    pBuf = taosMemoryMalloc(len);
9,417,791!
809
    if (NULL == pBuf) {
9,389,290!
810
      return terrno;
×
811
    }
812
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
9,389,290✔
813
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
9,389,290✔
814
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
9,389,290✔
815
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
9,383,147✔
816
    code = tEncodeSubmitReq(&encoder, pReq);
9,389,049✔
817
    tEncoderClear(&encoder);
9,418,510✔
818
  }
819

820
  if (TSDB_CODE_SUCCESS == code) {
9,434,640!
821
    *pData = pBuf;
9,434,640✔
822
    *pLen = len;
9,434,640✔
823
  } else {
824
    taosMemoryFree(pBuf);
×
825
  }
826
  return code;
9,412,499✔
827
}
828

829
static void destroyVgDataBlocks(void* p) {
×
830
  SVgDataBlocks* pVg = p;
×
831
  taosMemoryFree(pVg->pData);
×
832
  taosMemoryFree(pVg);
×
833
}
×
834

835
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) {
9,380,789✔
836
  size_t  numOfVg = taosArrayGetSize(pVgDataCxtList);
9,380,789✔
837
  SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES);
9,383,570!
838
  if (NULL == pDataBlocks) {
9,393,586!
839
    return TSDB_CODE_OUT_OF_MEMORY;
×
840
  }
841

842
  int32_t code = TSDB_CODE_SUCCESS;
9,393,586✔
843
  for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
18,803,078✔
844
    SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
9,417,418✔
845
    if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) {
9,414,241!
846
      continue;
×
847
    }
848
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
9,415,471!
849
    if (NULL == dst) {
9,416,791!
850
      code = terrno;
×
851
    }
852
    if (TSDB_CODE_SUCCESS == code) {
9,416,791!
853
      dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
9,418,771✔
854
      code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
9,419,554✔
855
      //      uError("td23101 3vgId:%d, numEps:%d", src->vgId, dst->vg.epSet.numOfEps);
856
    }
857
    if (TSDB_CODE_SUCCESS == code) {
9,406,719!
858
      code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
9,406,719✔
859
    }
860
    if (TSDB_CODE_SUCCESS == code) {
9,416,410✔
861
      code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);
9,405,723!
862
    }
863
  }
864

865
  if (append) {
9,385,660!
UNCOV
866
    if (NULL == *pVgDataBlocks) {
×
UNCOV
867
      *pVgDataBlocks = pDataBlocks;
×
868
    }
UNCOV
869
    return code;
×
870
  }
871

872
  if (TSDB_CODE_SUCCESS == code) {
9,385,660✔
873
    *pVgDataBlocks = pDataBlocks;
9,376,354✔
874
  } else {
875
    taosArrayDestroyP(pDataBlocks, destroyVgDataBlocks);
9,306✔
876
  }
877

878
  return code;
9,376,497✔
879
}
880

881
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
×
882
  for (int i = 0; i < numFields; i++) {
×
883
    if (strcmp(pSchema->name, fields[i].name) == 0) {
×
884
      return true;
×
885
    }
886
  }
887

888
  return false;
×
889
}
890

891
int32_t checkSchema(SSchema* pColSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
563✔
892
  if (*fields != pColSchema->type) {
563✔
893
    if (errstr != NULL)
1!
894
      snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
×
895
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
×
896
    return TSDB_CODE_INVALID_PARA;
1✔
897
  }
898
  if (IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) > pColSchema->bytes) {
562!
899
    if (errstr != NULL)
1!
900
      snprintf(errstr, errstrLen,
×
901
               "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
902
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
903
               *(int32_t*)(fields + sizeof(int8_t)));
×
904
    return TSDB_CODE_INVALID_PARA;
1✔
905
  }
906

907
  if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
561!
908
    if (errstr != NULL)
×
909
      snprintf(errstr, errstrLen,
×
910
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
911
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
912
               *(int32_t*)(fields + sizeof(int8_t)));
×
913
    return TSDB_CODE_INVALID_PARA;
×
914
  }
915
  return 0;
561✔
916
}
917

918
#define PRCESS_DATA(i, j)                                                                                 \
919
  ret = checkSchema(pColSchema, fields, errstr, errstrLen);                                               \
920
  if (ret != 0) {                                                                                         \
921
    goto end;                                                                                             \
922
  }                                                                                                       \
923
                                                                                                          \
924
  if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {                                                 \
925
    hasTs = true;                                                                                         \
926
  }                                                                                                       \
927
                                                                                                          \
928
  int8_t* offset = pStart;                                                                                \
929
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {                                                               \
930
    pStart += numOfRows * sizeof(int32_t);                                                                \
931
  } else {                                                                                                \
932
    pStart += BitmapLen(numOfRows);                                                                       \
933
  }                                                                                                       \
934
  char* pData = pStart;                                                                                   \
935
                                                                                                          \
936
  SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);                                               \
937
  ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); \
938
  if (ret != 0) {                                                                                         \
939
    goto end;                                                                                             \
940
  }                                                                                                       \
941
  fields += sizeof(int8_t) + sizeof(int32_t);                                                             \
942
  if (needChangeLength && version == BLOCK_VERSION_1) {                                                   \
943
    pStart += htonl(colLength[i]);                                                                        \
944
  } else {                                                                                                \
945
    pStart += colLength[i];                                                                               \
946
  }                                                                                                       \
947
  boundInfo->pColIndex[j] = -1;
948

949
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
135✔
950
                     int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
951
  int ret = 0;
135✔
952
  if (data == NULL) {
135!
953
    uError("rawBlockBindData, data is NULL");
×
954
    return TSDB_CODE_APP_ERROR;
×
955
  }
956
  void* tmp =
957
      taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
135✔
958
  SVCreateTbReq* pCreateReqTmp = NULL;
135✔
959
  if (tmp == NULL && pCreateTb != NULL) {
135✔
960
    ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
22✔
961
    if (ret != TSDB_CODE_SUCCESS) {
22!
962
      uError("cloneSVreateTbReq error");
×
963
      goto end;
×
964
    }
965
  }
966

967
  STableDataCxt* pTableCxt = NULL;
135✔
968
  ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
135✔
969
                           sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
970
  if (pCreateReqTmp != NULL) {
135!
971
    tdDestroySVCreateTbReq(pCreateReqTmp);
×
972
    taosMemoryFree(pCreateReqTmp);
×
973
  }
974

975
  if (ret != TSDB_CODE_SUCCESS) {
135!
976
    uError("insGetTableDataCxt error");
×
977
    goto end;
×
978
  }
979

980
  pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
135✔
981
  if (tmp == NULL) {
135✔
982
    ret = initTableColSubmitData(pTableCxt);
126✔
983
    if (ret != TSDB_CODE_SUCCESS) {
126!
984
      uError("initTableColSubmitData error");
×
985
      goto end;
×
986
    }
987
  }
988

989
  char* p = (char*)data;
135✔
990
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
991
  // column length |
992
  int32_t version = *(int32_t*)data;
135✔
993
  p += sizeof(int32_t);
135✔
994
  p += sizeof(int32_t);
135✔
995

996
  int32_t numOfRows = *(int32_t*)p;
135✔
997
  p += sizeof(int32_t);
135✔
998

999
  int32_t numOfCols = *(int32_t*)p;
135✔
1000
  p += sizeof(int32_t);
135✔
1001

1002
  p += sizeof(int32_t);
135✔
1003
  p += sizeof(uint64_t);
135✔
1004

1005
  int8_t* fields = p;
135✔
1006
  if (*fields >= TSDB_DATA_TYPE_MAX || *fields < 0) {
135!
1007
    uError("fields type error:%d", *fields);
×
1008
    ret = TSDB_CODE_INVALID_PARA;
×
1009
    goto end;
×
1010
  }
1011
  p += numOfCols * (sizeof(int8_t) + sizeof(int32_t));
135✔
1012

1013
  int32_t* colLength = (int32_t*)p;
135✔
1014
  p += sizeof(int32_t) * numOfCols;
135✔
1015

1016
  char* pStart = p;
135✔
1017

1018
  SSchema*       pSchema = getTableColumnSchema(pTableCxt->pMeta);
135✔
1019
  SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
135✔
1020

1021
  if (tFields != NULL && numFields != numOfCols) {
135!
1022
    if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to data cols:%d", numFields, numOfCols);
×
1023
    ret = TSDB_CODE_INVALID_PARA;
×
1024
    goto end;
×
1025
  }
1026

1027
  bool hasTs = false;
135✔
1028
  if (tFields == NULL) {
135✔
1029
    int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
6✔
1030
    for (int j = 0; j < len; j++) {
17✔
1031
      SSchema* pColSchema = &pSchema[j];
13✔
1032
      PRCESS_DATA(j, j)
13!
1033
    }
1034
  } else {
1035
    for (int i = 0; i < numFields; i++) {
679✔
1036
      for (int j = 0; j < boundInfo->numOfBound; j++) {
4,402!
1037
        SSchema* pColSchema = &pSchema[j];
4,402✔
1038
        char*    fieldName = NULL;
4,402✔
1039
        if (raw) {
4,402✔
1040
          fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
4,397✔
1041
        } else {
1042
          fieldName = ((TAOS_FIELD*)tFields)[i].name;
5✔
1043
        }
1044
        if (strcmp(pColSchema->name, fieldName) == 0) {
4,402✔
1045
          PRCESS_DATA(i, j)
550!
1046
          break;
550✔
1047
        }
1048
      }
1049
    }
1050
  }
1051

1052
  if (!hasTs) {
133!
1053
    if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
×
1054
    ret = TSDB_CODE_INVALID_PARA;
×
1055
    goto end;
×
1056
  }
1057

1058
  // process NULL data
1059
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
705✔
1060
    if (boundInfo->pColIndex[c] != -1) {
572✔
1061
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
12✔
1062
      ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
12✔
1063
      if (ret != 0) {
12!
1064
        goto end;
×
1065
      }
1066
    } else {
1067
      boundInfo->pColIndex[c] = c;  // restore for next block
560✔
1068
    }
1069
  }
1070

1071
end:
133✔
1072
  return ret;
135✔
1073
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc