• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4676

18 Aug 2025 07:58AM UTC coverage: 60.007% (+0.1%) from 59.866%
#4676

push

travis-ci

web-flow
test: update case desc (#32551)

137637 of 292075 branches covered (47.12%)

Branch coverage included in aggregate %.

208283 of 284395 relevant lines covered (73.24%)

14871103.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.14
/source/libs/parser/src/parInsertUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "parInsertUtil.h"
17

18
#include "catalog.h"
19
#include "parInt.h"
20
#include "parUtil.h"
21
#include "querynodes.h"
22
#include "tRealloc.h"
23
#include "taoserror.h"
24
#include "tarray.h"
25
#include "tdatablock.h"
26
#include "tdataformat.h"
27
#include "tmisce.h"
28
#include "ttypes.h"
29

30
void qDestroyBoundColInfo(void* pInfo) {
21,140✔
31
  if (NULL == pInfo) {
21,140✔
32
    return;
10,175✔
33
  }
34

35
  SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
10,965✔
36

37
  taosMemoryFreeClear(pBoundInfo->pColIndex);
10,965!
38
}
39

40
static char* tableNameGetPosition(SToken* pToken, char target) {
8,282,029✔
41
  bool inEscape = false;
8,282,029✔
42
  bool inQuote = false;
8,282,029✔
43
  char quotaStr = 0;
8,282,029✔
44

45
  for (uint32_t i = 0; i < pToken->n; ++i) {
98,282,803✔
46
    if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
97,946,207!
47
      return pToken->z + i;
7,945,433✔
48
    }
49

50
    if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
90,000,774✔
51
      if (!inQuote) {
66,780!
52
        inEscape = !inEscape;
66,791✔
53
      }
54
    }
55

56
    if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
90,000,774!
57
      if (!inEscape) {
29,007✔
58
        if (!inQuote) {
24,050✔
59
          quotaStr = *(pToken->z + i);
12,025✔
60
          inQuote = !inQuote;
12,025✔
61
        } else if (quotaStr == *(pToken->z + i)) {
12,025!
62
          inQuote = !inQuote;
12,025✔
63
        }
64
      }
65
    }
66
  }
67

68
  return NULL;
336,596✔
69
}
70

71
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
8,282,496✔
72
  const char* msg1 = "name too long";
8,282,496✔
73
  const char* msg2 = "invalid database name";
8,282,496✔
74
  const char* msg3 = "db is not specified";
8,282,496✔
75
  const char* msg4 = "invalid table name";
8,282,496✔
76

77
  int32_t code = TSDB_CODE_SUCCESS;
8,282,496✔
78
  char*   p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
8,282,496✔
79

80
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
8,287,163✔
81
    int32_t dbLen = p - pTableName->z;
7,944,525✔
82
    if (dbLen <= 0) {
7,944,525!
83
      return buildInvalidOperationMsg(pMsgBuf, msg2);
×
84
    }
85
    char name[TSDB_DB_FNAME_LEN] = {0};
7,944,525✔
86
    strncpy(name, pTableName->z, dbLen);
7,944,525✔
87
    int32_t actualDbLen = strdequote(name);
7,944,525✔
88

89
    code = tNameSetDbName(pName, acctId, name, actualDbLen);
7,944,666✔
90
    if (code != TSDB_CODE_SUCCESS) {
7,931,083!
91
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
92
    }
93

94
    int32_t tbLen = pTableName->n - dbLen - 1;
7,931,083✔
95
    if (tbLen <= 0) {
7,931,083!
96
      return buildInvalidOperationMsg(pMsgBuf, msg4);
×
97
    }
98

99
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
7,931,083✔
100
    strncpy(tbname, p + 1, tbLen);
7,931,083✔
101
    /*tbLen = */ (void)strdequote(tbname);
7,931,083✔
102

103
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
7,946,285✔
104
    if (code != 0) {
7,933,851!
105
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
106
    }
107
  } else {  // get current DB name first, and then set it into path
108
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
342,638✔
109
    strncpy(tbname, pTableName->z, pTableName->n);
342,638✔
110
    int32_t tbLen = strdequote(tbname);
342,638✔
111
    if (tbLen >= TSDB_TABLE_NAME_LEN) {
342,747✔
112
      return buildInvalidOperationMsg(pMsgBuf, msg1);
14✔
113
    }
114
    if (tbLen == 0) {
342,742!
115
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "invalid table name");
×
116
    }
117

118
    char name[TSDB_TABLE_FNAME_LEN] = {0};
342,742✔
119
    strncpy(name, pTableName->z, pTableName->n);
342,742✔
120
    (void)strdequote(name);
342,742✔
121

122
    if (dbName == NULL) {
342,754✔
123
      return buildInvalidOperationMsg(pMsgBuf, msg3);
9✔
124
    }
125
    if (name[0] == '\0') return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
342,745!
126

127
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
342,745✔
128
    if (code != TSDB_CODE_SUCCESS) {
342,732!
129
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
×
130
      return code;
×
131
    }
132

133
    code = tNameFromString(pName, name, T_NAME_TABLE);
342,732✔
134
    if (code != 0) {
342,729!
135
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
×
136
    }
137
  }
138

139
  if (NULL != strchr(pName->tname, '.')) {
8,276,582!
140
    code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'");
×
141
  }
142

143
  return code;
8,276,509✔
144
}
145

146
int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchema) {
367,843✔
147
  while (start < end) {
372,221✔
148
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
371,838✔
149
      return start;
367,460✔
150
    }
151
    ++start;
4,378✔
152
  }
153
  return -1;
383✔
154
}
155

156
int32_t insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
14,477✔
157
                            SArray* tagName, uint8_t tagNum, int32_t ttl) {
158
  pTbReq->type = TD_CHILD_TABLE;
14,477✔
159
  pTbReq->ctb.pTag = (uint8_t*)pTag;
14,477✔
160
  pTbReq->name = taosStrdup(tname);
14,477!
161
  if (!pTbReq->name) return terrno;
14,491!
162
  pTbReq->ctb.suid = suid;
14,491✔
163
  pTbReq->ctb.tagNum = tagNum;
14,491✔
164
  if (sname) {
14,491✔
165
    pTbReq->ctb.stbName = taosStrdup(sname);
10,615!
166
    if (!pTbReq->ctb.stbName) return terrno;
10,625!
167
  }
168
  pTbReq->ctb.tagName = taosArrayDup(tagName, NULL);
14,501✔
169
  if (!pTbReq->ctb.tagName) return terrno;
14,500✔
170
  pTbReq->ttl = ttl;
14,491✔
171
  pTbReq->commentLen = -1;
14,491✔
172

173
  return TSDB_CODE_SUCCESS;
14,491✔
174
}
175

176
static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
×
177
  for (int32_t i = 0; i < ncols; ++i) {
×
178
    pBoundCols[i] = i;
×
179
  }
180
}
×
181

182
static int32_t initColValues(STableMeta* pTableMeta, SArray* pValues) {
8,247,620✔
183
  SSchema* pSchemas = getTableColumnSchema(pTableMeta);
8,247,620✔
184
  int32_t  code = 0;
8,251,519✔
185
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
62,594,833✔
186
    SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
54,345,106✔
187
    if (NULL == taosArrayPush(pValues, &val)) {
54,343,314!
188
      code = terrno;
×
189
      break;
×
190
    }
191
  }
192
  return code;
8,249,727✔
193
}
194

195
int32_t insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { return initColValues(pTableMeta, aColValues); }
×
196

197
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
8,260,374✔
198
  pInfo->numOfCols = numOfBound;
8,260,374✔
199
  pInfo->numOfBound = numOfBound;
8,260,374✔
200
  pInfo->hasBoundCols = false;
8,260,374✔
201
  pInfo->mixTagsCols = false;
8,260,374✔
202
  pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t));
8,260,374!
203
  if (NULL == pInfo->pColIndex) {
8,279,606!
204
    return terrno;
×
205
  }
206
  for (int32_t i = 0; i < numOfBound; ++i) {
63,220,064✔
207
    pInfo->pColIndex[i] = i;
54,940,458✔
208
  }
209
  return TSDB_CODE_SUCCESS;
8,279,606✔
210
}
211

212
void insResetBoundColsInfo(SBoundColInfo* pInfo) {
×
213
  pInfo->numOfBound = pInfo->numOfCols;
×
214
  pInfo->hasBoundCols = false;
×
215
  pInfo->mixTagsCols = false;
×
216
  for (int32_t i = 0; i < pInfo->numOfCols; ++i) {
×
217
    pInfo->pColIndex[i] = i;
×
218
  }
219
}
×
220

221
void insCheckTableDataOrder(STableDataCxt* pTableCxt, SRowKey* rowKey) {
283,703,914✔
222
  // once the data block is disordered, we do NOT keep last timestamp any more
223
  if (!pTableCxt->ordered) {
283,703,914✔
224
    return;
40,015,023✔
225
  }
226

227
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) < 0) {
243,688,891✔
228
    pTableCxt->ordered = false;
37,480✔
229
  }
230

231
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) == 0) {
244,129,444✔
232
    pTableCxt->duplicateTs = true;
41✔
233
  }
234

235
  // TODO: for variable length data type, we need to copy it out
236
  pTableCxt->lastKey = *rowKey;
244,164,248✔
237
  return;
244,164,248✔
238
}
239

240
void insDestroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
24,512,966!
241

242
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput,
8,222,535✔
243
                                  bool colMode, bool ignoreColVals) {
244
  STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
8,222,535!
245
  if (NULL == pTableCxt) {
8,257,475!
246
    *pOutput = NULL;
×
247
    return terrno;
×
248
  }
249

250
  int32_t code = TSDB_CODE_SUCCESS;
8,257,475✔
251

252
  pTableCxt->lastKey = (SRowKey){0};
8,257,475✔
253
  pTableCxt->ordered = true;
8,257,475✔
254
  pTableCxt->duplicateTs = false;
8,257,475✔
255

256
  pTableCxt->pMeta = tableMetaDup(pTableMeta);
8,257,475✔
257
  if (NULL == pTableCxt->pMeta) {
8,246,078!
258
    code = TSDB_CODE_OUT_OF_MEMORY;
×
259
  }
260
  if (TSDB_CODE_SUCCESS == code) {
8,246,078!
261
    pTableCxt->pSchema =
8,266,707✔
262
        tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
8,248,121✔
263
    if (NULL == pTableCxt->pSchema) {
8,266,707!
264
      code = TSDB_CODE_OUT_OF_MEMORY;
×
265
    }
266
  }
267
  pTableCxt->hasBlob = schemaHasBlob(pTableCxt->pSchema);
8,264,664✔
268

269
  if (TSDB_CODE_SUCCESS == code) {
8,256,958!
270
    code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo);
8,258,153✔
271
  }
272
  if (TSDB_CODE_SUCCESS == code && !ignoreColVals) {
8,266,825!
273
    pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal));
8,269,196✔
274
    if (NULL == pTableCxt->pValues) {
8,250,814!
275
      code = terrno;
×
276
    } else {
277
      code = initColValues(pTableMeta, pTableCxt->pValues);
8,250,814✔
278
    }
279
  }
280
  if (TSDB_CODE_SUCCESS == code) {
8,255,111✔
281
    pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
8,254,024!
282
    if (NULL == pTableCxt->pData) {
8,267,942!
283
      code = terrno;
×
284
    } else {
285
      pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
8,267,942✔
286
      pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
8,267,942✔
287
      pTableCxt->pData->suid = pTableMeta->suid;
8,267,942✔
288
      pTableCxt->pData->uid = pTableMeta->uid;
8,267,942✔
289
      pTableCxt->pData->sver = pTableMeta->sversion;
8,267,942✔
290
      pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
8,267,942✔
291
      int8_t flag = pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT;
8,267,942✔
292
      if (pCreateTbReq != NULL) *pCreateTbReq = NULL;
8,267,942✔
293
      if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
8,267,942✔
294
        pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
11,104✔
295
        if (NULL == pTableCxt->pData->aCol) {
11,104!
296
          code = terrno;
×
297
        }
298
      } else {
299
        pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
8,256,838✔
300
        if (NULL == pTableCxt->pData->aRowP) {
8,239,062!
301
          code = terrno;
×
302
        }
303
      }
304
    }
305
  }
306
  if (TSDB_CODE_SUCCESS == code) {
8,251,253!
307
    *pOutput = pTableCxt;
8,251,253✔
308
    parserDebug("uid:%" PRId64 ", create table data context, code:%d, vgId:%d", pTableMeta->uid, code,
8,251,253✔
309
                pTableMeta->vgId);
310
  } else {
311
    insDestroyTableDataCxt(pTableCxt);
×
312
  }
313

314
  return code;
8,249,286✔
315
}
316

317
static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst, int8_t hasBlob) {
14,795✔
318
  int32_t        code = TSDB_CODE_SUCCESS;
14,795✔
319
  SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData));
14,795!
320
  if (NULL == pTmp) {
14,792!
321
    code = terrno;
×
322
  } else {
323
    pTmp->flags = pSrc->flags;
14,792✔
324
    pTmp->suid = pSrc->suid;
14,792✔
325
    pTmp->uid = pSrc->uid;
14,792✔
326
    pTmp->sver = pSrc->sver;
14,792✔
327
    pTmp->pCreateTbReq = NULL;
14,792✔
328
    if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
14,792✔
329
      if (pSrc->pCreateTbReq) {
13,862✔
330
        code = cloneSVreateTbReq(pSrc->pCreateTbReq, &pTmp->pCreateTbReq);
13,860✔
331
      } else {
332
        pTmp->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
2✔
333
      }
334
    }
335
    if (TSDB_CODE_SUCCESS == code) {
14,799!
336
      if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
14,799✔
337
        pTmp->aCol = taosArrayInit(128, sizeof(SColData));
10,973✔
338
        if (NULL == pTmp->aCol) {
10,962!
339
          code = terrno;
×
340
          taosMemoryFree(pTmp);
×
341
        }
342
      } else {
343
        pTmp->aRowP = taosArrayInit(128, POINTER_BYTES);
3,826✔
344
        if (NULL == pTmp->aRowP) {
3,831!
345
          code = terrno;
×
346
          taosMemoryFree(pTmp);
×
347
        }
348

349
        if (code != 0) {
3,831!
350
          taosArrayDestroy(pTmp->aRowP);
×
351
          taosMemoryFree(pTmp);
×
352
        }
353
      }
354

355
    } else {
356
      taosMemoryFree(pTmp);
×
357
    }
358
  }
359

360
  taosMemoryFree(pSrc);
14,793!
361
  if (TSDB_CODE_SUCCESS == code) {
14,799!
362
    *pDst = pTmp;
14,805✔
363
  }
364

365
  return code;
14,799✔
366
}
367

368
static void resetColValues(SArray* pValues) {
66✔
369
  int32_t num = taosArrayGetSize(pValues);
66✔
370
  for (int32_t i = 0; i < num; ++i) {
327✔
371
    SColVal* pVal = taosArrayGet(pValues, i);
261✔
372
    pVal->flag = CV_FLAG_NONE;
261✔
373
  }
374
}
66✔
375

376
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
8,242,827✔
377
                           SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals) {
378
  STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
8,242,827✔
379
  if (NULL != tmp) {
8,226,638✔
380
    *pTableCxt = *tmp;
66✔
381
    if (!ignoreColVals) {
66!
382
      resetColValues((*pTableCxt)->pValues);
66✔
383
    }
384
    return TSDB_CODE_SUCCESS;
66✔
385
  }
386
  int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals);
8,226,572✔
387
  if (TSDB_CODE_SUCCESS == code) {
8,249,425!
388
    void* pData = *pTableCxt;  // deal scan coverity
8,250,182✔
389
    code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
8,250,182✔
390
  }
391

392
  if (TSDB_CODE_SUCCESS != code) {
8,265,382!
393
    insDestroyTableDataCxt(*pTableCxt);
×
394
  }
395
  return code;
8,263,645✔
396
}
397

398
static void destroyColVal(void* p) {
54,799,953✔
399
  SColVal* pVal = p;
54,799,953✔
400
  if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type ||
54,799,953!
401
      TSDB_DATA_TYPE_VARBINARY == pVal->value.type || TSDB_DATA_TYPE_DECIMAL == pVal->value.type) {
54,545,256!
402
    taosMemoryFreeClear(pVal->value.pData);
254,736!
403
  }
404
}
54,799,953✔
405

406
void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
8,282,094✔
407
  if (NULL == pTableCxt) {
8,282,094!
408
    return;
×
409
  }
410

411
  taosMemoryFreeClear(pTableCxt->pMeta);
8,282,094!
412
  tDestroyTSchema(pTableCxt->pSchema);
8,282,156!
413
  insDestroyBoundColInfo(&pTableCxt->boundColsInfo);
8,282,184✔
414
  taosArrayDestroyEx(pTableCxt->pValues, destroyColVal);
8,282,178✔
415
  if (pTableCxt->pData) {
8,282,081✔
416
    tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
25,373✔
417
    taosMemoryFree(pTableCxt->pData);
25,368!
418
  }
419
  taosMemoryFree(pTableCxt);
8,282,082!
420
}
421

422
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
8,230,936✔
423
  if (NULL == pVgCxt) {
8,230,936!
424
    return;
×
425
  }
426

427
  tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
8,230,936✔
428
  taosMemoryFree(pVgCxt->pData);
8,236,881!
429

430
  taosMemoryFree(pVgCxt);
8,238,987!
431
}
432

433
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
16,326,435✔
434
  if (NULL == pVgCxtList) {
16,326,435✔
435
    return;
8,152,158✔
436
  }
437

438
  size_t size = taosArrayGetSize(pVgCxtList);
8,174,277✔
439
  for (int32_t i = 0; i < size; i++) {
16,437,624✔
440
    void* p = taosArrayGetP(pVgCxtList, i);
8,233,903✔
441
    insDestroyVgroupDataCxt(p);
8,231,524✔
442
  }
443

444
  taosArrayDestroy(pVgCxtList);
8,203,721✔
445
}
446

447
void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) {
×
448
  if (NULL == pVgCxtHash) {
×
449
    return;
×
450
  }
451

452
  void** p = taosHashIterate(pVgCxtHash, NULL);
×
453
  while (p) {
×
454
    insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p);
×
455

456
    p = taosHashIterate(pVgCxtHash, p);
×
457
  }
458

459
  taosHashCleanup(pVgCxtHash);
×
460
}
461

462
void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
8,193,961✔
463
  if (NULL == pTableCxtHash) {
8,193,961✔
464
    return;
10,990✔
465
  }
466

467
  void** p = taosHashIterate(pTableCxtHash, NULL);
8,182,971✔
468
  while (p) {
16,441,619✔
469
    insDestroyTableDataCxt(*(STableDataCxt**)p);
8,261,746✔
470

471
    p = taosHashIterate(pTableCxtHash, p);
8,261,531✔
472
  }
473

474
  taosHashCleanup(pTableCxtHash);
8,179,873✔
475
}
476

477
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) {
8,254,186✔
478
  int32_t code = 0;
8,254,186✔
479
  if (NULL == pVgCxt->pData->aSubmitTbData) {
8,254,186✔
480
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
8,211,986✔
481
    if (pVgCxt->pData->aSubmitTbData == NULL) {
8,210,198!
482
      return terrno;
×
483
    }
484
    if (pTableCxt->hasBlob) {
8,210,198!
485
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
×
486
      if (NULL == pVgCxt->pData->aSubmitBlobData) {
×
487
        return terrno;
×
488
      }
489
    }
490
  }
491

492
  // push data to submit, rebuild empty data for next submit
493
  if (!pTableCxt->hasBlob) pTableCxt->pData->pBlobSet = NULL;
8,252,398!
494

495
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData)) {
16,512,061!
496
    return terrno;
×
497
  }
498

499
  if (pTableCxt->hasBlob) {
8,259,663!
500
    parserDebug("blob row transfer %p, pData %p, %s", pTableCxt->pData->pBlobSet, pTableCxt->pData, __func__);
×
501
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTableCxt->pData->pBlobSet)) {
×
502
      return terrno;
×
503
    }
504
    pTableCxt->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
×
505
  }
506

507
  if (isRebuild) {
8,259,663✔
508
    code = rebuildTableData(pTableCxt->pData, &pTableCxt->pData, pTableCxt->hasBlob);
14,793✔
509
  } else if (clear) {
8,244,870✔
510
    taosMemoryFreeClear(pTableCxt->pData);
8,230,952!
511
  }
512
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
8,254,182✔
513

514
  return code;
8,251,166✔
515
}
516

517
static int32_t createVgroupDataCxt(int32_t vgId, SHashObj* pVgroupHash, SArray* pVgroupList, SVgroupDataCxt** pOutput) {
8,197,172✔
518
  SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
8,197,172!
519
  if (NULL == pVgCxt) {
8,237,148!
520
    return terrno;
×
521
  }
522
  pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2));
8,237,148!
523
  if (NULL == pVgCxt->pData) {
8,231,989!
524
    insDestroyVgroupDataCxt(pVgCxt);
×
525
    return terrno;
×
526
  }
527

528
  pVgCxt->vgId = vgId;
8,231,989✔
529
  int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
8,231,989✔
530
  if (TSDB_CODE_SUCCESS == code) {
8,233,873!
531
    if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) {
8,213,725!
532
      code = terrno;
×
533
      insDestroyVgroupDataCxt(pVgCxt);
×
534
      return code;
4,809✔
535
    }
536
    //    uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
537
    *pOutput = pVgCxt;
8,213,725✔
538
  } else {
539
    insDestroyVgroupDataCxt(pVgCxt);
×
540
  }
541
  return code;
8,216,489✔
542
}
543

544
int insColDataComp(const void* lp, const void* rp) {
56,439✔
545
  SColData* pLeft = (SColData*)lp;
56,439✔
546
  SColData* pRight = (SColData*)rp;
56,439✔
547
  if (pLeft->cid < pRight->cid) {
56,439✔
548
    return -1;
54,989✔
549
  } else if (pLeft->cid > pRight->cid) {
1,450!
550
    return 1;
1,455✔
551
  }
552

553
  return 0;
×
554
}
555

556
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId,
920✔
557
                                 STableColsData* pTbData, SName* sname) {
558
  if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) {
920!
559
    return TSDB_CODE_SUCCESS;
695✔
560
  }
561

562
  SVgroupInfo      vgInfo = {0};
225✔
563
  SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
225✔
564
                           .requestId = pBuildInfo->requestId,
225✔
565
                           .requestObjRefId = pBuildInfo->requestSelf,
225✔
566
                           .mgmtEps = pBuildInfo->mgmtEpSet};
567

568
  int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo);
225✔
569
  if (TSDB_CODE_SUCCESS != code) {
225!
570
    return code;
×
571
  }
572

573
  code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
225✔
574
  if (TSDB_CODE_SUCCESS != code) {
225!
575
    return code;
×
576
  }
577

578
  return TSDB_CODE_SUCCESS;
225✔
579
}
580

581
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData,
12,978✔
582
                             uint64_t* uid, int32_t* vgId) {
583
  STableVgUid* pTbInfo = NULL;
12,978✔
584
  int32_t      code = 0;
12,978✔
585

586
  if (pTbData->getFromHash) {
12,978✔
587
    pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName));
12,063✔
588
  }
589

590
  if (NULL == pTbInfo) {
12,984✔
591
    SName sname;
592
    code = qCreateSName(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0);
920✔
593
    if (TSDB_CODE_SUCCESS != code) {
920!
594
      return code;
×
595
    }
596

597
    STableMeta*      pTableMeta = NULL;
920✔
598
    SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
920✔
599
                             .requestId = pBuildInfo->requestId,
920✔
600
                             .requestObjRefId = pBuildInfo->requestSelf,
920✔
601
                             .mgmtEps = pBuildInfo->mgmtEpSet};
602
    code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta);
920✔
603

604
    if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
920!
605
      parserWarn("stmt2 async bind don't find table:%s.%s, try auto create table", sname.dbname, sname.tname);
×
606
      return code;
×
607
    }
608

609
    if (TSDB_CODE_SUCCESS != code) {
920!
610
      parserError("stmt2 async get table meta:%s.%s failed, code:%d", sname.dbname, sname.tname, code);
×
611
      return code;
×
612
    }
613

614
    *uid = pTableMeta->uid;
920✔
615
    *vgId = pTableMeta->vgId;
920✔
616

617
    STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId};
920✔
618
    code = tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo));
920✔
619
    if (TSDB_CODE_SUCCESS == code) {
920!
620
      code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname);
920✔
621
    }
622

623
    taosMemoryFree(pTableMeta);
920!
624
  } else {
625
    *uid = pTbInfo->uid;
12,064✔
626
    *vgId = pTbInfo->vgid;
12,064✔
627
  }
628

629
  return code;
12,984✔
630
}
631

632
int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
×
633
  int32_t             code = TSDB_CODE_SUCCESS;
×
634
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
×
635

636
  if (TSDB_CODE_SUCCESS == code) {
×
637
    code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
×
638
  }
639

640
  return code;
×
641
}
642

643
int32_t checkAndMergeSVgroupDataCxtByTbname(STableDataCxt* pTbCtx, SVgroupDataCxt* pVgCxt, SSHashObj* pTableNameHash,
7,377✔
644
                                            char* tbname) {
645
  if (NULL == pVgCxt->pData->aSubmitTbData) {
7,377✔
646
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
5,638✔
647
    if (NULL == pVgCxt->pData->aSubmitTbData) {
5,639!
648
      return terrno;
×
649
    }
650
    if (pTbCtx->hasBlob) {
5,639!
651
      pVgCxt->pData->aSubmitBlobData = taosArrayInit(128, sizeof(SBlobSet*));
×
652
      if (pVgCxt->pData->aSubmitBlobData == NULL) {
×
653
        return terrno;
×
654
      }
655
    }
656
  }
657

658
  int32_t        code = TSDB_CODE_SUCCESS;
7,378✔
659
  SArray**       rowP = NULL;
7,378✔
660

661
  rowP = (SArray**)tSimpleHashGet(pTableNameHash, tbname, strlen(tbname));
7,378✔
662

663
  if (rowP != NULL && *rowP != NULL) {
7,380!
664
    for (int32_t j = 0; j < taosArrayGetSize(*rowP); ++j) {
×
665
      SRow* pRow = (SRow*)taosArrayGetP(pTbCtx->pData->aRowP, j);
×
666
      if (pRow) {
×
667
        if (NULL == taosArrayPush(*rowP, &pRow)) {
×
668
          return terrno;
×
669
        }
670
      }
671

672
      if (pTbCtx->hasBlob == 0) {
×
673
        code = tRowSort(*rowP);
×
674
        TAOS_CHECK_RETURN(code);
×
675

676
        code = tRowMerge(*rowP, pTbCtx->pSchema, 0);
×
677
        TAOS_CHECK_RETURN(code);
×
678
      } else {
679
        code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
680
        TAOS_CHECK_RETURN(code);
×
681

682
        code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
683
        TAOS_CHECK_RETURN(code);
×
684
      }
685
    }
686

687
    parserDebug("merge same uid data: %" PRId64 ", vgId:%d", pTbCtx->pData->uid, pVgCxt->vgId);
×
688

689
    if (pTbCtx->pData->pCreateTbReq != NULL) {
×
690
      tdDestroySVCreateTbReq(pTbCtx->pData->pCreateTbReq);
×
691
      taosMemoryFree(pTbCtx->pData->pCreateTbReq);
×
692
      pTbCtx->pData->pCreateTbReq = NULL;
×
693
    }
694
    return TSDB_CODE_SUCCESS;
×
695
  }
696

697
  if (pTbCtx->hasBlob == 0) {
7,380!
698
    pTbCtx->pData->pBlobSet = NULL;  // if no blob, set it to NULL
7,380✔
699
  }
700

701
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTbCtx->pData)) {
14,758!
702
    return terrno;
×
703
  }
704

705
  if (pTbCtx->hasBlob) {
7,378!
706
    parserDebug("blob row transfer %p, pData %p, %s", pTbCtx->pData->pBlobSet, pTbCtx->pData, __func__);
×
707
    if (NULL == taosArrayPush(pVgCxt->pData->aSubmitBlobData, &pTbCtx->pData->pBlobSet)) {
×
708
      return terrno;
×
709
    }
710
    pTbCtx->pData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
×
711
  }
712

713
  code = tSimpleHashPut(pTableNameHash, tbname, strlen(tbname), &pTbCtx->pData->aRowP, sizeof(SArray*));
7,378✔
714

715
  if (code != TSDB_CODE_SUCCESS) {
7,379!
716
    return code;
×
717
  }
718

719
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTbCtx->pMeta->uid, pVgCxt->vgId);
7,379!
720

721
  return TSDB_CODE_SUCCESS;
7,379✔
722
}
723

724
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
5,597✔
725
                                  SStbInterlaceInfo* pBuildInfo) {
726
  int32_t  code = TSDB_CODE_SUCCESS;
5,597✔
727
  uint64_t uid;
728
  int32_t  vgId;
729

730
  pTbCtx->pData->aRowP = pTbData->aCol;
5,597✔
731

732
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId);
5,597✔
733
  if (TSDB_CODE_SUCCESS != code) {
5,603!
734
    return code;
×
735
  }
736

737
  pTbCtx->pMeta->vgId = vgId;
5,603✔
738
  pTbCtx->pMeta->uid = uid;
5,603✔
739
  pTbCtx->pData->uid = uid;
5,603✔
740

741
  if (!pTbCtx->ordered) {
5,603✔
742
    code = tRowSort(pTbCtx->pData->aRowP);
1✔
743
  }
744
  if (code == TSDB_CODE_SUCCESS && (!pTbCtx->ordered || pTbCtx->duplicateTs)) {
5,603!
745
    code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0);
1✔
746
  }
747

748
  if (TSDB_CODE_SUCCESS != code) {
5,602!
749
    return code;
×
750
  }
751

752
  SVgroupDataCxt* pVgCxt = NULL;
5,602✔
753
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
5,602✔
754
  if (NULL == pp) {
5,602✔
755
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
5,048✔
756
    if (NULL == pp) {
5,048!
757
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
5,048✔
758
    } else {
759
      pVgCxt = *(SVgroupDataCxt**)pp;
×
760
    }
761
  } else {
762
    pVgCxt = *(SVgroupDataCxt**)pp;
554✔
763
  }
764

765
  if (TSDB_CODE_SUCCESS == code) {
5,602!
766
    code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false);
5,602✔
767
  }
768

769
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
5,601!
770
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
771
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
772
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
773
    // insDestroyVgroupDataCxt(pVgCxt);
774
  }
775

776
  return code;
5,602✔
777
}
778

779
int32_t insAppendStmt2TableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
7,375✔
780
                                   SStbInterlaceInfo* pBuildInfo, SVCreateTbReq* ctbReq) {
781
  int32_t  code = TSDB_CODE_SUCCESS;
7,375✔
782
  uint64_t uid;
783
  int32_t  vgId;
784

785
  pTbCtx->pData->aRowP = pTbData->aCol;
7,375✔
786
  pTbCtx->pData->pBlobSet = pTbData->pBlobSet;
7,375✔
787

788
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId);
7,375✔
789
  if (ctbReq != NULL && code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
7,380!
790
    pTbCtx->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
×
791
    vgId = (int32_t)ctbReq->uid;
×
792
    uid = 0;
×
793
    pTbCtx->pMeta->vgId = (int32_t)ctbReq->uid;
×
794
    ctbReq->uid = 0;
×
795
    pTbCtx->pMeta->uid = 0;
×
796
    pTbCtx->pData->uid = 0;
×
797
    pTbCtx->pData->pCreateTbReq = ctbReq;
×
798
    code = TSDB_CODE_SUCCESS;
×
799
  } else {
800
    if (TSDB_CODE_SUCCESS != code) {
7,380!
801
      return code;
×
802
    }
803
    pTbCtx->pMeta->vgId = vgId;
7,380✔
804
    pTbCtx->pMeta->uid = uid;
7,380✔
805
    pTbCtx->pData->uid = uid;
7,380✔
806
    pTbCtx->pData->pCreateTbReq = NULL;
7,380✔
807

808
    if (ctbReq != NULL) {
7,380!
809
      tdDestroySVCreateTbReq(ctbReq);
810
      taosMemoryFree(ctbReq);
×
811
      ctbReq = NULL;
×
812
    }
813
  }
814

815
  if (pTbCtx->hasBlob == 0) {
7,380!
816
    if (!pTbData->isOrdered) {
7,380!
817
      code = tRowSort(pTbCtx->pData->aRowP);
×
818
    }
819
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
7,380!
820
      code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, PREFER_NON_NULL);
×
821
    }
822
  } else {
823
    if (!pTbData->isOrdered) {
×
824
      code = tRowSortWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet);
×
825
    }
826
    if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
×
827
      code = tRowMergeWithBlob(pTbCtx->pData->aRowP, pTbCtx->pSchema, pTbCtx->pData->pBlobSet, 0);
×
828
    }
829
  }
830

831
  if (TSDB_CODE_SUCCESS != code) {
7,380!
832
    return code;
×
833
  }
834

835
  SVgroupDataCxt* pVgCxt = NULL;
7,380✔
836
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
7,380✔
837
  if (NULL == pp) {
7,379✔
838
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
5,639✔
839
    if (NULL == pp) {
5,639!
840
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
5,639✔
841
    } else {
842
      pVgCxt = *(SVgroupDataCxt**)pp;
×
843
    }
844
  } else {
845
    pVgCxt = *(SVgroupDataCxt**)pp;
1,740✔
846
  }
847

848
  if (code == TSDB_CODE_SUCCESS) {
7,379✔
849
    code = checkAndMergeSVgroupDataCxtByTbname(pTbCtx, pVgCxt, pBuildInfo->pTableRowDataHash, pTbData->tbName);
7,378✔
850
  }
851

852
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
7,380!
853
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
854
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
855
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
856
    // insDestroyVgroupDataCxt(pVgCxt);
857
  }
858

859
  return code;
7,378✔
860
}
861

862
/*
863
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild,
864
int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
865
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
866
  if (NULL == pVgroupHash || NULL == pVgroupList) {
867
    taosHashCleanup(pVgroupHash);
868
    taosArrayDestroy(pVgroupList);
869
    return TSDB_CODE_OUT_OF_MEMORY;
870
  }
871

872
  int32_t code = TSDB_CODE_SUCCESS;
873

874
  for (int32_t i = 0; i < tbNum; ++i) {
875
    STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i);
876
    pTableCxt->pMeta->vgId = pTableCols->vgId;
877
    pTableCxt->pMeta->uid = pTableCols->uid;
878
    pTableCxt->pData->uid = pTableCols->uid;
879
    pTableCxt->pData->aCol = pTableCols->aCol;
880

881
    SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
882
    if (pCol->nVal <= 0) {
883
      continue;
884
    }
885

886
    if (pTableCxt->pData->pCreateTbReq) {
887
      pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
888
    }
889

890
    taosArraySort(pTableCxt->pData->aCol, insColDataComp);
891

892
    tColDataSortMerge(pTableCxt->pData->aCol);
893

894
    if (TSDB_CODE_SUCCESS == code) {
895
      SVgroupDataCxt* pVgCxt = NULL;
896
      int32_t         vgId = pTableCxt->pMeta->vgId;
897
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
898
      if (NULL == pp) {
899
        code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
900
      } else {
901
        pVgCxt = *(SVgroupDataCxt**)pp;
902
      }
903
      if (TSDB_CODE_SUCCESS == code) {
904
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false);
905
      }
906
    }
907
  }
908

909
  taosHashCleanup(pVgroupHash);
910
  if (TSDB_CODE_SUCCESS == code) {
911
    *pVgDataBlocks = pVgroupList;
912
  } else {
913
    insDestroyVgroupDataCxtList(pVgroupList);
914
  }
915

916
  return code;
917
}
918
*/
919

920
static int8_t colDataHasBlob(SColData* pCol) {
×
921
  if (IS_STR_DATA_BLOB(pCol->type)) {
×
922
    return 1;
×
923
  }
924
  return 0;
×
925
}
926
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
8,130,340✔
927
  SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
8,130,340✔
928
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
8,174,351✔
929
  if (NULL == pVgroupHash || NULL == pVgroupList) {
8,181,701!
930
    taosHashCleanup(pVgroupHash);
×
931
    taosArrayDestroy(pVgroupList);
×
932
    return terrno;
×
933
  }
934

935
  int32_t code = TSDB_CODE_SUCCESS;
8,182,352✔
936
  bool    colFormat = false;
8,182,352✔
937

938
  void* p = taosHashIterate(pTableHash, NULL);
8,182,352✔
939
  if (p) {
8,190,290!
940
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
8,190,617✔
941
    colFormat = (0 != (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT));
8,190,617✔
942
  }
943

944
  while (TSDB_CODE_SUCCESS == code && NULL != p) {
16,440,996✔
945
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
8,266,940✔
946
    if (colFormat) {
8,266,940✔
947
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
10,975✔
948
      if (pCol && pCol->nVal <= 0) {
10,973!
949
        p = taosHashIterate(pTableHash, p);
×
950
        continue;
×
951
      }
952

953
      if (pTableCxt->pData->pCreateTbReq) {
10,973✔
954
        pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
10,041✔
955
      }
956
      int8_t isBlob = IS_STR_DATA_BLOB(pCol->type) ? 1 : 0;
10,973!
957
      if (isBlob == 0) {
10,973!
958
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
10,974✔
959
        code = tColDataSortMerge(&pTableCxt->pData->aCol);
10,971✔
960
      } else {
961
        taosArraySort(pTableCxt->pData->aCol, insColDataComp);
×
962
        code = tColDataSortMergeWithBlob(&pTableCxt->pData->aCol, pTableCxt->pData->pBlobSet);
×
963
      }
964
    } else {
965
      // skip the table has no data to insert
966
      // eg: import a csv without valid data
967
      // if (0 == taosArrayGetSize(pTableCxt->pData->aRowP)) {
968
      //   parserWarn("no row in tableDataCxt uid:%" PRId64 " ", pTableCxt->pMeta->uid);
969
      //   p = taosHashIterate(pTableHash, p);
970
      //   continue;
971
      // }
972
      if (pTableCxt->hasBlob == 0) {
8,255,965✔
973
        if (!pTableCxt->ordered) {
8,254,312✔
974
          code = tRowSort(pTableCxt->pData->aRowP);
37,482✔
975
        }
976
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
8,254,312!
977
        code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, PREFER_NON_NULL);
36,444✔
978
        }
979
      } else {
980
        if (!pTableCxt->ordered) {
1,653!
981
        code = tRowSortWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet);
×
982
        }
983
        if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
1,653!
984
          code = tRowMergeWithBlob(pTableCxt->pData->aRowP, pTableCxt->pSchema, pTableCxt->pData->pBlobSet, 0);
×
985
        }
986
      }
987
    }
988

989
    if (TSDB_CODE_SUCCESS == code) {
8,266,121!
990
      SVgroupDataCxt* pVgCxt = NULL;
8,266,121✔
991
      int32_t         vgId = pTableCxt->pMeta->vgId;
8,266,121✔
992
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
8,266,121✔
993
      if (NULL == pp) {
8,233,031✔
994
        code = createVgroupDataCxt(vgId, pVgroupHash, pVgroupList, &pVgCxt);
8,188,828✔
995
      } else {
996
        pVgCxt = *(SVgroupDataCxt**)pp;
44,203✔
997
      }
998
      if (TSDB_CODE_SUCCESS == code) {
8,253,358!
999
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true);
8,254,400✔
1000
      }
1001
    }
1002
    if (TSDB_CODE_SUCCESS == code) {
8,245,590!
1003
      p = taosHashIterate(pTableHash, p);
8,249,426✔
1004
    }
1005
  }
1006

1007
  taosHashCleanup(pVgroupHash);
8,174,056✔
1008
  if (TSDB_CODE_SUCCESS == code) {
8,172,318✔
1009
    *pVgDataBlocks = pVgroupList;
8,164,423✔
1010
  } else {
1011
    insDestroyVgroupDataCxtList(pVgroupList);
7,895✔
1012
  }
1013

1014
  return code;
8,166,628✔
1015
}
1016

1017
static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
8,171,349✔
1018
  int32_t  code = TSDB_CODE_SUCCESS;
8,171,349✔
1019
  uint32_t len = 0;
8,171,349✔
1020
  void*    pBuf = NULL;
8,171,349✔
1021
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
8,171,349!
1022
  if (TSDB_CODE_SUCCESS == code) {
8,170,205!
1023
    SEncoder encoder;
1024
    len += sizeof(SSubmitReq2Msg);
8,172,956✔
1025
    pBuf = taosMemoryMalloc(len);
8,172,956!
1026
    if (NULL == pBuf) {
8,170,118!
1027
      return terrno;
×
1028
    }
1029
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
8,170,118✔
1030
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
8,170,118✔
1031
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
8,170,118✔
1032
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
8,173,357✔
1033
    code = tEncodeSubmitReq(&encoder, pReq);
8,179,180✔
1034
    tEncoderClear(&encoder);
8,212,397✔
1035
  }
1036

1037
  if (TSDB_CODE_SUCCESS == code) {
8,231,136!
1038
    *pData = pBuf;
8,231,136✔
1039
    *pLen = len;
8,231,136✔
1040
  } else {
1041
    taosMemoryFree(pBuf);
×
1042
  }
1043
  return code;
8,196,651✔
1044
}
1045

1046
static void destroyVgDataBlocks(void* p) {
×
1047
  if (p == NULL) return;
×
1048
  SVgDataBlocks* pVg = p;
×
1049
  taosMemoryFree(pVg->pData);
×
1050
  taosMemoryFree(pVg);
×
1051
}
1052

1053

1054
int32_t insResetBlob(SSubmitReq2* p) {
8,186,810✔
1055
  int32_t code = 0;
8,186,810✔
1056
  if (p->raw) {
8,186,810!
1057
    return TSDB_CODE_SUCCESS;  // no blob data in raw mode
×
1058
  }
1059

1060
  if (p->aSubmitBlobData != NULL) {
8,186,810!
1061
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
×
1062
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
×
1063
      SBlobSet**     ppBlob = taosArrayGet(p->aSubmitBlobData, i);
×
1064
      SBlobSet*      pBlob = *ppBlob;
×
1065
      int32_t        nrow = taosArrayGetSize(pSubmitTbData->aRowP);
×
1066
      int32_t        nblob = 0;
×
1067
      if (nrow > 0) {
×
1068
        nblob = taosArrayGetSize(pBlob->pSeqTable);
×
1069
      }
1070
      uTrace("blob %p row size %d, pData size %d", pBlob, nblob, nrow);
×
1071
      pSubmitTbData->pBlobSet = pBlob;
×
1072
      *ppBlob = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
×
1073
    }
1074
  } else {
1075
    for (int32_t i = 0; i < taosArrayGetSize(p->aSubmitTbData); i++) {
16,414,282✔
1076
      SSubmitTbData* pSubmitTbData = taosArrayGet(p->aSubmitTbData, i);
8,162,860✔
1077
      pSubmitTbData->pBlobSet = NULL;  // reset blob row to NULL, so that it will not be freed in destroy
8,227,472✔
1078
    }
1079
  }
1080

1081
  return code;
8,184,294✔
1082
}
1083
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) {
8,162,734✔
1084
  size_t  numOfVg = taosArrayGetSize(pVgDataCxtList);
8,162,734✔
1085
  SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES);
8,170,522!
1086
  if (NULL == pDataBlocks) {
8,187,440!
1087
    return TSDB_CODE_OUT_OF_MEMORY;
×
1088
  }
1089

1090
  int32_t code = TSDB_CODE_SUCCESS;
8,187,440✔
1091
  for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
16,386,983✔
1092
    SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
8,207,938✔
1093
    if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) {
8,218,573!
1094
      continue;
×
1095
    }
1096
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
8,218,486!
1097
    if (NULL == dst) {
8,217,577!
1098
      code = terrno;
×
1099
    }
1100

1101
    if (TSDB_CODE_SUCCESS == code) {
8,217,577!
1102
      dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
8,218,879✔
1103
      code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
8,213,287✔
1104
    }
1105
    if (TSDB_CODE_SUCCESS == code) {
8,199,622!
1106
      code = insResetBlob(src->pData);
8,199,622✔
1107
    }
1108

1109
    if (TSDB_CODE_SUCCESS == code) {
8,183,842!
1110
      code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
8,185,662✔
1111
    }
1112
    if (TSDB_CODE_SUCCESS == code) {
8,197,657!
1113
      code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);
8,197,292!
1114
    }
1115
    if (TSDB_CODE_SUCCESS != code) {
8,193,232!
1116
      destroyVgDataBlocks(dst);
×
1117
    }
1118
  }
1119

1120
  if (append) {
8,179,045✔
1121
    if (NULL == *pVgDataBlocks) {
9,002✔
1122
      *pVgDataBlocks = pDataBlocks;
9,001✔
1123
    }
1124
    return code;
9,002✔
1125
  }
1126

1127
  if (TSDB_CODE_SUCCESS == code) {
8,170,043✔
1128
    *pVgDataBlocks = pDataBlocks;
8,158,476✔
1129
  } else {
1130
    taosArrayDestroyP(pDataBlocks, destroyVgDataBlocks);
11,567✔
1131
  }
1132

1133
  return code;
8,158,836✔
1134
}
1135

1136
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
×
1137
  for (int i = 0; i < numFields; i++) {
×
1138
    if (strcmp(pSchema->name, fields[i].name) == 0) {
×
1139
      return true;
×
1140
    }
1141
  }
1142

1143
  return false;
×
1144
}
1145

1146
int32_t checkSchema(SSchema* pColSchema, SSchemaExt* pColExtSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
574✔
1147
  if (*fields != pColSchema->type) {
574✔
1148
    if (errstr != NULL)
1!
1149
      snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
×
1150
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
×
1151
    return TSDB_CODE_INVALID_PARA;
1✔
1152
  }
1153

1154
  if (IS_DECIMAL_TYPE(pColSchema->type)) {
573!
1155
    uint8_t precision = 0, scale = 0;
1✔
1156
    decimalFromTypeMod(pColExtSchema->typeMod, &precision, &scale);
1✔
1157
    uint8_t precisionData = 0, scaleData = 0;
1✔
1158
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
1✔
1159
    extractDecimalTypeInfoFromBytes(&bytes, &precisionData, &scaleData);
1✔
1160
    if (precision != precisionData || scale != scaleData) {
1!
1161
      if (errstr != NULL)
×
1162
        snprintf(errstr, errstrLen,
×
1163
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
1164
                 "precision:%d, scale:%d",
1165
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
1166
                 precisionData, scaleData);
1167
      return TSDB_CODE_INVALID_PARA;
×
1168
    }
1169
    return 0;
1✔
1170
  }
1171

1172
  if (IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) > pColSchema->bytes) {
572!
1173
    if (errstr != NULL)
1!
1174
      snprintf(errstr, errstrLen,
×
1175
               "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1176
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1177
               *(int32_t*)(fields + sizeof(int8_t)));
×
1178
    return TSDB_CODE_INVALID_PARA;
1✔
1179
  }
1180

1181
  if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
571!
1182
    if (errstr != NULL)
×
1183
      snprintf(errstr, errstrLen,
×
1184
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
1185
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
1186
               *(int32_t*)(fields + sizeof(int8_t)));
×
1187
    return TSDB_CODE_INVALID_PARA;
×
1188
  }
1189
  return 0;
571✔
1190
}
1191

1192
#define PRCESS_DATA(i, j)                                                                                 \
1193
  ret = checkSchema(pColSchema, pColExtSchema, fields, errstr, errstrLen);                                \
1194
  if (ret != 0) {                                                                                         \
1195
    goto end;                                                                                             \
1196
  }                                                                                                       \
1197
                                                                                                          \
1198
  if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {                                                 \
1199
    hasTs = true;                                                                                         \
1200
  }                                                                                                       \
1201
                                                                                                          \
1202
  int8_t* offset = pStart;                                                                                \
1203
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {                                                               \
1204
    pStart += numOfRows * sizeof(int32_t);                                                                \
1205
  } else {                                                                                                \
1206
    pStart += BitmapLen(numOfRows);                                                                       \
1207
  }                                                                                                       \
1208
  char* pData = pStart;                                                                                   \
1209
                                                                                                          \
1210
  SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);                                               \
1211
  ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); \
1212
  if (ret != 0) {                                                                                         \
1213
    goto end;                                                                                             \
1214
  }                                                                                                       \
1215
  fields += sizeof(int8_t) + sizeof(int32_t);                                                             \
1216
  if (needChangeLength && version == BLOCK_VERSION_1) {                                                   \
1217
    pStart += htonl(colLength[i]);                                                                        \
1218
  } else {                                                                                                \
1219
    pStart += colLength[i];                                                                               \
1220
  }                                                                                                       \
1221
  boundInfo->pColIndex[j] = -1;
1222

1223
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
127✔
1224
                     int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
1225
  int ret = 0;
127✔
1226
  if (data == NULL) {
127!
1227
    uError("rawBlockBindData, data is NULL");
×
1228
    return TSDB_CODE_APP_ERROR;
×
1229
  }
1230
  void* tmp =
1231
      taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
127✔
1232
  SVCreateTbReq* pCreateReqTmp = NULL;
127✔
1233
  if (tmp == NULL && pCreateTb != NULL) {
127✔
1234
    ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
12✔
1235
    if (ret != TSDB_CODE_SUCCESS) {
12!
1236
      uError("cloneSVreateTbReq error");
×
1237
      goto end;
×
1238
    }
1239
  }
1240

1241
  STableDataCxt* pTableCxt = NULL;
127✔
1242
  ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
127✔
1243
                           sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
1244
  if (pCreateReqTmp != NULL) {
127!
1245
    tdDestroySVCreateTbReq(pCreateReqTmp);
×
1246
    taosMemoryFree(pCreateReqTmp);
×
1247
  }
1248

1249
  if (ret != TSDB_CODE_SUCCESS) {
127!
1250
    uError("insGetTableDataCxt error");
×
1251
    goto end;
×
1252
  }
1253

1254
  pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
127✔
1255
  if (tmp == NULL) {
127✔
1256
    ret = initTableColSubmitData(pTableCxt);
114✔
1257
    if (ret != TSDB_CODE_SUCCESS) {
114!
1258
      uError("initTableColSubmitData error");
×
1259
      goto end;
×
1260
    }
1261
  }
1262

1263
  char* p = (char*)data;
127✔
1264
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1265
  // column length |
1266
  int32_t version = *(int32_t*)data;
127✔
1267
  p += sizeof(int32_t);
127✔
1268
  p += sizeof(int32_t);
127✔
1269

1270
  int32_t numOfRows = *(int32_t*)p;
127✔
1271
  p += sizeof(int32_t);
127✔
1272

1273
  int32_t numOfCols = *(int32_t*)p;
127✔
1274
  p += sizeof(int32_t);
127✔
1275

1276
  p += sizeof(int32_t);
127✔
1277
  p += sizeof(uint64_t);
127✔
1278

1279
  int8_t* fields = p;
127✔
1280
  if (*fields >= TSDB_DATA_TYPE_MAX || *fields < 0) {
127!
1281
    uError("fields type error:%d", *fields);
×
1282
    ret = TSDB_CODE_INVALID_PARA;
×
1283
    goto end;
×
1284
  }
1285
  p += numOfCols * (sizeof(int8_t) + sizeof(int32_t));
127✔
1286

1287
  int32_t* colLength = (int32_t*)p;
127✔
1288
  p += sizeof(int32_t) * numOfCols;
127✔
1289

1290
  char* pStart = p;
127✔
1291

1292
  SSchema*       pSchema = getTableColumnSchema(pTableMeta);
127✔
1293
  SSchemaExt*    pExtSchemas = getTableColumnExtSchema(pTableMeta);
127✔
1294
  SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
127✔
1295

1296
  if (tFields != NULL && numFields != numOfCols) {
127!
1297
    if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to data cols:%d", numFields, numOfCols);
×
1298
    ret = TSDB_CODE_INVALID_PARA;
×
1299
    goto end;
×
1300
  }
1301

1302
  bool hasTs = false;
127✔
1303
  if (tFields == NULL) {
127✔
1304
    int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
6✔
1305
    for (int j = 0; j < len; j++) {
18✔
1306
      SSchema*    pColSchema = &pSchema[j];
14✔
1307
      SSchemaExt* pColExtSchema = &pExtSchemas[j];
14✔
1308
      PRCESS_DATA(j, j)
14!
1309
    }
1310
  } else {
1311
    for (int i = 0; i < numFields; i++) {
639✔
1312
      for (int j = 0; j < boundInfo->numOfBound; j++) {
4,322!
1313
        SSchema*    pColSchema = &pSchema[j];
4,322✔
1314
        SSchemaExt* pColExtSchema = &pExtSchemas[j];
4,322✔
1315
        char*       fieldName = NULL;
4,322✔
1316
        if (raw) {
4,322✔
1317
          fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
4,317✔
1318
        } else {
1319
          fieldName = ((TAOS_FIELD*)tFields)[i].name;
5✔
1320
        }
1321
        if (strcmp(pColSchema->name, fieldName) == 0) {
4,322✔
1322
          PRCESS_DATA(i, j)
518!
1323
          break;
518✔
1324
        }
1325
      }
1326
    }
1327
  }
1328

1329
  if (!hasTs) {
125!
1330
    if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
×
1331
    ret = TSDB_CODE_INVALID_PARA;
×
1332
    goto end;
×
1333
  }
1334

1335
  // process NULL data
1336
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
665✔
1337
    if (boundInfo->pColIndex[c] != -1) {
540✔
1338
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
11✔
1339
      ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
11✔
1340
      if (ret != 0) {
11!
1341
        goto end;
×
1342
      }
1343
    } else {
1344
      boundInfo->pColIndex[c] = c;  // restore for next block
529✔
1345
    }
1346
  }
1347

1348
end:
125✔
1349
  return ret;
127✔
1350
}
1351

1352
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data) {
×
1353
  int code = transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion);
×
1354
  if (code != 0) {
×
1355
    return code;
×
1356
  }
1357
  SVgroupDataCxt* pVgCxt = NULL;
×
1358
  void**          pp = taosHashGet(pVgroupHash, &pTableMeta->vgId, sizeof(pTableMeta->vgId));
×
1359
  if (NULL == pp) {
×
1360
    code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt);
×
1361
    if (code != 0) {
×
1362
      return code;
×
1363
    }
1364
  } else {
1365
    pVgCxt = *(SVgroupDataCxt**)pp;
×
1366
  }
1367
  if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1368
    pVgCxt->pData->aSubmitTbData = taosArrayInit(0, POINTER_BYTES);
×
1369
    pVgCxt->pData->raw = true;
×
1370
    if (NULL == pVgCxt->pData->aSubmitTbData) {
×
1371
      return terrno;
×
1372
    }
1373
  }
1374

1375
  // push data to submit, rebuild empty data for next submit
1376
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, &data)) {
×
1377
    return terrno;
×
1378
  }
1379

1380
  uTrace("add raw data to vgId:%d, len:%d", pTableMeta->vgId, *(int32_t*)data);
×
1381

1382
  return 0;
×
1383
}
1384

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc