• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4907

30 Dec 2025 10:52AM UTC coverage: 65.541% (+0.03%) from 65.514%
#4907

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

808 existing lines in 106 files now uncovered.

193920 of 295877 relevant lines covered (65.54%)

118520209.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.82
/source/libs/parser/src/parInsertUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "parInsertUtil.h"
17

18
#include "catalog.h"
19
#include "parInt.h"
20
#include "parUtil.h"
21
#include "querynodes.h"
22
#include "tRealloc.h"
23
#include "taoserror.h"
24
#include "tarray.h"
25
#include "tdatablock.h"
26
#include "tdataformat.h"
27
#include "tmisce.h"
28
#include "ttypes.h"
29

30
void qDestroyBoundColInfo(void* pInfo) {
4,112,881✔
31
  if (NULL == pInfo) {
4,112,881✔
32
    return;
784,184✔
33
  }
34

35
  SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
3,328,697✔
36

37
  taosMemoryFreeClear(pBoundInfo->pColIndex);
3,328,697✔
38
}
39

40
static char* tableNameGetPosition(SToken* pToken, char target) {
571,765,504✔
41
  bool inEscape = false;
571,765,504✔
42
  bool inQuote = false;
571,765,504✔
43
  char quotaStr = 0;
571,765,504✔
44

45
  for (uint32_t i = 0; i < pToken->n; ++i) {
2,147,483,647✔
46
    if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
2,147,483,647✔
47
      return pToken->z + i;
214,918,359✔
48
    }
49

50
    if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
2,147,483,647✔
51
      if (!inQuote) {
23,055,373✔
52
        inEscape = !inEscape;
23,055,329✔
53
      }
54
    }
55

56
    if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
2,147,483,647✔
57
      if (!inEscape) {
6,242,882✔
58
        if (!inQuote) {
6,241,250✔
59
          quotaStr = *(pToken->z + i);
3,120,625✔
60
          inQuote = !inQuote;
3,120,625✔
61
        } else if (quotaStr == *(pToken->z + i)) {
3,120,625✔
62
          inQuote = !inQuote;
3,120,625✔
63
        }
64
      }
65
    }
66
  }
67

68
  return NULL;
356,858,717✔
69
}
70

71
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
571,765,476✔
72
  const char* msg1 = "table name is too long";
571,765,476✔
73
  const char* msg2 = "invalid database name";
571,765,476✔
74
  const char* msg3 = "db is not specified";
571,765,476✔
75
  const char* msg4 = "invalid table name";
571,765,476✔
76
  const char* msg5 = "database name is too long";
571,765,476✔
77

78
  int32_t code = TSDB_CODE_SUCCESS;
571,765,476✔
79
  char*   p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
571,765,476✔
80

81
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
571,777,852✔
82
    // before dbname dequote
83
    int32_t dbLen = p - pTableName->z;
214,919,023✔
84
    if (dbLen <= 0) {
214,910,074✔
85
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
86
    }
87
    if (dbLen >= TSDB_DB_FNAME_LEN + TSDB_NAME_QUOTE) {
214,910,074✔
88
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg5);
×
89
    }
90

91
    char name[TSDB_DB_FNAME_LEN + TSDB_NAME_QUOTE] = {0};
214,910,074✔
92
    strncpy(name, pTableName->z, dbLen);
214,910,787✔
93
    int32_t actualDbLen = strdequote(name);
214,911,568✔
94

95
    // after dbname dequote
96
    if (actualDbLen <= 0) {
214,911,365✔
97
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
98
    }
99
    if (actualDbLen >= TSDB_DB_NAME_LEN) {
214,911,769✔
100
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg5);
×
101
    }
102

103
    code = tNameSetDbName(pName, acctId, name, actualDbLen);
214,911,769✔
104
    if (code != TSDB_CODE_SUCCESS) {
214,914,519✔
105
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
106
    }
107

108
    // before tbname dequote
109
    int32_t tbLen = pTableName->n - dbLen - 1;
214,914,519✔
110
    if (tbLen <= 0) {
214,910,902✔
111
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
442✔
112
    }
113
    if (tbLen >= TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE) {
214,911,757✔
114
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
125✔
115
    }
116

117
    char tbname[TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE] = {0};
214,911,632✔
118
    strncpy(tbname, p + 1, tbLen);
214,910,968✔
119
    int32_t actualTbLen = strdequote(tbname);
214,908,044✔
120

121
    // after tbname dequote
122
    if (actualTbLen <= 0) {
214,915,178✔
123
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
×
124
    }
125
    if (actualTbLen >= TSDB_TABLE_NAME_LEN) {
214,915,282✔
126
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
125✔
127
    }
128

129
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
214,915,157✔
130
    if (code != 0) {
214,919,450✔
131
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
4,697✔
132
    }
133
  } else {  // get current DB name first, and then set it into path
134
    // before tbname dequote
135
    int32_t tbLen = pTableName->n;
356,858,829✔
136
    if (tbLen <= 0) {
356,855,815✔
137
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
×
138
    }
139

140
    if (tbLen >= TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE) {
356,857,368✔
141
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
125✔
142
    }
143

144
    char tbname[TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE] = {0};
356,857,243✔
145
    strncpy(tbname, pTableName->z, tbLen);
356,855,770✔
146
    int32_t actualTbLen = strdequote(tbname);
356,856,237✔
147
    // after tbname dequote
148
    if (actualTbLen <= 0) {
356,855,307✔
149
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
1,098✔
150
    }
151
    if (actualTbLen >= TSDB_TABLE_NAME_LEN) {
356,854,209✔
152
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
125✔
153
    }
154

155
    if (dbName == NULL || strlen(dbName) == 0) {
356,854,084✔
156
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_DB_NOT_SPECIFIED, msg3);
×
157
    }
158

159
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
356,858,918✔
160
    if (code != TSDB_CODE_SUCCESS) {
356,857,799✔
161
      code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
162
      return code;
×
163
    }
164

165
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
356,857,799✔
166
    if (code != 0) {
356,855,262✔
167
      code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
×
168
    }
169
  }
170

171
  if (NULL != strchr(pName->tname, '.')) {
571,771,262✔
172
    code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'");
×
173
  }
174

175
  return code;
571,771,726✔
176
}
177

178
int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchema) {
916,273,391✔
179
  while (start < end) {
2,147,483,647✔
180
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
2,147,483,647✔
181
      return start;
910,049,965✔
182
    }
183
    ++start;
2,147,483,647✔
184
  }
185
  return -1;
6,222,932✔
186
}
187

188
int32_t insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
11,643,589✔
189
                            SArray* tagName, uint8_t tagNum, int32_t ttl) {
190
  pTbReq->type = TD_CHILD_TABLE;
11,643,589✔
191
  pTbReq->ctb.pTag = (uint8_t*)pTag;
11,647,762✔
192
  pTbReq->name = taosStrdup(tname);
11,646,157✔
193
  if (!pTbReq->name) return terrno;
11,646,661✔
194
  pTbReq->ctb.suid = suid;
11,644,093✔
195
  pTbReq->ctb.tagNum = tagNum;
11,645,056✔
196
  if (sname) {
11,646,340✔
197
    pTbReq->ctb.stbName = taosStrdup(sname);
11,012,291✔
198
    if (!pTbReq->ctb.stbName) return terrno;
11,016,500✔
199
  }
200
  pTbReq->ctb.tagName = taosArrayDup(tagName, NULL);
11,647,660✔
201
  if (!pTbReq->ctb.tagName) return terrno;
11,646,755✔
202
  pTbReq->ttl = ttl;
11,644,404✔
203
  pTbReq->commentLen = -1;
11,644,404✔
204

205
  return TSDB_CODE_SUCCESS;
11,645,046✔
206
}
207

208
static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
×
209
  for (int32_t i = 0; i < ncols; ++i) {
×
210
    pBoundCols[i] = i;
×
211
  }
212
}
×
213

214
static int32_t initColValues(STableMeta* pTableMeta, SArray* pValues) {
518,316,722✔
215
  SSchema* pSchemas = getTableColumnSchema(pTableMeta);
518,316,722✔
216
  int32_t  code = 0;
518,322,606✔
217
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
2,147,483,647✔
218
    SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
2,147,483,647✔
219
    if (NULL == taosArrayPush(pValues, &val)) {
2,147,483,647✔
220
      code = terrno;
×
221
      break;
×
222
    }
223
  }
224
  return code;
518,331,366✔
225
}
226

227
int32_t insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { return initColValues(pTableMeta, aColValues); }
199,249✔
228

229
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
538,684,034✔
230
  pInfo->numOfCols = numOfBound;
538,684,034✔
231
  pInfo->numOfBound = numOfBound;
538,690,635✔
232
  pInfo->hasBoundCols = false;
538,691,314✔
233
  pInfo->mixTagsCols = false;
538,691,353✔
234
  pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t));
538,692,338✔
235
  if (NULL == pInfo->pColIndex) {
538,688,640✔
236
    return terrno;
×
237
  }
238
  for (int32_t i = 0; i < numOfBound; ++i) {
2,147,483,647✔
239
    pInfo->pColIndex[i] = i;
2,147,483,647✔
240
  }
241
  return TSDB_CODE_SUCCESS;
538,691,299✔
242
}
243

244
void insResetBoundColsInfo(SBoundColInfo* pInfo) {
2,454✔
245
  pInfo->numOfBound = pInfo->numOfCols;
2,454✔
246
  pInfo->hasBoundCols = false;
2,454✔
247
  pInfo->mixTagsCols = false;
2,454✔
248
  for (int32_t i = 0; i < pInfo->numOfCols; ++i) {
12,270✔
249
    pInfo->pColIndex[i] = i;
9,816✔
250
  }
251
}
2,454✔
252

253
void insCheckTableDataOrder(STableDataCxt* pTableCxt, SRowKey* rowKey) {
2,147,483,647✔
254
  // once the data block is disordered, we do NOT keep last timestamp any more
255
  if (!pTableCxt->ordered) {
2,147,483,647✔
256
    return;
28,892,644✔
257
  }
258

259
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) < 0) {
2,147,483,647✔
260
    pTableCxt->ordered = false;
1,393,635✔
261
  }
262

263
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) == 0) {
2,147,483,647✔
264
    pTableCxt->duplicateTs = true;
1,728,188✔
265
  }
266

267
  // TODO: for variable length data type, we need to copy it out
268
  pTableCxt->lastKey = *rowKey;
2,147,483,647✔
269
  return;
2,147,483,647✔
270
}
271

272
void insDestroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
1,565,605,740✔
273

274
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput,
520,815,992✔
275
                                  bool colMode, bool ignoreColVals) {
276
  STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
520,815,992✔
277
  if (NULL == pTableCxt) {
520,805,600✔
278
    *pOutput = NULL;
×
279
    return terrno;
×
280
  }
281

282
  int32_t code = TSDB_CODE_SUCCESS;
520,805,600✔
283

284
  pTableCxt->lastKey = (SRowKey){0};
520,805,600✔
285
  pTableCxt->ordered = true;
520,810,104✔
286
  pTableCxt->duplicateTs = false;
520,811,530✔
287

288
  pTableCxt->pMeta = tableMetaDup(pTableMeta);
520,813,268✔
289
  if (NULL == pTableCxt->pMeta) {
520,825,908✔
290
    code = TSDB_CODE_OUT_OF_MEMORY;
×
291
  }
292
  if (TSDB_CODE_SUCCESS == code) {
520,826,649✔
293
    pTableCxt->pSchema =
520,828,760✔
294
        tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
520,827,232✔
295
    if (NULL == pTableCxt->pSchema) {
520,828,521✔
296
      code = TSDB_CODE_OUT_OF_MEMORY;
×
297
    }
298
  }
299
  pTableCxt->hasBlob = schemaHasBlob(pTableCxt->pSchema);
520,826,731✔
300

301
  if (TSDB_CODE_SUCCESS == code) {
520,822,882✔
302
    code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo);
520,822,240✔
303
  }
304
  if (TSDB_CODE_SUCCESS == code && !ignoreColVals) {
520,823,919✔
305
    pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal));
518,124,023✔
306
    if (NULL == pTableCxt->pValues) {
518,123,810✔
307
      code = terrno;
×
308
    } else {
309
      code = initColValues(pTableMeta, pTableCxt->pValues);
518,123,043✔
310
    }
311
  }
312
  if (TSDB_CODE_SUCCESS == code) {
520,826,970✔
313
    pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
520,831,560✔
314
    if (NULL == pTableCxt->pData) {
520,820,730✔
315
      code = terrno;
×
316
    } else {
317
      pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
520,826,252✔
318
      pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
520,828,709✔
319
      pTableCxt->pData->suid = pTableMeta->suid;
520,825,649✔
320
      pTableCxt->pData->uid = pTableMeta->uid;
520,823,704✔
321
      pTableCxt->pData->sver = pTableMeta->sversion;
520,821,739✔
322
      pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
520,823,537✔
323
      int8_t flag = pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT;
520,822,782✔
324
      if (pCreateTbReq != NULL) *pCreateTbReq = NULL;
520,820,594✔
325
      if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
520,821,351✔
326
        pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
3,355,684✔
327
        if (NULL == pTableCxt->pData->aCol) {
3,357,289✔
328
          code = terrno;
×
329
        }
330
      } else {
331
        pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
517,460,270✔
332
        if (NULL == pTableCxt->pData->aRowP) {
517,466,199✔
333
          code = terrno;
×
334
        }
335
      }
336
    }
337
  }
338
  if (TSDB_CODE_SUCCESS == code) {
520,824,221✔
339
    *pOutput = pTableCxt;
520,824,221✔
340
    parserDebug("uid:%" PRId64 ", create table data context, code:%d, vgId:%d", pTableMeta->uid, code,
520,825,576✔
341
                pTableMeta->vgId);
342
  } else {
343
    insDestroyTableDataCxt(pTableCxt);
×
344
  }
345

346
  return code;
520,824,587✔
347
}
348

349
static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst, int8_t hasBlob) {
3,994,456✔
350
  int32_t        code = TSDB_CODE_SUCCESS;
3,994,456✔
351
  SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData));
3,994,456✔
352
  if (NULL == pTmp) {
3,994,173✔
353
    code = terrno;
×
354
  } else {
355
    pTmp->flags = pSrc->flags;
3,994,173✔
356
    pTmp->suid = pSrc->suid;
3,994,173✔
357
    pTmp->uid = pSrc->uid;
3,994,815✔
358
    pTmp->sver = pSrc->sver;
3,994,494✔
359
    pTmp->pCreateTbReq = NULL;
3,995,136✔
360
    if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
3,995,136✔
361
      if (pSrc->pCreateTbReq) {
3,849,970✔
362
        code = cloneSVreateTbReq(pSrc->pCreateTbReq, &pTmp->pCreateTbReq);
3,849,007✔
363
      } else {
364
        pTmp->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
×
365
      }
366
    }
367
    if (TSDB_CODE_SUCCESS == code) {
3,994,466✔
368
      if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
3,994,466✔
369
        pTmp->aCol = taosArrayInit(128, sizeof(SColData));
3,360,808✔
370
        if (NULL == pTmp->aCol) {
3,361,771✔
371
          code = terrno;
×
372
          taosMemoryFree(pTmp);
×
373
        }
374
      } else {
375
        pTmp->aRowP = taosArrayInit(128, POINTER_BYTES);
633,658✔
376
        if (NULL == pTmp->aRowP) {
633,686✔
377
          code = terrno;
×
378
          taosMemoryFree(pTmp);
×
379
        }
380

381
        if (code != 0) {
633,686✔
382
          taosArrayDestroy(pTmp->aRowP);
×
383
          taosMemoryFree(pTmp);
×
384
        }
385
      }
386

387
    } else {
388
      taosMemoryFree(pTmp);
×
389
    }
390
  }
391

392
  taosMemoryFree(pSrc);
3,995,457✔
393
  if (TSDB_CODE_SUCCESS == code) {
3,995,778✔
394
    *pDst = pTmp;
3,995,136✔
395
  }
396

397
  return code;
3,995,778✔
398
}
399

400
static void resetColValues(SArray* pValues) {
32,266,599✔
401
  int32_t num = taosArrayGetSize(pValues);
32,266,599✔
402
  for (int32_t i = 0; i < num; ++i) {
364,693,164✔
403
    SColVal* pVal = taosArrayGet(pValues, i);
332,426,565✔
404
    pVal->flag = CV_FLAG_NONE;
332,426,565✔
405
  }
406
}
32,266,599✔
407

408
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
1,448,072,861✔
409
                           SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals) {
410
  STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
1,448,072,861✔
411
  if (NULL != tmp) {
1,448,087,186✔
412
    *pTableCxt = *tmp;
927,260,381✔
413
    if (!ignoreColVals) {
927,260,381✔
414
      resetColValues((*pTableCxt)->pValues);
32,266,599✔
415
    }
416
    return TSDB_CODE_SUCCESS;
927,260,381✔
417
  }
418
  int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals);
520,826,805✔
419
  if (TSDB_CODE_SUCCESS == code) {
520,823,219✔
420
    void* pData = *pTableCxt;  // deal scan coverity
520,824,785✔
421
    code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
520,829,096✔
422
  }
423

424
  if (TSDB_CODE_SUCCESS != code) {
520,830,542✔
425
    insDestroyTableDataCxt(*pTableCxt);
×
426
  }
427
  return code;
520,829,789✔
428
}
429

430
static void destroyColVal(void* p) {
2,147,483,647✔
431
  SColVal* pVal = p;
2,147,483,647✔
432
  if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type ||
2,147,483,647✔
433
      TSDB_DATA_TYPE_VARBINARY == pVal->value.type || TSDB_DATA_TYPE_DECIMAL == pVal->value.type) {
2,147,483,647✔
434
    taosMemoryFreeClear(pVal->value.pData);
838,657,645✔
435
  }
436
}
2,147,483,647✔
437

438
void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
523,965,232✔
439
  if (NULL == pTableCxt) {
523,965,232✔
440
    return;
×
441
  }
442

443
  taosMemoryFreeClear(pTableCxt->pMeta);
523,965,232✔
444
  tDestroyTSchema(pTableCxt->pSchema);
523,964,199✔
445
  insDestroyBoundColInfo(&pTableCxt->boundColsInfo);
523,963,081✔
446
  taosArrayDestroyEx(pTableCxt->pValues, destroyColVal);
523,965,272✔
447
  if (pTableCxt->pData) {
523,964,698✔
448
    tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
7,819,490✔
449
    taosMemoryFree(pTableCxt->pData);
7,819,445✔
450
  }
451
  taosMemoryFree(pTableCxt);
523,964,259✔
452
}
453

454
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
497,848,639✔
455
  if (NULL == pVgCxt) {
497,848,639✔
456
    return;
×
457
  }
458

459
  tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
497,848,639✔
460
  taosMemoryFree(pVgCxt->pData);
497,849,154✔
461

462
  taosMemoryFree(pVgCxt);
497,848,041✔
463
}
464

465
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
968,504,148✔
466
  if (NULL == pVgCxtList) {
968,504,148✔
467
    return;
484,255,303✔
468
  }
469

470
  size_t size = taosArrayGetSize(pVgCxtList);
484,248,845✔
471
  for (int32_t i = 0; i < size; i++) {
982,101,471✔
472
    void* p = taosArrayGetP(pVgCxtList, i);
497,850,216✔
473
    insDestroyVgroupDataCxt(p);
497,851,835✔
474
  }
475

476
  taosArrayDestroy(pVgCxtList);
484,251,255✔
477
}
478

479
void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) {
×
480
  if (NULL == pVgCxtHash) {
×
481
    return;
×
482
  }
483

484
  void** p = taosHashIterate(pVgCxtHash, NULL);
×
485
  while (p) {
×
486
    insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p);
×
487

488
    p = taosHashIterate(pVgCxtHash, p);
×
489
  }
490

491
  taosHashCleanup(pVgCxtHash);
×
492
}
493

494
void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
484,985,888✔
495
  if (NULL == pTableCxtHash) {
484,985,888✔
496
    return;
3,329,672✔
497
  }
498

499
  void** p = taosHashIterate(pTableCxtHash, NULL);
481,656,216✔
500
  while (p) {
999,068,285✔
501
    insDestroyTableDataCxt(*(STableDataCxt**)p);
517,411,455✔
502

503
    p = taosHashIterate(pTableCxtHash, p);
517,410,754✔
504
  }
505

506
  taosHashCleanup(pTableCxtHash);
481,656,830✔
507
}
508

509
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) {
522,452,176✔
510
  int32_t code = 0;
522,452,176✔
511
  if (NULL == pVgCxt->pData->aSubmitTbData) {
522,452,176✔
512
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
497,369,530✔
513
    if (pVgCxt->pData->aSubmitTbData == NULL) {
497,370,932✔
514
      return terrno;
×
515
    }
516
    if (pTableCxt->hasBlob) {
497,372,365✔
517
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
21,168✔
518
      if (NULL == pVgCxt->pData->aSubmitBlobData) {
21,168✔
519
        return terrno;
×
520
      }
521
    }
522
  }
523

524
  // push data to submit, rebuild empty data for next submit
525
  if (!pTableCxt->hasBlob) pTableCxt->pData->pBlobSet = NULL;
522,456,314✔
526

527
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData)) {
1,044,910,233✔
528
    return terrno;
×
529
  }
530

531
  if (pTableCxt->hasBlob) {
522,454,760✔
532
    parserDebug("blob row transfer %p, pData %p, %s", pTableCxt->pData->pBlobSet, pTableCxt->pData, __func__);
21,168✔
533
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTableCxt->pData->pBlobSet)) {
42,336✔
534
      return terrno;
×
535
    }
536
    pTableCxt->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
21,168✔
537
  }
538

539
  if (isRebuild) {
522,451,343✔
540
    code = rebuildTableData(pTableCxt->pData, &pTableCxt->pData, pTableCxt->hasBlob);
3,994,456✔
541
  } else if (clear) {
518,456,887✔
542
    taosMemoryFreeClear(pTableCxt->pData);
516,225,960✔
543
  }
544
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
522,453,893✔
545

546
  return code;
522,453,920✔
547
}
548

549
static int32_t createVgroupDataCxt(int32_t vgId, SHashObj* pVgroupHash, SArray* pVgroupList, SVgroupDataCxt** pOutput) {
497,942,786✔
550
  SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
497,942,786✔
551
  if (NULL == pVgCxt) {
497,942,771✔
552
    return terrno;
×
553
  }
554
  pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2));
497,942,771✔
555
  if (NULL == pVgCxt->pData) {
497,940,667✔
556
    insDestroyVgroupDataCxt(pVgCxt);
×
557
    return terrno;
×
558
  }
559

560
  pVgCxt->vgId = vgId;
497,942,451✔
561
  int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
497,943,688✔
562
  if (TSDB_CODE_SUCCESS == code) {
497,942,746✔
563
    if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) {
497,944,505✔
564
      code = terrno;
×
565
      insDestroyVgroupDataCxt(pVgCxt);
×
566
      return code;
66✔
567
    }
568
    //    uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
569
    *pOutput = pVgCxt;
497,944,505✔
570
  } else {
UNCOV
571
    insDestroyVgroupDataCxt(pVgCxt);
×
572
  }
573
  return code;
497,944,209✔
574
}
575

576
int insColDataComp(const void* lp, const void* rp) {
15,095,624✔
577
  SColData* pLeft = (SColData*)lp;
15,095,624✔
578
  SColData* pRight = (SColData*)rp;
15,095,624✔
579
  if (pLeft->cid < pRight->cid) {
15,095,624✔
580
    return -1;
15,046,431✔
581
  } else if (pLeft->cid > pRight->cid) {
50,232✔
582
    return 1;
50,232✔
583
  }
584

UNCOV
585
  return 0;
×
586
}
587

588
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId,
69,649✔
589
                                 STableColsData* pTbData, SName* sname) {
590
  if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) {
69,649✔
591
    return TSDB_CODE_SUCCESS;
58,332✔
592
  }
593

594
  SVgroupInfo      vgInfo = {0};
11,317✔
595
  SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
22,406✔
596
                           .requestId = pBuildInfo->requestId,
11,317✔
597
                           .requestObjRefId = pBuildInfo->requestSelf,
11,317✔
598
                           .mgmtEps = pBuildInfo->mgmtEpSet};
599

600
  int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo);
11,317✔
601
  if (TSDB_CODE_SUCCESS != code) {
11,317✔
602
    return code;
×
603
  }
604

605
  code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
11,317✔
606
  if (TSDB_CODE_SUCCESS != code) {
11,317✔
607
    return code;
×
608
  }
609

610
  return TSDB_CODE_SUCCESS;
11,317✔
611
}
612

613
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData,
3,268,044✔
614
                             uint64_t* uid, int32_t* vgId, uint64_t* suid) {
615
  STableVgUid* pTbInfo = NULL;
3,268,044✔
616
  int32_t      code = 0;
3,268,044✔
617

618
  if (pTbData->getFromHash) {
3,268,044✔
619
    pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName));
3,199,630✔
620
  }
621

622
  if (NULL == pTbInfo) {
3,267,662✔
623
    SName sname;
54,005✔
624
    code = qCreateSName2(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0);
70,121✔
625
    if (TSDB_CODE_SUCCESS != code) {
70,018✔
626
      return code;
460✔
627
    }
628

629
    STableMeta*      pTableMeta = NULL;
70,018✔
630
    SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
123,920✔
631
                             .requestId = pBuildInfo->requestId,
70,018✔
632
                             .requestObjRefId = pBuildInfo->requestSelf,
70,077✔
633
                             .mgmtEps = pBuildInfo->mgmtEpSet};
634
    code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta);
70,062✔
635

636
    if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
70,108✔
637
      parserWarn("stmt2 async bind don't find table:%s.%s, try auto create table", sname.dbname, sname.tname);
460✔
638
      return code;
460✔
639
    }
640

641
    if (TSDB_CODE_SUCCESS != code) {
69,648✔
642
      parserError("stmt2 async get table meta:%s.%s failed, code:%d", sname.dbname, sname.tname, code);
×
643
      return code;
×
644
    }
645

646
    *uid = pTableMeta->uid;
69,648✔
647
    *vgId = pTableMeta->vgId;
69,648✔
648
    *suid = pTableMeta->suid;
69,604✔
649

650
    STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId, .suid = *suid};
69,603✔
651
    code = tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo));
69,602✔
652
    if (TSDB_CODE_SUCCESS == code) {
69,617✔
653
      code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname);
69,616✔
654
    }
655

656
    taosMemoryFree(pTableMeta);
69,650✔
657
  } else {
658
    *uid = pTbInfo->uid;
3,197,541✔
659
    *vgId = pTbInfo->vgid;
3,198,259✔
660
    *suid = pTbInfo->suid;
3,198,430✔
661
  }
662

663
  return code;
3,268,184✔
664
}
665

666
int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
×
667
  int32_t             code = TSDB_CODE_SUCCESS;
×
668
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
×
669

670
  if (TSDB_CODE_SUCCESS == code) {
×
671
    code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
×
672
  }
673

674
  return code;
×
675
}
676

677
int32_t checkAndMergeSVgroupDataCxtByTbname(STableDataCxt* pTbCtx, SVgroupDataCxt* pVgCxt, SSHashObj* pTableNameHash,
1,035,967✔
678
                                            char* tbname) {
679
  if (NULL == pVgCxt->pData->aSubmitTbData) {
1,035,967✔
680
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
571,134✔
681
    if (NULL == pVgCxt->pData->aSubmitTbData) {
571,357✔
682
      return terrno;
×
683
    }
684
    if (pTbCtx->hasBlob) {
571,357✔
685
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
×
686
      if (pVgCxt->pData->aSubmitBlobData == NULL) {
×
687
        return terrno;
×
688
      }
689
    }
690
  }
691

692
  int32_t  code = TSDB_CODE_SUCCESS;
1,036,290✔
693
  SArray** rowP = NULL;
1,036,290✔
694

695
  rowP = (SArray**)tSimpleHashGet(pTableNameHash, tbname, strlen(tbname));
1,036,290✔
696

697
  if (rowP != NULL && *rowP != NULL) {
1,036,419✔
698
    for (int32_t j = 0; j < taosArrayGetSize(*rowP); ++j) {
×
699
      SRow* pRow = (SRow*)taosArrayGetP(pTbCtx->pData->aRowP, j);
×
700
      if (pRow) {
×
701
        if (NULL == taosArrayPush(*rowP, &pRow)) {
×
702
          return terrno;
×
703
        }
704
      }
705

706
      if (pTbCtx->hasBlob == 0) {
×
707
        code = tRowSort(*rowP);
×
708
        TAOS_CHECK_RETURN(code);
×
709

710
        code = tRowMerge(*rowP, pTbCtx->pSchema, 0);
×
711
        TAOS_CHECK_RETURN(code);
×
712
      } else {
713
        code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
714
        TAOS_CHECK_RETURN(code);
×
715

716
        code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
717
        TAOS_CHECK_RETURN(code);
×
718
      }
719
    }
720

721
    parserDebug("merge same uid data: %" PRId64 ", vgId:%d", pTbCtx->pData->uid, pVgCxt->vgId);
×
722

723
    if (pTbCtx->pData->pCreateTbReq != NULL) {
×
724
      tdDestroySVCreateTbReq(pTbCtx->pData->pCreateTbReq);
×
725
      taosMemoryFree(pTbCtx->pData->pCreateTbReq);
×
726
      pTbCtx->pData->pCreateTbReq = NULL;
×
727
    }
728
    return TSDB_CODE_SUCCESS;
×
729
  }
730

731
  if (pTbCtx->hasBlob == 0) {
1,036,419✔
732
    pTbCtx->pData->pBlobSet = NULL;  // if no blob, set it to NULL
1,036,365✔
733
  }
734

735
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTbCtx->pData)) {
2,072,812✔
736
    return terrno;
×
737
  }
738

739
  if (pTbCtx->hasBlob) {
1,036,393✔
740
    parserDebug("blob row transfer %p, pData %p, %s", pTbCtx->pData->pBlobSet, pTbCtx->pData, __func__);
×
741
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTbCtx->pData->pBlobSet)) {
×
742
      return terrno;
×
743
    }
744
    pTbCtx->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
×
745
  }
746

747
  code = tSimpleHashPut(pTableNameHash, tbname, strlen(tbname), &pTbCtx->pData->aRowP, sizeof(SArray*));
1,036,381✔
748

749
  if (code != TSDB_CODE_SUCCESS) {
1,036,391✔
750
    return code;
×
751
  }
752

753
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTbCtx->pMeta->uid, pVgCxt->vgId);
1,036,391✔
754

755
  return TSDB_CODE_SUCCESS;
1,036,375✔
756
}
757

758
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
2,231,513✔
759
                                  SStbInterlaceInfo* pBuildInfo) {
760
  int32_t  code = TSDB_CODE_SUCCESS;
2,231,513✔
761
  uint64_t uid;
2,216,527✔
762
  int32_t  vgId;
2,217,143✔
763
  uint64_t suid;
2,217,259✔
764

765
  pTbCtx->pData->aRowP = pTbData->aCol;
2,232,292✔
766

767
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId, &suid);
2,232,525✔
768
  if (TSDB_CODE_SUCCESS != code) {
2,231,723✔
769
    return code;
×
770
  }
771

772
  pTbCtx->pMeta->vgId = vgId;
2,231,723✔
773
  pTbCtx->pMeta->uid = uid;
2,231,745✔
774
  pTbCtx->pData->uid = uid;
2,231,781✔
775

776
  if (!pTbCtx->ordered) {
2,232,072✔
777
    code = tRowSort(pTbCtx->pData->aRowP);
12✔
778
  }
779
  if (code == TSDB_CODE_SUCCESS && (!pTbCtx->ordered || pTbCtx->duplicateTs)) {
2,231,027✔
780
    code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0);
12✔
781
  }
782

783
  if (TSDB_CODE_SUCCESS != code) {
2,230,897✔
784
    return code;
×
785
  }
786

787
  SVgroupDataCxt* pVgCxt = NULL;
2,230,897✔
788
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
2,230,828✔
789
  if (NULL == pp) {
2,232,836✔
790
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
353,736✔
791
    if (NULL == pp) {
353,818✔
792
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
353,818✔
793
    } else {
794
      pVgCxt = *(SVgroupDataCxt**)pp;
×
795
    }
796
  } else {
797
    pVgCxt = *(SVgroupDataCxt**)pp;
1,879,100✔
798
  }
799

800
  if (TSDB_CODE_SUCCESS == code) {
2,232,488✔
801
    code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false);
2,232,488✔
802
  }
803

804
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
2,231,178✔
805
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
806
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
807
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
808
    // insDestroyVgroupDataCxt(pVgCxt);
809
  }
810

811
  return code;
2,230,193✔
812
}
813

814
int32_t insAppendStmt2TableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
1,036,186✔
815
                                   SStbInterlaceInfo* pBuildInfo, SVCreateTbReq* ctbReq) {
816
  int32_t  code = TSDB_CODE_SUCCESS;
1,036,186✔
817
  uint64_t uid;
324,955✔
818
  int32_t  vgId;
325,011✔
819
  uint64_t suid;
325,011✔
820

821
  pTbCtx->pData->aRowP = pTbData->aCol;
1,036,242✔
822
  pTbCtx->pData->pBlobSet = pTbData->pBlobSet;
1,036,198✔
823

824
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId, &suid);
1,036,286✔
825
  if (ctbReq != NULL && code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
1,036,377✔
826
    pTbCtx->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
460✔
827
    vgId = (int32_t)ctbReq->uid;
460✔
828
    uid = 0;
460✔
829
    pTbCtx->pMeta->vgId = (int32_t)ctbReq->uid;
460✔
830
    ctbReq->uid = 0;
460✔
831
    pTbCtx->pMeta->uid = 0;
460✔
832
    pTbCtx->pData->uid = 0;
460✔
833
    pTbCtx->pData->pCreateTbReq = ctbReq;
460✔
834
    code = TSDB_CODE_SUCCESS;
460✔
835
  } else {
836
    if (TSDB_CODE_SUCCESS != code) {
1,035,917✔
837
      return code;
×
838
    }
839
    if (pTbCtx->pData->suid != suid) {
1,035,917✔
840
      return TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
841
    }
842

843
    pTbCtx->pMeta->vgId = vgId;
1,035,797✔
844
    pTbCtx->pMeta->uid = uid;
1,035,961✔
845
    pTbCtx->pData->uid = uid;
1,035,885✔
846
    pTbCtx->pData->pCreateTbReq = NULL;
1,035,929✔
847

848
    if (ctbReq != NULL) {
1,035,973✔
849
      tdDestroySVCreateTbReq(ctbReq);
850
      taosMemoryFree(ctbReq);
596,406✔
851
      ctbReq = NULL;
596,370✔
852
    }
853
  }
854

855
  if (pTbCtx->hasBlob == 0) {
1,036,407✔
856
    if (!pTbData->isOrdered) {
1,036,288✔
857
      code = tRowSort(pTbCtx->pData->aRowP);
×
858
    }
859
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
1,036,364✔
UNCOV
860
      code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, PREFER_NON_NULL);
×
861
    }
862
  } else {
UNCOV
863
    if (!pTbData->isOrdered) {
×
864
      code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
865
    }
UNCOV
866
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
×
867
      code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
868
    }
869
  }
870

871
  if (TSDB_CODE_SUCCESS != code) {
1,036,176✔
872
    return code;
×
873
  }
874

875
  SVgroupDataCxt* pVgCxt = NULL;
1,036,176✔
876
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
1,036,188✔
877
  if (NULL == pp) {
1,036,271✔
878
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
571,175✔
879
    if (NULL == pp) {
571,253✔
880
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
571,253✔
881
    } else {
882
      pVgCxt = *(SVgroupDataCxt**)pp;
×
883
    }
884
  } else {
885
    pVgCxt = *(SVgroupDataCxt**)pp;
465,096✔
886
  }
887

888
  if (code == TSDB_CODE_SUCCESS) {
1,036,135✔
889
    code = checkAndMergeSVgroupDataCxtByTbname(pTbCtx, pVgCxt, pBuildInfo->pTableRowDataHash, pTbData->tbName);
1,036,083✔
890
  }
891

892
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
1,036,415✔
893
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
894
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
895
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
896
    // insDestroyVgroupDataCxt(pVgCxt);
897
  }
898

899
  return code;
1,036,360✔
900
}
901

902
/*
903
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild,
904
int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
905
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
906
  if (NULL == pVgroupHash || NULL == pVgroupList) {
907
    taosHashCleanup(pVgroupHash);
908
    taosArrayDestroy(pVgroupList);
909
    return TSDB_CODE_OUT_OF_MEMORY;
910
  }
911

912
  int32_t code = TSDB_CODE_SUCCESS;
913

914
  for (int32_t i = 0; i < tbNum; ++i) {
915
    STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i);
916
    pTableCxt->pMeta->vgId = pTableCols->vgId;
917
    pTableCxt->pMeta->uid = pTableCols->uid;
918
    pTableCxt->pData->uid = pTableCols->uid;
919
    pTableCxt->pData->aCol = pTableCols->aCol;
920

921
    SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
922
    if (pCol->nVal <= 0) {
923
      continue;
924
    }
925

926
    if (pTableCxt->pData->pCreateTbReq) {
927
      pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
928
    }
929

930
    taosArraySort(pTableCxt->pData->aCol, insColDataComp);
931

932
    tColDataSortMerge(pTableCxt->pData->aCol);
933

934
    if (TSDB_CODE_SUCCESS == code) {
935
      SVgroupDataCxt* pVgCxt = NULL;
936
      int32_t         vgId = pTableCxt->pMeta->vgId;
937
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
938
      if (NULL == pp) {
939
        code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
940
      } else {
941
        pVgCxt = *(SVgroupDataCxt**)pp;
942
      }
943
      if (TSDB_CODE_SUCCESS == code) {
944
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false);
945
      }
946
    }
947
  }
948

949
  taosHashCleanup(pVgroupHash);
950
  if (TSDB_CODE_SUCCESS == code) {
951
    *pVgDataBlocks = pVgroupList;
952
  } else {
953
    insDestroyVgroupDataCxtList(pVgroupList);
954
  }
955

956
  return code;
957
}
958
*/
959

960
static int8_t colDataHasBlob(SColData* pCol) {
×
961
  if (IS_STR_DATA_BLOB(pCol->type)) {
×
962
    return 1;
×
963
  }
964
  return 0;
×
965
}
966
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
483,490,260✔
967
  SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
483,490,260✔
968
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
483,496,962✔
969
  if (NULL == pVgroupHash || NULL == pVgroupList) {
483,495,582✔
970
    taosHashCleanup(pVgroupHash);
411✔
971
    taosArrayDestroy(pVgroupList);
×
972
    return terrno;
×
973
  }
974

975
  int32_t code = TSDB_CODE_SUCCESS;
483,495,171✔
976
  bool    colFormat = false;
483,495,171✔
977

978
  void* p = taosHashIterate(pTableHash, NULL);
483,495,171✔
979
  if (p) {
483,498,015✔
980
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
483,498,015✔
981
    colFormat = (0 != (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT));
483,497,751✔
982
  }
983

984
  while (TSDB_CODE_SUCCESS == code && NULL != p) {
1,003,721,975✔
985
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
520,224,260✔
986
    if (colFormat) {
520,224,260✔
987
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
3,362,218✔
988
      if (pCol && pCol->nVal <= 0) {
3,362,218✔
989
        p = taosHashIterate(pTableHash, p);
126✔
990
        continue;
126✔
991
      }
992

993
      if (pTableCxt->pData->pCreateTbReq) {
3,362,092✔
994
        pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
3,226,716✔
995
      }
996
      int8_t isBlob = IS_STR_DATA_BLOB(pCol->type) ? 1 : 0;
3,360,808✔
997
      if (isBlob == 0) {
3,361,771✔
998
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
3,361,771✔
999
        code = tColDataSortMerge(&pTableCxt->pData->aCol);
3,361,091✔
1000
      } else {
1001
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
×
1002
        code = tColDataSortMergeWithBlob(&pTableCxt->pData->aCol, pTableCxt->pData->pBlobSet);
×
1003
      }
1004
    } else {
1005
      // skip the table has no data to insert
1006
      // eg: import a csv without valid data
1007
      // if (0 == taosArrayGetSize(pTableCxt->pData->aRowP)) {
1008
      //   parserWarn("no row in tableDataCxt uid:%" PRId64 " ", pTableCxt->pMeta->uid);
1009
      //   p = taosHashIterate(pTableHash, p);
1010
      //   continue;
1011
      // }
1012
      if (pTableCxt->hasBlob == 0) {
516,862,042✔
1013
        if (!pTableCxt->ordered) {
516,840,924✔
1014
          code = tRowSort(pTableCxt->pData->aRowP);
1,392,908✔
1015
        }
1016
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
516,840,482✔
1017
          code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, PREFER_NON_NULL);
1,521,628✔
1018
        }
1019
      } else {
1020
        if (!pTableCxt->ordered) {
21,168✔
1021
          code = tRowSortWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet);
727✔
1022
        }
1023
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
21,168✔
1024
          code = tRowMergeWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet, 0);
727✔
1025
        }
1026
      }
1027
    }
1028

1029
    if (TSDB_CODE_SUCCESS == code) {
520,222,912✔
1030
      SVgroupDataCxt* pVgCxt = NULL;
520,222,912✔
1031
      int32_t         vgId = pTableCxt->pMeta->vgId;
520,223,364✔
1032
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
520,223,445✔
1033
      if (NULL == pp) {
520,222,371✔
1034
        code = createVgroupDataCxt(vgId, pVgroupHash, pVgroupList, &pVgCxt);
497,017,974✔
1035
      } else {
1036
        pVgCxt = *(SVgroupDataCxt**)pp;
23,204,397✔
1037
      }
1038
      if (TSDB_CODE_SUCCESS == code) {
520,223,024✔
1039
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true);
520,223,024✔
1040
      }
1041
    }
1042
    if (TSDB_CODE_SUCCESS == code) {
520,222,819✔
1043
      p = taosHashIterate(pTableHash, p);
520,222,819✔
1044
    }
1045
  }
1046

1047
  taosHashCleanup(pVgroupHash);
483,497,715✔
1048
  if (TSDB_CODE_SUCCESS == code) {
483,494,976✔
1049
    *pVgDataBlocks = pVgroupList;
483,495,330✔
1050
  } else {
1051
    insDestroyVgroupDataCxtList(pVgroupList);
38✔
1052
  }
1053

1054
  return code;
483,495,671✔
1055
}
1056

1057
static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
497,937,658✔
1058
  int32_t  code = TSDB_CODE_SUCCESS;
497,937,658✔
1059
  uint32_t len = 0;
497,937,658✔
1060
  void*    pBuf = NULL;
497,937,658✔
1061
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
497,937,658✔
1062
  if (TSDB_CODE_SUCCESS == code) {
497,940,509✔
1063
    SEncoder encoder;
495,172,061✔
1064
    len += sizeof(SSubmitReq2Msg);
497,941,404✔
1065
    pBuf = taosMemoryMalloc(len);
497,941,404✔
1066
    if (NULL == pBuf) {
497,937,935✔
1067
      return terrno;
×
1068
    }
1069
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
497,937,935✔
1070
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
497,938,642✔
1071
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
497,941,165✔
1072
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
497,941,520✔
1073
    code = tEncodeSubmitReq(&encoder, pReq);
497,942,173✔
1074
    tEncoderClear(&encoder);
497,944,361✔
1075
  }
1076

1077
  if (TSDB_CODE_SUCCESS == code) {
497,939,820✔
1078
    *pData = pBuf;
497,939,820✔
1079
    *pLen = len;
497,941,003✔
1080
  } else {
1081
    taosMemoryFree(pBuf);
×
1082
  }
1083
  return code;
497,936,681✔
1084
}
1085

1086
static void destroyVgDataBlocks(void* p) {
×
1087
  if (p == NULL) return;
×
1088
  SVgDataBlocks* pVg = p;
×
1089
  taosMemoryFree(pVg->pData);
×
1090
  taosMemoryFree(pVg);
×
1091
}
1092

1093
int32_t insResetBlob(SSubmitReq2* p) {
497,938,044✔
1094
  int32_t code = 0;
497,938,044✔
1095
  if (p->raw) {
497,938,044✔
1096
    return TSDB_CODE_SUCCESS;  // no blob data in raw mode
×
1097
  }
1098

1099
  if (p->aSubmitBlobData != NULL) {
497,940,960✔
1100
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
42,336✔
1101
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
21,168✔
1102
      SBlobSet**     ppBlob = taosArrayGet(p->aSubmitBlobData, i);
21,168✔
1103
      SBlobSet*      pBlob = ppBlob ? *ppBlob : NULL;
21,168✔
1104
      int32_t        nrow = taosArrayGetSize(pSubmitTbData->aRowP);
21,168✔
1105
      int32_t        nblob = 0;
21,168✔
1106
      if (nrow > 0 && pBlob) {
21,168✔
1107
        nblob = taosArrayGetSize(pBlob->pSeqTable);
21,168✔
1108
      }
1109
      uTrace("blob %p row size %d, pData size %d", pBlob, nblob, nrow);
21,168✔
1110
      pSubmitTbData->pBlobSet = pBlob;
21,168✔
1111
      if (ppBlob != NULL) *ppBlob = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
21,168✔
1112
    }
1113
  } else {
1114
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
1,021,383,609✔
1115
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
523,462,154✔
1116
      pSubmitTbData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
523,466,819✔
1117
    }
1118
  }
1119

1120
  return code;
497,942,185✔
1121
}
1122
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) {
484,340,504✔
1123
  size_t  numOfVg = taosArrayGetSize(pVgDataCxtList);
484,340,504✔
1124
  SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES);
484,344,577✔
1125
  if (NULL == pDataBlocks) {
484,345,638✔
1126
    return TSDB_CODE_OUT_OF_MEMORY;
×
1127
  }
1128

1129
  int32_t code = TSDB_CODE_SUCCESS;
484,345,638✔
1130
  for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
982,288,001✔
1131
    SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
497,941,870✔
1132
    if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) {
497,943,690✔
1133
      continue;
×
1134
    }
1135
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
497,944,514✔
1136
    if (NULL == dst) {
497,938,729✔
1137
      code = terrno;
×
1138
    }
1139

1140
    if (TSDB_CODE_SUCCESS == code) {
497,938,729✔
1141
      dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
497,939,090✔
1142
      code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
497,940,539✔
1143
    }
1144
    if (TSDB_CODE_SUCCESS == code) {
497,942,958✔
1145
      code = insResetBlob(src->pData);
497,942,958✔
1146
    }
1147

1148
    if (TSDB_CODE_SUCCESS == code) {
497,942,306✔
1149
      code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
497,942,306✔
1150
    }
1151
    if (TSDB_CODE_SUCCESS == code) {
497,936,905✔
1152
      code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);
497,942,709✔
1153
    }
1154
    if (TSDB_CODE_SUCCESS != code) {
497,941,482✔
1155
      destroyVgDataBlocks(dst);
×
1156
    }
1157
  }
1158

1159
  if (append) {
484,346,131✔
1160
    if (NULL == *pVgDataBlocks) {
848,842✔
1161
      *pVgDataBlocks = pDataBlocks;
848,856✔
1162
    }
1163
    return code;
848,795✔
1164
  }
1165

1166
  if (TSDB_CODE_SUCCESS == code) {
483,497,289✔
1167
    *pVgDataBlocks = pDataBlocks;
483,490,351✔
1168
  } else {
1169
    taosArrayDestroyP(pDataBlocks, destroyVgDataBlocks);
6,938✔
1170
  }
1171

1172
  return code;
483,494,248✔
1173
}
1174

1175
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
×
1176
  for (int i = 0; i < numFields; i++) {
×
1177
    if (strcmp(pSchema->name, fields[i].name) == 0) {
×
1178
      return true;
×
1179
    }
1180
  }
1181

1182
  return false;
×
1183
}
1184

1185
int32_t checkSchema(SSchema* pColSchema, SSchemaExt* pColExtSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
159,860✔
1186
  if (*fields != pColSchema->type) {
159,860✔
1187
    if (errstr != NULL) {
265✔
1188
      snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
×
1189
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
×
1190
    } else {
1191
      char buf[512] = {0};
265✔
1192
      snprintf(buf, sizeof(buf), "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
530✔
1193
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
530✔
1194
      uError("checkSchema %s", buf);
265✔
1195
    }
1196
    return TSDB_CODE_INVALID_PARA;
265✔
1197
  }
1198

1199
  if (IS_DECIMAL_TYPE(pColSchema->type)) {
159,595✔
1200
    uint8_t precision = 0, scale = 0;
265✔
1201
    decimalFromTypeMod(pColExtSchema->typeMod, &precision, &scale);
265✔
1202
    uint8_t precisionData = 0, scaleData = 0;
265✔
1203
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
265✔
1204
    extractDecimalTypeInfoFromBytes(&bytes, &precisionData, &scaleData);
265✔
1205
    if (precision != precisionData || scale != scaleData) {
265✔
1206
      if (errstr != NULL) {
×
1207
        snprintf(errstr, errstrLen,
×
1208
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
1209
                 "precision:%d, scale:%d",
1210
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
1211
                 precisionData, scaleData);
1212
        return TSDB_CODE_INVALID_PARA;
×
1213
      } else {
1214
        char buf[512] = {0};
×
1215
        snprintf(buf, sizeof(buf),
×
1216
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
1217
                 "precision:%d, scale:%d",
1218
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
1219
                 precisionData, scaleData);
1220
        uError("checkSchema %s", buf);
×
1221
        return TSDB_CODE_INVALID_PARA;
×
1222
      }
1223
    }
1224
    return 0;
265✔
1225
  }
1226

1227
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {
159,330✔
1228
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
27,904✔
1229
    if (IS_STR_DATA_BLOB(pColSchema->type)) {
27,904✔
1230
      if (bytes >= TSDB_MAX_BLOB_LEN) {
×
1231
        uError("column blob data bytes exceed max limit, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
×
1232
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name, bytes);
1233
        return TSDB_CODE_INVALID_PARA;
×
1234
      }
1235
    } else {
1236
      if (bytes > pColSchema->bytes) {
27,904✔
1237
        if (errstr != NULL) {
265✔
1238
          snprintf(errstr, errstrLen,
×
1239
                   "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1240
                   pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1241
                   *(int32_t*)(fields + sizeof(int8_t)));
×
1242
        } else {
1243
          char buf[512] = {0};
265✔
1244
          snprintf(buf, sizeof(buf),
530✔
1245
                   "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1246
                   pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
795✔
1247
                   *(int32_t*)(fields + sizeof(int8_t)));
265✔
1248
          uError("checkSchema %s", buf);
265✔
1249
        }
1250
        return TSDB_CODE_INVALID_PARA;
265✔
1251
      }
1252
    }
1253
  }
1254

1255
  if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
159,065✔
1256
    if (errstr != NULL) {
×
1257
      snprintf(errstr, errstrLen,
×
1258
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1259
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1260
               *(int32_t*)(fields + sizeof(int8_t)));
×
1261
    } else {
1262
      char buf[512] = {0};
×
1263
      snprintf(buf, sizeof(buf),
×
1264
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1265
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1266
               *(int32_t*)(fields + sizeof(int8_t)));
×
1267
      uError("checkSchema %s", buf);
×
1268
    }
1269
    return TSDB_CODE_INVALID_PARA;
×
1270
  }
1271
  return 0;
159,065✔
1272
}
1273

1274
#define PRCESS_DATA(i, j)                                                                                          \
1275
  ret = checkSchema(pColSchema, pColExtSchema, fields, errstr, errstrLen);                                         \
1276
  if (ret != 0) {                                                                                                  \
1277
    goto end;                                                                                                      \
1278
  }                                                                                                                \
1279
                                                                                                                   \
1280
  if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {                                                          \
1281
    hasTs = true;                                                                                                  \
1282
  }                                                                                                                \
1283
                                                                                                                   \
1284
  int8_t* offset = pStart;                                                                                         \
1285
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {                                                                        \
1286
    pStart += numOfRows * sizeof(int32_t);                                                                         \
1287
  } else {                                                                                                         \
1288
    pStart += BitmapLen(numOfRows);                                                                                \
1289
  }                                                                                                                \
1290
  char* pData = pStart;                                                                                            \
1291
                                                                                                                   \
1292
  SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);                                                        \
1293
  if (hasBlob) {                                                                                                   \
1294
    ret = tColDataAddValueByDataBlockWithBlob(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData, \
1295
                                              pBlobSet);                                                           \
1296
  } else {                                                                                                         \
1297
    ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);        \
1298
  }                                                                                                                \
1299
  if (ret != 0) {                                                                                                  \
1300
    goto end;                                                                                                      \
1301
  }                                                                                                                \
1302
  fields += sizeof(int8_t) + sizeof(int32_t);                                                                      \
1303
  if (needChangeLength && version == BLOCK_VERSION_1) {                                                            \
1304
    pStart += htonl(colLength[i]);                                                                                 \
1305
  } else {                                                                                                         \
1306
    pStart += colLength[i];                                                                                        \
1307
  }                                                                                                                \
1308
  boundInfo->pColIndex[j] = -1;
1309

1310
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
34,710✔
1311
                     int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
1312
  int       ret = 0;
34,710✔
1313
  int8_t    hasBlob = 0;
34,710✔
1314
  SBlobSet* pBlobSet = NULL;
34,710✔
1315
  if (data == NULL) {
34,710✔
1316
    uError("rawBlockBindData, data is NULL");
×
1317
    return TSDB_CODE_APP_ERROR;
×
1318
  }
1319
  void* tmp =
1320
      taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
34,710✔
1321
  SVCreateTbReq* pCreateReqTmp = NULL;
34,710✔
1322
  if (tmp == NULL && pCreateTb != NULL) {
34,710✔
1323
    ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
3,312✔
1324
    if (ret != TSDB_CODE_SUCCESS) {
3,312✔
1325
      uError("cloneSVreateTbReq error");
×
1326
      goto end;
×
1327
    }
1328
  }
1329

1330
  STableDataCxt* pTableCxt = NULL;
34,710✔
1331
  ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
34,710✔
1332
                           sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
1333
  if (pCreateReqTmp != NULL) {
34,710✔
1334
    tdDestroySVCreateTbReq(pCreateReqTmp);
×
1335
    taosMemoryFree(pCreateReqTmp);
×
1336
  }
1337

1338
  hasBlob = pTableCxt->hasBlob;
34,710✔
1339
  if (hasBlob && pTableCxt->pData->pBlobSet == NULL) {
34,710✔
1340
    ret = tBlobSetCreate(512, 0, &pTableCxt->pData->pBlobSet);
×
1341
    if (pTableCxt->pData->pBlobSet == NULL) {
×
1342
      uError("create blob set failed");
×
1343
      ret = terrno;
×
1344
    }
1345
  }
1346

1347
  if (ret != TSDB_CODE_SUCCESS) {
34,710✔
1348
    uError("insGetTableDataCxt error");
×
1349
    goto end;
×
1350
  }
1351
  pBlobSet = pTableCxt->pData->pBlobSet;
34,710✔
1352

1353
  pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
34,710✔
1354
  if (tmp == NULL) {
34,710✔
1355
    ret = initTableColSubmitData(pTableCxt);
31,118✔
1356
    if (ret != TSDB_CODE_SUCCESS) {
31,118✔
1357
      uError("initTableColSubmitData error");
×
1358
      goto end;
×
1359
    }
1360
  }
1361

1362
  char* p = (char*)data;
34,710✔
1363
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1364
  // column length |
1365
  int32_t version = *(int32_t*)data;
34,710✔
1366
  p += sizeof(int32_t);
34,710✔
1367
  p += sizeof(int32_t);
34,710✔
1368

1369
  int32_t numOfRows = *(int32_t*)p;
34,710✔
1370
  p += sizeof(int32_t);
34,710✔
1371

1372
  int32_t numOfCols = *(int32_t*)p;
34,710✔
1373
  p += sizeof(int32_t);
34,710✔
1374

1375
  p += sizeof(int32_t);
34,710✔
1376
  p += sizeof(uint64_t);
34,710✔
1377

1378
  int8_t* fields = (int8_t*)p;
34,710✔
1379
  if (*fields >= TSDB_DATA_TYPE_MAX || *fields < 0) {
34,710✔
1380
    uError("fields type error:%d", *fields);
×
1381
    ret = TSDB_CODE_INVALID_PARA;
×
1382
    goto end;
×
1383
  }
1384
  p += numOfCols * (sizeof(int8_t) + sizeof(int32_t));
34,710✔
1385

1386
  int32_t* colLength = (int32_t*)p;
34,710✔
1387
  p += sizeof(int32_t) * numOfCols;
34,710✔
1388

1389
  char* pStart = p;
34,710✔
1390

1391
  SSchema*       pSchema = getTableColumnSchema(pTableMeta);
34,710✔
1392
  SSchemaExt*    pExtSchemas = getTableColumnExtSchema(pTableMeta);
34,710✔
1393
  SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
34,710✔
1394

1395
  if (tFields != NULL && numFields != numOfCols) {
34,710✔
1396
    if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to data cols:%d", numFields, numOfCols);
×
1397
    ret = TSDB_CODE_INVALID_PARA;
×
1398
    goto end;
×
1399
  }
1400

1401
  bool hasTs = false;
34,710✔
1402
  if (tFields == NULL) {
34,710✔
1403
    int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
1,590✔
1404
    for (int j = 0; j < len; j++) {
4,770✔
1405
      SSchema*    pColSchema = &pSchema[j];
3,710✔
1406
      SSchemaExt* pColExtSchema = &pExtSchemas[j];
3,710✔
1407
      PRCESS_DATA(j, j)
3,710✔
1408
    }
1409
  } else {
1410
    for (int i = 0; i < numFields; i++) {
179,038✔
1411
      for (int j = 0; j < boundInfo->numOfBound; j++) {
1,355,411✔
1412
        SSchema*    pColSchema = &pSchema[j];
1,355,135✔
1413
        SSchemaExt* pColExtSchema = &pExtSchemas[j];
1,355,135✔
1414
        char*       fieldName = NULL;
1,355,135✔
1415
        if (raw) {
1,355,135✔
1416
          fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
1,353,810✔
1417
        } else {
1418
          fieldName = ((TAOS_FIELD*)tFields)[i].name;
1,325✔
1419
        }
1420
        if (strcmp(pColSchema->name, fieldName) == 0) {
1,355,135✔
1421
          PRCESS_DATA(i, j)
145,642✔
1422
          break;
145,642✔
1423
        }
1424
      }
1425
    }
1426
  }
1427

1428
  if (!hasTs) {
34,180✔
1429
    if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
×
1430
    ret = TSDB_CODE_INVALID_PARA;
×
1431
    goto end;
×
1432
  }
1433

1434
  // process NULL data
1435
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
186,281✔
1436
    if (boundInfo->pColIndex[c] != -1) {
152,101✔
1437
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
3,544✔
1438
      ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
3,544✔
1439
      if (ret != 0) {
3,544✔
1440
        goto end;
×
1441
      }
1442
    } else {
1443
      boundInfo->pColIndex[c] = c;  // restore for next block
148,557✔
1444
    }
1445
  }
1446

1447
end:
34,710✔
1448
  return ret;
34,710✔
1449
}
1450

1451
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data) {
×
1452
  int code = transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion);
×
1453
  if (code != 0) {
×
1454
    return code;
×
1455
  }
1456
  SVgroupDataCxt* pVgCxt = NULL;
×
1457
  void**          pp = taosHashGet(pVgroupHash, &pTableMeta->vgId, sizeof(pTableMeta->vgId));
×
1458
  if (NULL == pp) {
×
1459
    code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt);
×
1460
    if (code != 0) {
×
1461
      return code;
×
1462
    }
1463
  } else {
1464
    pVgCxt = *(SVgroupDataCxt**)pp;
×
1465
  }
1466
  if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1467
    pVgCxt->pData->aSubmitTbData = taosArrayInit(0, POINTER_BYTES);
×
1468
    pVgCxt->pData->raw = true;
×
1469
    if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1470
      return terrno;
×
1471
    }
1472
  }
1473

1474
  // push data to submit, rebuild empty data for next submit
1475
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, &data)) {
×
1476
    return terrno;
×
1477
  }
1478

1479
  uTrace("add raw data to vgId:%d, len:%d", pTableMeta->vgId, *(int32_t*)data);
×
1480

1481
  return 0;
×
1482
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc