• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.52
/source/libs/parser/src/parInsertUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "parInsertUtil.h"
17

18
#include "catalog.h"
19
#include "parInt.h"
20
#include "parUtil.h"
21
#include "querynodes.h"
22
#include "tRealloc.h"
23
#include "tdatablock.h"
24
#include "tmisce.h"
25

26
void qDestroyBoundColInfo(void* pInfo) {
98✔
27
  if (NULL == pInfo) {
98✔
28
    return;
66✔
29
  }
30

31
  SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
32✔
32

33
  taosMemoryFreeClear(pBoundInfo->pColIndex);
32✔
34
}
35

36
static char* tableNameGetPosition(SToken* pToken, char target) {
1,799,795✔
37
  bool inEscape = false;
1,799,795✔
38
  bool inQuote = false;
1,799,795✔
39
  char quotaStr = 0;
1,799,795✔
40

41
  for (uint32_t i = 0; i < pToken->n; ++i) {
13,931,967✔
42
    if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
12,914,384!
43
      return pToken->z + i;
782,212✔
44
    }
45

46
    if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
12,132,172✔
47
      if (!inQuote) {
1,222!
48
        inEscape = !inEscape;
1,222✔
49
      }
50
    }
51

52
    if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
12,132,172✔
53
      if (!inEscape) {
24,014✔
54
        if (!inQuote) {
24,000✔
55
          quotaStr = *(pToken->z + i);
12,000✔
56
          inQuote = !inQuote;
12,000✔
57
        } else if (quotaStr == *(pToken->z + i)) {
12,000!
58
          inQuote = !inQuote;
12,000✔
59
        }
60
      }
61
    }
62
  }
63

64
  return NULL;
1,017,583✔
65
}
66

67
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
1,799,795✔
68
  const char* msg1 = "name too long";
1,799,795✔
69
  const char* msg2 = "invalid database name";
1,799,795✔
70
  const char* msg3 = "db is not specified";
1,799,795✔
71
  const char* msg4 = "invalid table name";
1,799,795✔
72

73
  int32_t code = TSDB_CODE_SUCCESS;
1,799,795✔
74
  char*   p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
1,799,795✔
75

76
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
1,799,798✔
77
    int32_t dbLen = p - pTableName->z;
782,212✔
78
    if (dbLen <= 0) {
782,212!
79
      return buildInvalidOperationMsg(pMsgBuf, msg2);
×
80
    }
81
    char name[TSDB_DB_FNAME_LEN] = {0};
782,212✔
82
    strncpy(name, pTableName->z, dbLen);
782,212✔
83
    int32_t actualDbLen = strdequote(name);
782,212✔
84

85
    code = tNameSetDbName(pName, acctId, name, actualDbLen);
782,207✔
86
    if (code != TSDB_CODE_SUCCESS) {
782,208!
87
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
88
    }
89

90
    int32_t tbLen = pTableName->n - dbLen - 1;
782,208✔
91
    if (tbLen <= 0) {
782,208!
92
      return buildInvalidOperationMsg(pMsgBuf, msg4);
×
93
    }
94

95
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
782,208✔
96
    strncpy(tbname, p + 1, tbLen);
782,208✔
97
    /*tbLen = */ (void)strdequote(tbname);
782,208✔
98

99
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
782,209✔
100
    if (code != 0) {
782,206!
101
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
102
    }
103
  } else {  // get current DB name first, and then set it into path
104
    if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
1,017,586!
105
      return buildInvalidOperationMsg(pMsgBuf, msg1);
7✔
106
    }
107
    if (pTableName->n == 0) {
1,017,586!
108
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "invalid table name");
×
109
    }
110

111
    char name[TSDB_TABLE_FNAME_LEN] = {0};
1,017,586✔
112
    strncpy(name, pTableName->z, pTableName->n);
1,017,586✔
113
    (void)strdequote(name);
1,017,586✔
114

115
    if (dbName == NULL) {
1,017,587✔
116
      return buildInvalidOperationMsg(pMsgBuf, msg3);
5✔
117
    }
118
    if (name[0] == '\0') return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
1,017,582✔
119

120
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
1,017,580✔
121
    if (code != TSDB_CODE_SUCCESS) {
1,017,580!
122
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
×
123
      return code;
×
124
    }
125

126
    code = tNameFromString(pName, name, T_NAME_TABLE);
1,017,580✔
127
    if (code != 0) {
1,017,580!
128
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
×
129
    }
130
  }
131

132
  if (NULL != strchr(pName->tname, '.')) {
1,799,786!
133
    code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'");
×
134
  }
135

136
  return code;
1,799,787✔
137
}
138

139
int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchema) {
9,459,777✔
140
  while (start < end) {
58,655,187✔
141
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
58,626,471✔
142
      return start;
9,431,061✔
143
    }
144
    ++start;
49,195,410✔
145
  }
146
  return -1;
28,716✔
147
}
148

149
int32_t insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
66,356✔
150
                            SArray* tagName, uint8_t tagNum, int32_t ttl) {
151
  pTbReq->type = TD_CHILD_TABLE;
66,356✔
152
  pTbReq->ctb.pTag = (uint8_t*)pTag;
66,356✔
153
  pTbReq->name = taosStrdup(tname);
66,356✔
154
  if (!pTbReq->name) return terrno;
66,356!
155
  pTbReq->ctb.suid = suid;
66,356✔
156
  pTbReq->ctb.tagNum = tagNum;
66,356✔
157
  if (sname) {
66,356✔
158
    pTbReq->ctb.stbName = taosStrdup(sname);
64,568✔
159
    if (!pTbReq->ctb.stbName) return terrno;
64,568!
160
  }
161
  pTbReq->ctb.tagName = taosArrayDup(tagName, NULL);
66,356✔
162
  if (!pTbReq->ctb.tagName) return terrno;
66,356!
163
  pTbReq->ttl = ttl;
66,356✔
164
  pTbReq->commentLen = -1;
66,356✔
165

166
  return TSDB_CODE_SUCCESS;
66,356✔
167
}
168

169
static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
×
170
  for (int32_t i = 0; i < ncols; ++i) {
×
171
    pBoundCols[i] = i;
×
172
  }
173
}
×
174

175
static int32_t initColValues(STableMeta* pTableMeta, SArray* pValues) {
1,721,144✔
176
  SSchema* pSchemas = getTableColumnSchema(pTableMeta);
1,721,144✔
177
  int32_t  code = 0;
1,721,146✔
178
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
57,526,858✔
179
    SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
55,805,540✔
180
    if (NULL == taosArrayPush(pValues, &val)) {
55,805,712!
181
      code = terrno;
×
182
      break;
×
183
    }
184
  }
185
  return code;
1,721,318✔
186
}
187

188
int32_t insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { return initColValues(pTableMeta, aColValues); }
10,557✔
189

190
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
1,791,908✔
191
  pInfo->numOfCols = numOfBound;
1,791,908✔
192
  pInfo->numOfBound = numOfBound;
1,791,908✔
193
  pInfo->hasBoundCols = false;
1,791,908✔
194
  pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t));
1,791,908✔
195
  if (NULL == pInfo->pColIndex) {
1,791,916!
196
    return terrno;
×
197
  }
198
  for (int32_t i = 0; i < numOfBound; ++i) {
57,866,226✔
199
    pInfo->pColIndex[i] = i;
56,074,310✔
200
  }
201
  return TSDB_CODE_SUCCESS;
1,791,916✔
202
}
203

204
void insResetBoundColsInfo(SBoundColInfo* pInfo) {
2✔
205
  pInfo->numOfBound = pInfo->numOfCols;
2✔
206
  pInfo->hasBoundCols = false;
2✔
207
  for (int32_t i = 0; i < pInfo->numOfCols; ++i) {
10✔
208
    pInfo->pColIndex[i] = i;
8✔
209
  }
210
}
2✔
211

212
void insCheckTableDataOrder(STableDataCxt* pTableCxt, SRowKey* rowKey) {
125,736,330✔
213
  // once the data block is disordered, we do NOT keep last timestamp any more
214
  if (!pTableCxt->ordered) {
125,736,330✔
215
    return;
8,679,760✔
216
  }
217

218
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) < 0) {
117,056,570✔
219
    pTableCxt->ordered = false;
17,930✔
220
  }
221

222
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) == 0) {
117,104,786✔
223
    pTableCxt->duplicateTs = true;
119✔
224
  }
225

226
  // TODO: for variable length data type, we need to copy it out
227
  pTableCxt->lastKey = *rowKey;
117,076,750✔
228
  return;
117,076,750✔
229
}
230

231
void insDestroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
5,108,683✔
232

233
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput,
1,719,935✔
234
                                  bool colMode, bool ignoreColVals) {
235
  STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
1,719,935✔
236
  if (NULL == pTableCxt) {
1,719,947!
237
    *pOutput = NULL;
×
238
    return terrno;
×
239
  }
240

241
  int32_t code = TSDB_CODE_SUCCESS;
1,719,947✔
242

243
  pTableCxt->lastKey = (SRowKey){0};
1,719,947✔
244
  pTableCxt->ordered = true;
1,719,947✔
245
  pTableCxt->duplicateTs = false;
1,719,947✔
246

247
  pTableCxt->pMeta = tableMetaDup(pTableMeta);
1,719,947✔
248
  if (NULL == pTableCxt->pMeta) {
1,719,946!
249
    code = TSDB_CODE_OUT_OF_MEMORY;
×
250
  }
251
  if (TSDB_CODE_SUCCESS == code) {
1,719,946!
252
    pTableCxt->pSchema =
1,719,943✔
253
        tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
1,719,946✔
254
    if (NULL == pTableCxt->pSchema) {
1,719,943!
255
      code = TSDB_CODE_OUT_OF_MEMORY;
×
256
    }
257
  }
258
  if (TSDB_CODE_SUCCESS == code) {
1,719,943✔
259
    code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo);
1,719,942✔
260
  }
261
  if (TSDB_CODE_SUCCESS == code && !ignoreColVals) {
1,719,944✔
262
    pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal));
1,710,591✔
263
    if (NULL == pTableCxt->pValues) {
1,710,592!
264
      code = terrno;
×
265
    } else {
266
      code = initColValues(pTableMeta, pTableCxt->pValues);
1,710,592✔
267
    }
268
  }
269
  if (TSDB_CODE_SUCCESS == code) {
1,719,945✔
270
    pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
1,719,944✔
271
    if (NULL == pTableCxt->pData) {
1,719,947!
UNCOV
272
      code = terrno;
×
273
    } else {
274
      pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
1,719,947✔
275
      pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
1,719,947✔
276
      pTableCxt->pData->suid = pTableMeta->suid;
1,719,947✔
277
      pTableCxt->pData->uid = pTableMeta->uid;
1,719,947✔
278
      pTableCxt->pData->sver = pTableMeta->sversion;
1,719,947✔
279
      pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
1,719,947✔
280
      if (pCreateTbReq != NULL) *pCreateTbReq = NULL;
1,719,947✔
281
      if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
1,719,947✔
282
        pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
160✔
283
        if (NULL == pTableCxt->pData->aCol) {
160!
284
          code = terrno;
×
285
        }
286
      } else {
287
        pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
1,719,787✔
288
        if (NULL == pTableCxt->pData->aRowP) {
1,719,788!
289
          code = terrno;
×
290
        }
291
      }
292
    }
293
  }
294
  if (TSDB_CODE_SUCCESS == code) {
1,719,949!
295
    *pOutput = pTableCxt;
1,719,949✔
296
    qDebug("tableDataCxt created, code:%d, uid:%" PRId64 ", vgId:%d", code, pTableMeta->uid, pTableMeta->vgId);
1,719,949✔
297
  } else {
298
    insDestroyTableDataCxt(pTableCxt);
×
299
  }
300

301
  return code;
1,719,949✔
302
}
303

304
static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
1,959✔
305
  int32_t        code = TSDB_CODE_SUCCESS;
1,959✔
306
  SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData));
1,959✔
307
  if (NULL == pTmp) {
1,959!
308
    code = terrno;
×
309
  } else {
310
    pTmp->flags = pSrc->flags;
1,959✔
311
    pTmp->suid = pSrc->suid;
1,959✔
312
    pTmp->uid = pSrc->uid;
1,959✔
313
    pTmp->sver = pSrc->sver;
1,959✔
314
    pTmp->pCreateTbReq = NULL;
1,959✔
315
    if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
1,959✔
316
      if (pSrc->pCreateTbReq) {
1,840!
317
        code = cloneSVreateTbReq(pSrc->pCreateTbReq, &pTmp->pCreateTbReq);
1,840✔
318
      } else {
319
        pTmp->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
×
320
      }
321
    }
322
    if (TSDB_CODE_SUCCESS == code) {
1,959!
323
      if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
1,959✔
324
        pTmp->aCol = taosArrayInit(128, sizeof(SColData));
181✔
325
        if (NULL == pTmp->aCol) {
181!
326
          code = terrno;
×
327
          taosMemoryFree(pTmp);
×
328
        }
329
      } else {
330
        pTmp->aRowP = taosArrayInit(128, POINTER_BYTES);
1,778✔
331
        if (NULL == pTmp->aRowP) {
1,778!
332
          code = terrno;
×
333
          taosMemoryFree(pTmp);
×
334
        }
335
      }
336
    } else {
337
      taosMemoryFree(pTmp);
×
338
    }
339
  }
340

341
  taosMemoryFree(pSrc);
1,959✔
342
  if (TSDB_CODE_SUCCESS == code) {
1,959!
343
    *pDst = pTmp;
1,959✔
344
  }
345

346
  return code;
1,959✔
347
}
348

349
static void resetColValues(SArray* pValues) {
15,503✔
350
  int32_t num = taosArrayGetSize(pValues);
15,503✔
351
  for (int32_t i = 0; i < num; ++i) {
414,141✔
352
    SColVal* pVal = taosArrayGet(pValues, i);
398,638✔
353
    pVal->flag = CV_FLAG_NONE;
398,638✔
354
  }
355
}
15,503✔
356

357
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
3,749,547✔
358
                           SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals) {
359
  STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
3,749,547✔
360
  if (NULL != tmp) {
3,749,547✔
361
    *pTableCxt = *tmp;
2,029,607✔
362
    if (!ignoreColVals) {
2,029,607✔
363
      resetColValues((*pTableCxt)->pValues);
15,503✔
364
    }
365
    return TSDB_CODE_SUCCESS;
2,029,607✔
366
  }
367
  int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals);
1,719,940✔
368
  if (TSDB_CODE_SUCCESS == code) {
1,719,946!
369
    void* pData = *pTableCxt;  // deal scan coverity
1,719,948✔
370
    code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
1,719,948✔
371
  }
372

373
  if (TSDB_CODE_SUCCESS != code) {
1,719,947!
374
    insDestroyTableDataCxt(*pTableCxt);
×
375
  }
376
  return code;
1,719,949✔
377
}
378

379
static void destroyColVal(void* p) {
55,762,844✔
380
  SColVal* pVal = p;
55,762,844✔
381
  if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type ||
55,762,844✔
382
      TSDB_DATA_TYPE_VARBINARY == pVal->value.type) {
50,502,860✔
383
    taosMemoryFreeClear(pVal->value.pData);
5,266,359✔
384
  }
385
}
55,762,844✔
386

387
void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
1,719,664✔
388
  if (NULL == pTableCxt) {
1,719,664!
389
    return;
×
390
  }
391

392
  taosMemoryFreeClear(pTableCxt->pMeta);
1,719,664!
393
  tDestroyTSchema(pTableCxt->pSchema);
1,719,664✔
394
  insDestroyBoundColInfo(&pTableCxt->boundColsInfo);
1,719,664✔
395
  taosArrayDestroyEx(pTableCxt->pValues, destroyColVal);
1,719,664✔
396
  if (pTableCxt->pData) {
1,719,664✔
397
    tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
15,652✔
398
    taosMemoryFree(pTableCxt->pData);
15,652✔
399
  }
400
  taosMemoryFree(pTableCxt);
1,719,664✔
401
}
402

403
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
1,647,936✔
404
  if (NULL == pVgCxt) {
1,647,936!
405
    return;
×
406
  }
407

408
  tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
1,647,936✔
409
  taosMemoryFree(pVgCxt->pData);
1,647,936✔
410
  taosMemoryFree(pVgCxt);
1,647,936✔
411
}
412

413
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
3,242,875✔
414
  if (NULL == pVgCxtList) {
3,242,875✔
415
    return;
1,633,660✔
416
  }
417

418
  size_t size = taosArrayGetSize(pVgCxtList);
1,609,215✔
419
  for (int32_t i = 0; i < size; i++) {
3,257,151✔
420
    void* p = taosArrayGetP(pVgCxtList, i);
1,647,936✔
421
    insDestroyVgroupDataCxt(p);
1,647,936✔
422
  }
423

424
  taosArrayDestroy(pVgCxtList);
1,609,215✔
425
}
426

427
void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) {
×
428
  if (NULL == pVgCxtHash) {
×
429
    return;
×
430
  }
431

432
  void** p = taosHashIterate(pVgCxtHash, NULL);
×
433
  while (p) {
×
434
    insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p);
×
435

436
    p = taosHashIterate(pVgCxtHash, p);
×
437
  }
438

439
  taosHashCleanup(pVgCxtHash);
×
440
}
441

442
void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
1,635,318✔
443
  if (NULL == pTableCxtHash) {
1,635,318✔
444
    return;
32✔
445
  }
446

447
  void** p = taosHashIterate(pTableCxtHash, NULL);
1,635,286✔
448
  while (p) {
3,354,890✔
449
    insDestroyTableDataCxt(*(STableDataCxt**)p);
1,719,604✔
450

451
    p = taosHashIterate(pTableCxtHash, p);
1,719,603✔
452
  }
453

454
  taosHashCleanup(pTableCxtHash);
1,635,286✔
455
}
456

457
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) {
1,706,295✔
458
  if (NULL == pVgCxt->pData->aSubmitTbData) {
1,706,295✔
459
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
1,648,248✔
460
    if (NULL == pVgCxt->pData->aSubmitTbData) {
1,648,248!
461
      return terrno;
×
462
    }
463
  }
464

465
  // push data to submit, rebuild empty data for next submit
466
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData)) {
3,412,592!
467
    return terrno;
×
468
  }
469
  int32_t code = 0;
1,706,297✔
470
  if (isRebuild) {
1,706,297✔
471
    code = rebuildTableData(pTableCxt->pData, &pTableCxt->pData);
1,959✔
472
  } else if (clear) {
1,704,338✔
473
    taosMemoryFreeClear(pTableCxt->pData);
1,704,320!
474
  }
475

476
  qDebug("add tableDataCxt uid:%" PRId64 " to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
1,706,297✔
477

478
  return code;
1,706,294✔
479
}
480

481
static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHash, SArray* pVgroupList,
1,648,246✔
482
                                   SVgroupDataCxt** pOutput) {
483
  SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
1,648,246✔
484
  if (NULL == pVgCxt) {
1,648,249!
485
    return terrno;
×
486
  }
487
  pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2));
1,648,249✔
488
  if (NULL == pVgCxt->pData) {
1,648,250!
489
    insDestroyVgroupDataCxt(pVgCxt);
×
490
    return terrno;
×
491
  }
492

493
  pVgCxt->vgId = pTableCxt->pMeta->vgId;
1,648,250✔
494
  int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
1,648,250✔
495
  if (TSDB_CODE_SUCCESS == code) {
1,648,249!
496
    if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) {
1,648,249!
497
      code = terrno;
×
498
      insDestroyVgroupDataCxt(pVgCxt);
×
499
      return code;
×
500
    }
501
    //    uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
502
    *pOutput = pVgCxt;
1,648,249✔
503
  } else {
504
    insDestroyVgroupDataCxt(pVgCxt);
×
505
  }
506
  return code;
1,648,249✔
507
}
508

509
int insColDataComp(const void* lp, const void* rp) {
1,634✔
510
  SColData* pLeft = (SColData*)lp;
1,634✔
511
  SColData* pRight = (SColData*)rp;
1,634✔
512
  if (pLeft->cid < pRight->cid) {
1,634!
513
    return -1;
1,634✔
514
  } else if (pLeft->cid > pRight->cid) {
×
515
    return 1;
×
516
  }
517

518
  return 0;
×
519
}
520

521
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId,
8✔
522
                                 STableColsData* pTbData, SName* sname) {
523
  if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) {
8!
524
    return TSDB_CODE_SUCCESS;
6✔
525
  }
526

527
  SVgroupInfo      vgInfo = {0};
2✔
528
  SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
2✔
529
                           .requestId = pBuildInfo->requestId,
2✔
530
                           .requestObjRefId = pBuildInfo->requestSelf,
2✔
531
                           .mgmtEps = pBuildInfo->mgmtEpSet};
532

533
  int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo);
2✔
534
  if (TSDB_CODE_SUCCESS != code) {
2!
535
    return code;
×
536
  }
537

538
  code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
2✔
539
  if (TSDB_CODE_SUCCESS != code) {
2!
540
    return code;
×
541
  }
542

543
  return TSDB_CODE_SUCCESS;
2✔
544
}
545

546
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData,
16✔
547
                             uint64_t* uid, int32_t* vgId) {
548
  STableVgUid* pTbInfo = NULL;
16✔
549
  int32_t      code = 0;
16✔
550

551
  if (pTbData->getFromHash) {
16✔
552
    pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName));
12✔
553
  }
554

555
  if (NULL == pTbInfo) {
16✔
556
    SName sname;
557
    code = qCreateSName(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0);
8✔
558
    if (TSDB_CODE_SUCCESS != code) {
8!
559
      return code;
×
560
    }
561

562
    STableMeta*      pTableMeta = NULL;
8✔
563
    SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
8✔
564
                             .requestId = pBuildInfo->requestId,
8✔
565
                             .requestObjRefId = pBuildInfo->requestSelf,
8✔
566
                             .mgmtEps = pBuildInfo->mgmtEpSet};
567
    code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta);
8✔
568

569
    if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
8!
570
      parserDebug("tb %s.%s not exist", sname.dbname, sname.tname);
×
571
      return code;
×
572
    }
573

574
    if (TSDB_CODE_SUCCESS != code) {
8!
575
      return code;
×
576
    }
577

578
    *uid = pTableMeta->uid;
8✔
579
    *vgId = pTableMeta->vgId;
8✔
580

581
    STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId};
8✔
582
    code = tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo));
8✔
583
    if (TSDB_CODE_SUCCESS == code) {
8!
584
      code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname);
8✔
585
    }
586

587
    taosMemoryFree(pTableMeta);
8✔
588
  } else {
589
    *uid = pTbInfo->uid;
8✔
590
    *vgId = pTbInfo->vgid;
8✔
591
  }
592

593
  return code;
16✔
594
}
595

596
int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
×
597
  int32_t             code = TSDB_CODE_SUCCESS;
×
598
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
×
599

600
  if (TSDB_CODE_SUCCESS == code) {
×
601
    code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
×
602
  }
603

604
  return code;
×
605
}
606

607
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
16✔
608
                                  SStbInterlaceInfo* pBuildInfo) {
609
  int32_t  code = TSDB_CODE_SUCCESS;
16✔
610
  uint64_t uid;
611
  int32_t  vgId;
612

613
  pTbCtx->pData->aRowP = pTbData->aCol;
16✔
614

615
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId);
16✔
616
  if (TSDB_CODE_SUCCESS != code) {
16!
617
    return code;
×
618
  }
619

620
  pTbCtx->pMeta->vgId = vgId;
16✔
621
  pTbCtx->pMeta->uid = uid;
16✔
622
  pTbCtx->pData->uid = uid;
16✔
623

624
  if (!pTbCtx->ordered) {
16!
625
    code = tRowSort(pTbCtx->pData->aRowP);
×
626
  }
627
  if (code == TSDB_CODE_SUCCESS && (!pTbCtx->ordered || pTbCtx->duplicateTs)) {
16!
628
    code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0);
×
629
  }
630

631
  if (TSDB_CODE_SUCCESS != code) {
16!
632
    return code;
×
633
  }
634

635
  SVgroupDataCxt* pVgCxt = NULL;
16✔
636
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
16✔
637
  if (NULL == pp) {
16!
638
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
16✔
639
    if (NULL == pp) {
16!
640
      code = createVgroupDataCxt(pTbCtx, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
16✔
641
    } else {
642
      pVgCxt = *(SVgroupDataCxt**)pp;
×
643
    }
644
  } else {
645
    pVgCxt = *(SVgroupDataCxt**)pp;
×
646
  }
647

648
  if (TSDB_CODE_SUCCESS == code) {
16!
649
    code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false);
16✔
650
  }
651

652
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
16!
653
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
654
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
655
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
656
    // insDestroyVgroupDataCxt(pVgCxt);
657
  }
658

659
  return code;
16✔
660
}
661

662
/*
663
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild,
664
int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
665
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
666
  if (NULL == pVgroupHash || NULL == pVgroupList) {
667
    taosHashCleanup(pVgroupHash);
668
    taosArrayDestroy(pVgroupList);
669
    return TSDB_CODE_OUT_OF_MEMORY;
670
  }
671

672
  int32_t code = TSDB_CODE_SUCCESS;
673

674
  for (int32_t i = 0; i < tbNum; ++i) {
675
    STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i);
676
    pTableCxt->pMeta->vgId = pTableCols->vgId;
677
    pTableCxt->pMeta->uid = pTableCols->uid;
678
    pTableCxt->pData->uid = pTableCols->uid;
679
    pTableCxt->pData->aCol = pTableCols->aCol;
680

681
    SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
682
    if (pCol->nVal <= 0) {
683
      continue;
684
    }
685

686
    if (pTableCxt->pData->pCreateTbReq) {
687
      pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
688
    }
689

690
    taosArraySort(pTableCxt->pData->aCol, insColDataComp);
691

692
    tColDataSortMerge(pTableCxt->pData->aCol);
693

694
    if (TSDB_CODE_SUCCESS == code) {
695
      SVgroupDataCxt* pVgCxt = NULL;
696
      int32_t         vgId = pTableCxt->pMeta->vgId;
697
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
698
      if (NULL == pp) {
699
        code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
700
      } else {
701
        pVgCxt = *(SVgroupDataCxt**)pp;
702
      }
703
      if (TSDB_CODE_SUCCESS == code) {
704
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false);
705
      }
706
    }
707
  }
708

709
  taosHashCleanup(pVgroupHash);
710
  if (TSDB_CODE_SUCCESS == code) {
711
    *pVgDataBlocks = pVgroupList;
712
  } else {
713
    insDestroyVgroupDataCxtList(pVgroupList);
714
  }
715

716
  return code;
717
}
718
*/
719

720
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
1,609,507✔
721
  SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
1,609,507✔
722
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
1,609,509✔
723
  if (NULL == pVgroupHash || NULL == pVgroupList) {
1,609,510!
724
    taosHashCleanup(pVgroupHash);
×
725
    taosArrayDestroy(pVgroupList);
×
726
    return terrno;
×
727
  }
728

729
  int32_t code = TSDB_CODE_SUCCESS;
1,609,510✔
730
  bool    colFormat = false;
1,609,510✔
731

732
  void* p = taosHashIterate(pTableHash, NULL);
1,609,510✔
733
  if (p) {
1,609,513!
734
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
1,609,513✔
735
    colFormat = (0 != (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT));
1,609,513✔
736
  }
737

738
  while (TSDB_CODE_SUCCESS == code && NULL != p) {
3,315,790!
739
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
1,706,281✔
740
    if (colFormat) {
1,706,281✔
741
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
181✔
742
      if (pCol->nVal <= 0) {
181!
743
        p = taosHashIterate(pTableHash, p);
×
744
        continue;
×
745
      }
746

747
      if (pTableCxt->pData->pCreateTbReq) {
181✔
748
        pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
74✔
749
      }
750

751
      taosArraySort(pTableCxt->pData->aCol, insColDataComp);
181✔
752

753
      code = tColDataSortMerge(&pTableCxt->pData->aCol);
181✔
754
    } else {
755
      // skip the table has no data to insert
756
      // eg: import a csv without valid data
757
      // if (0 == taosArrayGetSize(pTableCxt->pData->aRowP)) {
758
      //   qWarn("no row in tableDataCxt uid:%" PRId64 " ", pTableCxt->pMeta->uid);
759
      //   p = taosHashIterate(pTableHash, p);
760
      //   continue;
761
      // }
762
      if (!pTableCxt->ordered) {
1,706,100✔
763
        code = tRowSort(pTableCxt->pData->aRowP);
17,930✔
764
      }
765
      if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
1,706,100✔
766
        code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
17,998✔
767
      }
768
    }
769

770
    if (TSDB_CODE_SUCCESS == code) {
1,706,279!
771
      SVgroupDataCxt* pVgCxt = NULL;
1,706,279✔
772
      int32_t         vgId = pTableCxt->pMeta->vgId;
1,706,279✔
773
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
1,706,279✔
774
      if (NULL == pp) {
1,706,278✔
775
        code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
1,648,231✔
776
      } else {
777
        pVgCxt = *(SVgroupDataCxt**)pp;
58,047✔
778
      }
779
      if (TSDB_CODE_SUCCESS == code) {
1,706,280!
780
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true);
1,706,281✔
781
      }
782
    }
783
    if (TSDB_CODE_SUCCESS == code) {
1,706,278!
784
      p = taosHashIterate(pTableHash, p);
1,706,280✔
785
    }
786
  }
787

788
  taosHashCleanup(pVgroupHash);
1,609,509✔
789
  if (TSDB_CODE_SUCCESS == code) {
1,609,511!
790
    *pVgDataBlocks = pVgroupList;
1,609,512✔
791
  } else {
792
    insDestroyVgroupDataCxtList(pVgroupList);
×
793
  }
794

795
  return code;
1,609,511✔
796
}
797

798
static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
1,648,247✔
799
  int32_t  code = TSDB_CODE_SUCCESS;
1,648,247✔
800
  uint32_t len = 0;
1,648,247✔
801
  void*    pBuf = NULL;
1,648,247✔
802
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
1,648,247!
803
  if (TSDB_CODE_SUCCESS == code) {
1,648,247!
804
    SEncoder encoder;
805
    len += sizeof(SSubmitReq2Msg);
1,648,247✔
806
    pBuf = taosMemoryMalloc(len);
1,648,247✔
807
    if (NULL == pBuf) {
1,648,248!
808
      return terrno;
×
809
    }
810
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
1,648,248✔
811
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
1,648,248✔
812
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
1,648,248✔
813
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
1,648,247✔
814
    code = tEncodeSubmitReq(&encoder, pReq);
1,648,247✔
815
    tEncoderClear(&encoder);
1,648,250✔
816
  }
817

818
  if (TSDB_CODE_SUCCESS == code) {
1,648,250!
819
    *pData = pBuf;
1,648,250✔
820
    *pLen = len;
1,648,250✔
821
  } else {
822
    taosMemoryFree(pBuf);
×
823
  }
824
  return code;
1,648,250✔
825
}
826

827
static void destroyVgDataBlocks(void* p) {
×
828
  SVgDataBlocks* pVg = p;
×
829
  taosMemoryFree(pVg->pData);
×
830
  taosMemoryFree(pVg);
×
831
}
×
832

833
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) {
1,609,525✔
834
  size_t  numOfVg = taosArrayGetSize(pVgDataCxtList);
1,609,525✔
835
  SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES);
1,609,525!
836
  if (NULL == pDataBlocks) {
1,609,529!
837
    return TSDB_CODE_OUT_OF_MEMORY;
×
838
  }
839

840
  int32_t code = TSDB_CODE_SUCCESS;
1,609,529✔
841
  for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
3,257,778✔
842
    SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
1,648,250✔
843
    if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) {
1,648,249!
844
      continue;
×
845
    }
846
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
1,648,249✔
847
    if (NULL == dst) {
1,648,247!
848
      code = terrno;
×
849
    }
850
    if (TSDB_CODE_SUCCESS == code) {
1,648,247!
851
      dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
1,648,247✔
852
      code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
1,648,248✔
853
      //      uError("td23101 3vgId:%d, numEps:%d", src->vgId, dst->vg.epSet.numOfEps);
854
    }
855
    if (TSDB_CODE_SUCCESS == code) {
1,648,249!
856
      code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
1,648,249✔
857
    }
858
    if (TSDB_CODE_SUCCESS == code) {
1,648,249!
859
      code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);
1,648,249!
860
    }
861
  }
862

863
  if (append) {
1,609,528✔
864
    if (NULL == *pVgDataBlocks) {
16!
865
      *pVgDataBlocks = pDataBlocks;
16✔
866
    }
867
    return code;
16✔
868
  }
869

870
  if (TSDB_CODE_SUCCESS == code) {
1,609,512!
871
    *pVgDataBlocks = pDataBlocks;
1,609,513✔
872
  } else {
873
    taosArrayDestroyP(pDataBlocks, destroyVgDataBlocks);
×
874
  }
875

876
  return code;
1,609,513✔
877
}
878

879
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
×
880
  for (int i = 0; i < numFields; i++) {
×
881
    if (strcmp(pSchema->name, fields[i].name) == 0) {
×
882
      return true;
×
883
    }
884
  }
885

886
  return false;
×
887
}
888

889
int32_t checkSchema(SSchema* pColSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
563✔
890
  if (*fields != pColSchema->type) {
563✔
891
    if (errstr != NULL)
1!
892
      snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
×
893
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
×
894
    return TSDB_CODE_INVALID_PARA;
1✔
895
  }
896
  if (IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) > pColSchema->bytes) {
562!
897
    if (errstr != NULL)
1!
898
      snprintf(errstr, errstrLen,
×
899
               "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
900
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
901
               *(int32_t*)(fields + sizeof(int8_t)));
×
902
    return TSDB_CODE_INVALID_PARA;
1✔
903
  }
904

905
  if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
561!
906
    if (errstr != NULL)
×
907
      snprintf(errstr, errstrLen,
×
908
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
909
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
910
               *(int32_t*)(fields + sizeof(int8_t)));
×
911
    return TSDB_CODE_INVALID_PARA;
×
912
  }
913
  return 0;
561✔
914
}
915

916
#define PRCESS_DATA(i, j)                                                                                 \
917
  ret = checkSchema(pColSchema, fields, errstr, errstrLen);                                               \
918
  if (ret != 0) {                                                                                         \
919
    goto end;                                                                                             \
920
  }                                                                                                       \
921
                                                                                                          \
922
  if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {                                                 \
923
    hasTs = true;                                                                                         \
924
  }                                                                                                       \
925
                                                                                                          \
926
  int8_t* offset = pStart;                                                                                \
927
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {                                                               \
928
    pStart += numOfRows * sizeof(int32_t);                                                                \
929
  } else {                                                                                                \
930
    pStart += BitmapLen(numOfRows);                                                                       \
931
  }                                                                                                       \
932
  char* pData = pStart;                                                                                   \
933
                                                                                                          \
934
  SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);                                               \
935
  ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); \
936
  if (ret != 0) {                                                                                         \
937
    goto end;                                                                                             \
938
  }                                                                                                       \
939
  fields += sizeof(int8_t) + sizeof(int32_t);                                                             \
940
  if (needChangeLength && version == BLOCK_VERSION_1) {                                                   \
941
    pStart += htonl(colLength[i]);                                                                        \
942
  } else {                                                                                                \
943
    pStart += colLength[i];                                                                               \
944
  }                                                                                                       \
945
  boundInfo->pColIndex[j] = -1;
946

947
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
135✔
948
                     int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
949
  int ret = 0;
135✔
950
  if (data == NULL) {
135!
951
    uError("rawBlockBindData, data is NULL");
×
952
    return TSDB_CODE_APP_ERROR;
×
953
  }
954
  void* tmp =
955
      taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
135✔
956
  SVCreateTbReq* pCreateReqTmp = NULL;
135✔
957
  if (tmp == NULL && pCreateTb != NULL) {
135✔
958
    ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
22✔
959
    if (ret != TSDB_CODE_SUCCESS) {
22!
960
      uError("cloneSVreateTbReq error");
×
961
      goto end;
×
962
    }
963
  }
964

965
  STableDataCxt* pTableCxt = NULL;
135✔
966
  ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
135✔
967
                           sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
968
  if (pCreateReqTmp != NULL) {
135!
969
    tdDestroySVCreateTbReq(pCreateReqTmp);
×
970
    taosMemoryFree(pCreateReqTmp);
×
971
  }
972

973
  if (ret != TSDB_CODE_SUCCESS) {
135!
974
    uError("insGetTableDataCxt error");
×
975
    goto end;
×
976
  }
977

978
  pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
135✔
979
  if (tmp == NULL) {
135✔
980
    ret = initTableColSubmitData(pTableCxt);
126✔
981
    if (ret != TSDB_CODE_SUCCESS) {
126!
982
      uError("initTableColSubmitData error");
×
983
      goto end;
×
984
    }
985
  }
986

987
  char* p = (char*)data;
135✔
988
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
989
  // column length |
990
  int32_t version = *(int32_t*)data;
135✔
991
  p += sizeof(int32_t);
135✔
992
  p += sizeof(int32_t);
135✔
993

994
  int32_t numOfRows = *(int32_t*)p;
135✔
995
  p += sizeof(int32_t);
135✔
996

997
  int32_t numOfCols = *(int32_t*)p;
135✔
998
  p += sizeof(int32_t);
135✔
999

1000
  p += sizeof(int32_t);
135✔
1001
  p += sizeof(uint64_t);
135✔
1002

1003
  int8_t* fields = p;
135✔
1004
  if (*fields >= TSDB_DATA_TYPE_MAX || *fields < 0) {
135!
1005
    uError("fields type error:%d", *fields);
×
1006
    ret = TSDB_CODE_INVALID_PARA;
×
1007
    goto end;
×
1008
  }
1009
  p += numOfCols * (sizeof(int8_t) + sizeof(int32_t));
135✔
1010

1011
  int32_t* colLength = (int32_t*)p;
135✔
1012
  p += sizeof(int32_t) * numOfCols;
135✔
1013

1014
  char* pStart = p;
135✔
1015

1016
  SSchema*       pSchema = getTableColumnSchema(pTableCxt->pMeta);
135✔
1017
  SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
135✔
1018

1019
  if (tFields != NULL && numFields != numOfCols) {
135!
1020
    if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to data cols:%d", numFields, numOfCols);
×
1021
    ret = TSDB_CODE_INVALID_PARA;
×
1022
    goto end;
×
1023
  }
1024

1025
  bool hasTs = false;
135✔
1026
  if (tFields == NULL) {
135✔
1027
    int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
6✔
1028
    for (int j = 0; j < len; j++) {
17✔
1029
      SSchema* pColSchema = &pSchema[j];
13✔
1030
      PRCESS_DATA(j, j)
13!
1031
    }
1032
  } else {
1033
    for (int i = 0; i < numFields; i++) {
679✔
1034
      for (int j = 0; j < boundInfo->numOfBound; j++) {
4,402!
1035
        SSchema* pColSchema = &pSchema[j];
4,402✔
1036
        char*    fieldName = NULL;
4,402✔
1037
        if (raw) {
4,402✔
1038
          fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
4,397✔
1039
        } else {
1040
          fieldName = ((TAOS_FIELD*)tFields)[i].name;
5✔
1041
        }
1042
        if (strcmp(pColSchema->name, fieldName) == 0) {
4,402✔
1043
          PRCESS_DATA(i, j)
550!
1044
          break;
550✔
1045
        }
1046
      }
1047
    }
1048
  }
1049

1050
  if (!hasTs) {
133!
1051
    if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
×
1052
    ret = TSDB_CODE_INVALID_PARA;
×
1053
    goto end;
×
1054
  }
1055

1056
  // process NULL data
1057
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
705✔
1058
    if (boundInfo->pColIndex[c] != -1) {
572✔
1059
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
12✔
1060
      ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
12✔
1061
      if (ret != 0) {
12!
1062
        goto end;
×
1063
      }
1064
    } else {
1065
      boundInfo->pColIndex[c] = c;  // restore for next block
560✔
1066
    }
1067
  }
1068

1069
end:
133✔
1070
  return ret;
135✔
1071
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc