• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4103

17 May 2025 02:18AM UTC coverage: 63.264% (+0.4%) from 62.905%
#4103

push

travis-ci

web-flow
Merge pull request #31110 from taosdata/3.0

merge 3.0

158149 of 318142 branches covered (49.71%)

Branch coverage included in aggregate %.

3 of 5 new or added lines in 1 file covered. (60.0%)

1725 existing lines in 138 files now uncovered.

243642 of 316962 relevant lines covered (76.87%)

16346281.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.13
/source/libs/parser/src/parInsertUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "parInsertUtil.h"
17

18
#include "catalog.h"
19
#include "parInt.h"
20
#include "parUtil.h"
21
#include "querynodes.h"
22
#include "tRealloc.h"
23
#include "tdatablock.h"
24
#include "tmisce.h"
25

26
void qDestroyBoundColInfo(void* pInfo) {
44,204✔
27
  if (NULL == pInfo) {
44,204✔
28
    return;
23,340✔
29
  }
30

31
  SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
20,864✔
32

33
  taosMemoryFreeClear(pBoundInfo->pColIndex);
20,864!
34
}
35

36
static char* tableNameGetPosition(SToken* pToken, char target) {
9,995,288✔
37
  bool inEscape = false;
9,995,288✔
38
  bool inQuote = false;
9,995,288✔
39
  char quotaStr = 0;
9,995,288✔
40

41
  for (uint32_t i = 0; i < pToken->n; ++i) {
110,674,304✔
42
    if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
109,564,636!
43
      return pToken->z + i;
8,885,620✔
44
    }
45

46
    if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
100,679,016✔
47
      if (!inQuote) {
171,748!
48
        inEscape = !inEscape;
171,790✔
49
      }
50
    }
51

52
    if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
100,679,016✔
53
      if (!inEscape) {
24,426✔
54
        if (!inQuote) {
24,056✔
55
          quotaStr = *(pToken->z + i);
12,028✔
56
          inQuote = !inQuote;
12,028✔
57
        } else if (quotaStr == *(pToken->z + i)) {
12,028!
58
          inQuote = !inQuote;
12,028✔
59
        }
60
      }
61
    }
62
  }
63

64
  return NULL;
1,109,668✔
65
}
66

67
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
9,995,612✔
68
  const char* msg1 = "name too long";
9,995,612✔
69
  const char* msg2 = "invalid database name";
9,995,612✔
70
  const char* msg3 = "db is not specified";
9,995,612✔
71
  const char* msg4 = "invalid table name";
9,995,612✔
72

73
  int32_t code = TSDB_CODE_SUCCESS;
9,995,612✔
74
  char*   p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
9,995,612✔
75

76
  if (p != NULL) {  // db has been specified in sql string so we ignore current db path
9,995,371✔
77
    int32_t dbLen = p - pTableName->z;
8,885,579✔
78
    if (dbLen <= 0) {
8,885,579!
79
      return buildInvalidOperationMsg(pMsgBuf, msg2);
×
80
    }
81
    char name[TSDB_DB_FNAME_LEN] = {0};
8,885,579✔
82
    strncpy(name, pTableName->z, dbLen);
8,885,579✔
83
    int32_t actualDbLen = strdequote(name);
8,885,579✔
84

85
    code = tNameSetDbName(pName, acctId, name, actualDbLen);
8,882,564✔
86
    if (code != TSDB_CODE_SUCCESS) {
8,885,569!
87
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
88
    }
89

90
    int32_t tbLen = pTableName->n - dbLen - 1;
8,885,569✔
91
    if (tbLen <= 0) {
8,885,569!
92
      return buildInvalidOperationMsg(pMsgBuf, msg4);
×
93
    }
94

95
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
8,885,569✔
96
    strncpy(tbname, p + 1, tbLen);
8,885,569✔
97
    /*tbLen = */ (void)strdequote(tbname);
8,885,569✔
98

99
    code = tNameFromString(pName, tbname, T_NAME_TABLE);
8,895,155✔
100
    if (code != 0) {
8,876,099!
101
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
102
    }
103
  } else {  // get current DB name first, and then set it into path
104
    char tbname[TSDB_TABLE_FNAME_LEN] = {0};
1,109,792✔
105
    strncpy(tbname, pTableName->z, pTableName->n);
1,109,792✔
106
    int32_t tbLen = strdequote(tbname);
1,109,792✔
107
    if (tbLen >= TSDB_TABLE_NAME_LEN) {
1,110,034!
UNCOV
108
      return buildInvalidOperationMsg(pMsgBuf, msg1);
×
109
    }
110
    if (tbLen == 0) {
1,110,043✔
111
      return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "invalid table name");
2✔
112
    }
113

114
    char name[TSDB_TABLE_FNAME_LEN] = {0};
1,110,041✔
115
    strncpy(name, pTableName->z, pTableName->n);
1,110,041✔
116
    (void)strdequote(name);
1,110,041✔
117

118
    if (dbName == NULL) {
1,110,074✔
119
      return buildInvalidOperationMsg(pMsgBuf, msg3);
11✔
120
    }
121
    if (name[0] == '\0') return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
1,110,063!
122

123
    code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
1,110,063✔
124
    if (code != TSDB_CODE_SUCCESS) {
1,110,029!
125
      code = buildInvalidOperationMsg(pMsgBuf, msg2);
×
126
      return code;
×
127
    }
128

129
    code = tNameFromString(pName, name, T_NAME_TABLE);
1,110,029✔
130
    if (code != 0) {
1,109,979!
131
      code = buildInvalidOperationMsg(pMsgBuf, msg1);
×
132
    }
133
  }
134

135
  if (NULL != strchr(pName->tname, '.')) {
9,986,095!
136
    code = generateSyntaxErrMsgExt(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "The table name cannot contain '.'");
×
137
  }
138

139
  return code;
9,986,741✔
140
}
141

142
int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchema) {
9,763,975✔
143
  while (start < end) {
58,949,020✔
144
    if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
58,920,620✔
145
      return start;
9,735,575✔
146
    }
147
    ++start;
49,185,045✔
148
  }
149
  return -1;
28,400✔
150
}
151

152
int32_t insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
68,735✔
153
                            SArray* tagName, uint8_t tagNum, int32_t ttl) {
154
  pTbReq->type = TD_CHILD_TABLE;
68,735✔
155
  pTbReq->ctb.pTag = (uint8_t*)pTag;
68,735✔
156
  pTbReq->name = taosStrdup(tname);
68,735!
157
  if (!pTbReq->name) return terrno;
68,746!
158
  pTbReq->ctb.suid = suid;
68,746✔
159
  pTbReq->ctb.tagNum = tagNum;
68,746✔
160
  if (sname) {
68,746✔
161
    pTbReq->ctb.stbName = taosStrdup(sname);
66,236!
162
    if (!pTbReq->ctb.stbName) return terrno;
66,250!
163
  }
164
  pTbReq->ctb.tagName = taosArrayDup(tagName, NULL);
68,760✔
165
  if (!pTbReq->ctb.tagName) return terrno;
68,760!
166
  pTbReq->ttl = ttl;
68,768✔
167
  pTbReq->commentLen = -1;
68,768✔
168

169
  return TSDB_CODE_SUCCESS;
68,768✔
170
}
171

172
static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
×
173
  for (int32_t i = 0; i < ncols; ++i) {
×
174
    pBoundCols[i] = i;
×
175
  }
176
}
×
177

178
static int32_t initColValues(STableMeta* pTableMeta, SArray* pValues) {
9,834,933✔
179
  SSchema* pSchemas = getTableColumnSchema(pTableMeta);
9,834,933✔
180
  int32_t  code = 0;
9,836,906✔
181
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
119,332,939✔
182
    SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
109,411,055✔
183
    if (NULL == taosArrayPush(pValues, &val)) {
109,496,033!
184
      code = terrno;
×
185
      break;
×
186
    }
187
  }
188
  return code;
9,921,884✔
189
}
190

191
int32_t insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { return initColValues(pTableMeta, aColValues); }
10,588✔
192

193
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
9,926,918✔
194
  pInfo->numOfCols = numOfBound;
9,926,918✔
195
  pInfo->numOfBound = numOfBound;
9,926,918✔
196
  pInfo->hasBoundCols = false;
9,926,918✔
197
  pInfo->mixTagsCols = false;
9,926,918✔
198
  pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t));
9,926,918!
199
  if (NULL == pInfo->pColIndex) {
9,944,302!
200
    return terrno;
×
201
  }
202
  for (int32_t i = 0; i < numOfBound; ++i) {
120,290,022✔
203
    pInfo->pColIndex[i] = i;
110,345,720✔
204
  }
205
  return TSDB_CODE_SUCCESS;
9,944,302✔
206
}
207

208
void insResetBoundColsInfo(SBoundColInfo* pInfo) {
22✔
209
  pInfo->numOfBound = pInfo->numOfCols;
22✔
210
  pInfo->hasBoundCols = false;
22✔
211
  pInfo->mixTagsCols = false;
22✔
212
  for (int32_t i = 0; i < pInfo->numOfCols; ++i) {
110✔
213
    pInfo->pColIndex[i] = i;
88✔
214
  }
215
}
22✔
216

217
void insCheckTableDataOrder(STableDataCxt* pTableCxt, SRowKey* rowKey) {
483,333,425✔
218
  // once the data block is disordered, we do NOT keep last timestamp any more
219
  if (!pTableCxt->ordered) {
483,333,425✔
220
    return;
34,014,116✔
221
  }
222

223
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) < 0) {
449,319,309✔
224
    pTableCxt->ordered = false;
49,115✔
225
  }
226

227
  if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) == 0) {
449,859,362✔
228
    pTableCxt->duplicateTs = true;
125✔
229
  }
230

231
  // TODO: for variable length data type, we need to copy it out
232
  pTableCxt->lastKey = *rowKey;
448,513,736✔
233
  return;
448,513,736✔
234
}
235

236
void insDestroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
31,358,392!
237

238
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput,
9,799,651✔
239
                                  bool colMode, bool ignoreColVals) {
240
  STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
9,799,651!
241
  if (NULL == pTableCxt) {
9,835,325!
242
    *pOutput = NULL;
×
243
    return terrno;
×
244
  }
245

246
  int32_t code = TSDB_CODE_SUCCESS;
9,835,325✔
247

248
  pTableCxt->lastKey = (SRowKey){0};
9,835,325✔
249
  pTableCxt->ordered = true;
9,835,325✔
250
  pTableCxt->duplicateTs = false;
9,835,325✔
251

252
  pTableCxt->pMeta = tableMetaDup(pTableMeta);
9,835,325✔
253
  if (NULL == pTableCxt->pMeta) {
9,824,219!
254
    code = TSDB_CODE_OUT_OF_MEMORY;
×
255
  }
256
  if (TSDB_CODE_SUCCESS == code) {
9,824,219!
257
    pTableCxt->pSchema =
9,849,989✔
258
        tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
9,826,154✔
259
    if (NULL == pTableCxt->pSchema) {
9,849,989!
260
      code = TSDB_CODE_OUT_OF_MEMORY;
×
261
    }
262
  }
263
  if (TSDB_CODE_SUCCESS == code) {
9,848,054✔
264
    code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo);
9,840,974✔
265
  }
266
  if (TSDB_CODE_SUCCESS == code && !ignoreColVals) {
9,858,043✔
267
    pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal));
9,842,174✔
268
    if (NULL == pTableCxt->pValues) {
9,825,229!
269
      code = terrno;
×
270
    } else {
271
      code = initColValues(pTableMeta, pTableCxt->pValues);
9,825,229✔
272
    }
273
  }
274
  if (TSDB_CODE_SUCCESS == code) {
9,835,737✔
275
    pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
9,835,044!
276
    if (NULL == pTableCxt->pData) {
9,848,235!
277
      code = terrno;
×
278
    } else {
279
      pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
9,848,235!
280
      pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
9,848,235✔
281
      pTableCxt->pData->suid = pTableMeta->suid;
9,848,235✔
282
      pTableCxt->pData->uid = pTableMeta->uid;
9,848,235✔
283
      pTableCxt->pData->sver = pTableMeta->sversion;
9,848,235✔
284
      pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
9,848,235✔
285
      if (pCreateTbReq != NULL) *pCreateTbReq = NULL;
9,848,235✔
286
      if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
9,848,235✔
287
        pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
21,012✔
288
        if (NULL == pTableCxt->pData->aCol) {
20,997!
289
          code = terrno;
×
290
        }
291
      } else {
292
        pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
9,827,223✔
293
        if (NULL == pTableCxt->pData->aRowP) {
9,815,079!
294
          code = terrno;
×
295
        }
296
      }
297
    }
298
  }
299
  if (TSDB_CODE_SUCCESS == code) {
9,836,769!
300
    *pOutput = pTableCxt;
9,836,769✔
301
    parserDebug("uid:%" PRId64 ", create table data context, code:%d, vgId:%d", pTableMeta->uid, code, pTableMeta->vgId);
9,836,769✔
302
  } else {
303
    insDestroyTableDataCxt(pTableCxt);
×
304
  }
305

306
  return code;
9,837,548✔
307
}
308

309
static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
43,480✔
310
  int32_t        code = TSDB_CODE_SUCCESS;
43,480✔
311
  SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData));
43,480!
312
  if (NULL == pTmp) {
43,490!
313
    code = terrno;
×
314
  } else {
315
    pTmp->flags = pSrc->flags;
43,490✔
316
    pTmp->suid = pSrc->suid;
43,490✔
317
    pTmp->uid = pSrc->uid;
43,490✔
318
    pTmp->sver = pSrc->sver;
43,490✔
319
    pTmp->pCreateTbReq = NULL;
43,490✔
320
    if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
43,490✔
321
      if (pSrc->pCreateTbReq) {
42,659✔
322
        code = cloneSVreateTbReq(pSrc->pCreateTbReq, &pTmp->pCreateTbReq);
42,643✔
323
      } else {
324
        pTmp->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
16✔
325
      }
326
    }
327
    if (TSDB_CODE_SUCCESS == code) {
43,484!
328
      if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
43,484✔
329
        pTmp->aCol = taosArrayInit(128, sizeof(SColData));
40,986✔
330
        if (NULL == pTmp->aCol) {
40,980!
331
          code = terrno;
×
332
          taosMemoryFree(pTmp);
×
333
        }
334
      } else {
335
        pTmp->aRowP = taosArrayInit(128, POINTER_BYTES);
2,498✔
336
        if (NULL == pTmp->aRowP) {
2,502!
337
          code = terrno;
×
338
          taosMemoryFree(pTmp);
×
339
        }
340
      }
341
    } else {
342
      taosMemoryFree(pTmp);
×
343
    }
344
  }
345

346
  taosMemoryFree(pSrc);
43,481!
347
  if (TSDB_CODE_SUCCESS == code) {
43,481!
348
    *pDst = pTmp;
43,481✔
349
  }
350

351
  return code;
43,481✔
352
}
353

354
static void resetColValues(SArray* pValues) {
15,573✔
355
  int32_t num = taosArrayGetSize(pValues);
15,573✔
356
  for (int32_t i = 0; i < num; ++i) {
414,479✔
357
    SColVal* pVal = taosArrayGet(pValues, i);
398,906✔
358
    pVal->flag = CV_FLAG_NONE;
398,906✔
359
  }
360
}
15,573✔
361

362
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
11,858,116✔
363
                           SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals) {
364
  STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
11,858,116✔
365
  if (NULL != tmp) {
11,831,969✔
366
    *pTableCxt = *tmp;
2,029,677✔
367
    if (!ignoreColVals) {
2,029,677✔
368
      resetColValues((*pTableCxt)->pValues);
15,573✔
369
    }
370
    return TSDB_CODE_SUCCESS;
2,029,677✔
371
  }
372
  int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals);
9,802,292✔
373
  if (TSDB_CODE_SUCCESS == code) {
9,837,002!
374
    void* pData = *pTableCxt;  // deal scan coverity
9,837,230✔
375
    code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
9,837,230✔
376
  }
377

378
  if (TSDB_CODE_SUCCESS != code) {
9,840,215!
379
    insDestroyTableDataCxt(*pTableCxt);
×
380
  }
381
  return code;
9,839,273✔
382
}
383

384
static void destroyColVal(void* p) {
109,693,902✔
385
  SColVal* pVal = p;
109,693,902✔
386
  if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type ||
109,693,902✔
387
      TSDB_DATA_TYPE_VARBINARY == pVal->value.type || TSDB_DATA_TYPE_DECIMAL == pVal->value.type) {
104,160,992✔
388
    taosMemoryFreeClear(pVal->value.pData);
5,725,170!
389
  }
390
}
109,693,902✔
391

392
void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
9,873,891✔
393
  if (NULL == pTableCxt) {
9,873,891!
394
    return;
×
395
  }
396

397
  taosMemoryFreeClear(pTableCxt->pMeta);
9,873,891!
398
  tDestroyTSchema(pTableCxt->pSchema);
9,872,222!
399
  insDestroyBoundColInfo(&pTableCxt->boundColsInfo);
9,873,668✔
400
  taosArrayDestroyEx(pTableCxt->pValues, destroyColVal);
9,871,702✔
401
  if (pTableCxt->pData) {
9,873,706✔
402
    tDestroySubmitTbData(pTableCxt->pData, TSDB_MSG_FLG_ENCODE);
57,498✔
403
    taosMemoryFree(pTableCxt->pData);
57,497!
404
  }
405
  taosMemoryFree(pTableCxt);
9,873,720!
406
}
407

408
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
9,807,864✔
409
  if (NULL == pVgCxt) {
9,807,864!
410
    return;
×
411
  }
412

413
  tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
9,807,864✔
414
  taosMemoryFree(pVgCxt->pData);
9,823,149!
415
  taosMemoryFree(pVgCxt);
9,824,196!
416
}
417

418
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
19,462,328✔
419
  if (NULL == pVgCxtList) {
19,462,328✔
420
    return;
9,719,150✔
421
  }
422

423
  size_t size = taosArrayGetSize(pVgCxtList);
9,743,178✔
424
  for (int32_t i = 0; i < size; i++) {
19,598,712✔
425
    void* p = taosArrayGetP(pVgCxtList, i);
9,816,263✔
426
    insDestroyVgroupDataCxt(p);
9,809,050✔
427
  }
428

429
  taosArrayDestroy(pVgCxtList);
9,782,449✔
430
}
431

432
void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) {
×
433
  if (NULL == pVgCxtHash) {
×
434
    return;
×
435
  }
436

437
  void** p = taosHashIterate(pVgCxtHash, NULL);
×
438
  while (p) {
×
439
    insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p);
×
440

441
    p = taosHashIterate(pVgCxtHash, p);
×
442
  }
443

444
  taosHashCleanup(pVgCxtHash);
×
445
}
446

447
void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
9,763,264✔
448
  if (NULL == pTableCxtHash) {
9,763,264✔
449
    return;
20,889✔
450
  }
451

452
  void** p = taosHashIterate(pTableCxtHash, NULL);
9,742,375✔
453
  while (p) {
19,566,066✔
454
    insDestroyTableDataCxt(*(STableDataCxt**)p);
9,833,331✔
455

456
    p = taosHashIterate(pTableCxtHash, p);
9,832,598✔
457
  }
458

459
  taosHashCleanup(pTableCxtHash);
9,732,735✔
460
}
461

462
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) {
9,881,649✔
463
  if (NULL == pVgCxt->pData->aSubmitTbData) {
9,881,649✔
464
    pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
9,792,017✔
465
    if (NULL == pVgCxt->pData->aSubmitTbData) {
9,800,407!
466
      return terrno;
×
467
    }
468
  }
469

470
  // push data to submit, rebuild empty data for next submit
471
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData)) {
19,780,100!
472
    return terrno;
×
473
  }
474
  int32_t code = 0;
9,890,061✔
475
  if (isRebuild) {
9,890,061✔
476
    code = rebuildTableData(pTableCxt->pData, &pTableCxt->pData);
43,485✔
477
  } else if (clear) {
9,846,576✔
478
    taosMemoryFreeClear(pTableCxt->pData);
9,791,859!
479
  }
480

481
  parserDebug("uid:%" PRId64 ", add table data context to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
9,874,296✔
482

483
  return code;
9,881,432✔
484
}
485

486
static int32_t createVgroupDataCxt(int32_t vgId, SHashObj* pVgroupHash, SArray* pVgroupList,
9,773,978✔
487
                                   SVgroupDataCxt** pOutput) {
488
  SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
9,773,978!
489
  if (NULL == pVgCxt) {
9,817,001!
490
    return terrno;
×
491
  }
492
  pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2));
9,817,001!
493
  if (NULL == pVgCxt->pData) {
9,822,709!
494
    insDestroyVgroupDataCxt(pVgCxt);
×
495
    return terrno;
×
496
  }
497

498
  pVgCxt->vgId = vgId;
9,822,709✔
499
  int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
9,822,709✔
500
  if (TSDB_CODE_SUCCESS == code) {
9,803,179!
501
    if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) {
9,793,207!
502
      code = terrno;
×
503
      insDestroyVgroupDataCxt(pVgCxt);
×
UNCOV
504
      return code;
×
505
    }
506
    //    uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
507
    *pOutput = pVgCxt;
9,793,207✔
508
  } else {
509
    insDestroyVgroupDataCxt(pVgCxt);
×
510
  }
511
  return code;
9,795,393✔
512
}
513

514
int insColDataComp(const void* lp, const void* rp) {
90,623✔
515
  SColData* pLeft = (SColData*)lp;
90,623✔
516
  SColData* pRight = (SColData*)rp;
90,623✔
517
  if (pLeft->cid < pRight->cid) {
90,623✔
518
    return -1;
89,509✔
519
  } else if (pLeft->cid > pRight->cid) {
1,114!
520
    return 1;
1,122✔
521
  }
522

523
  return 0;
×
524
}
525

526
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId,
12,358✔
527
                                 STableColsData* pTbData, SName* sname) {
528
  if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) {
12,358!
529
    return TSDB_CODE_SUCCESS;
12,304✔
530
  }
531

532
  SVgroupInfo      vgInfo = {0};
56✔
533
  SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
56✔
534
                           .requestId = pBuildInfo->requestId,
56✔
535
                           .requestObjRefId = pBuildInfo->requestSelf,
56✔
536
                           .mgmtEps = pBuildInfo->mgmtEpSet};
537

538
  int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo);
56✔
539
  if (TSDB_CODE_SUCCESS != code) {
56!
540
    return code;
×
541
  }
542

543
  code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
56✔
544
  if (TSDB_CODE_SUCCESS != code) {
56!
545
    return code;
×
546
  }
547

548
  return TSDB_CODE_SUCCESS;
56✔
549
}
550

551
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData,
56,238✔
552
                             uint64_t* uid, int32_t* vgId) {
553
  STableVgUid* pTbInfo = NULL;
56,238✔
554
  int32_t      code = 0;
56,238✔
555

556
  if (pTbData->getFromHash) {
56,238✔
557
    pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName));
43,886✔
558
  }
559

560
  if (NULL == pTbInfo) {
56,267✔
561
    SName sname;
562
    code = qCreateSName(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0);
12,386✔
563
    if (TSDB_CODE_SUCCESS != code) {
12,385!
564
      return code;
28✔
565
    }
566

567
    STableMeta*      pTableMeta = NULL;
12,385✔
568
    SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
12,385✔
569
                             .requestId = pBuildInfo->requestId,
12,385✔
570
                             .requestObjRefId = pBuildInfo->requestSelf,
12,385✔
571
                             .mgmtEps = pBuildInfo->mgmtEpSet};
572
    code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta);
12,385✔
573

574
    if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
12,388✔
575
      parserDebug("tb:%s.%s not exist", sname.dbname, sname.tname);
28!
576
      return code;
28✔
577
    }
578

579
    if (TSDB_CODE_SUCCESS != code) {
12,360!
580
      return code;
×
581
    }
582

583
    *uid = pTableMeta->uid;
12,360✔
584
    *vgId = pTableMeta->vgId;
12,360✔
585

586
    STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId};
12,360✔
587
    code = tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo));
12,360✔
588
    if (TSDB_CODE_SUCCESS == code) {
12,356!
589
      code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname);
12,359✔
590
    }
591

592
    taosMemoryFree(pTableMeta);
12,357!
593
  } else {
594
    *uid = pTbInfo->uid;
43,881✔
595
    *vgId = pTbInfo->vgid;
43,881✔
596
  }
597

598
  return code;
56,239✔
599
}
600

601
int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
×
602
  int32_t             code = TSDB_CODE_SUCCESS;
×
603
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
×
604

605
  if (TSDB_CODE_SUCCESS == code) {
×
606
    code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
×
607
  }
608

609
  return code;
×
610
}
611

612
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
56,268✔
613
                                  SStbInterlaceInfo* pBuildInfo, SVCreateTbReq* ctbReq) {
614
  int32_t  code = TSDB_CODE_SUCCESS;
56,268✔
615
  uint64_t uid;
616
  int32_t  vgId;
617

618
  pTbCtx->pData->aRowP = pTbData->aCol;
56,268✔
619

620
  code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId);
56,268✔
621
  if (ctbReq !=NULL && code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
56,265✔
622
    pTbCtx->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
28✔
623
    vgId = (int32_t)ctbReq->uid;
28✔
624
    uid = 0;
28✔
625
    pTbCtx->pMeta->vgId = (int32_t)ctbReq->uid;
28✔
626
    ctbReq->uid = 0;
28✔
627
    pTbCtx->pData->pCreateTbReq = ctbReq;
28✔
628
    code = TSDB_CODE_SUCCESS;
28✔
629
  } else {
630
    if (TSDB_CODE_SUCCESS != code) {
56,237!
631
      return code;
×
632
    }
633
    pTbCtx->pMeta->vgId = vgId;
56,237✔
634
    pTbCtx->pMeta->uid = uid;
56,237✔
635
    pTbCtx->pData->uid = uid;
56,237✔
636
    pTbCtx->pData->pCreateTbReq = NULL;
56,237✔
637

638
    if (ctbReq != NULL) {
56,237✔
639
      tdDestroySVCreateTbReq(ctbReq);
640
      taosMemoryFree(ctbReq);
35!
641
      ctbReq = NULL;
35✔
642
    }
643
  }
644

645
  if (!pTbData->isOrdered) {
56,265✔
646
    code = tRowSort(pTbCtx->pData->aRowP);
50,980✔
647
  }
648
  if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
56,271!
649
    code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0);
50,988✔
650
  }
651

652
  if (TSDB_CODE_SUCCESS != code) {
56,208!
UNCOV
653
    return code;
×
654
  }
655

656
  SVgroupDataCxt* pVgCxt = NULL;
56,208✔
657
  void**          pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
56,208✔
658
  if (NULL == pp) {
56,278✔
659
    pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
23,218✔
660
    if (NULL == pp) {
23,207!
661
      code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
23,207✔
662
    } else {
UNCOV
663
      pVgCxt = *(SVgroupDataCxt**)pp;
×
664
    }
665
  } else {
666
    pVgCxt = *(SVgroupDataCxt**)pp;
33,060✔
667
  }
668

669
  if (TSDB_CODE_SUCCESS == code) {
56,258✔
670
    code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false);
56,224✔
671
  }
672

673
  if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
56,211!
UNCOV
674
    code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
×
675
    // taosArrayClear(pVgCxt->pData->aSubmitTbData);
UNCOV
676
    tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
×
677
    // insDestroyVgroupDataCxt(pVgCxt);
678
  }
679

680
  return code;
56,165✔
681
}
682

683
/*
684
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild,
685
int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
686
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
687
  if (NULL == pVgroupHash || NULL == pVgroupList) {
688
    taosHashCleanup(pVgroupHash);
689
    taosArrayDestroy(pVgroupList);
690
    return TSDB_CODE_OUT_OF_MEMORY;
691
  }
692

693
  int32_t code = TSDB_CODE_SUCCESS;
694

695
  for (int32_t i = 0; i < tbNum; ++i) {
696
    STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i);
697
    pTableCxt->pMeta->vgId = pTableCols->vgId;
698
    pTableCxt->pMeta->uid = pTableCols->uid;
699
    pTableCxt->pData->uid = pTableCols->uid;
700
    pTableCxt->pData->aCol = pTableCols->aCol;
701

702
    SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
703
    if (pCol->nVal <= 0) {
704
      continue;
705
    }
706

707
    if (pTableCxt->pData->pCreateTbReq) {
708
      pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
709
    }
710

711
    taosArraySort(pTableCxt->pData->aCol, insColDataComp);
712

713
    tColDataSortMerge(pTableCxt->pData->aCol);
714

715
    if (TSDB_CODE_SUCCESS == code) {
716
      SVgroupDataCxt* pVgCxt = NULL;
717
      int32_t         vgId = pTableCxt->pMeta->vgId;
718
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
719
      if (NULL == pp) {
720
        code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
721
      } else {
722
        pVgCxt = *(SVgroupDataCxt**)pp;
723
      }
724
      if (TSDB_CODE_SUCCESS == code) {
725
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false);
726
      }
727
    }
728
  }
729

730
  taosHashCleanup(pVgroupHash);
731
  if (TSDB_CODE_SUCCESS == code) {
732
    *pVgDataBlocks = pVgroupList;
733
  } else {
734
    insDestroyVgroupDataCxtList(pVgroupList);
735
  }
736

737
  return code;
738
}
739
*/
740

741
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
9,692,821✔
742
  SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
9,692,821✔
743
  SArray*   pVgroupList = taosArrayInit(8, POINTER_BYTES);
9,737,652✔
744
  if (NULL == pVgroupHash || NULL == pVgroupList) {
9,751,237!
UNCOV
745
    taosHashCleanup(pVgroupHash);
×
UNCOV
746
    taosArrayDestroy(pVgroupList);
×
UNCOV
747
    return terrno;
×
748
  }
749

750
  int32_t code = TSDB_CODE_SUCCESS;
9,751,758✔
751
  bool    colFormat = false;
9,751,758✔
752

753
  void* p = taosHashIterate(pTableHash, NULL);
9,751,758✔
754
  if (p) {
9,757,629!
755
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
9,757,943✔
756
    colFormat = (0 != (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT));
9,757,943✔
757
  }
758

759
  while (TSDB_CODE_SUCCESS == code && NULL != p) {
19,582,234✔
760
    STableDataCxt* pTableCxt = *(STableDataCxt**)p;
9,855,902✔
761
    if (colFormat) {
9,855,902✔
762
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
41,003✔
763
      if (pCol && pCol->nVal <= 0) {
41,003!
764
        p = taosHashIterate(pTableHash, p);
16✔
765
        continue;
16✔
766
      }
767

768
      if (pTableCxt->pData->pCreateTbReq) {
40,987✔
769
        pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
40,161✔
770
      }
771

772
      taosArraySort(pTableCxt->pData->aCol, insColDataComp);
40,987✔
773

774
      code = tColDataSortMerge(&pTableCxt->pData->aCol);
40,978✔
775
    } else {
776
      // skip the table has no data to insert
777
      // eg: import a csv without valid data
778
      // if (0 == taosArrayGetSize(pTableCxt->pData->aRowP)) {
779
      //   parserWarn("no row in tableDataCxt uid:%" PRId64 " ", pTableCxt->pMeta->uid);
780
      //   p = taosHashIterate(pTableHash, p);
781
      //   continue;
782
      // }
783
      if (!pTableCxt->ordered) {
9,814,899✔
784
        code = tRowSort(pTableCxt->pData->aRowP);
49,115✔
785
      }
786
      if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
9,814,899✔
787
        code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
48,276✔
788
      }
789
    }
790

791
    if (TSDB_CODE_SUCCESS == code) {
9,853,449!
792
      SVgroupDataCxt* pVgCxt = NULL;
9,853,449✔
793
      int32_t         vgId = pTableCxt->pMeta->vgId;
9,853,449✔
794
      void**          pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
9,853,449✔
795
      if (NULL == pp) {
9,811,261✔
796
        code = createVgroupDataCxt(vgId, pVgroupHash, pVgroupList, &pVgCxt);
9,751,995✔
797
      } else {
798
        pVgCxt = *(SVgroupDataCxt**)pp;
59,266✔
799
      }
800
      if (TSDB_CODE_SUCCESS == code) {
9,826,439!
801
        code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true);
9,828,540✔
802
      }
803
    }
804
    if (TSDB_CODE_SUCCESS == code) {
9,825,542!
805
      p = taosHashIterate(pTableHash, p);
9,827,834✔
806
    }
807
  }
808

809
  taosHashCleanup(pVgroupHash);
9,726,332✔
810
  if (TSDB_CODE_SUCCESS == code) {
9,725,956!
811
    *pVgDataBlocks = pVgroupList;
9,727,694✔
812
  } else {
UNCOV
813
    insDestroyVgroupDataCxtList(pVgroupList);
×
814
  }
815

816
  return code;
9,729,301✔
817
}
818

819
static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
9,768,421✔
820
  int32_t  code = TSDB_CODE_SUCCESS;
9,768,421✔
821
  uint32_t len = 0;
9,768,421✔
822
  void*    pBuf = NULL;
9,768,421✔
823
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
9,768,421!
824
  if (TSDB_CODE_SUCCESS == code) {
9,775,396!
825
    SEncoder encoder;
826
    len += sizeof(SSubmitReq2Msg);
9,777,146✔
827
    pBuf = taosMemoryMalloc(len);
9,777,146!
828
    if (NULL == pBuf) {
9,746,278!
UNCOV
829
      return terrno;
×
830
    }
831
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
9,746,278✔
832
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
9,746,278✔
833
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
9,746,278✔
834
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
9,753,465✔
835
    code = tEncodeSubmitReq(&encoder, pReq);
9,763,099✔
836
    tEncoderClear(&encoder);
9,797,752✔
837
  }
838

839
  if (TSDB_CODE_SUCCESS == code) {
9,816,494!
840
    *pData = pBuf;
9,816,494✔
841
    *pLen = len;
9,816,494✔
842
  } else {
UNCOV
843
    taosMemoryFree(pBuf);
×
844
  }
845
  return code;
9,789,059✔
846
}
847

848
static void destroyVgDataBlocks(void* p) {
×
849
  if (p == NULL) return;
×
UNCOV
850
  SVgDataBlocks* pVg = p;
×
UNCOV
851
  taosMemoryFree(pVg->pData);
×
UNCOV
852
  taosMemoryFree(pVg);
×
853
}
854

855
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) {
9,738,167✔
856
  size_t  numOfVg = taosArrayGetSize(pVgDataCxtList);
9,738,167✔
857
  SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES);
9,743,528!
858
  if (NULL == pDataBlocks) {
9,766,236!
UNCOV
859
    return TSDB_CODE_OUT_OF_MEMORY;
×
860
  }
861

862
  int32_t code = TSDB_CODE_SUCCESS;
9,766,236✔
863
  for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
19,555,834✔
864
    SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
9,797,688✔
865
    if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) {
9,789,614!
UNCOV
866
      continue;
×
867
    }
868
    SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
9,790,142!
869
    if (NULL == dst) {
9,794,548!
UNCOV
870
      code = terrno;
×
871
    }
872
    if (TSDB_CODE_SUCCESS == code) {
9,794,548!
873
      dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
9,796,465✔
874
      code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
9,797,184✔
875
    }
876
    if (TSDB_CODE_SUCCESS == code) {
9,777,365!
877
      code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
9,777,365✔
878
    }
879
    if (TSDB_CODE_SUCCESS == code) {
9,788,863!
880
      code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);
9,782,329!
881
    }
882
    if (TSDB_CODE_SUCCESS != code) {
9,775,735!
UNCOV
883
      destroyVgDataBlocks(dst);
×
884
    }
885
  }
886

887
  if (append) {
9,758,146✔
888
    if (NULL == *pVgDataBlocks) {
22,193!
889
      *pVgDataBlocks = pDataBlocks;
22,193✔
890
    }
891
    return code;
22,193✔
892
  }
893

894
  if (TSDB_CODE_SUCCESS == code) {
9,735,953✔
895
    *pVgDataBlocks = pDataBlocks;
9,721,003✔
896
  } else {
897
    taosArrayDestroyP(pDataBlocks, destroyVgDataBlocks);
14,950✔
898
  }
899

900
  return code;
9,720,628✔
901
}
902

903
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
×
UNCOV
904
  for (int i = 0; i < numFields; i++) {
×
UNCOV
905
    if (strcmp(pSchema->name, fields[i].name) == 0) {
×
UNCOV
906
      return true;
×
907
    }
908
  }
909

UNCOV
910
  return false;
×
911
}
912

913
int32_t checkSchema(SSchema* pColSchema, SSchemaExt* pColExtSchema, int8_t* fields, char* errstr, int32_t errstrLen) {
606✔
914
  if (*fields != pColSchema->type) {
606✔
915
    if (errstr != NULL)
1!
UNCOV
916
      snprintf(errstr, errstrLen, "column type not equal, name:%s, schema type:%s, data type:%s", pColSchema->name,
×
UNCOV
917
               tDataTypes[pColSchema->type].name, tDataTypes[*fields].name);
×
918
    return TSDB_CODE_INVALID_PARA;
1✔
919
  }
920

921
  if (IS_DECIMAL_TYPE(pColSchema->type)) {
605!
922
    uint8_t precision = 0, scale = 0;
1✔
923
    decimalFromTypeMod(pColExtSchema->typeMod, &precision, &scale);
1✔
924
    uint8_t precisionData = 0, scaleData  = 0;
1✔
925
    int32_t bytes = *(int32_t*)(fields + sizeof(int8_t));
1✔
926
    extractDecimalTypeInfoFromBytes(&bytes, &precisionData, &scaleData);
1✔
927
    if (precision != precisionData || scale != scaleData) {
1!
UNCOV
928
      if (errstr != NULL)
×
929
        snprintf(errstr, errstrLen,
×
930
                 "column decimal type not equal, name:%s, schema type:%s, precision:%d, scale:%d, data type:%s, "
931
                 "precision:%d, scale:%d",
UNCOV
932
                 pColSchema->name, tDataTypes[pColSchema->type].name, precision, scale, tDataTypes[*fields].name,
×
933
                 precisionData, scaleData);
UNCOV
934
      return TSDB_CODE_INVALID_PARA;
×
935
    }
936
    return 0;
1✔
937
  }
938

939
  if (IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) > pColSchema->bytes) {
604!
940
    if (errstr != NULL)
1!
941
      snprintf(errstr, errstrLen,
×
942
               "column var data bytes error, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
UNCOV
943
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
UNCOV
944
               *(int32_t*)(fields + sizeof(int8_t)));
×
945
    return TSDB_CODE_INVALID_PARA;
1✔
946
  }
947

948
  if (!IS_VAR_DATA_TYPE(pColSchema->type) && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
603!
949
    if (errstr != NULL)
×
950
      snprintf(errstr, errstrLen,
×
951
               "column normal data bytes not equal, name:%s, schema type:%s, bytes:%d, data type:%s, bytes:%d",
UNCOV
952
               pColSchema->name, tDataTypes[pColSchema->type].name, pColSchema->bytes, tDataTypes[*fields].name,
×
UNCOV
953
               *(int32_t*)(fields + sizeof(int8_t)));
×
UNCOV
954
    return TSDB_CODE_INVALID_PARA;
×
955
  }
956
  return 0;
603✔
957
}
958

959
#define PRCESS_DATA(i, j)                                                                                 \
960
  ret = checkSchema(pColSchema, pColExtSchema, fields, errstr, errstrLen);                                               \
961
  if (ret != 0) {                                                                                         \
962
    goto end;                                                                                             \
963
  }                                                                                                       \
964
                                                                                                          \
965
  if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {                                                 \
966
    hasTs = true;                                                                                         \
967
  }                                                                                                       \
968
                                                                                                          \
969
  int8_t* offset = pStart;                                                                                \
970
  if (IS_VAR_DATA_TYPE(pColSchema->type)) {                                                               \
971
    pStart += numOfRows * sizeof(int32_t);                                                                \
972
  } else {                                                                                                \
973
    pStart += BitmapLen(numOfRows);                                                                       \
974
  }                                                                                                       \
975
  char* pData = pStart;                                                                                   \
976
                                                                                                          \
977
  SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);                                               \
978
  ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); \
979
  if (ret != 0) {                                                                                         \
980
    goto end;                                                                                             \
981
  }                                                                                                       \
982
  fields += sizeof(int8_t) + sizeof(int32_t);                                                             \
983
  if (needChangeLength && version == BLOCK_VERSION_1) {                                                   \
984
    pStart += htonl(colLength[i]);                                                                        \
985
  } else {                                                                                                \
986
    pStart += colLength[i];                                                                               \
987
  }                                                                                                       \
988
  boundInfo->pColIndex[j] = -1;
989

990
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
135✔
991
                     int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
992
  int ret = 0;
135✔
993
  if (data == NULL) {
135!
UNCOV
994
    uError("rawBlockBindData, data is NULL");
×
UNCOV
995
    return TSDB_CODE_APP_ERROR;
×
996
  }
997
  void* tmp =
998
      taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
135✔
999
  SVCreateTbReq* pCreateReqTmp = NULL;
135✔
1000
  if (tmp == NULL && pCreateTb != NULL) {
135✔
1001
    ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
18✔
1002
    if (ret != TSDB_CODE_SUCCESS) {
18!
UNCOV
1003
      uError("cloneSVreateTbReq error");
×
UNCOV
1004
      goto end;
×
1005
    }
1006
  }
1007

1008
  STableDataCxt* pTableCxt = NULL;
135✔
1009
  ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
135✔
1010
                           sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
1011
  if (pCreateReqTmp != NULL) {
135!
UNCOV
1012
    tdDestroySVCreateTbReq(pCreateReqTmp);
×
UNCOV
1013
    taosMemoryFree(pCreateReqTmp);
×
1014
  }
1015

1016
  if (ret != TSDB_CODE_SUCCESS) {
135!
UNCOV
1017
    uError("insGetTableDataCxt error");
×
UNCOV
1018
    goto end;
×
1019
  }
1020

1021
  pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
135✔
1022
  if (tmp == NULL) {
135✔
1023
    ret = initTableColSubmitData(pTableCxt);
122✔
1024
    if (ret != TSDB_CODE_SUCCESS) {
122!
UNCOV
1025
      uError("initTableColSubmitData error");
×
UNCOV
1026
      goto end;
×
1027
    }
1028
  }
1029

1030
  char* p = (char*)data;
135✔
1031
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1032
  // column length |
1033
  int32_t version = *(int32_t*)data;
135✔
1034
  p += sizeof(int32_t);
135✔
1035
  p += sizeof(int32_t);
135✔
1036

1037
  int32_t numOfRows = *(int32_t*)p;
135✔
1038
  p += sizeof(int32_t);
135✔
1039

1040
  int32_t numOfCols = *(int32_t*)p;
135✔
1041
  p += sizeof(int32_t);
135✔
1042

1043
  p += sizeof(int32_t);
135✔
1044
  p += sizeof(uint64_t);
135✔
1045

1046
  int8_t* fields = p;
135✔
1047
  if (*fields >= TSDB_DATA_TYPE_MAX || *fields < 0) {
135!
UNCOV
1048
    uError("fields type error:%d", *fields);
×
UNCOV
1049
    ret = TSDB_CODE_INVALID_PARA;
×
UNCOV
1050
    goto end;
×
1051
  }
1052
  p += numOfCols * (sizeof(int8_t) + sizeof(int32_t));
135✔
1053

1054
  int32_t* colLength = (int32_t*)p;
135✔
1055
  p += sizeof(int32_t) * numOfCols;
135✔
1056

1057
  char* pStart = p;
135✔
1058

1059
  SSchema*          pSchema     = getTableColumnSchema(pTableCxt->pMeta);
135✔
1060
  SSchemaExt*       pExtSchemas = getTableColumnExtSchema(pTableCxt->pMeta);
135✔
1061
  SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
135✔
1062

1063
  if (tFields != NULL && numFields != numOfCols) {
135!
UNCOV
1064
    if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to data cols:%d", numFields, numOfCols);
×
UNCOV
1065
    ret = TSDB_CODE_INVALID_PARA;
×
UNCOV
1066
    goto end;
×
1067
  }
1068

1069
  bool hasTs = false;
135✔
1070
  if (tFields == NULL) {
135✔
1071
    int32_t len = TMIN(numOfCols, boundInfo->numOfBound);
6✔
1072
    for (int j = 0; j < len; j++) {
18✔
1073
      SSchema* pColSchema = &pSchema[j];
14✔
1074
      SSchemaExt* pColExtSchema = &pExtSchemas[j];
14✔
1075
      PRCESS_DATA(j, j)
14!
1076
    }
1077
  } else {
1078
    for (int i = 0; i < numFields; i++) {
679✔
1079
      for (int j = 0; j < boundInfo->numOfBound; j++) {
4,402!
1080
        SSchema* pColSchema = &pSchema[j];
4,402✔
1081
        SSchemaExt* pColExtSchema = &pExtSchemas[j];
4,402✔
1082
        char*    fieldName = NULL;
4,402✔
1083
        if (raw) {
4,402✔
1084
          fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
4,397✔
1085
        } else {
1086
          fieldName = ((TAOS_FIELD*)tFields)[i].name;
5✔
1087
        }
1088
        if (strcmp(pColSchema->name, fieldName) == 0) {
4,402✔
1089
          PRCESS_DATA(i, j)
550!
1090
          break;
550✔
1091
        }
1092
      }
1093
    }
1094
  }
1095

1096
  if (!hasTs) {
133!
UNCOV
1097
    if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
×
UNCOV
1098
    ret = TSDB_CODE_INVALID_PARA;
×
UNCOV
1099
    goto end;
×
1100
  }
1101

1102
  // process NULL data
1103
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
707✔
1104
    if (boundInfo->pColIndex[c] != -1) {
574✔
1105
      SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
13✔
1106
      ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
13✔
1107
      if (ret != 0) {
13!
UNCOV
1108
        goto end;
×
1109
      }
1110
    } else {
1111
      boundInfo->pColIndex[c] = c;  // restore for next block
561✔
1112
    }
1113
  }
1114

1115
end:
133✔
1116
  return ret;
135✔
1117
}
1118

1119
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data) {
17✔
1120
  int code = transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion);
17✔
1121
  if (code != 0){
17!
UNCOV
1122
    return code;
×
1123
  }
1124
  SVgroupDataCxt* pVgCxt = NULL;
17✔
1125
  void**          pp = taosHashGet(pVgroupHash, &pTableMeta->vgId, sizeof(pTableMeta->vgId));
17✔
1126
  if (NULL == pp) {
17!
1127
    code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt);
17✔
1128
    if (code != 0){
17!
1129
      return code;
×
1130
    }
1131
  } else {
UNCOV
1132
    pVgCxt = *(SVgroupDataCxt**)pp;
×
1133
  }
1134
  if (NULL == pVgCxt->pData->aSubmitTbData) {
17!
1135
    pVgCxt->pData->aSubmitTbData = taosArrayInit(0, POINTER_BYTES);
17✔
1136
    pVgCxt->pData->raw = true;
17✔
1137
    if (NULL == pVgCxt->pData->aSubmitTbData) {
17!
UNCOV
1138
      return terrno;
×
1139
    }
1140
  }
1141

1142
  // push data to submit, rebuild empty data for next submit
1143
  if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, &data)) {
34!
UNCOV
1144
    return terrno;
×
1145
  }
1146

1147
  uTrace("add raw data to vgId:%d, len:%d", pTableMeta->vgId, *(int32_t*)data);
17!
1148

1149
  return 0;
17✔
1150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc