• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4909

30 Dec 2025 10:52AM UTC coverage: 65.542% (+0.2%) from 65.386%
#4909

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

857 existing lines in 113 files now uncovered.

193924 of 295877 relevant lines covered (65.54%)

120594206.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.06
/source/libs/parser/src/parInsertUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "parInsertUtil.h"
17

18
#include "catalog.h"
19
#include "parInt.h"
20
#include "parUtil.h"
21
#include "querynodes.h"
22
#include "tRealloc.h"
23
#include "taoserror.h"
24
#include "tarray.h"
25
#include "tdatablock.h"
26
#include "tdataformat.h"
27
#include "tmisce.h"
28
#include "ttypes.h"
29

30
void qDestroyBoundColInfo(void* pInfo) {
4,076,516✔
31
  if (NULL == pInfo) {
4,076,516✔
32
    return;
768,413✔
33
  }
34

35
  SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
3,308,103✔
36

37
  taosMemoryFreeClear(pBoundInfo->pColIndex);
3,308,103✔
38
}
39

40
static char* tableNameGetPosition(SToken* pToken, char target) {
612,200,098✔
41
  bool inEscape = false;
612,200,098✔
42
  bool inQuote = false;
612,200,098✔
43
  char quotaStr = 0;
612,200,098✔
44

45
  for (uint32_t i = 0; i < pToken->n; ++i) {
2,147,483,647✔
46
    if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
2,147,483,647✔
47
      return pToken->z + i;
211,216,578✔
48
    }
49

50
    if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
2,147,483,647✔
51
      if (!inQuote) {
22,927,536✔
52
        inEscape = !inEscape;
22,926,858✔
53
      }
54
    }
55

56
    if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
2,147,483,647✔
57
      if (!inEscape) {
6,254,862✔
58
        if (!inQuote) {
6,253,250✔
59
          quotaStr = *(pToken->z + i);
3,126,625✔
60
          inQuote = !inQuote;
3,126,625✔
61
        } else if (quotaStr == *(pToken->z + i)) {
3,126,625✔
62
          inQuote = !inQuote;
3,126,625✔
63
        }
64
      }
65
    }
66
  }
67

68
  return NULL;
400,996,419✔
69
}
70

71
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
612,200,171✔
72
  const char* msg1 = "table name is too long";
612,200,171✔
73
  const char* msg2 = "invalid database name";
612,200,171✔
74
  const char* msg3 = "db is not specified";
612,200,171✔
75
  const char* msg4 = "invalid table name";
612,200,171✔
76
  const char* msg5 = "database name is too long";
612,200,171✔
77

78
  int32_t code = TSDB_CODE_SUCCESS;
612,200,171✔
79
  char*   p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
612,200,171✔
80

81
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
612,212,990✔
82
    // before dbname dequote
83
    int32_t dbLen = p - pTableName->z;
211,216,299✔
84
    if (dbLen <= 0) {
211,208,563✔
85
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
86
    }
87
    if (dbLen >= TSDB_DB_FNAME_LEN + TSDB_NAME_QUOTE) {
211,208,563✔
88
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg5);
×
89
    }
90

91
    char name[TSDB_DB_FNAME_LEN + TSDB_NAME_QUOTE] = {0};
211,208,563✔
92
    strncpy(name, pTableName->z, dbLen);
211,209,755✔
93
    int32_t actualDbLen = strdequote(name);
211,209,584✔
94

95
    // after dbname dequote
96
    if (actualDbLen <= 0) {
211,209,215✔
97
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
98
    }
99
    if (actualDbLen >= TSDB_DB_NAME_LEN) {
211,210,635✔
100
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg5);
×
101
    }
102

103
    code = tNameSetDbName(pName, acctId, name, actualDbLen);
211,210,635✔
104
    if (code != TSDB_CODE_SUCCESS) {
211,215,043✔
105
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
106
    }
107

108
    // before tbname dequote
109
    int32_t tbLen = pTableName->n - dbLen - 1;
211,215,043✔
110
    if (tbLen <= 0) {
211,209,795✔
111
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
1,979✔
112
    }
113
    if (tbLen >= TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE) {
211,207,816✔
114
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
125✔
115
    }
116

117
    char tbname[TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE] = {0};
211,207,691✔
118
    strncpy(tbname, p + 1, tbLen);
211,206,806✔
119
    int32_t actualTbLen = strdequote(tbname);
211,204,365✔
120

121
    // after tbname dequote
122
    if (actualTbLen <= 0) {
211,215,085✔
123
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
×
124
    }
125
    if (actualTbLen >= TSDB_TABLE_NAME_LEN) {
211,215,416✔
126
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
125✔
127
    }
128

129
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
211,215,291✔
130
    if (code != 0) {
211,212,453✔
131
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
1,207✔
132
    }
133
  } else {  // get current DB name first, and then set it into path
134
    // before tbname dequote
135
    int32_t tbLen = pTableName->n;
400,996,691✔
136
    if (tbLen <= 0) {
400,992,799✔
137
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
×
138
    }
139

140
    if (tbLen >= TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE) {
400,993,936✔
141
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
125✔
142
    }
143

144
    char tbname[TSDB_TABLE_NAME_LEN + TSDB_NAME_QUOTE] = {0};
400,993,811✔
145
    strncpy(tbname, pTableName->z, tbLen);
400,993,029✔
146
    int32_t actualTbLen = strdequote(tbname);
400,993,412✔
147
    // after tbname dequote
148
    if (actualTbLen <= 0) {
400,991,874✔
149
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
1,028✔
150
    }
151
    if (actualTbLen >= TSDB_TABLE_NAME_LEN) {
400,990,859✔
152
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
125✔
153
    }
154

155
    if (dbName == NULL || strlen(dbName) == 0) {
400,990,734✔
156
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_DB_NOT_SPECIFIED, msg3);
×
157
    }
158

159
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
400,995,537✔
160
    if (code != TSDB_CODE_SUCCESS) {
400,993,226✔
161
      code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg2);
×
162
      return code;
×
163
    }
164

165
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
400,993,226✔
166
    if (code != 0) {
400,992,802✔
167
      code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg1);
×
168
    }
169
  }
170

171
  if (NULL != strchr(pName->tname, '.')) {
612,206,695✔
172
    code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'");
×
173
  }
174

175
  return code;
612,205,544✔
176
}
177

178
int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchema) {
914,412,282✔
179
  while (start < end) {
2,147,483,647✔
180
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
2,147,483,647✔
181
      return start;
908,271,977✔
182
    }
183
    ++start;
2,147,483,647✔
184
  }
185
  return -1;
6,141,176✔
186
}
187

188
int32_t insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
11,647,577✔
189
                            SArray* tagName, uint8_t tagNum, int32_t ttl) {
190
  pTbReq->type = TD_CHILD_TABLE;
11,647,577✔
191
  pTbReq->ctb.pTag = (uint8_t*)pTag;
11,650,404✔
192
  pTbReq->name = taosStrdup(tname);
11,649,812✔
193
  if (!pTbReq->name) return terrno;
11,648,101✔
194
  pTbReq->ctb.suid = suid;
11,644,136✔
195
  pTbReq->ctb.tagNum = tagNum;
11,645,412✔
196
  if (sname) {
11,648,009✔
197
    pTbReq->ctb.stbName = taosStrdup(sname);
11,014,808✔
198
    if (!pTbReq->ctb.stbName) return terrno;
11,017,360✔
199
  }
200
  pTbReq->ctb.tagName = taosArrayDup(tagName, NULL);
11,648,009✔
201
  if (!pTbReq->ctb.tagName) return terrno;
11,649,786✔
202
  pTbReq->ttl = ttl;
11,647,931✔
203
  pTbReq->commentLen = -1;
11,647,566✔
204

205
  return TSDB_CODE_SUCCESS;
11,648,569✔
206
}
207

208
static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
×
209
  for (int32_t i = 0; i < ncols; ++i) {
×
210
    pBoundCols[i] = i;
×
211
  }
212
}
×
213

214
static int32_t initColValues(STableMeta* pTableMeta, SArray* pValues) {
558,883,753✔
215
  SSchema* pSchemas = getTableColumnSchema(pTableMeta);
558,883,753✔
216
  int32_t  code = 0;
558,887,559✔
217
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
2,147,483,647✔
218
    SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
2,147,483,647✔
219
    if (NULL == taosArrayPush(pValues, &val)) {
2,147,483,647✔
220
      code = terrno;
×
221
      break;
×
222
    }
223
  }
224
  return code;
558,895,053✔
225
}
226

227
int32_t insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { return initColValues(pTableMeta, aColValues); }
199,160✔
228

229
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
579,212,304✔
230
  pInfo->numOfCols = numOfBound;
579,212,304✔
231
  pInfo->numOfBound = numOfBound;
579,220,048✔
232
  pInfo->hasBoundCols = false;
579,216,062✔
233
  pInfo->mixTagsCols = false;
579,219,321✔
234
  pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t));
579,218,034✔
235
  if (NULL == pInfo->pColIndex) {
579,219,839✔
236
    return terrno;
×
237
  }
238
  for (int32_t i = 0; i < numOfBound; ++i) {
2,147,483,647✔
239
    pInfo->pColIndex[i] = i;
2,147,483,647✔
240
  }
241
  return TSDB_CODE_SUCCESS;
579,220,151✔
242
}
243

244
void insResetBoundColsInfo(SBoundColInfo* pInfo) {
2,452✔
245
  pInfo->numOfBound = pInfo->numOfCols;
2,452✔
246
  pInfo->hasBoundCols = false;
2,452✔
247
  pInfo->mixTagsCols = false;
2,452✔
248
  for (int32_t i = 0; i < pInfo->numOfCols; ++i) {
12,260✔
249
    pInfo->pColIndex[i] = i;
9,808✔
250
  }
251
}
2,452✔
252

253
void insCheckTableDataOrder(STableDataCxt* pTableCxt, SRowKey* rowKey) {
2,147,483,647✔
254
  // once the data block is disordered, we do NOT keep last timestamp any more
255
  if (!pTableCxt->ordered) {
2,147,483,647✔
256
    return;
28,753,077✔
257
  }
258

259
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) < 0) {
2,147,483,647✔
260
    pTableCxt->ordered = false;
1,349,070✔
261
  }
262

263
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) == 0) {
2,147,483,647✔
264
    pTableCxt->duplicateTs = true;
1,726,535✔
265
  }
266

267
  // TODO: for variable length data type, we need to copy it out
268
  pTableCxt->lastKey = *rowKey;
2,147,483,647✔
269
  return;
2,147,483,647✔
270
}
271

272
void insDestroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
1,684,906,663✔
273

274
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput,
561,400,362✔
275
                                  bool colMode, bool ignoreColVals) {
276
  STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
561,400,362✔
277
  if (NULL == pTableCxt) {
561,393,899✔
278
    *pOutput = NULL;
×
279
    return terrno;
×
280
  }
281

282
  int32_t code = TSDB_CODE_SUCCESS;
561,393,899✔
283

284
  pTableCxt->lastKey = (SRowKey){0};
561,393,899✔
285
  pTableCxt->ordered = true;
561,395,351✔
286
  pTableCxt->duplicateTs = false;
561,397,963✔
287

288
  pTableCxt->pMeta = tableMetaDup(pTableMeta);
561,399,046✔
289
  if (NULL == pTableCxt->pMeta) {
561,410,655✔
290
    code = TSDB_CODE_OUT_OF_MEMORY;
×
291
  }
292
  if (TSDB_CODE_SUCCESS == code) {
561,411,237✔
293
    pTableCxt->pSchema =
561,411,274✔
294
        tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
561,411,162✔
295
    if (NULL == pTableCxt->pSchema) {
561,411,507✔
296
      code = TSDB_CODE_OUT_OF_MEMORY;
×
297
    }
298
  }
299
  pTableCxt->hasBlob = schemaHasBlob(pTableCxt->pSchema);
561,410,315✔
300

301
  if (TSDB_CODE_SUCCESS == code) {
561,410,215✔
302
    code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo);
561,410,204✔
303
  }
304
  if (TSDB_CODE_SUCCESS == code && !ignoreColVals) {
561,411,362✔
305
    pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal));
558,691,849✔
306
    if (NULL == pTableCxt->pValues) {
558,689,552✔
307
      code = terrno;
×
308
    } else {
309
      code = initColValues(pTableMeta, pTableCxt->pValues);
558,687,543✔
310
    }
311
  }
312
  if (TSDB_CODE_SUCCESS == code) {
561,414,379✔
313
    pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
561,416,051✔
314
    if (NULL == pTableCxt->pData) {
561,404,681✔
315
      code = terrno;
×
316
    } else {
317
      pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
561,411,151✔
318
      pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
561,412,293✔
319
      pTableCxt->pData->suid = pTableMeta->suid;
561,407,080✔
320
      pTableCxt->pData->uid = pTableMeta->uid;
561,406,922✔
321
      pTableCxt->pData->sver = pTableMeta->sversion;
561,410,008✔
322
      pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
561,407,144✔
323
      int8_t flag = pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT;
561,410,438✔
324
      if (pCreateTbReq != NULL) *pCreateTbReq = NULL;
561,405,054✔
325
      if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
561,406,637✔
326
        pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
3,336,594✔
327
        if (NULL == pTableCxt->pData->aCol) {
3,337,436✔
328
          code = terrno;
×
329
        }
330
      } else {
331
        pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
558,067,258✔
332
        if (NULL == pTableCxt->pData->aRowP) {
558,070,261✔
333
          code = terrno;
×
334
        }
335
      }
336
    }
337
  }
338
  if (TSDB_CODE_SUCCESS == code) {
561,409,470✔
339
    *pOutput = pTableCxt;
561,409,470✔
340
    parserDebug("uid:%" PRId64 ", create table data context, code:%d, vgId:%d", pTableMeta->uid, code,
561,409,304✔
341
                pTableMeta->vgId);
342
  } else {
343
    insDestroyTableDataCxt(pTableCxt);
×
344
  }
345

346
  return code;
561,408,916✔
347
}
348

349
static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst, int8_t hasBlob) {
3,975,461✔
350
  int32_t        code = TSDB_CODE_SUCCESS;
3,975,461✔
351
  SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData));
3,975,461✔
352
  if (NULL == pTmp) {
3,976,113✔
353
    code = terrno;
×
354
  } else {
355
    pTmp->flags = pSrc->flags;
3,976,113✔
356
    pTmp->suid = pSrc->suid;
3,975,794✔
357
    pTmp->uid = pSrc->uid;
3,975,794✔
358
    pTmp->sver = pSrc->sver;
3,975,794✔
359
    pTmp->pCreateTbReq = NULL;
3,975,794✔
360
    if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
3,976,113✔
361
      if (pSrc->pCreateTbReq) {
3,831,157✔
362
        code = cloneSVreateTbReq(pSrc->pCreateTbReq, &pTmp->pCreateTbReq);
3,830,200✔
363
      } else {
UNCOV
364
        pTmp->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
×
365
      }
366
    }
367
    if (TSDB_CODE_SUCCESS == code) {
3,976,085✔
368
      if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
3,976,085✔
369
        pTmp->aCol = taosArrayInit(128, sizeof(SColData));
3,341,259✔
370
        if (NULL == pTmp->aCol) {
3,341,578✔
371
          code = terrno;
×
372
          taosMemoryFree(pTmp);
×
373
        }
374
      } else {
375
        pTmp->aRowP = taosArrayInit(128, POINTER_BYTES);
635,464✔
376
        if (NULL == pTmp->aRowP) {
635,478✔
377
          code = terrno;
×
378
          taosMemoryFree(pTmp);
×
379
        }
380

381
        if (code != 0) {
635,478✔
382
          taosArrayDestroy(pTmp->aRowP);
×
383
          taosMemoryFree(pTmp);
×
384
        }
385
      }
386

387
    } else {
388
      taosMemoryFree(pTmp);
×
389
    }
390
  }
391

392
  taosMemoryFree(pSrc);
3,975,475✔
393
  if (TSDB_CODE_SUCCESS == code) {
3,977,375✔
394
    *pDst = pTmp;
3,977,375✔
395
  }
396

397
  return code;
3,977,694✔
398
}
399

400
static void resetColValues(SArray* pValues) {
32,186,087✔
401
  int32_t num = taosArrayGetSize(pValues);
32,186,087✔
402
  for (int32_t i = 0; i < num; ++i) {
363,102,385✔
403
    SColVal* pVal = taosArrayGet(pValues, i);
330,916,298✔
404
    pVal->flag = CV_FLAG_NONE;
330,916,298✔
405
  }
406
}
32,186,087✔
407

408
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
1,486,569,274✔
409
                           SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals) {
410
  STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
1,486,569,274✔
411
  if (NULL != tmp) {
1,486,582,569✔
412
    *pTableCxt = *tmp;
925,169,847✔
413
    if (!ignoreColVals) {
925,169,847✔
414
      resetColValues((*pTableCxt)->pValues);
32,186,087✔
415
    }
416
    return TSDB_CODE_SUCCESS;
925,169,847✔
417
  }
418
  int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals);
561,412,722✔
419
  if (TSDB_CODE_SUCCESS == code) {
561,409,000✔
420
    void* pData = *pTableCxt;  // deal scan coverity
561,411,086✔
421
    code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
561,413,340✔
422
  }
423

424
  if (TSDB_CODE_SUCCESS != code) {
561,414,175✔
425
    insDestroyTableDataCxt(*pTableCxt);
×
426
  }
427
  return code;
561,414,853✔
428
}
429

430
static void destroyColVal(void* p) {
2,147,483,647✔
431
  SColVal* pVal = p;
2,147,483,647✔
432
  if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type ||
2,147,483,647✔
433
      TSDB_DATA_TYPE_VARBINARY == pVal->value.type || TSDB_DATA_TYPE_DECIMAL == pVal->value.type) {
2,147,483,647✔
434
    taosMemoryFreeClear(pVal->value.pData);
874,133,414✔
435
  }
436
}
2,147,483,647✔
437

438
void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
564,529,286✔
439
  if (NULL == pTableCxt) {
564,529,286✔
440
    return;
×
441
  }
442

443
  taosMemoryFreeClear(pTableCxt->pMeta);
564,529,286✔
444
  tDestroyTSchema(pTableCxt->pSchema);
564,526,709✔
445
  insDestroyBoundColInfo(&pTableCxt->boundColsInfo);
564,527,255✔
446
  taosArrayDestroyEx(pTableCxt->pValues, destroyColVal);
564,526,475✔
447
  if (pTableCxt->pData) {
564,526,524✔
448
    tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
7,780,646✔
449
    taosMemoryFree(pTableCxt->pData);
7,780,528✔
450
  }
451
  taosMemoryFree(pTableCxt);
564,526,628✔
452
}
453

454
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
536,390,189✔
455
  if (NULL == pVgCxt) {
536,390,189✔
456
    return;
×
457
  }
458

459
  tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
536,390,189✔
460
  taosMemoryFree(pVgCxt->pData);
536,389,423✔
461

462
  taosMemoryFree(pVgCxt);
536,389,148✔
463
}
464

465
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
1,045,046,498✔
466
  if (NULL == pVgCxtList) {
1,045,046,498✔
467
    return;
522,527,129✔
468
  }
469

470
  size_t size = taosArrayGetSize(pVgCxtList);
522,519,369✔
471
  for (int32_t i = 0; i < size; i++) {
1,058,909,656✔
472
    void* p = taosArrayGetP(pVgCxtList, i);
536,391,618✔
473
    insDestroyVgroupDataCxt(p);
536,394,090✔
474
  }
475

476
  taosArrayDestroy(pVgCxtList);
522,518,038✔
477
}
478

479
void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) {
×
480
  if (NULL == pVgCxtHash) {
×
481
    return;
×
482
  }
483

484
  void** p = taosHashIterate(pVgCxtHash, NULL);
×
485
  while (p) {
×
486
    insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p);
×
487

488
    p = taosHashIterate(pVgCxtHash, p);
×
489
  }
490

491
  taosHashCleanup(pVgCxtHash);
×
492
}
493

494
void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
523,258,233✔
495
  if (NULL == pTableCxtHash) {
523,258,233✔
496
    return;
3,309,208✔
497
  }
498

499
  void** p = taosHashIterate(pTableCxtHash, NULL);
519,949,025✔
500
  while (p) {
1,077,968,029✔
501
    insDestroyTableDataCxt(*(STableDataCxt**)p);
558,015,362✔
502

503
    p = taosHashIterate(pTableCxtHash, p);
558,014,532✔
504
  }
505

506
  taosHashCleanup(pTableCxtHash);
519,952,667✔
507
}
508

509
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) {
562,793,064✔
510
  int32_t code = 0;
562,793,064✔
511
  if (NULL == pVgCxt->pData->aSubmitTbData) {
562,793,064✔
512
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
535,902,808✔
513
    if (pVgCxt->pData->aSubmitTbData == NULL) {
535,904,899✔
514
      return terrno;
×
515
    }
516
    if (pTableCxt->hasBlob) {
535,904,631✔
517
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
21,167✔
518
      if (NULL == pVgCxt->pData->aSubmitBlobData) {
21,167✔
519
        return terrno;
×
520
      }
521
    }
522
  }
523

524
  // push data to submit, rebuild empty data for next submit
525
  if (!pTableCxt->hasBlob) pTableCxt->pData->pBlobSet = NULL;
562,799,584✔
526

527
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData)) {
1,125,599,401✔
528
    return terrno;
×
529
  }
530

531
  if (pTableCxt->hasBlob) {
562,799,004✔
532
    parserDebug("blob row transfer %p, pData %p, %s", pTableCxt->pData->pBlobSet, pTableCxt->pData, __func__);
21,167✔
533
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTableCxt->pData->pBlobSet)) {
42,334✔
534
      return terrno;
×
535
    }
536
    pTableCxt->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
21,167✔
537
  }
538

539
  if (isRebuild) {
562,792,776✔
540
    code = rebuildTableData(pTableCxt->pData, &pTableCxt->pData, pTableCxt->hasBlob);
3,975,142✔
541
  } else if (clear) {
558,817,634✔
542
    taosMemoryFreeClear(pTableCxt->pData);
556,826,072✔
543
  }
544
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
562,792,408✔
545

546
  return code;
562,794,368✔
547
}
548

549
static int32_t createVgroupDataCxt(int32_t vgId, SHashObj* pVgroupHash, SArray* pVgroupList, SVgroupDataCxt** pOutput) {
536,481,220✔
550
  SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
536,481,220✔
551
  if (NULL == pVgCxt) {
536,483,782✔
552
    return terrno;
×
553
  }
554
  pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2));
536,483,782✔
555
  if (NULL == pVgCxt->pData) {
536,481,221✔
556
    insDestroyVgroupDataCxt(pVgCxt);
×
557
    return terrno;
×
558
  }
559

560
  pVgCxt->vgId = vgId;
536,482,121✔
561
  int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
536,482,805✔
562
  if (TSDB_CODE_SUCCESS == code) {
536,483,384✔
563
    if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) {
536,483,434✔
564
      code = terrno;
×
565
      insDestroyVgroupDataCxt(pVgCxt);
×
566
      return code;
14✔
567
    }
568
    //    uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
569
    *pOutput = pVgCxt;
536,483,434✔
570
  } else {
571
    insDestroyVgroupDataCxt(pVgCxt);
×
572
  }
573
  return code;
536,482,705✔
574
}
575

576
int insColDataComp(const void* lp, const void* rp) {
15,020,026✔
577
  SColData* pLeft = (SColData*)lp;
15,020,026✔
578
  SColData* pRight = (SColData*)rp;
15,020,026✔
579
  if (pLeft->cid < pRight->cid) {
15,020,026✔
580
    return -1;
14,970,742✔
581
  } else if (pLeft->cid > pRight->cid) {
51,198✔
582
    return 1;
51,198✔
583
  }
584

585
  return 0;
×
586
}
587

588
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId,
68,291✔
589
                                 STableColsData* pTbData, SName* sname) {
590
  if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) {
68,291✔
591
    return TSDB_CODE_SUCCESS;
57,137✔
592
  }
593

594
  SVgroupInfo      vgInfo = {0};
11,167✔
595
  SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
22,105✔
596
                           .requestId = pBuildInfo->requestId,
11,167✔
597
                           .requestObjRefId = pBuildInfo->requestSelf,
11,167✔
598
                           .mgmtEps = pBuildInfo->mgmtEpSet};
599

600
  int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo);
11,167✔
601
  if (TSDB_CODE_SUCCESS != code) {
11,167✔
602
    return code;
×
603
  }
604

605
  code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
11,167✔
606
  if (TSDB_CODE_SUCCESS != code) {
11,167✔
607
    return code;
×
608
  }
609

610
  return TSDB_CODE_SUCCESS;
11,167✔
611
}
612

613
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData,
3,065,337✔
614
                             uint64_t* uid, int32_t* vgId, uint64_t* suid) {
615
  STableVgUid* pTbInfo = NULL;
3,065,337✔
616
  int32_t      code = 0;
3,065,337✔
617

618
  if (pTbData->getFromHash) {
3,065,337✔
619
    pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName));
2,999,027✔
620
  }
621

622
  if (NULL == pTbInfo) {
3,066,551✔
623
    SName sname;
52,598✔
624
    code = qCreateSName2(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0);
68,784✔
625
    if (TSDB_CODE_SUCCESS != code) {
68,784✔
626
      return code;
480✔
627
    }
628

629
    STableMeta*      pTableMeta = NULL;
68,784✔
630
    SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
121,382✔
631
                             .requestId = pBuildInfo->requestId,
68,784✔
632
                             .requestObjRefId = pBuildInfo->requestSelf,
68,784✔
633
                             .mgmtEps = pBuildInfo->mgmtEpSet};
634
    code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta);
68,784✔
635

636
    if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
68,771✔
637
      parserWarn("stmt2 async bind don't find table:%s.%s, try auto create table", sname.dbname, sname.tname);
480✔
638
      return code;
480✔
639
    }
640

641
    if (TSDB_CODE_SUCCESS != code) {
68,291✔
642
      parserError("stmt2 async get table meta:%s.%s failed, code:%d", sname.dbname, sname.tname, code);
×
643
      return code;
×
644
    }
645

646
    *uid = pTableMeta->uid;
68,291✔
647
    *vgId = pTableMeta->vgId;
68,291✔
648
    *suid = pTableMeta->suid;
68,291✔
649

650
    STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId, .suid = *suid};
68,290✔
651
    code = tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo));
68,246✔
652
    if (TSDB_CODE_SUCCESS == code) {
68,304✔
653
      code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname);
68,304✔
654
    }
655

656
    taosMemoryFree(pTableMeta);
68,304✔
657
  } else {
658
    *uid = pTbInfo->uid;
2,997,767✔
659
    *vgId = pTbInfo->vgid;
2,998,360✔
660
    *suid = pTbInfo->suid;
2,998,824✔
661
  }
662

663
  return code;
3,067,077✔
664
}
665

666
int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
×
667
  int32_t             code = TSDB_CODE_SUCCESS;
×
668
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
×
669

670
  if (TSDB_CODE_SUCCESS == code) {
×
671
    code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
×
672
  }
673

674
  return code;
×
675
}
676

677
int32_t checkAndMergeSVgroupDataCxtByTbname(STableDataCxt* pTbCtx, SVgroupDataCxt* pVgCxt, SSHashObj* pTableNameHash,
1,074,954✔
678
                                            char* tbname) {
679
  if (NULL == pVgCxt->pData->aSubmitTbData) {
1,074,954✔
680
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
578,801✔
681
    if (NULL == pVgCxt->pData->aSubmitTbData) {
578,859✔
682
      return terrno;
×
683
    }
684
    if (pTbCtx->hasBlob) {
578,859✔
685
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
×
686
      if (pVgCxt->pData->aSubmitBlobData == NULL) {
×
687
        return terrno;
×
688
      }
689
    }
690
  }
691

692
  int32_t  code = TSDB_CODE_SUCCESS;
1,075,224✔
693
  SArray** rowP = NULL;
1,075,224✔
694

695
  rowP = (SArray**)tSimpleHashGet(pTableNameHash, tbname, strlen(tbname));
1,075,224✔
696

697
  if (rowP != NULL && *rowP != NULL) {
1,075,295✔
698
    for (int32_t j = 0; j < taosArrayGetSize(*rowP); ++j) {
×
699
      SRow* pRow = (SRow*)taosArrayGetP(pTbCtx->pData->aRowP, j);
×
700
      if (pRow) {
×
701
        if (NULL == taosArrayPush(*rowP, &pRow)) {
×
702
          return terrno;
×
703
        }
704
      }
705

706
      if (pTbCtx->hasBlob == 0) {
×
707
        code = tRowSort(*rowP);
×
708
        TAOS_CHECK_RETURN(code);
×
709

710
        code = tRowMerge(*rowP, pTbCtx->pSchema, 0);
×
711
        TAOS_CHECK_RETURN(code);
×
712
      } else {
713
        code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
714
        TAOS_CHECK_RETURN(code);
×
715

716
        code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
717
        TAOS_CHECK_RETURN(code);
×
718
      }
719
    }
720

721
    parserDebug("merge same uid data: %" PRId64 ", vgId:%d", pTbCtx->pData->uid, pVgCxt->vgId);
×
722

723
    if (pTbCtx->pData->pCreateTbReq != NULL) {
×
724
      tdDestroySVCreateTbReq(pTbCtx->pData->pCreateTbReq);
×
725
      taosMemoryFree(pTbCtx->pData->pCreateTbReq);
×
726
      pTbCtx->pData->pCreateTbReq = NULL;
×
727
    }
728
    return TSDB_CODE_SUCCESS;
×
729
  }
730

731
  if (pTbCtx->hasBlob == 0) {
1,075,295✔
732
    pTbCtx->pData->pBlobSet = NULL;  // if no blob, set it to NULL
1,075,240✔
733
  }
734

735
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTbCtx->pData)) {
2,150,529✔
736
    return terrno;
×
737
  }
738

739
  if (pTbCtx->hasBlob) {
1,075,276✔
740
    parserDebug("blob row transfer %p, pData %p, %s", pTbCtx->pData->pBlobSet, pTbCtx->pData, __func__);
×
741
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTbCtx->pData->pBlobSet)) {
×
742
      return terrno;
×
743
    }
744
    pTbCtx->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
×
745
  }
746

747
  code = tSimpleHashPut(pTableNameHash, tbname, strlen(tbname), &pTbCtx->pData->aRowP, sizeof(SArray*));
1,075,276✔
748

749
  if (code != TSDB_CODE_SUCCESS) {
1,075,233✔
750
    return code;
×
751
  }
752

753
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTbCtx->pMeta->uid, pVgCxt->vgId);
1,075,233✔
754

755
  return TSDB_CODE_SUCCESS;
1,075,119✔
756
}
757

758
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
1,990,640✔
759
                                  SStbInterlaceInfo* pBuildInfo) {
760
  int32_t  code = TSDB_CODE_SUCCESS;
1,990,640✔
761
  uint64_t uid;
1,975,654✔
762
  int32_t  vgId;
1,976,256✔
763
  uint64_t suid;
1,976,511✔
764

765
  pTbCtx->pData->aRowP = pTbData->aCol;
1,991,650✔
766

767
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId, &suid);
1,992,981✔
768
  if (TSDB_CODE_SUCCESS != code) {
1,992,221✔
769
    return code;
×
770
  }
771

772
  pTbCtx->pMeta->vgId = vgId;
1,992,221✔
773
  pTbCtx->pMeta->uid = uid;
1,992,415✔
774
  pTbCtx->pData->uid = uid;
1,992,527✔
775

776
  if (!pTbCtx->ordered) {
1,992,425✔
777
    code = tRowSort(pTbCtx->pData->aRowP);
12✔
778
  }
779
  if (code == TSDB_CODE_SUCCESS && (!pTbCtx->ordered || pTbCtx->duplicateTs)) {
1,991,206✔
780
    code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0);
12✔
781
  }
782

783
  if (TSDB_CODE_SUCCESS != code) {
1,991,974✔
784
    return code;
×
785
  }
786

787
  SVgroupDataCxt* pVgCxt = NULL;
1,991,974✔
788
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
1,991,362✔
789
  if (NULL == pp) {
1,991,450✔
790
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
337,323✔
791
    if (NULL == pp) {
337,415✔
792
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
337,415✔
793
    } else {
794
      pVgCxt = *(SVgroupDataCxt**)pp;
×
795
    }
796
  } else {
797
    pVgCxt = *(SVgroupDataCxt**)pp;
1,654,127✔
798
  }
799

800
  if (TSDB_CODE_SUCCESS == code) {
1,991,109✔
801
    code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false);
1,991,109✔
802
  }
803

804
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
1,991,347✔
805
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
806
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
807
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
808
    // insDestroyVgroupDataCxt(pVgCxt);
809
  }
810

811
  return code;
1,990,544✔
812
}
813

814
int32_t insAppendStmt2TableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
1,074,974✔
815
                                   SStbInterlaceInfo* pBuildInfo, SVCreateTbReq* ctbReq) {
816
  int32_t  code = TSDB_CODE_SUCCESS;
1,074,974✔
817
  uint64_t uid;
314,429✔
818
  int32_t  vgId;
314,429✔
819
  uint64_t suid;
314,429✔
820

821
  pTbCtx->pData->aRowP = pTbData->aCol;
1,074,974✔
822
  pTbCtx->pData->pBlobSet = pTbData->pBlobSet;
1,075,142✔
823

824
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId, &suid);
1,075,140✔
825
  if (ctbReq != NULL && code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
1,075,162✔
826
    pTbCtx->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
480✔
827
    vgId = (int32_t)ctbReq->uid;
480✔
828
    uid = 0;
480✔
829
    pTbCtx->pMeta->vgId = (int32_t)ctbReq->uid;
480✔
830
    ctbReq->uid = 0;
480✔
831
    pTbCtx->pMeta->uid = 0;
480✔
832
    pTbCtx->pData->uid = 0;
480✔
833
    pTbCtx->pData->pCreateTbReq = ctbReq;
480✔
834
    code = TSDB_CODE_SUCCESS;
480✔
835
  } else {
836
    if (TSDB_CODE_SUCCESS != code) {
1,074,682✔
837
      return code;
×
838
    }
839
    if (pTbCtx->pData->suid != suid) {
1,074,682✔
840
      return TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
841
    }
842

843
    pTbCtx->pMeta->vgId = vgId;
1,074,768✔
844
    pTbCtx->pMeta->uid = uid;
1,074,768✔
845
    pTbCtx->pData->uid = uid;
1,074,894✔
846
    pTbCtx->pData->pCreateTbReq = NULL;
1,074,852✔
847

848
    if (ctbReq != NULL) {
1,074,810✔
849
      tdDestroySVCreateTbReq(ctbReq);
850
      taosMemoryFree(ctbReq);
636,774✔
851
      ctbReq = NULL;
636,747✔
852
    }
853
  }
854

855
  if (pTbCtx->hasBlob == 0) {
1,075,342✔
856
    if (!pTbData->isOrdered) {
1,075,276✔
857
      code = tRowSort(pTbCtx->pData->aRowP);
×
858
    }
859
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
1,075,192✔
860
      code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, PREFER_NON_NULL);
14✔
861
    }
862
  } else {
863
    if (!pTbData->isOrdered) {
66✔
864
      code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
865
    }
866
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
66✔
867
      code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
868
    }
869
  }
870

871
  if (TSDB_CODE_SUCCESS != code) {
1,075,068✔
872
    return code;
×
873
  }
874

875
  SVgroupDataCxt* pVgCxt = NULL;
1,075,068✔
876
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
1,075,110✔
877
  if (NULL == pp) {
1,075,208✔
878
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
578,734✔
879
    if (NULL == pp) {
578,845✔
880
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
578,845✔
881
    } else {
882
      pVgCxt = *(SVgroupDataCxt**)pp;
×
883
    }
884
  } else {
885
    pVgCxt = *(SVgroupDataCxt**)pp;
496,474✔
886
  }
887

888
  if (code == TSDB_CODE_SUCCESS) {
1,075,234✔
889
    code = checkAndMergeSVgroupDataCxtByTbname(pTbCtx, pVgCxt, pBuildInfo->pTableRowDataHash, pTbData->tbName);
1,075,193✔
890
  }
891

892
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
1,075,160✔
893
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
894
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
895
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
896
    // insDestroyVgroupDataCxt(pVgCxt);
897
  }
898

899
  return code;
1,075,169✔
900
}
901

902
/*
903
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild,
904
int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
905
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
906
  if (NULL == pVgroupHash || NULL == pVgroupList) {
907
    taosHashCleanup(pVgroupHash);
908
    taosArrayDestroy(pVgroupList);
909
    return TSDB_CODE_OUT_OF_MEMORY;
910
  }
911

912
  int32_t code = TSDB_CODE_SUCCESS;
913

914
  for (int32_t i = 0; i < tbNum; ++i) {
915
    STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i);
916
    pTableCxt->pMeta->vgId = pTableCols->vgId;
917
    pTableCxt->pMeta->uid = pTableCols->uid;
918
    pTableCxt->pData->uid = pTableCols->uid;
919
    pTableCxt->pData->aCol = pTableCols->aCol;
920

921
    SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
922
    if (pCol->nVal <= 0) {
923
      continue;
924
    }
925

926
    if (pTableCxt->pData->pCreateTbReq) {
927
      pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
928
    }
929

930
    taosArraySort(pTableCxt->pData->aCol, insColDataComp);
931

932
    tColDataSortMerge(pTableCxt->pData->aCol);
933

934
    if (TSDB_CODE_SUCCESS == code) {
935
      SVgroupDataCxt* pVgCxt = NULL;
936
      int32_t         vgId = pTableCxt->pMeta->vgId;
937
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
938
      if (NULL == pp) {
939
        code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
940
      } else {
941
        pVgCxt = *(SVgroupDataCxt**)pp;
942
      }
943
      if (TSDB_CODE_SUCCESS == code) {
944
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false);
945
      }
946
    }
947
  }
948

949
  taosHashCleanup(pVgroupHash);
950
  if (TSDB_CODE_SUCCESS == code) {
951
    *pVgDataBlocks = pVgroupList;
952
  } else {
953
    insDestroyVgroupDataCxtList(pVgroupList);
954
  }
955

956
  return code;
957
}
958
*/
959

960
static int8_t colDataHasBlob(SColData* pCol) {
×
961
  if (IS_STR_DATA_BLOB(pCol->type)) {
×
962
    return 1;
×
963
  }
964
  return 0;
×
965
}
966
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
521,762,799✔
967
  SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
521,762,799✔
968
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
521,769,183✔
969
  if (NULL == pVgroupHash || NULL == pVgroupList) {
521,771,198✔
970
    taosHashCleanup(pVgroupHash);
×
971
    taosArrayDestroy(pVgroupList);
×
972
    return terrno;
×
973
  }
974

975
  int32_t code = TSDB_CODE_SUCCESS;
521,771,533✔
976
  bool    colFormat = false;
521,771,533✔
977

978
  void* p = taosHashIterate(pTableHash, NULL);
521,771,533✔
979
  if (p) {
521,771,928✔
980
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
521,771,928✔
981
    colFormat = (0 != (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT));
521,772,026✔
982
  }
983

984
  while (TSDB_CODE_SUCCESS == code && NULL != p) {
1,082,580,516✔
985
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
560,807,178✔
986
    if (colFormat) {
560,807,497✔
987
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
3,342,342✔
988
      if (pCol && pCol->nVal <= 0) {
3,342,342✔
989
        p = taosHashIterate(pTableHash, p);
126✔
990
        continue;
126✔
991
      }
992

993
      if (pTableCxt->pData->pCreateTbReq) {
3,341,897✔
994
        pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
3,205,114✔
995
      }
996
      int8_t isBlob = IS_STR_DATA_BLOB(pCol->type) ? 1 : 0;
3,342,216✔
997
      if (isBlob == 0) {
3,341,259✔
998
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
3,341,578✔
999
        code = tColDataSortMerge(&pTableCxt->pData->aCol);
3,342,216✔
1000
      } else {
1001
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
×
1002
        code = tColDataSortMergeWithBlob(&pTableCxt->pData->aCol, pTableCxt->pData->pBlobSet);
×
1003
      }
1004
    } else {
1005
      // skip the table has no data to insert
1006
      // eg: import a csv without valid data
1007
      // if (0 == taosArrayGetSize(pTableCxt->pData->aRowP)) {
1008
      //   parserWarn("no row in tableDataCxt uid:%" PRId64 " ", pTableCxt->pMeta->uid);
1009
      //   p = taosHashIterate(pTableHash, p);
1010
      //   continue;
1011
      // }
1012
      if (pTableCxt->hasBlob == 0) {
557,465,155✔
1013
        if (!pTableCxt->ordered) {
557,444,374✔
1014
          code = tRowSort(pTableCxt->pData->aRowP);
1,348,343✔
1015
        }
1016
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
557,443,602✔
1017
          code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, PREFER_NON_NULL);
1,476,518✔
1018
        }
1019
      } else {
1020
        if (!pTableCxt->ordered) {
21,167✔
1021
          code = tRowSortWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet);
727✔
1022
        }
1023
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
21,167✔
1024
          code = tRowMergeWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet, 0);
727✔
1025
        }
1026
      }
1027
    }
1028

1029
    if (TSDB_CODE_SUCCESS == code) {
560,805,829✔
1030
      SVgroupDataCxt* pVgCxt = NULL;
560,805,829✔
1031
      int32_t         vgId = pTableCxt->pMeta->vgId;
560,805,829✔
1032
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
560,806,985✔
1033
      if (NULL == pp) {
560,807,202✔
1034
        code = createVgroupDataCxt(vgId, pVgroupHash, pVgroupList, &pVgCxt);
535,568,080✔
1035
      } else {
1036
        pVgCxt = *(SVgroupDataCxt**)pp;
25,239,122✔
1037
      }
1038
      if (TSDB_CODE_SUCCESS == code) {
560,806,643✔
1039
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true);
560,806,994✔
1040
      }
1041
    }
1042
    if (TSDB_CODE_SUCCESS == code) {
560,805,976✔
1043
      p = taosHashIterate(pTableHash, p);
560,805,976✔
1044
    }
1045
  }
1046

1047
  taosHashCleanup(pVgroupHash);
521,773,338✔
1048
  if (TSDB_CODE_SUCCESS == code) {
521,768,726✔
1049
    *pVgDataBlocks = pVgroupList;
521,769,710✔
1050
  } else {
1051
    insDestroyVgroupDataCxtList(pVgroupList);
26✔
1052
  }
1053

1054
  return code;
521,768,812✔
1055
}
1056

1057
static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
536,477,251✔
1058
  int32_t  code = TSDB_CODE_SUCCESS;
536,477,251✔
1059
  uint32_t len = 0;
536,477,251✔
1060
  void*    pBuf = NULL;
536,477,251✔
1061
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
536,477,251✔
1062
  if (TSDB_CODE_SUCCESS == code) {
536,476,113✔
1063
    SEncoder encoder;
533,712,185✔
1064
    len += sizeof(SSubmitReq2Msg);
536,481,293✔
1065
    pBuf = taosMemoryMalloc(len);
536,481,293✔
1066
    if (NULL == pBuf) {
536,476,742✔
1067
      return terrno;
×
1068
    }
1069
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
536,476,742✔
1070
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
536,477,091✔
1071
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
536,481,017✔
1072
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
536,480,840✔
1073
    code = tEncodeSubmitReq(&encoder, pReq);
536,483,429✔
1074
    tEncoderClear(&encoder);
536,483,501✔
1075
  }
1076

1077
  if (TSDB_CODE_SUCCESS == code) {
536,479,192✔
1078
    *pData = pBuf;
536,479,192✔
1079
    *pLen = len;
536,479,642✔
1080
  } else {
1081
    taosMemoryFree(pBuf);
×
1082
  }
1083
  return code;
536,472,844✔
1084
}
1085

1086
static void destroyVgDataBlocks(void* p) {
×
1087
  if (p == NULL) return;
×
1088
  SVgDataBlocks* pVg = p;
×
1089
  taosMemoryFree(pVg->pData);
×
1090
  taosMemoryFree(pVg);
×
1091
}
1092

1093
int32_t insResetBlob(SSubmitReq2* p) {
536,477,800✔
1094
  int32_t code = 0;
536,477,800✔
1095
  if (p->raw) {
536,477,800✔
1096
    return TSDB_CODE_SUCCESS;  // no blob data in raw mode
×
1097
  }
1098

1099
  if (p->aSubmitBlobData != NULL) {
536,480,929✔
1100
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
42,334✔
1101
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
21,167✔
1102
      SBlobSet**     ppBlob = taosArrayGet(p->aSubmitBlobData, i);
21,167✔
1103
      SBlobSet*      pBlob = ppBlob ? *ppBlob : NULL;
21,167✔
1104
      int32_t        nrow = taosArrayGetSize(pSubmitTbData->aRowP);
21,167✔
1105
      int32_t        nblob = 0;
21,167✔
1106
      if (nrow > 0 && pBlob) {
21,167✔
1107
        nblob = taosArrayGetSize(pBlob->pSeqTable);
21,167✔
1108
      }
1109
      uTrace("blob %p row size %d, pData size %d", pBlob, nblob, nrow);
21,167✔
1110
      pSubmitTbData->pBlobSet = pBlob;
21,167✔
1111
      if (ppBlob != NULL) *ppBlob = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
21,167✔
1112
    }
1113
  } else {
1114
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
1,100,310,447✔
1115
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
563,848,426✔
1116
      pSubmitTbData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
563,853,451✔
1117
    }
1118
  }
1119

1120
  return code;
536,483,661✔
1121
}
1122
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) {
522,608,547✔
1123
  size_t  numOfVg = taosArrayGetSize(pVgDataCxtList);
522,608,547✔
1124
  SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES);
522,611,388✔
1125
  if (NULL == pDataBlocks) {
522,610,933✔
1126
    return TSDB_CODE_OUT_OF_MEMORY;
×
1127
  }
1128

1129
  int32_t code = TSDB_CODE_SUCCESS;
522,610,933✔
1130
  for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
1,059,088,726✔
1131
    SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
536,480,163✔
1132
    if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) {
536,481,338✔
1133
      continue;
×
1134
    }
1135
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
536,482,899✔
1136
    if (NULL == dst) {
536,478,900✔
1137
      code = terrno;
×
1138
    }
1139

1140
    if (TSDB_CODE_SUCCESS == code) {
536,478,900✔
1141
      dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
536,479,574✔
1142
      code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
536,480,880✔
1143
    }
1144
    if (TSDB_CODE_SUCCESS == code) {
536,483,280✔
1145
      code = insResetBlob(src->pData);
536,483,280✔
1146
    }
1147

1148
    if (TSDB_CODE_SUCCESS == code) {
536,483,308✔
1149
      code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
536,483,308✔
1150
    }
1151
    if (TSDB_CODE_SUCCESS == code) {
536,472,547✔
1152
      code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);
536,477,962✔
1153
    }
1154
    if (TSDB_CODE_SUCCESS != code) {
536,477,670✔
1155
      destroyVgDataBlocks(dst);
×
1156
    }
1157
  }
1158

1159
  if (append) {
522,608,563✔
1160
    if (NULL == *pVgDataBlocks) {
841,241✔
1161
      *pVgDataBlocks = pDataBlocks;
841,367✔
1162
    }
1163
    return code;
841,375✔
1164
  }
1165

1166
  if (TSDB_CODE_SUCCESS == code) {
521,767,322✔
1167
    *pVgDataBlocks = pDataBlocks;
521,760,301✔
1168
  } else {
1169
    taosArrayDestroyP(pDataBlocks, destroyVgDataBlocks);
7,085✔
1170
  }
1171

1172
  return code;
521,767,770✔
1173
}
1174

1175
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
×
1176
  for (int i = 0; i < numFields; i++) {
×
1177
    if (strcmp(pSchema->name, fields[i].name) == 0) {
×
1178
      return true;
×
1179
    }
1180
  }
1181

1182
  return false;
×
1183
}
1184

1185
int32_t checkSchema(SSchema* pColSchema, SSchemaExt* pColExtSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
159,903✔
1186
  if (*fields != pColSchema->type) {
159,903✔
1187
    if (errstr != NULL) {
263✔
1188
      snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
×
1189
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
×
1190
    } else {
1191
      char buf[512] = {0};
263✔
1192
      snprintf(buf, sizeof(buf), "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
526✔
1193
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
526✔
1194
      uError("checkSchema %s", buf);
263✔
1195
    }
1196
    return TSDB_CODE_INVALID_PARA;
263✔
1197
  }
1198

1199
  if (IS_DECIMAL_TYPE(pColSchema->type)) {
159,640✔
1200
    uint8_t precision = 0, scale = 0;
263✔
1201
    decimalFromTypeMod(pColExtSchema->typeMod, &precision, &scale);
263✔
1202
    uint8_t precisionData = 0, scaleData = 0;
263✔
1203
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
263✔
1204
    extractDecimalTypeInfoFromBytes(&bytes, &precisionData, &scaleData);
263✔
1205
    if (precision != precisionData || scale != scaleData) {
263✔
1206
      if (errstr != NULL) {
×
1207
        snprintf(errstr, errstrLen,
×
1208
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
1209
                 "precision:%d, scale:%d",
1210
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
1211
                 precisionData, scaleData);
1212
        return TSDB_CODE_INVALID_PARA;
×
1213
      } else {
1214
        char buf[512] = {0};
×
1215
        snprintf(buf, sizeof(buf),
×
1216
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
1217
                 "precision:%d, scale:%d",
1218
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
1219
                 precisionData, scaleData);
1220
        uError("checkSchema %s", buf);
×
1221
        return TSDB_CODE_INVALID_PARA;
×
1222
      }
1223
    }
1224
    return 0;
263✔
1225
  }
1226

1227
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {
159,377✔
1228
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
27,904✔
1229
    if (IS_STR_DATA_BLOB(pColSchema->type)) {
27,904✔
1230
      if (bytes >= TSDB_MAX_BLOB_LEN) {
×
1231
        uError("column blob data bytes exceed max limit, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
×
1232
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name, bytes);
1233
        return TSDB_CODE_INVALID_PARA;
×
1234
      }
1235
    } else {
1236
      if (bytes > pColSchema->bytes) {
27,904✔
1237
        if (errstr != NULL) {
263✔
1238
          snprintf(errstr, errstrLen,
×
1239
                   "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1240
                   pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1241
                   *(int32_t*)(fields + sizeof(int8_t)));
×
1242
        } else {
1243
          char buf[512] = {0};
263✔
1244
          snprintf(buf, sizeof(buf),
526✔
1245
                   "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1246
                   pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
789✔
1247
                   *(int32_t*)(fields + sizeof(int8_t)));
263✔
1248
          uError("checkSchema %s", buf);
263✔
1249
        }
1250
        return TSDB_CODE_INVALID_PARA;
263✔
1251
      }
1252
    }
1253
  }
1254

1255
  if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
159,114✔
1256
    if (errstr != NULL) {
×
1257
      snprintf(errstr, errstrLen,
×
1258
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1259
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1260
               *(int32_t*)(fields + sizeof(int8_t)));
×
1261
    } else {
1262
      char buf[512] = {0};
×
1263
      snprintf(buf, sizeof(buf),
×
1264
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1265
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1266
               *(int32_t*)(fields + sizeof(int8_t)));
×
1267
      uError("checkSchema %s", buf);
×
1268
    }
1269
    return TSDB_CODE_INVALID_PARA;
×
1270
  }
1271
  return 0;
159,114✔
1272
}
1273

1274
#define PRCESS_DATA(i, j)                                                                                          \
1275
  ret = checkSchema(pColSchema, pColExtSchema, fields, errstr, errstrLen);                                         \
1276
  if (ret != 0) {                                                                                                  \
1277
    goto end;                                                                                                      \
1278
  }                                                                                                                \
1279
                                                                                                                   \
1280
  if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {                                                          \
1281
    hasTs = true;                                                                                                  \
1282
  }                                                                                                                \
1283
                                                                                                                   \
1284
  int8_t* offset = pStart;                                                                                         \
1285
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {                                                                        \
1286
    pStart += numOfRows * sizeof(int32_t);                                                                         \
1287
  } else {                                                                                                         \
1288
    pStart += BitmapLen(numOfRows);                                                                                \
1289
  }                                                                                                                \
1290
  char* pData = pStart;                                                                                            \
1291
                                                                                                                   \
1292
  SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);                                                        \
1293
  if (hasBlob) {                                                                                                   \
1294
    ret = tColDataAddValueByDataBlockWithBlob(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData, \
1295
                                              pBlobSet);                                                           \
1296
  } else {                                                                                                         \
1297
    ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);        \
1298
  }                                                                                                                \
1299
  if (ret != 0) {                                                                                                  \
1300
    goto end;                                                                                                      \
1301
  }                                                                                                                \
1302
  fields += sizeof(int8_t) + sizeof(int32_t);                                                                      \
1303
  if (needChangeLength && version == BLOCK_VERSION_1) {                                                            \
1304
    pStart += htonl(colLength[i]);                                                                                 \
1305
  } else {                                                                                                         \
1306
    pStart += colLength[i];                                                                                        \
1307
  }                                                                                                                \
1308
  boundInfo->pColIndex[j] = -1;
1309

1310
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
34,703✔
1311
                     int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
1312
  int       ret = 0;
34,703✔
1313
  int8_t    hasBlob = 0;
34,703✔
1314
  SBlobSet* pBlobSet = NULL;
34,703✔
1315
  if (data == NULL) {
34,703✔
1316
    uError("rawBlockBindData, data is NULL");
×
1317
    return TSDB_CODE_APP_ERROR;
×
1318
  }
1319
  void* tmp =
1320
      taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
34,703✔
1321
  SVCreateTbReq* pCreateReqTmp = NULL;
34,703✔
1322
  if (tmp == NULL && pCreateTb != NULL) {
34,703✔
1323
    ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
3,320✔
1324
    if (ret != TSDB_CODE_SUCCESS) {
3,320✔
1325
      uError("cloneSVreateTbReq error");
×
1326
      goto end;
×
1327
    }
1328
  }
1329

1330
  STableDataCxt* pTableCxt = NULL;
34,703✔
1331
  ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
34,703✔
1332
                           sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
1333
  if (pCreateReqTmp != NULL) {
34,703✔
1334
    tdDestroySVCreateTbReq(pCreateReqTmp);
×
1335
    taosMemoryFree(pCreateReqTmp);
×
1336
  }
1337

1338
  hasBlob = pTableCxt->hasBlob;
34,703✔
1339
  if (hasBlob && pTableCxt->pData->pBlobSet == NULL) {
34,703✔
1340
    ret = tBlobSetCreate(512, 0, &pTableCxt->pData->pBlobSet);
×
1341
    if (pTableCxt->pData->pBlobSet == NULL) {
×
1342
      uError("create blob set failed");
×
1343
      ret = terrno;
×
1344
    }
1345
  }
1346

1347
  if (ret != TSDB_CODE_SUCCESS) {
34,703✔
1348
    uError("insGetTableDataCxt error");
×
1349
    goto end;
×
1350
  }
1351
  pBlobSet = pTableCxt->pData->pBlobSet;
34,703✔
1352

1353
  pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
34,703✔
1354
  if (tmp == NULL) {
34,703✔
1355
    ret = initTableColSubmitData(pTableCxt);
31,107✔
1356
    if (ret != TSDB_CODE_SUCCESS) {
31,107✔
1357
      uError("initTableColSubmitData error");
×
1358
      goto end;
×
1359
    }
1360
  }
1361

1362
  char* p = (char*)data;
34,703✔
1363
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1364
  // column length |
1365
  int32_t version = *(int32_t*)data;
34,703✔
1366
  p += sizeof(int32_t);
34,703✔
1367
  p += sizeof(int32_t);
34,703✔
1368

1369
  int32_t numOfRows = *(int32_t*)p;
34,703✔
1370
  p += sizeof(int32_t);
34,703✔
1371

1372
  int32_t numOfCols = *(int32_t*)p;
34,703✔
1373
  p += sizeof(int32_t);
34,703✔
1374

1375
  p += sizeof(int32_t);
34,703✔
1376
  p += sizeof(uint64_t);
34,703✔
1377

1378
  int8_t* fields = (int8_t*)p;
34,703✔
1379
  if (*fields >= TSDB_DATA_TYPE_MAX || *fields < 0) {
34,703✔
1380
    uError("fields type error:%d", *fields);
×
1381
    ret = TSDB_CODE_INVALID_PARA;
×
1382
    goto end;
×
1383
  }
1384
  p += numOfCols * (sizeof(int8_t) + sizeof(int32_t));
34,703✔
1385

1386
  int32_t* colLength = (int32_t*)p;
34,703✔
1387
  p += sizeof(int32_t) * numOfCols;
34,703✔
1388

1389
  char* pStart = p;
34,703✔
1390

1391
  SSchema*       pSchema = getTableColumnSchema(pTableMeta);
34,703✔
1392
  SSchemaExt*    pExtSchemas = getTableColumnExtSchema(pTableMeta);
34,703✔
1393
  SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
34,703✔
1394

1395
  if (tFields != NULL && numFields != numOfCols) {
34,703✔
1396
    if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to data cols:%d", numFields, numOfCols);
×
1397
    ret = TSDB_CODE_INVALID_PARA;
×
1398
    goto end;
×
1399
  }
1400

1401
  bool hasTs = false;
34,703✔
1402
  if (tFields == NULL) {
34,703✔
1403
    int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
1,578✔
1404
    for (int j = 0; j < len; j++) {
4,734✔
1405
      SSchema*    pColSchema = &pSchema[j];
3,682✔
1406
      SSchemaExt* pColExtSchema = &pExtSchemas[j];
3,682✔
1407
      PRCESS_DATA(j, j)
3,682✔
1408
    }
1409
  } else {
1410
    for (int i = 0; i < numFields; i++) {
178,826✔
1411
      for (int j = 0; j < boundInfo->numOfBound; j++) {
1,345,734✔
1412
        SSchema*    pColSchema = &pSchema[j];
1,345,734✔
1413
        SSchemaExt* pColExtSchema = &pExtSchemas[j];
1,345,734✔
1414
        char*       fieldName = NULL;
1,345,734✔
1415
        if (raw) {
1,345,734✔
1416
          fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
1,344,419✔
1417
        } else {
1418
          fieldName = ((TAOS_FIELD*)tFields)[i].name;
1,315✔
1419
        }
1420
        if (strcmp(pColSchema->name, fieldName) == 0) {
1,345,734✔
1421
          PRCESS_DATA(i, j)
145,701✔
1422
          break;
145,701✔
1423
        }
1424
      }
1425
    }
1426
  }
1427

1428
  if (!hasTs) {
34,177✔
1429
    if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
×
1430
    ret = TSDB_CODE_INVALID_PARA;
×
1431
    goto end;
×
1432
  }
1433

1434
  // process NULL data
1435
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
186,308✔
1436
    if (boundInfo->pColIndex[c] != -1) {
152,131✔
1437
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
3,537✔
1438
      ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
3,537✔
1439
      if (ret != 0) {
3,537✔
1440
        goto end;
×
1441
      }
1442
    } else {
1443
      boundInfo->pColIndex[c] = c;  // restore for next block
148,594✔
1444
    }
1445
  }
1446

1447
end:
34,703✔
1448
  return ret;
34,703✔
1449
}
1450

1451
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data) {
×
1452
  int code = transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion);
×
1453
  if (code != 0) {
×
1454
    return code;
×
1455
  }
1456
  SVgroupDataCxt* pVgCxt = NULL;
×
1457
  void**          pp = taosHashGet(pVgroupHash, &pTableMeta->vgId, sizeof(pTableMeta->vgId));
×
1458
  if (NULL == pp) {
×
1459
    code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt);
×
1460
    if (code != 0) {
×
1461
      return code;
×
1462
    }
1463
  } else {
1464
    pVgCxt = *(SVgroupDataCxt**)pp;
×
1465
  }
1466
  if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1467
    pVgCxt->pData->aSubmitTbData = taosArrayInit(0, POINTER_BYTES);
×
1468
    pVgCxt->pData->raw = true;
×
1469
    if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1470
      return terrno;
×
1471
    }
1472
  }
1473

1474
  // push data to submit, rebuild empty data for next submit
1475
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, &data)) {
×
1476
    return terrno;
×
1477
  }
1478

1479
  uTrace("add raw data to vgId:%d, len:%d", pTableMeta->vgId, *(int32_t*)data);
×
1480

1481
  return 0;
×
1482
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc