• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4308

14 Jun 2025 02:06PM UTC coverage: 62.454% (-0.3%) from 62.777%
#4308

push

travis-ci

web-flow
fix: taosdump windows pthread_mutex_unlock crash(3.0) (#31357)

* fix: windows pthread_mutex_unlock crash

* enh: sync from main fix taosdump crash windows

* fix: restore .github action branch to main

153985 of 315105 branches covered (48.87%)

Branch coverage included in aggregate %.

238120 of 312727 relevant lines covered (76.14%)

6462519.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.8
/source/libs/parser/src/parInsertSql.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "geosWrapper.h"
17
#include "parInsertUtil.h"
18
#include "parToken.h"
19
#include "scalar.h"
20
#include "tglobal.h"
21
#include "ttime.h"
22
#include "decimal.h"
23

24
// CSV delimiter and quote character definitions
25
#define CSV_DEFAULT_DELIMITER ','
26
#define CSV_QUOTE_SINGLE      '\''
27
#define CSV_QUOTE_DOUBLE      '"'
28
#define CSV_ESCAPE_CHAR       '\\'
29
#define CSV_QUOTE_NONE        '\0'
30

31
typedef struct SCsvParser {
32
  char      delimiter;            // Field delimiter (default: ',')
33
  char      quote;                // Quote character (default: '"')
34
  char      escape;               // Escape character (default: '\')
35
  bool      allowNewlineInField;  // Allow newlines in quoted fields
36
  char*     buffer;               // Read buffer
37
  size_t    bufferSize;           // Buffer size
38
  size_t    bufferPos;            // Current position in buffer
39
  size_t    bufferLen;            // Valid data length in buffer
40
  bool      eof;                  // End of file reached
41
  TdFilePtr pFile;                // File pointer
42
  // Line buffer for reuse to avoid frequent memory allocation
43
  char*  lineBuffer;          // Reusable line buffer
44
  size_t lineBufferCapacity;  // Line buffer capacity
45
} SCsvParser;
46

47
typedef struct SInsertParseContext {
48
  SParseContext* pComCxt;
49
  SMsgBuf        msg;
50
  char           tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
51
  SBoundColInfo  tags;  // for stmt
52
  bool           missCache;
53
  bool           usingDuplicateTable;
54
  bool           forceUpdate;
55
  bool           needTableTagVal;
56
  bool           needRequest;  // whether or not request server
57
  bool           isStmtBind;   // whether is stmt bind
58
  uint8_t        stmtTbNameFlag;
59
} SInsertParseContext;
60

61
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
62
static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt);
63
static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate);
64

65
// CSV parser function declarations
66
static int32_t csvParserInit(SCsvParser* parser, TdFilePtr pFile);
67
static void    csvParserDestroy(SCsvParser* parser);
68
static int32_t csvParserFillBuffer(SCsvParser* parser);
69
static int32_t csvParserReadLine(SCsvParser* parser);
70
static void    destroySavedCsvParser(SVnodeModifyOpStmt* pStmt);
71
static int32_t csvParserExpandLineBuffer(SCsvParser* parser, size_t requiredLen);
72

73
static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
74
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
75

76
static FORCE_INLINE bool isNullValue(int8_t dataType, SToken* pToken) {
77
  return TK_NULL == pToken->type ||
1,129,140,625✔
78
         (TK_NK_STRING == pToken->type && !IS_STR_DATA_TYPE(dataType) && IS_NULL_STR(pToken->z, pToken->n));
1,121,368,388!
79
}
80

81
static FORCE_INLINE int32_t toDouble(SToken* pToken, double* value, char** endPtr) {
82
  SET_ERRNO(0);
83
  *value = taosStr2Double(pToken->z, endPtr);
84

85
  // not a valid integer number, return error
86
  if ((*endPtr - pToken->z) != pToken->n) {
87
    return TK_NK_ILLEGAL;
88
  }
89

90
  return pToken->type;
91
}
92

93
static int32_t skipInsertInto(const char** pSql, SMsgBuf* pMsg) {
1,853,014✔
94
  SToken token;
95
  NEXT_TOKEN(*pSql, token);
1,853,014✔
96
  if (TK_INSERT != token.type && TK_IMPORT != token.type) {
1,853,089!
97
    return buildSyntaxErrMsg(pMsg, "keyword INSERT is expected", token.z);
×
98
  }
99
  NEXT_TOKEN(*pSql, token);
1,853,089✔
100
  if (TK_INTO != token.type) {
1,853,069!
101
    return buildSyntaxErrMsg(pMsg, "keyword INTO is expected", token.z);
×
102
  }
103
  return TSDB_CODE_SUCCESS;
1,853,095✔
104
}
105

106
static int32_t skipParentheses(SInsertParseContext* pCxt, const char** pSql) {
464,338✔
107
  SToken  token;
108
  int32_t expectRightParenthesis = 1;
464,338✔
109
  while (1) {
110
    NEXT_TOKEN(*pSql, token);
9,861,602✔
111
    if (TK_NK_LP == token.type) {
9,861,602!
112
      ++expectRightParenthesis;
×
113
    } else if (TK_NK_RP == token.type && 0 == --expectRightParenthesis) {
9,861,602!
114
      break;
464,338✔
115
    }
116
    if (0 == token.n) {
9,397,264!
117
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", NULL);
×
118
    }
119
  }
120
  return TSDB_CODE_SUCCESS;
464,338✔
121
}
122

123
static int32_t skipTableOptions(SInsertParseContext* pCxt, const char** pSql) {
18,614✔
124
  do {
×
125
    int32_t index = 0;
18,614✔
126
    SToken  token;
127
    NEXT_TOKEN_KEEP_SQL(*pSql, token, index);
18,614✔
128
    if (TK_TTL == token.type || TK_COMMENT == token.type) {
18,614!
129
      *pSql += index;
×
130
      NEXT_TOKEN_WITH_PREV(*pSql, token);
×
131
    } else {
132
      break;
133
    }
134
  } while (1);
135
  return TSDB_CODE_SUCCESS;
18,614✔
136
}
137

138
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
139
static int32_t ignoreUsingClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
5✔
140
  const char** pSql = &pStmt->pSql;
5✔
141
  int32_t code = TSDB_CODE_SUCCESS;
5✔
142
  SToken  token;
143
  NEXT_TOKEN(*pSql, token);
5✔
144

145
  NEXT_TOKEN(*pSql, token);
5✔
146
  if (TK_NK_LP == token.type) {
5!
147
    code = skipParentheses(pCxt, pSql);
×
148
    if (TSDB_CODE_SUCCESS == code) {
×
149
      NEXT_TOKEN(*pSql, token);
×
150
    }
151
  }
152

153
  // pSql -> TAGS (tag1_value, ...)
154
  if (TSDB_CODE_SUCCESS == code) {
5!
155
    if (TK_TAGS != token.type) {
5!
156
      code = buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", token.z);
×
157
    } else {
158
      NEXT_TOKEN(*pSql, token);
5✔
159
    }
160
  }
161
  if (TSDB_CODE_SUCCESS == code) {
5!
162
    if (TK_NK_LP != token.type) {
5!
163
      code = buildSyntaxErrMsg(&pCxt->msg, "( is expected", token.z);
×
164
    } else {
165
      code = skipParentheses(pCxt, pSql);
5✔
166
    }
167
  }
168

169
  if (TSDB_CODE_SUCCESS == code) {
5!
170
    code = skipTableOptions(pCxt, pSql);
5✔
171
  }
172

173
  return code;
5✔
174
}
175

176
static int32_t ignoreUsingClauseAndCheckTagValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
18,618✔
177
  const char** pSql = &pStmt->pSql;
18,618✔
178
  int32_t      code = TSDB_CODE_SUCCESS;
18,618✔
179
  code = parseBoundTagsClause(pCxt, pStmt);
18,618✔
180
  if (TSDB_CODE_SUCCESS != code) {
18,618!
181
    return code;
×
182
  }
183
  // pSql -> TAGS (tag1_value, ...)
184
  code = parseTagsClause(pCxt, pStmt, true);
18,618✔
185
  if (TSDB_CODE_SUCCESS != code) {
18,618✔
186
    return code;
9✔
187
  }
188

189
  if (TSDB_CODE_SUCCESS == code) {
18,609!
190
    code = skipTableOptions(pCxt, pSql);
18,609✔
191
  }
192

193
  return code;
18,609✔
194
}
195

196
static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* pDuplicate) {
79,639✔
197
  int32_t code = TSDB_CODE_SUCCESS;
79,639✔
198
  *pDuplicate = false;
79,639✔
199

200
  char tbFName[TSDB_TABLE_FNAME_LEN];
201
  code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
79,639✔
202
  if (TSDB_CODE_SUCCESS != code) {
79,645!
203
    return code;
×
204
  }
205
  STableMeta** pMeta = taosHashGet(pStmt->pSubTableHashObj, tbFName, strlen(tbFName));
79,645✔
206
  if (NULL != pMeta) {
79,651✔
207
    *pDuplicate = true;
5✔
208
    pCxt->missCache = false;
5✔
209
    code = cloneTableMeta(*pMeta, &pStmt->pTableMeta);
5✔
210
    if (TSDB_CODE_SUCCESS != code) {
5!
211
      return code;
×
212
    }
213
    return ignoreUsingClause(pCxt, pStmt);
5✔
214
  }
215

216
  return code;
79,646✔
217
}
218

219
typedef enum { BOUND_TAGS, BOUND_COLUMNS, BOUND_ALL_AND_TBNAME } EBoundColumnsType;
220

221
static int32_t getTbnameSchemaIndex(STableMeta* pTableMeta) {
2,565,998✔
222
  return pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns;
2,565,998✔
223
}
224

225
// pStmt->pSql -> field1_name, ...)
226
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, EBoundColumnsType boundColsType,
464,761✔
227
                                 STableMeta* pTableMeta, SBoundColInfo* pBoundInfo) {
228
  SSchema* pSchema = NULL;
464,761✔
229
  if (boundColsType == BOUND_TAGS) {
464,761✔
230
    pSchema = getTableTagSchema(pTableMeta);
346✔
231
  } else if (boundColsType == BOUND_COLUMNS) {
464,415✔
232
    pSchema = getTableColumnSchema(pTableMeta);
453,820✔
233
  } else {
234
    pSchema = pTableMeta->schema;
10,595✔
235
    if (pBoundInfo->numOfCols != getTbnameSchemaIndex(pTableMeta) + 1) {
10,595!
236
      return TSDB_CODE_PAR_INTERNAL_ERROR;
×
237
    }
238
  }
239
  int32_t tbnameSchemaIndex = getTbnameSchemaIndex(pTableMeta);
464,761✔
240

241
  bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
464,761!
242
  if (NULL == pUseCols) {
464,761!
243
    return terrno;
×
244
  }
245

246
  pBoundInfo->numOfBound = 0;
464,761✔
247
  pBoundInfo->hasBoundCols = true;
464,761✔
248

249
  bool    hasPK = pTableMeta->tableInfo.numOfPKs;
464,761✔
250
  int16_t numOfBoundPKs = 0;
464,761✔
251
  int16_t lastColIdx = -1;  // last column found
464,761✔
252
  int32_t code = TSDB_CODE_SUCCESS;
464,761✔
253
  while (TSDB_CODE_SUCCESS == code) {
9,862,687✔
254
    SToken token;
255
    NEXT_TOKEN(*pSql, token);
9,862,683✔
256

257
    if (TK_NK_RP == token.type) {
9,862,683✔
258
      break;
464,757✔
259
    }
260

261
    char tmpTokenBuf[TSDB_COL_NAME_LEN + 2] = {0};  // used for deleting Escape character backstick(`)
9,397,926✔
262
    strncpy(tmpTokenBuf, token.z, token.n);
9,397,926✔
263
    token.z = tmpTokenBuf;
9,397,926✔
264
    token.n = strdequote(token.z);
9,397,926✔
265

266
    if (boundColsType == BOUND_ALL_AND_TBNAME && token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) {
9,397,926✔
267
      pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex;
10,593✔
268
      pUseCols[tbnameSchemaIndex] = true;
10,593✔
269
      ++pBoundInfo->numOfBound;
10,593✔
270
      continue;
10,593✔
271
    }
272
    int16_t t = lastColIdx + 1;
9,387,333✔
273
    int16_t end = (boundColsType == BOUND_ALL_AND_TBNAME) ? (pBoundInfo->numOfCols - 1) : pBoundInfo->numOfCols;
9,387,333✔
274
    int16_t index = insFindCol(&token, t, end, pSchema);
9,387,333✔
275
    if (index < 0 && t > 0) {
9,387,333!
276
      index = insFindCol(&token, 0, t, pSchema);
27,812✔
277
    }
278
    if (index < 0) {
9,387,333✔
279
      code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, token.z);
4✔
280
    } else if (pUseCols[index]) {
9,387,329!
281
      code = buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", token.z);
×
282
    } else {
283
      lastColIdx = index;
9,387,329✔
284
      pUseCols[index] = true;
9,387,329✔
285
      pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
9,387,329✔
286
      ++pBoundInfo->numOfBound;
9,387,329✔
287
      if (hasPK && (pSchema[index].flags & COL_IS_KEY)) ++numOfBoundPKs;
9,387,329✔
288
    }
289
  }
290

291
  if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType)) {
464,761✔
292
    if (!pUseCols[0]) {
464,411✔
293
      code = buildInvalidOperationMsg(&pCxt->msg, "Primary timestamp column should not be null");
26✔
294
    }
295
    if (numOfBoundPKs != pTableMeta->tableInfo.numOfPKs) {
464,411✔
296
      code = buildInvalidOperationMsg(&pCxt->msg, "Primary key column should not be none");
18✔
297
    }
298
  }
299
  if (TSDB_CODE_SUCCESS == code && (BOUND_ALL_AND_TBNAME == boundColsType) && !pUseCols[tbnameSchemaIndex]) {
464,761✔
300
    code = buildInvalidOperationMsg(&pCxt->msg, "tbname column should not be null");
2✔
301
  }
302
  taosMemoryFree(pUseCols);
464,761!
303

304
  return code;
464,760✔
305
}
306

307
static int32_t parseTimestampOrInterval(const char** end, SToken* pToken, int16_t timePrec, int64_t* ts, int64_t* interval,
179,253,543✔
308
                                    SMsgBuf* pMsgBuf, bool* isTs, timezone_t tz) {
309
  if (pToken->type == TK_NOW) {
179,253,543✔
310
    *isTs = true;
23,415,847✔
311
    *ts = taosGetTimestamp(timePrec);
46,831,694✔
312
  } else if (pToken->type == TK_TODAY) {
155,837,696✔
313
    *isTs = true;
568✔
314
    *ts = taosGetTimestampToday(timePrec, tz);
568✔
315
  } else if (pToken->type == TK_NK_INTEGER) {
155,837,128✔
316
    *isTs = true;
144,439,377✔
317
    if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, ts)) {
144,439,377✔
318
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
8✔
319
    }
320
  } else if (pToken->type == TK_NK_VARIABLE) {
11,397,751✔
321
    char unit = 0;
10,373,484✔
322
    *isTs = false;
10,373,484✔
323
    if (parseAbsoluteDuration(pToken->z, pToken->n, interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
10,373,484✔
324
      return TSDB_CODE_TSC_INVALID_OPERATION;
73✔
325
    }
326
  } else {  // parse the RFC-3339/ISO-8601 timestamp format string
327
    *isTs = true;
1,024,267✔
328
    if (taosParseTime(pToken->z, ts, pToken->n, timePrec, tz) != TSDB_CODE_SUCCESS) {
1,024,267✔
329
      if ((pToken->n == 0) ||
183,398✔
330
          (pToken->type != TK_NK_STRING && pToken->type != TK_NK_HEX && pToken->type != TK_NK_BIN)) {
146,805✔
331
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
36,839✔
332
      }
333
      if (IS_NOW_STR(pToken->z, pToken->n)) {
146,559!
334
        *isTs = true;
73✔
335
        *ts = taosGetTimestamp(timePrec);
146!
336
      } else if (IS_TODAY_STR(pToken->z, pToken->n)) {
146,486!
337
        *isTs = true;
73✔
338
        *ts = taosGetTimestampToday(timePrec, tz);
73✔
339
      } else if (TSDB_CODE_SUCCESS == toIntegerPure(pToken->z, pToken->n, 10, ts)) {
146,413✔
340
        *isTs = true;
145,192✔
341
      } else {
342
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
1,221✔
343
      }
344
    }
345
  }
346

347
  return TSDB_CODE_SUCCESS;
179,390,165✔
348
}
349

350
static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf, timezone_t tz) {
168,753,396✔
351
  int32_t     index = 0, i = 0;
168,753,396✔
352
  int64_t     interval = 0, tempInterval = 0;
168,753,396✔
353
  int64_t     ts = 0, tempTs = 0;
168,753,396✔
354
  bool        firstIsTS = false, secondIsTs = false;
168,753,396✔
355
  const char* pTokenEnd = *end;
168,753,396✔
356

357
  if (TSDB_CODE_SUCCESS !=
169,007,468!
358
      parseTimestampOrInterval(&pTokenEnd, pToken, timePrec, &ts, &interval, pMsgBuf, &firstIsTS, tz)) {
168,753,396✔
359
    return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
360
  }
361

362
  if (firstIsTS) {
169,042,901✔
363
    *time = ts;
169,016,068✔
364
  }
365

366
  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
169,571,955✔
367
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
168,584,241!
368
    if (pToken->z[k] == '(') {  // for insert NOW()/TODAY()
168,088,627✔
369
      if (pToken->z[k + 1] == ')') {
33,440✔
370
        *end = pTokenEnd = &pToken->z[k + 2];
33,289✔
371
        ++k;
33,289✔
372
        continue;
33,289✔
373
      } else {
374
        char nc = pToken->z[k + 1];
151✔
375
        while (nc == ' ' || nc == '\t' || nc == '\n' || nc == '\r' || nc == '\f') {
302!
376
          nc = pToken->z[(++k) + 1];
151✔
377
        }
378
        if (nc == ')') {
151!
379
          *end = pTokenEnd = &pToken->z[k + 2];
151✔
380
          ++k;
151✔
381
          continue;
151✔
382
        }
383
      }
384
    }
385
    if (pToken->z[k] == ',') {
168,055,187✔
386
      *end = pTokenEnd;
141,953,331✔
387
      if (!firstIsTS) {
141,953,331!
388
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
389
      }
390
      *time = ts;
141,953,331✔
391
      return TSDB_CODE_SUCCESS;
141,953,331✔
392
    }
393
    break;
26,101,856✔
394
  }
395

396
  while (pTokenEnd[i] != '\0') {
27,151,555✔
397
    if (pTokenEnd[i] == ' ' || pTokenEnd[i] == '\t') {
27,060,721!
398
      i++;
61,985✔
399
      continue;
61,985✔
400
    } else if (pTokenEnd[i] == ',' || pTokenEnd[i] == ')') {
26,998,736✔
401
      *end = pTokenEnd + i;
16,624,543✔
402
      if (!firstIsTS) {
16,624,543!
403
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
404
      }
405
      *time = ts;
16,624,543✔
406
      return TSDB_CODE_SUCCESS;
16,624,543✔
407
    } else {
408
      break;
409
    }
410
  }
411
  pTokenEnd = pTokenEnd + i;
10,465,027✔
412

413
  index = 0;
10,465,027✔
414
  SToken token = tStrGetToken(pTokenEnd, &index, false, NULL);
10,465,027✔
415

416
  if (token.type == TK_NK_MINUS || token.type == TK_NK_PLUS) {
10,384,115✔
417
    pTokenEnd += index;
10,373,647✔
418
    index = 0;
10,373,647✔
419
    SToken valueToken = tStrGetToken(pTokenEnd, &index, false, NULL);
10,373,647✔
420
    pTokenEnd += index;
10,373,647✔
421
    char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
422
    if (TK_NK_STRING == valueToken.type) {
10,373,647✔
423
      if (valueToken.n >= TSDB_MAX_BYTES_PER_ROW) {
1!
424
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", valueToken.z);
10,373,567✔
425
      }
426
      int32_t len = trimString(valueToken.z, valueToken.n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
1✔
427
      valueToken.z = tmpTokenBuf;
1✔
428
      valueToken.n = len;
1✔
429
    }
430

431
    if (TSDB_CODE_SUCCESS !=
10,373,647✔
432
        parseTimestampOrInterval(&pTokenEnd, &valueToken, timePrec, &tempTs, &tempInterval, pMsgBuf, &secondIsTs, tz)) {
10,373,647✔
433
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
73✔
434
    }
435

436
    if (valueToken.n < 2) {
10,373,574✔
437
      return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", token.z);
154✔
438
    }
439

440
    if (secondIsTs) {
10,373,420✔
441
      // not support operator between tow timestamp, such as today() + now()
442
      if (firstIsTS) {
15✔
443
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
11✔
444
      }
445
      ts = tempTs;
4✔
446
    } else {
447
      // not support operator between tow interval, such as 2h + 3s
448
      if (!firstIsTS) {
10,373,405✔
449
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
2✔
450
      }
451
      interval = tempInterval;
10,373,403✔
452
    }
453
    if (token.type == TK_NK_MINUS) {
10,373,407✔
454
      // not support interval - ts,such as 2h - today()
455
      if (secondIsTs) {
15,438✔
456
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
2✔
457
      }
458
      *time = ts - interval;
15,436✔
459
    } else {
460
      *time = ts + interval;
10,357,969✔
461
    }
462

463
    for (int k = valueToken.n; valueToken.z[k] != '\0'; k++) {
10,426,173✔
464
      if (valueToken.z[k] == ' ' || valueToken.z[k] == '\t') continue;
10,426,093!
465
      if (valueToken.z[k] == '(' && valueToken.z[k + 1] == ')') {  // for insert NOW()/TODAY()
10,373,325!
466
        *end = pTokenEnd = &valueToken.z[k + 2];
×
467
        k++;
×
468
        continue;
×
469
      }
470
      if (valueToken.z[k] == ',' || valueToken.z[k] == ')') {
10,373,325✔
471
        *end = pTokenEnd;
10,373,322✔
472
        return TSDB_CODE_SUCCESS;
10,373,322✔
473
      }
474
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
3✔
475
    }
476
  }
477

478
  *end = pTokenEnd;
10,548✔
479
  return TSDB_CODE_SUCCESS;
10,548✔
480
}
481

482
// need to call geosFreeBuffer(*output) later
483
static int parseGeometry(SToken* pToken, unsigned char** output, size_t* size) {
16,151✔
484
#ifdef USE_GEOS
485
  int32_t code = TSDB_CODE_FAILED;
16,151✔
486

487
  //[ToDo] support to parse WKB as well as WKT
488
  if (pToken->type == TK_NK_STRING) {
16,151✔
489
    code = initCtxGeomFromText();
15,210✔
490
    if (code != TSDB_CODE_SUCCESS) {
15,210!
491
      return code;
×
492
    }
493

494
    code = doGeomFromText(pToken->z, output, size);
15,210✔
495
    if (code != TSDB_CODE_SUCCESS) {
15,210✔
496
      return code;
1,507✔
497
    }
498
  }
499

500
  return code;
14,644✔
501
#else
502
  TAOS_RETURN(TSDB_CODE_OPS_NOT_SUPPORT);
503
#endif
504
}
505

506
static int32_t parseVarbinary(SToken* pToken, uint8_t** pData, uint32_t* nData, int32_t bytes) {
10,670,937✔
507
  if (pToken->type != TK_NK_STRING) {
10,670,937✔
508
    return TSDB_CODE_PAR_INVALID_VARBINARY;
959✔
509
  }
510

511
  if (isHex(pToken->z + 1, pToken->n - 2)) {
10,669,978✔
512
    if (!isValidateHex(pToken->z + 1, pToken->n - 2)) {
4,989✔
513
      return TSDB_CODE_PAR_INVALID_VARBINARY;
35✔
514
    }
515

516
    void*    data = NULL;
4,956✔
517
    uint32_t size = 0;
4,956✔
518
    if (taosHex2Ascii(pToken->z + 1, pToken->n - 2, &data, &size) < 0) {
4,956!
519
      return TSDB_CODE_OUT_OF_MEMORY;
×
520
    }
521

522
    if (size + VARSTR_HEADER_SIZE > bytes) {
4,956✔
523
      taosMemoryFree(data);
2!
524
      return TSDB_CODE_PAR_VALUE_TOO_LONG;
2✔
525
    }
526
    *pData = data;
4,954✔
527
    *nData = size;
4,954✔
528
  } else {
529
    *pData = taosMemoryCalloc(1, pToken->n);
10,664,989!
530
    if (!pData) return terrno;
10,664,989!
531
    int32_t len = trimString(pToken->z, pToken->n, *pData, pToken->n);
10,664,989✔
532
    *nData = len;
10,664,989✔
533

534
    if (*nData + VARSTR_HEADER_SIZE > bytes) {
10,664,989!
535
      return TSDB_CODE_PAR_VALUE_TOO_LONG;
×
536
    }
537
  }
538
  return TSDB_CODE_SUCCESS;
10,669,943✔
539
}
540

541
static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, STagVal* val,
500,547✔
542
                             SMsgBuf* pMsgBuf, timezone_t tz, void *charsetCxt) {
543
  int64_t  iv;
544
  uint64_t uv;
545
  char*    endptr = NULL;
500,547✔
546
  int32_t  code = TSDB_CODE_SUCCESS;
500,547✔
547

548
#if 0
549
  if (isNullValue(pSchema->type, pToken)) {
550
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
551
      return buildSyntaxErrMsg(pMsgBuf, "Primary timestamp column can not be null", pToken->z);
552
    }
553

554
    return TSDB_CODE_SUCCESS;
555
  }
556
#endif
557

558
  //  strcpy(val->colName, pSchema->name);
559
  val->cid = pSchema->colId;
500,547✔
560
  val->type = pSchema->type;
500,547✔
561

562
  switch (pSchema->type) {
500,547!
563
    case TSDB_DATA_TYPE_BOOL: {
21,325✔
564
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
21,325✔
565
        if (IS_TRUE_STR(pToken->z, pToken->n)) {
13,279!
566
          *(int8_t*)(&val->i64) = TRUE_VALUE;
11,238✔
567
        } else if (IS_FALSE_STR(pToken->z, pToken->n)) {
2,041!
568
          *(int8_t*)(&val->i64) = FALSE_VALUE;
1,172✔
569
        } else if (TSDB_CODE_SUCCESS == toDoubleEx(pToken->z, pToken->n, pToken->type, (double*)&iv)) {
869✔
570
          *(int8_t*)(&val->i64) = (*(double*)&iv == 0 ? FALSE_VALUE : TRUE_VALUE);
355✔
571
        } else {
572
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
514✔
573
        }
574
      } else if (pToken->type == TK_NK_INTEGER) {
8,046✔
575
        *(int8_t*)(&val->i64) = ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? FALSE_VALUE : TRUE_VALUE);
7,608✔
576
      } else if (pToken->type == TK_NK_FLOAT) {
438✔
577
        *(int8_t*)(&val->i64) = ((taosStr2Double(pToken->z, NULL) == 0) ? FALSE_VALUE : TRUE_VALUE);
140✔
578
      } else if ((pToken->type == TK_NK_HEX || pToken->type == TK_NK_BIN) &&
436!
579
                 (TSDB_CODE_SUCCESS == toDoubleEx(pToken->z, pToken->n, pToken->type, (double*)&iv))) {
138✔
580
        *(int8_t*)(&val->i64) = (*(double*)&iv == 0 ? FALSE_VALUE : TRUE_VALUE);
138✔
581
      } else {
582
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
160✔
583
      }
584
      break;
20,652✔
585
    }
586

587
    case TSDB_DATA_TYPE_TINYINT: {
17,482✔
588
      code = toIntegerEx(pToken->z, pToken->n, pToken->type, &iv);
17,482✔
589
      if (TSDB_CODE_SUCCESS != code) {
17,482✔
590
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
767✔
591
      } else if (!IS_VALID_TINYINT(iv)) {
16,715✔
592
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
28✔
593
      }
594

595
      *(int8_t*)(&val->i64) = iv;
16,687✔
596
      break;
16,687✔
597
    }
598

599
    case TSDB_DATA_TYPE_UTINYINT: {
6,734✔
600
      code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &uv);
6,734✔
601
      if (TSDB_CODE_SUCCESS != code) {
6,734✔
602
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
854✔
603
      } else if (uv > UINT8_MAX) {
5,880✔
604
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
10✔
605
      }
606
      *(uint8_t*)(&val->i64) = uv;
5,870✔
607
      break;
5,870✔
608
    }
609

610
    case TSDB_DATA_TYPE_SMALLINT: {
17,325✔
611
      code = toIntegerEx(pToken->z, pToken->n, pToken->type, &iv);
17,325✔
612
      if (TSDB_CODE_SUCCESS != code) {
17,325✔
613
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
767✔
614
      } else if (!IS_VALID_SMALLINT(iv)) {
16,558✔
615
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
28✔
616
      }
617
      *(int16_t*)(&val->i64) = iv;
16,530✔
618
      break;
16,530✔
619
    }
620

621
    case TSDB_DATA_TYPE_USMALLINT: {
6,730✔
622
      code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &uv);
6,730✔
623
      if (TSDB_CODE_SUCCESS != code) {
6,730✔
624
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
854✔
625
      } else if (uv > UINT16_MAX) {
5,876✔
626
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
10✔
627
      }
628
      *(uint16_t*)(&val->i64) = uv;
5,866✔
629
      break;
5,866✔
630
    }
631

632
    case TSDB_DATA_TYPE_INT: {
130,012✔
633
      code = toIntegerEx(pToken->z, pToken->n, pToken->type, &iv);
130,012✔
634
      if (TSDB_CODE_SUCCESS != code) {
130,043✔
635
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
771✔
636
      } else if (!IS_VALID_INT(iv)) {
129,272!
637
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
×
638
      }
639
      *(int32_t*)(&val->i64) = iv;
129,272✔
640
      break;
129,272✔
641
    }
642

643
    case TSDB_DATA_TYPE_UINT: {
6,444✔
644
      code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &uv);
6,444✔
645
      if (TSDB_CODE_SUCCESS != code) {
6,444✔
646
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
874✔
647
      } else if (uv > UINT32_MAX) {
5,570✔
648
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
11✔
649
      }
650
      *(uint32_t*)(&val->i64) = uv;
5,559✔
651
      break;
5,559✔
652
    }
653

654
    case TSDB_DATA_TYPE_BIGINT: {
38,337✔
655
      code = toIntegerEx(pToken->z, pToken->n, pToken->type, &iv);
38,337✔
656
      if (TSDB_CODE_SUCCESS != code) {
38,338✔
657
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
798✔
658
      }
659
      val->i64 = iv;
37,540✔
660
      break;
37,540✔
661
    }
662

663
    case TSDB_DATA_TYPE_UBIGINT: {
6,241✔
664
      code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &uv);
6,241✔
665
      if (TSDB_CODE_SUCCESS != code) {
6,241✔
666
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
868✔
667
      }
668
      *(uint64_t*)(&val->i64) = uv;
5,373✔
669
      break;
5,373✔
670
    }
671

672
    case TSDB_DATA_TYPE_FLOAT: {
13,612✔
673
      double dv;
674
      code = toDoubleEx(pToken->z, pToken->n, pToken->type, &dv);
13,612✔
675
      if (TSDB_CODE_SUCCESS != code) {
13,610✔
676
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
784✔
677
      }
678
      if (dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
12,840!
679
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
14✔
680
      }
681
      *(float*)(&val->i64) = dv;
12,826✔
682
      break;
12,826✔
683
    }
684

685
    case TSDB_DATA_TYPE_DOUBLE: {
42,037✔
686
      double dv;
687
      code = toDoubleEx(pToken->z, pToken->n, pToken->type, &dv);
42,037✔
688
      if (TSDB_CODE_SUCCESS != code) {
42,038✔
689
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
781✔
690
      }
691
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && ERRNO == ERANGE) || isinf(dv) || isnan(dv)) {
41,257!
692
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
×
693
      }
694

695
      *(double*)(&val->i64) = dv;
41,257✔
696
      break;
41,257✔
697
    }
698

699
    case TSDB_DATA_TYPE_BINARY: {
128,168✔
700
      // Too long values will raise the invalid sql error message
701
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
128,168✔
702
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
243✔
703
      }
704
      val->pData = taosStrdup(pToken->z);
127,925!
705
      if (!val->pData) {
127,912!
706
        return terrno;
×
707
      }
708
      val->nData = pToken->n;
127,912✔
709
      break;
127,912✔
710
    }
711
    case TSDB_DATA_TYPE_VARBINARY: {
3,375✔
712
      code = parseVarbinary(pToken, &val->pData, &val->nData, pSchema->bytes);
3,375✔
713
      if (code != TSDB_CODE_SUCCESS) {
3,375✔
714
        return generateSyntaxErrMsg(pMsgBuf, code, pSchema->name);
454✔
715
      }
716
      break;
2,921✔
717
    }
718
    case TSDB_DATA_TYPE_GEOMETRY: {
3,002✔
719
      unsigned char* output = NULL;
3,002✔
720
      size_t         size = 0;
3,002✔
721

722
      code = parseGeometry(pToken, &output, &size);
3,002✔
723
      if (code != TSDB_CODE_SUCCESS) {
3,002✔
724
        code = buildSyntaxErrMsg(pMsgBuf, getGeosErrMsg(code), pToken->z);
1,182✔
725
      } else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) {
1,820!
726
        // Too long values will raise the invalid sql error message
727
        code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
728
      } else {
729
        val->pData = taosMemoryMalloc(size);
1,820!
730
        if (NULL == val->pData) {
1,820!
731
          code = terrno;
×
732
        } else {
733
          memcpy(val->pData, output, size);
1,820✔
734
          val->nData = size;
1,820✔
735
        }
736
      }
737

738
      geosFreeBuffer(output);
3,002✔
739
      break;
3,002✔
740
    }
741

742
    case TSDB_DATA_TYPE_NCHAR: {
46,488✔
743
      int32_t output = 0;
46,488✔
744
      int64_t realLen = pToken->n << 2;
46,488✔
745
      if (realLen > pSchema->bytes - VARSTR_HEADER_SIZE) realLen = pSchema->bytes - VARSTR_HEADER_SIZE;
46,488✔
746
      void* p = taosMemoryMalloc(realLen);
46,488!
747
      if (p == NULL) {
46,488!
748
        return terrno;
218✔
749
      }
750
      if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)(p), realLen, &output, charsetCxt)) {
46,488✔
751
        if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
218✔
752
          taosMemoryFree(p);
217!
753
          return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
217✔
754
        }
755
        char buf[512] = {0};
1✔
756
        snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s %d %d", strerror(terrno), ERRNO, EILSEQ);
1✔
757
        taosMemoryFree(p);
1!
758
        return buildSyntaxErrMsg(pMsgBuf, buf, pToken->z);
1✔
759
      }
760
      val->pData = p;
46,270✔
761
      val->nData = output;
46,270✔
762
      break;
46,270✔
763
    }
764
    case TSDB_DATA_TYPE_TIMESTAMP: {
13,244✔
765
      if (parseTime(end, pToken, timePrec, &iv, pMsgBuf, tz) != TSDB_CODE_SUCCESS) {
13,244✔
766
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
924✔
767
      }
768

769
      val->i64 = iv;
12,322✔
770
      break;
12,322✔
771
    }
772
  }
773

774
  return code;
489,850✔
775
}
776

777
// input pStmt->pSql:  [(tag1_name, ...)] TAGS (tag1_value, ...) ...
778
// output pStmt->pSql: TAGS (tag1_value, ...) ...
779
static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
79,605✔
780
  int32_t code = insInitBoundColsInfo(getNumOfTags(pStmt->pTableMeta), &pCxt->tags);
79,605✔
781
  if (TSDB_CODE_SUCCESS != code) {
79,626!
782
    return code;
×
783
  }
784

785
  SToken  token;
786
  int32_t index = 0;
79,626✔
787
  NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
79,626✔
788
  if (TK_NK_LP != token.type) {
79,624✔
789
    return TSDB_CODE_SUCCESS;
79,282✔
790
  }
791

792
  pStmt->pSql += index;
342✔
793
  return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_TAGS, pStmt->pTableMeta, &pCxt->tags);
342✔
794
}
795

796
int32_t parseTagValue(SMsgBuf* pMsgBuf, const char** pSql, uint8_t precision, SSchema* pTagSchema, SToken* pToken,
503,672✔
797
                      SArray* pTagName, SArray* pTagVals, STag** pTag, timezone_t tz, void *charsetCxt) {
798
  bool isNull = isNullValue(pTagSchema->type, pToken);
503,672✔
799
  if (!isNull && pTagName) {
503,672✔
800
    if (NULL == taosArrayPush(pTagName, pTagSchema->name)) {
989,484!
801
      return terrno;
×
802
    }
803
  }
804

805
  if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
503,664✔
806
    if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
1,157✔
807
      return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
4✔
808
    }
809

810
    if (isNull) {
1,153✔
811
      return tTagNew(pTagVals, 1, true, pTag);
68✔
812
    } else {
813
      return parseJsontoTagData(pToken->z, pTagVals, pTag, pMsgBuf, charsetCxt);
1,085✔
814
    }
815
  }
816

817
  if (isNull) return 0;
502,507✔
818

819
  STagVal val = {0};
500,566✔
820
  int32_t code = parseTagToken(pSql, pToken, pTagSchema, precision, &val, pMsgBuf, tz, charsetCxt);
500,566✔
821
  if (TSDB_CODE_SUCCESS == code) {
500,594✔
822
    if (NULL == taosArrayPush(pTagVals, &val)) {
488,683!
823
      code = terrno;
×
824
    }
825
  }
826

827
  return code;
500,629✔
828
}
829

830
static int32_t buildCreateTbReq(SVnodeModifyOpStmt* pStmt, STag* pTag, SArray* pTagName) {
56,666✔
831
  if (pStmt->pCreateTblReq) {
56,666!
832
    tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
×
833
    taosMemoryFreeClear(pStmt->pCreateTblReq);
×
834
  }
835
  pStmt->pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
56,666!
836
  if (NULL == pStmt->pCreateTblReq) {
56,671!
837
    return terrno;
×
838
  }
839
  return insBuildCreateTbReq(pStmt->pCreateTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid,
56,671✔
840
                             pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags,
56,671✔
841
                             TSDB_DEFAULT_TABLE_TTL);
842
}
843

844
int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf, int8_t type) {
1,138,298,514✔
845
  if (pToken->type == TK_NK_QUESTION) {
1,138,298,514!
846
    return buildInvalidOperationMsg(pMsgBuf, "insert into super table syntax is not supported for stmt");
×
847
  }
848
  if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
1,138,298,514!
849
       pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
355,310,974✔
850
       pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN &&
12,018,632!
851
       pToken->type != TK_NK_VARIABLE) ||
3,980✔
852
      (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
1,138,294,550!
853
    return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
×
854
  }
855

856
  // Remove quotation marks
857
  if (TK_NK_STRING == pToken->type && type != TSDB_DATA_TYPE_VARBINARY) {
1,139,485,759✔
858
    if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
152,569,815!
859
      return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
×
860
    }
861

862
    int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
152,569,815✔
863
    pToken->z = tmpTokenBuf;
153,341,679✔
864
    pToken->n = len;
153,341,679✔
865
  }
866

867
  return TSDB_CODE_SUCCESS;
1,140,257,623✔
868
}
869

870
typedef struct SRewriteTagCondCxt {
871
  SArray* pTagVals;
872
  SArray* pTagName;
873
  int32_t code;
874
} SRewriteTagCondCxt;
875

876
static int32_t rewriteTagCondColumnImpl(STagVal* pVal, SNode** pNode) {
6✔
877
  SValueNode* pValue = NULL;
6✔
878
  int32_t     code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pValue);
6✔
879
  if (NULL == pValue) {
6!
880
    return code;
×
881
  }
882

883
  pValue->node.resType = ((SColumnNode*)*pNode)->node.resType;
6✔
884
  nodesDestroyNode(*pNode);
6✔
885
  *pNode = (SNode*)pValue;
6✔
886

887
  switch (pVal->type) {
6!
888
    case TSDB_DATA_TYPE_BOOL:
×
889
      pValue->datum.b = *(int8_t*)(&pVal->i64);
×
890
      *(bool*)&pValue->typeData = pValue->datum.b;
×
891
      break;
×
892
    case TSDB_DATA_TYPE_TINYINT:
×
893
      pValue->datum.i = *(int8_t*)(&pVal->i64);
×
894
      *(int8_t*)&pValue->typeData = pValue->datum.i;
×
895
      break;
×
896
    case TSDB_DATA_TYPE_SMALLINT:
×
897
      pValue->datum.i = *(int16_t*)(&pVal->i64);
×
898
      *(int16_t*)&pValue->typeData = pValue->datum.i;
×
899
      break;
×
900
    case TSDB_DATA_TYPE_INT:
6✔
901
      pValue->datum.i = *(int32_t*)(&pVal->i64);
6✔
902
      *(int32_t*)&pValue->typeData = pValue->datum.i;
6✔
903
      break;
6✔
904
    case TSDB_DATA_TYPE_BIGINT:
×
905
      pValue->datum.i = pVal->i64;
×
906
      pValue->typeData = pValue->datum.i;
×
907
      break;
×
908
    case TSDB_DATA_TYPE_FLOAT:
×
909
      pValue->datum.d = *(float*)(&pVal->i64);
×
910
      *(float*)&pValue->typeData = pValue->datum.d;
×
911
      break;
×
912
    case TSDB_DATA_TYPE_DOUBLE:
×
913
      pValue->datum.d = *(double*)(&pVal->i64);
×
914
      *(double*)&pValue->typeData = pValue->datum.d;
×
915
      break;
×
916
    case TSDB_DATA_TYPE_VARCHAR:
×
917
    case TSDB_DATA_TYPE_VARBINARY:
918
    case TSDB_DATA_TYPE_NCHAR:
919
      pValue->datum.p = taosMemoryCalloc(1, pVal->nData + VARSTR_HEADER_SIZE);
×
920
      if (NULL == pValue->datum.p) {
×
921
        return terrno;
×
922
      }
923
      varDataSetLen(pValue->datum.p, pVal->nData);
×
924
      memcpy(varDataVal(pValue->datum.p), pVal->pData, pVal->nData);
×
925
      break;
×
926
    case TSDB_DATA_TYPE_TIMESTAMP:
×
927
      pValue->datum.i = pVal->i64;
×
928
      pValue->typeData = pValue->datum.i;
×
929
      break;
×
930
    case TSDB_DATA_TYPE_UTINYINT:
×
931
      pValue->datum.i = *(uint8_t*)(&pVal->i64);
×
932
      *(uint8_t*)&pValue->typeData = pValue->datum.i;
×
933
      break;
×
934
    case TSDB_DATA_TYPE_USMALLINT:
×
935
      pValue->datum.i = *(uint16_t*)(&pVal->i64);
×
936
      *(uint16_t*)&pValue->typeData = pValue->datum.i;
×
937
      break;
×
938
    case TSDB_DATA_TYPE_UINT:
×
939
      pValue->datum.i = *(uint32_t*)(&pVal->i64);
×
940
      *(uint32_t*)&pValue->typeData = pValue->datum.i;
×
941
      break;
×
942
    case TSDB_DATA_TYPE_UBIGINT:
×
943
      pValue->datum.i = *(uint64_t*)(&pVal->i64);
×
944
      *(uint64_t*)&pValue->typeData = pValue->datum.i;
×
945
      break;
×
946
    case TSDB_DATA_TYPE_JSON:
×
947
    case TSDB_DATA_TYPE_DECIMAL:
948
    case TSDB_DATA_TYPE_BLOB:
949
    case TSDB_DATA_TYPE_MEDIUMBLOB:
950
    default:
951
      return TSDB_CODE_FAILED;
×
952
  }
953
  return TSDB_CODE_SUCCESS;
6✔
954
}
955

956
static int32_t rewriteTagCondColumn(SArray* pTagVals, SArray* pTagName, SNode** pNode) {
6✔
957
  SColumnNode* pCol = (SColumnNode*)*pNode;
6✔
958
  int32_t      ntags = taosArrayGetSize(pTagName);
6✔
959
  for (int32_t i = 0; i < ntags; ++i) {
6!
960
    char* pTagColName = taosArrayGet(pTagName, i);
6✔
961
    if (0 == strcmp(pTagColName, pCol->colName)) {
6!
962
      return rewriteTagCondColumnImpl(taosArrayGet(pTagVals, i), pNode);
6✔
963
    }
964
  }
965
  return TSDB_CODE_PAR_PERMISSION_DENIED;
×
966
}
967

968
static EDealRes rewriteTagCond(SNode** pNode, void* pContext) {
18✔
969
  if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
18✔
970
    SRewriteTagCondCxt* pCxt = pContext;
6✔
971
    pCxt->code = rewriteTagCondColumn(pCxt->pTagVals, pCxt->pTagName, pNode);
6✔
972
    return (TSDB_CODE_SUCCESS == pCxt->code ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
6!
973
  }
974
  return DEAL_RES_CONTINUE;
12✔
975
}
976

977
static int32_t setTagVal(SArray* pTagVals, SArray* pTagName, SNode* pCond) {
6✔
978
  SRewriteTagCondCxt cxt = {.code = TSDB_CODE_SUCCESS, .pTagVals = pTagVals, .pTagName = pTagName};
6✔
979
  nodesRewriteExpr(&pCond, rewriteTagCond, &cxt);
6✔
980
  return cxt.code;
6✔
981
}
982

983
static int32_t checkTagCondResult(SNode* pResult) {
6✔
984
  return (QUERY_NODE_VALUE == nodeType(pResult) && ((SValueNode*)pResult)->datum.b) ? TSDB_CODE_SUCCESS
6✔
985
                                                                                    : TSDB_CODE_PAR_PERMISSION_DENIED;
12!
986
}
987

988
static int32_t checkSubtablePrivilege(SArray* pTagVals, SArray* pTagName, SNode** pCond) {
6✔
989
  int32_t code = setTagVal(pTagVals, pTagName, *pCond);
6✔
990
  if (TSDB_CODE_SUCCESS == code) {
6!
991
    code = scalarCalculateConstants(*pCond, pCond);
6✔
992
  }
993
  if (TSDB_CODE_SUCCESS == code) {
6!
994
    code = checkTagCondResult(*pCond);
6✔
995
  }
996
  NODES_DESTORY_NODE(*pCond);
6✔
997
  return code;
6✔
998
}
999

1000
// pSql -> tag1_value, ...)
1001
static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate) {
79,597✔
1002
  int32_t  code = TSDB_CODE_SUCCESS;
79,597✔
1003
  SSchema* pSchema = getTableTagSchema(pStmt->pTableMeta);
79,597✔
1004
  SArray*  pTagVals = NULL;
79,620✔
1005
  SArray*  pTagName = NULL;
79,620✔
1006
  uint8_t  precision = pStmt->pTableMeta->tableInfo.precision;
79,620✔
1007
  SToken   token;
1008
  bool     isParseBindParam = false;
79,620✔
1009
  bool     isJson = false;
79,620✔
1010
  STag*    pTag = NULL;
79,620✔
1011

1012
  if (!(pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal))) ||
159,241!
1013
      !(pTagName = taosArrayInit(pCxt->tags.numOfBound, TSDB_COL_NAME_LEN))) {
79,625✔
1014
    code = terrno;
×
1015
    goto _exit;
×
1016
  }
1017

1018
  for (int i = 0; TSDB_CODE_SUCCESS == code && i < pCxt->tags.numOfBound; ++i) {
200,618✔
1019
    NEXT_TOKEN_WITH_PREV(pStmt->pSql, token);
120,981✔
1020

1021
    if (token.type == TK_NK_QUESTION) {
121,009✔
1022
      isParseBindParam = true;
127✔
1023
      if (NULL == pCxt->pComCxt->pStmtCb) {
127!
1024
        code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", token.z);
×
1025
        break;
×
1026
      }
1027
      if (pTagVals->size != 0) {
127✔
1028
        code = buildSyntaxErrMsg(&pCxt->msg, "no mix usage for ? and tag values", token.z);
1✔
1029
        break;
1✔
1030
      }
1031

1032
      continue;
126✔
1033
    }
1034

1035
    if (isParseBindParam) {
120,882!
1036
      code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values");
×
1037
      break;
×
1038
    }
1039

1040
    SSchema* pTagSchema = &pSchema[pCxt->tags.pColIndex[i]];
120,882✔
1041
    isJson = pTagSchema->type == TSDB_DATA_TYPE_JSON;
120,882✔
1042
    code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
120,882✔
1043
    if (TSDB_CODE_SUCCESS == code && TK_NK_VARIABLE == token.type) {
120,870!
1044
      code = buildSyntaxErrMsg(&pCxt->msg, "not expected tags values ", token.z);
×
1045
    }
1046
    if (TSDB_CODE_SUCCESS == code) {
120,870✔
1047
      code = parseTagValue(&pCxt->msg, &pStmt->pSql, precision, pTagSchema, &token, pTagName, pTagVals, &pTag, pCxt->pComCxt->timezone, pCxt->pComCxt->charsetCxt);
120,029✔
1048
    }
1049
  }
1050

1051
  if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pTagCond) {
79,638✔
1052
    code = checkSubtablePrivilege(pTagVals, pTagName, &pStmt->pTagCond);
2✔
1053
  }
1054

1055
  if (TSDB_CODE_SUCCESS == code && !isParseBindParam && !isJson) {
79,638✔
1056
    code = tTagNew(pTagVals, 1, false, &pTag);
75,008✔
1057
  }
1058

1059
  if (TSDB_CODE_SUCCESS == code && !isParseBindParam && !autoCreate) {
79,634✔
1060
    code = buildCreateTbReq(pStmt, pTag, pTagName);
56,672✔
1061
    pTag = NULL;
56,662✔
1062
  }
1063

1064
  if (code == TSDB_CODE_SUCCESS && !isParseBindParam) {
79,624✔
1065
    pCxt->stmtTbNameFlag |= IS_FIXED_TAG;
75,277✔
1066
  }
1067

1068
_exit:
4,347✔
1069
  for (int32_t i = 0; i < taosArrayGetSize(pTagVals); ++i) {
195,927✔
1070
    STagVal* p = (STagVal*)TARRAY_GET_ELEM(pTagVals, i);
116,303✔
1071
    if (IS_VAR_DATA_TYPE(p->type)) {
116,303✔
1072
      taosMemoryFreeClear(p->pData);
30,115!
1073
    }
1074
  }
1075
  taosArrayDestroy(pTagVals);
79,617✔
1076
  taosArrayDestroy(pTagName);
79,621✔
1077
  tTagFree(pTag);
79,629✔
1078
  return code;
79,605✔
1079
}
1080

1081
// input pStmt->pSql:  TAGS (tag1_value, ...) [table_options] ...
1082
// output pStmt->pSql: [table_options] ...
1083
static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate) {
79,621✔
1084
  SToken token;
1085
  NEXT_TOKEN(pStmt->pSql, token);
79,621✔
1086
  if (TK_TAGS != token.type) {
79,635✔
1087
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", token.z);
5✔
1088
  }
1089

1090
  NEXT_TOKEN(pStmt->pSql, token);
79,630✔
1091
  if (TK_NK_LP != token.type) {
79,621!
1092
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", token.z);
×
1093
  }
1094

1095
  int32_t code = parseTagsClauseImpl(pCxt, pStmt, autoCreate);
79,621✔
1096
  if (TSDB_CODE_SUCCESS == code) {
79,603✔
1097
    NEXT_VALID_TOKEN(pStmt->pSql, token);
104,169✔
1098
    if (TK_NK_COMMA == token.type) {
75,303✔
1099
      code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
31✔
1100
    } else if (TK_NK_RP != token.type) {
75,272✔
1101
      code = buildSyntaxErrMsg(&pCxt->msg, ") is expected", token.z);
247✔
1102
    }
1103
  }
1104
  return code;
79,612✔
1105
}
1106

1107
static int32_t storeChildTableMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
60,995✔
1108
  pStmt->pTableMeta->suid = pStmt->pTableMeta->uid;
60,995✔
1109
  pStmt->pTableMeta->uid = pStmt->totalTbNum;
60,995✔
1110
  pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE;
60,995✔
1111

1112
  STableMeta* pBackup = NULL;
60,995✔
1113
  if (TSDB_CODE_SUCCESS != cloneTableMeta(pStmt->pTableMeta, &pBackup)) {
60,995!
1114
    return TSDB_CODE_OUT_OF_MEMORY;
×
1115
  }
1116

1117
  char    tbFName[TSDB_TABLE_FNAME_LEN];
1118
  int32_t code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
61,012✔
1119
  if (TSDB_CODE_SUCCESS != code) {
61,014!
1120
    taosMemoryFree(pBackup);
×
1121
    return code;
×
1122
  }
1123
  code = taosHashPut(pStmt->pSubTableHashObj, tbFName, strlen(tbFName), &pBackup, POINTER_BYTES);
61,014✔
1124
  if (TSDB_CODE_SUCCESS != code) {
61,013!
1125
    taosMemoryFree(pBackup);
×
1126
  }
1127
  return code;
61,013✔
1128
}
1129

1130
static int32_t parseTableOptions(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
56,438✔
1131
  do {
18✔
1132
    int32_t index = 0;
56,438✔
1133
    SToken  token;
1134
    NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
56,438✔
1135
    if (TK_TTL == token.type) {
56,452✔
1136
      pStmt->pSql += index;
51✔
1137
      NEXT_TOKEN_WITH_PREV(pStmt->pSql, token);
51✔
1138
      if (TK_NK_INTEGER != token.type) {
18!
1139
        return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", token.z);
4✔
1140
      }
1141
      pStmt->pCreateTblReq->ttl = taosStr2Int32(token.z, NULL, 10);
18✔
1142
      if (pStmt->pCreateTblReq->ttl < 0) {
18✔
1143
        return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", token.z);
4✔
1144
      }
1145
    } else if (TK_COMMENT == token.type) {
56,401✔
1146
      pStmt->pSql += index;
4✔
1147
      NEXT_TOKEN(pStmt->pSql, token);
4✔
1148
      if (TK_NK_STRING != token.type) {
4!
1149
        return buildSyntaxErrMsg(&pCxt->msg, "Invalid option comment", token.z);
×
1150
      }
1151
      if (token.n >= TSDB_TB_COMMENT_LEN) {
4!
1152
        return buildSyntaxErrMsg(&pCxt->msg, "comment too long", token.z);
×
1153
      }
1154
      int32_t len = trimString(token.z, token.n, pCxt->tmpTokenBuf, TSDB_TB_COMMENT_LEN);
4✔
1155
      pStmt->pCreateTblReq->comment = taosStrndup(pCxt->tmpTokenBuf, len);
4!
1156
      if (NULL == pStmt->pCreateTblReq->comment) {
4!
1157
        return terrno;
×
1158
      }
1159
      pStmt->pCreateTblReq->commentLen = len;
4✔
1160
    } else {
1161
      break;
56,397✔
1162
    }
1163
  } while (1);
1164
  return TSDB_CODE_SUCCESS;
56,397✔
1165
}
1166

1167
// input pStmt->pSql:
1168
//   1. [(tag1_name, ...)] ...
1169
//   2. VALUES ... | FILE ...
1170
// output pStmt->pSql:
1171
//   1. [(field1_name, ...)]
1172
//   2. VALUES ... | FILE ...
1173
static int32_t parseUsingClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
1,948,733✔
1174
  if (!pStmt->usingTableProcessing || pCxt->usingDuplicateTable) {
1,948,733!
1175
    return TSDB_CODE_SUCCESS;
1,887,731✔
1176
  }
1177

1178
  int32_t code = parseBoundTagsClause(pCxt, pStmt);
61,002✔
1179
  if (TSDB_CODE_SUCCESS == code) {
61,015!
1180
    code = parseTagsClause(pCxt, pStmt, false);
61,015✔
1181
  }
1182
  if (TSDB_CODE_SUCCESS == code) {
61,003✔
1183
    code = parseTableOptions(pCxt, pStmt);
56,424✔
1184
  }
1185

1186
  return code;
60,999✔
1187
}
1188

1189
static void setUserAuthInfo(SParseContext* pCxt, SName* pTbName, SUserAuthInfo* pInfo) {
1,880,441✔
1190
  snprintf(pInfo->user, sizeof(pInfo->user), "%s", pCxt->pUser);
1,880,441✔
1191
  memcpy(&pInfo->tbName, pTbName, sizeof(SName));
1,880,441✔
1192
  pInfo->type = AUTH_TYPE_WRITE;
1,880,441✔
1193
}
1,880,441✔
1194

1195
static int32_t checkAuth(SParseContext* pCxt, SName* pTbName, bool* pMissCache, SNode** pTagCond) {
1,880,477✔
1196
  int32_t       code = TSDB_CODE_SUCCESS;
1,880,477✔
1197
  SUserAuthInfo authInfo = {0};
1,880,477✔
1198
  setUserAuthInfo(pCxt, pTbName, &authInfo);
1,880,477✔
1199
  SUserAuthRes authRes = {0};
1,880,530✔
1200
  bool         exists = true;
1,880,530✔
1201
  if (pCxt->async) {
1,880,530✔
1202
    code = catalogChkAuthFromCache(pCxt->pCatalog, &authInfo, &authRes, &exists);
1,880,392✔
1203
  } else {
1204
    SRequestConnInfo conn = {.pTrans = pCxt->pTransporter,
138✔
1205
                             .requestId = pCxt->requestId,
138✔
1206
                             .requestObjRefId = pCxt->requestRid,
138✔
1207
                             .mgmtEps = pCxt->mgmtEpSet};
1208
    code = catalogChkAuth(pCxt->pCatalog, &conn, &authInfo, &authRes);
138✔
1209
  }
1210
  if (TSDB_CODE_SUCCESS == code) {
1,880,545!
1211
    if (!exists) {
1,880,552✔
1212
      *pMissCache = true;
522✔
1213
    } else if (!authRes.pass[AUTH_RES_BASIC]) {
1,880,030✔
1214
      code = TSDB_CODE_PAR_PERMISSION_DENIED;
38✔
1215
    } else if (NULL != authRes.pCond[AUTH_RES_BASIC]) {
1,879,992✔
1216
      *pTagCond = authRes.pCond[AUTH_RES_BASIC];
4✔
1217
    }
1218
  }
1219
  return code;
1,880,545✔
1220
}
1221

1222
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMeta** pTableMeta, bool* pMissCache,
139,260✔
1223
                            bool bUsingTable) {
1224
  SParseContext* pComCxt = pCxt->pComCxt;
139,260✔
1225
  int32_t        code = TSDB_CODE_SUCCESS;
139,260✔
1226
  if (pComCxt->async) {
139,260✔
1227
    if (bUsingTable) {
118,996✔
1228
      code = catalogGetCachedSTableMeta(pComCxt->pCatalog, pTbName, pTableMeta);
59,461✔
1229
    } else {
1230
      code = catalogGetCachedTableMeta(pComCxt->pCatalog, pTbName, pTableMeta);
59,535✔
1231
    }
1232
  } else {
1233
    SRequestConnInfo conn = {.pTrans = pComCxt->pTransporter,
20,264✔
1234
                             .requestId = pComCxt->requestId,
20,264✔
1235
                             .requestObjRefId = pComCxt->requestRid,
20,264✔
1236
                             .mgmtEps = pComCxt->mgmtEpSet};
1237
    if (bUsingTable) {
20,264✔
1238
      code = catalogGetSTableMeta(pComCxt->pCatalog, &conn, pTbName, pTableMeta);
20,085✔
1239
    } else {
1240
      code = catalogGetTableMeta(pComCxt->pCatalog, &conn, pTbName, pTableMeta);
179✔
1241
    }
1242
  }
1243
  if (TSDB_CODE_SUCCESS == code) {
139,278!
1244
    if (NULL == *pTableMeta) {
139,287✔
1245
      *pMissCache = true;
40,944✔
1246
    } else if (bUsingTable && TSDB_SUPER_TABLE != (*pTableMeta)->tableType) {
98,343!
1247
      code = buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
×
1248
    } else if (((*pTableMeta)->virtualStb) ||
98,343✔
1249
               TSDB_VIRTUAL_CHILD_TABLE == (*pTableMeta)->tableType ||
98,335✔
1250
               TSDB_VIRTUAL_NORMAL_TABLE == (*pTableMeta)->tableType) {
98,334!
1251
      code = TSDB_CODE_VTABLE_NOT_SUPPORT_STMT;
2✔
1252
    }
1253
  }
1254
  return code;
139,290✔
1255
}
1256

1257
static int32_t getTargetTableVgroup(SParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool isStb, bool* pMissCache) {
38,828✔
1258
  int32_t     code = TSDB_CODE_SUCCESS;
38,828✔
1259
  SVgroupInfo vg;
1260
  bool        exists = true;
38,828✔
1261
  if (pCxt->async) {
38,828✔
1262
    code = catalogGetCachedTableHashVgroup(pCxt->pCatalog, &pStmt->targetTableName, &vg, &exists);
18,603✔
1263
  } else {
1264
    SRequestConnInfo conn = {.pTrans = pCxt->pTransporter,
20,225✔
1265
                             .requestId = pCxt->requestId,
20,225✔
1266
                             .requestObjRefId = pCxt->requestRid,
20,225✔
1267
                             .mgmtEps = pCxt->mgmtEpSet};
1268
    code = catalogGetTableHashVgroup(pCxt->pCatalog, &conn, &pStmt->targetTableName, &vg);
20,225✔
1269
  }
1270
  if (TSDB_CODE_SUCCESS == code) {
38,841✔
1271
    if (exists) {
38,840!
1272
      if (isStb) {
38,840✔
1273
        pStmt->pTableMeta->vgId = vg.vgId;
38,694✔
1274
      }
1275
      code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
38,840✔
1276
    }
1277
    *pMissCache = !exists;
38,848✔
1278
  }
1279
  return code;
38,849✔
1280
}
1281

1282
static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* pMissCache) {
1,879,844✔
1283
  SParseContext* pComCxt = pCxt->pComCxt;
1,879,844✔
1284
  int32_t        code = TSDB_CODE_SUCCESS;
1,879,844✔
1285
  if (pComCxt->async) {
1,879,844✔
1286
    {
1287
      SVgroupInfo vg;
1288
      code = catalogGetCachedTableVgMeta(pComCxt->pCatalog, &pStmt->targetTableName, &vg, &pStmt->pTableMeta);
1,879,747✔
1289
      if (TSDB_CODE_SUCCESS == code) {
1,879,761✔
1290
        if (NULL != pStmt->pTableMeta) {
1,879,754✔
1291
          if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE) {
1,876,947✔
1292
            pStmt->stbSyntax = true;
10,610✔
1293
          } else {
1294
            code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
1,866,337✔
1295
          }
1296
        }
1297
        *pMissCache = (NULL == pStmt->pTableMeta);
1,879,838✔
1298
      }
1299
    }
1300
  } else {
1301
    bool bUsingTable = false;
97✔
1302
    code = getTableMeta(pCxt, &pStmt->targetTableName, &pStmt->pTableMeta, pMissCache, bUsingTable);
97✔
1303
    if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
182!
1304
      if (TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) {
181✔
1305
        pStmt->stbSyntax = true;
35✔
1306
      }
1307
      if (!pStmt->stbSyntax) {
181✔
1308
        code = getTargetTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
146✔
1309
      }
1310
    }
1311
  }
1312
  return code;
1,879,979✔
1313
}
1314

1315
static int32_t collectUseTable(const SName* pName, SHashObj* pTable) {
40,233✔
1316
  char    fullName[TSDB_TABLE_FNAME_LEN];
1317
  int32_t code = tNameExtractFullName(pName, fullName);
40,233✔
1318
  if (TSDB_CODE_SUCCESS != code) {
40,249!
1319
    return code;
×
1320
  }
1321
  return taosHashPut(pTable, fullName, strlen(fullName), pName, sizeof(SName));
40,249✔
1322
}
1323

1324
static int32_t collectUseDatabase(const SName* pName, SHashObj* pDbs) {
30,855✔
1325
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
30,855✔
1326
  (void)tNameGetFullDbName(pName, dbFName);
30,855✔
1327
  return taosHashPut(pDbs, dbFName, strlen(dbFName), dbFName, sizeof(dbFName));
30,870✔
1328
}
1329

1330
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
1,880,750✔
1331
  if (pCxt->forceUpdate) {
1,880,750✔
1332
    pCxt->missCache = true;
237✔
1333
    return TSDB_CODE_SUCCESS;
237✔
1334
  }
1335
  SNode*  pTagCond = NULL;
1,880,513✔
1336
  int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache, &pTagCond);
1,880,513✔
1337
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
1,880,547!
1338
    code = getTargetTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
1,879,934✔
1339
  }
1340
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
1,880,600✔
1341
    if (TSDB_SUPER_TABLE != pStmt->pTableMeta->tableType) {
1,877,139✔
1342
      pCxt->needTableTagVal = (NULL != pTagCond);
1,866,500✔
1343
      pCxt->missCache = (NULL != pTagCond);
1,866,500✔
1344
    } else {
1345
      pStmt->pTagCond = NULL;
10,639✔
1346
      code = nodesCloneNode(pTagCond, &pStmt->pTagCond);
10,639✔
1347
    }
1348
  }
1349
  nodesDestroyNode(pTagCond);
1,880,606✔
1350

1351
  if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
1,880,546✔
1352
    code = collectUseDatabase(&pStmt->targetTableName, pStmt->pDbFNameHashObj);
180✔
1353
    if (TSDB_CODE_SUCCESS == code) {
181!
1354
      code = collectUseTable(&pStmt->targetTableName, pStmt->pTableNameHashObj);
181✔
1355
    }
1356
  }
1357
  return code;
1,880,549✔
1358
}
1359

1360
static int32_t preParseUsingTableName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) {
79,621✔
1361
  return insCreateSName(&pStmt->usingTableName, pTbName, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
79,621✔
1362
}
1363

1364
static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* ctbCacheHit) {
79,634✔
1365
  int32_t code = TSDB_CODE_SUCCESS;
79,634✔
1366
  STableMeta* pStableMeta = NULL;
79,634✔
1367
  STableMeta* pCtableMeta = NULL;
79,634✔
1368
  if (pCxt->forceUpdate) {
79,634!
1369
    pCxt->missCache = true;
×
1370
    return TSDB_CODE_SUCCESS;
×
1371
  }
1372
  if (!pCxt->missCache) {
79,634!
1373
    char tbFName[TSDB_TABLE_FNAME_LEN];
1374
    code = tNameExtractFullName(&pStmt->usingTableName, tbFName);
79,643✔
1375
    if (TSDB_CODE_SUCCESS != code) {
79,647!
1376
      return code;
×
1377
    }
1378
    STableMeta** ppStableMeta = taosHashGet(pStmt->pSuperTableHashObj, tbFName, strlen(tbFName));
79,647✔
1379
    if (NULL != ppStableMeta) {
79,642✔
1380
      pStableMeta = *ppStableMeta;
91✔
1381
    }
1382
    if (NULL == pStableMeta) {
79,642✔
1383
      bool bUsingTable = true;
79,553✔
1384
      code = getTableMeta(pCxt, &pStmt->usingTableName, &pStableMeta, &pCxt->missCache, bUsingTable);
79,553✔
1385
      if (TSDB_CODE_SUCCESS == code) {
79,544!
1386
        code = taosHashPut(pStmt->pSuperTableHashObj, tbFName, strlen(tbFName), &pStableMeta, POINTER_BYTES);
79,548✔
1387
      } else {
1388
        taosMemoryFreeClear(pStableMeta);
×
1389
      }
1390
    }
1391
  }
1392
  if (pCxt->isStmtBind) {
79,624✔
1393
    goto _no_ctb_cache;
20,062✔
1394
  }
1395

1396
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
59,562!
1397
    bool bUsingTable = false;
59,550✔
1398
    code = getTableMeta(pCxt, &pStmt->targetTableName, &pCtableMeta, &pCxt->missCache, bUsingTable);
59,550✔
1399
  }
1400

1401
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
59,564!
1402
    code = (pStableMeta->suid == pCtableMeta->suid) ? TSDB_CODE_SUCCESS : TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
18,620✔
1403
    *ctbCacheHit = true;
18,620✔
1404
  }
1405
_no_ctb_cache:
40,944✔
1406
  if (TSDB_CODE_SUCCESS == code) {
79,626✔
1407
    if (*ctbCacheHit) {
79,620✔
1408
      code = cloneTableMeta(pCtableMeta, &pStmt->pTableMeta);
18,618✔
1409
    } else {
1410
      code = cloneTableMeta(pStableMeta, &pStmt->pTableMeta);
61,002✔
1411
    }
1412
  }
1413
  taosMemoryFree(pCtableMeta);
79,646!
1414
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
79,631✔
1415
    code = getTargetTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache);
38,685✔
1416
  }
1417
  if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
79,647✔
1418
    code = collectUseDatabase(&pStmt->usingTableName, pStmt->pDbFNameHashObj);
20,099✔
1419
    if (TSDB_CODE_SUCCESS == code) {
20,099!
1420
      code = collectUseTable(&pStmt->usingTableName, pStmt->pTableNameHashObj);
20,099✔
1421
    }
1422
  }
1423
  return code;
79,650✔
1424
}
1425

1426
static int32_t parseUsingTableNameImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
79,633✔
1427
  SToken token;
1428
  NEXT_TOKEN(pStmt->pSql, token);
79,633✔
1429
  bool    ctbCacheHit = false;
79,651✔
1430
  int32_t code = preParseUsingTableName(pCxt, pStmt, &token);
79,651✔
1431
  if (TSDB_CODE_SUCCESS == code) {
79,650✔
1432
    code = getUsingTableSchema(pCxt, pStmt, &ctbCacheHit);
79,649✔
1433
    if (TSDB_CODE_SUCCESS == code && ctbCacheHit && !pCxt->missCache) {
79,644!
1434
      pStmt->usingTableProcessing = false;
18,618✔
1435
      return ignoreUsingClauseAndCheckTagValues(pCxt, pStmt);
18,618✔
1436
    }
1437
  }
1438
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
61,027✔
1439
    code = storeChildTableMeta(pCxt, pStmt);
20,082✔
1440
  }
1441
  return code;
61,031✔
1442
}
1443

1444
// input pStmt->pSql:
1445
//   1(care). [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) [table_options]] ...
1446
//   2. VALUES ... | FILE ...
1447
// output pStmt->pSql:
1448
//   1. [(tag1_name, ...)] TAGS (tag1_value, ...) [table_options]] ...
1449
//   2. VALUES ... | FILE ...
1450
static int32_t parseUsingTableName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
1,960,379✔
1451
  SToken  token;
1452
  int32_t index = 0;
1,960,379✔
1453
  NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
1,960,379✔
1454
  if (TK_USING != token.type) {
1,960,482✔
1455
    return getTargetTableSchema(pCxt, pStmt);
1,880,829✔
1456
  }
1457
  pStmt->usingTableProcessing = true;
79,653✔
1458
  pCxt->stmtTbNameFlag |= USING_CLAUSE;
79,653✔
1459
  // pStmt->pSql -> stb_name [(tag1_name, ...)
1460
  pStmt->pSql += index;
79,653✔
1461
  int32_t code = parseDuplicateUsingClause(pCxt, pStmt, &pCxt->usingDuplicateTable);
79,653✔
1462
  if (TSDB_CODE_SUCCESS == code && !pCxt->usingDuplicateTable) {
79,649!
1463
    return parseUsingTableNameImpl(pCxt, pStmt);
79,646✔
1464
  }
1465
  return code;
3✔
1466
}
1467

1468
static int32_t preParseTargetTableName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) {
1,960,664✔
1469
  int32_t code = insCreateSName(&pStmt->targetTableName, pTbName, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
1,960,664✔
1470
  if (TSDB_CODE_SUCCESS == code) {
1,960,639!
1471
    if (IS_SYS_DBNAME(pStmt->targetTableName.dbname)) {
1,960,666✔
1472
      return TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED;
248✔
1473
    }
1474
  }
1475

1476
  return code;
1,960,391✔
1477
}
1478

1479
// input pStmt->pSql:
1480
//   1(care). [(field1_name, ...)] ...
1481
//   2. [ USING ... ] ...
1482
//   3. VALUES ... | FILE ...
1483
// output pStmt->pSql:
1484
//   1. [ USING ... ] ...
1485
//   2. VALUES ... | FILE ...
1486
static int32_t preParseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
1,960,436✔
1487
  SToken  token;
1488
  int32_t index = 0;
1,960,436✔
1489
  NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
1,960,436✔
1490
  if (TK_NK_LP != token.type) {
1,960,441✔
1491
    return TSDB_CODE_SUCCESS;
1,496,114✔
1492
  }
1493

1494
  // pStmt->pSql -> field1_name, ...)
1495
  pStmt->pSql += index;
464,327✔
1496
  pStmt->pBoundCols = pStmt->pSql;
464,327✔
1497
  return skipParentheses(pCxt, &pStmt->pSql);
464,327✔
1498
}
1499

1500
static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt** pTableCxt) {
1,944,171✔
1501
  if (pCxt->pComCxt->async) {
1,944,171✔
1502
    return insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid),
1,924,082✔
1503
                              pStmt->pTableMeta, &pStmt->pCreateTblReq, pTableCxt, false, false);
1,924,008✔
1504
  }
1505

1506
  char    tbFName[TSDB_TABLE_FNAME_LEN];
1507
  int32_t code = 0;
20,163✔
1508
  if ((pCxt->stmtTbNameFlag & NO_DATA_USING_CLAUSE) == USING_CLAUSE) {
20,163✔
1509
    tstrncpy(pStmt->targetTableName.tname, pStmt->usingTableName.tname, sizeof(pStmt->targetTableName.tname));
23✔
1510
    tstrncpy(pStmt->targetTableName.dbname, pStmt->usingTableName.dbname, sizeof(pStmt->targetTableName.dbname));
23✔
1511
    pStmt->targetTableName.type = TSDB_SUPER_TABLE;
23✔
1512
    pStmt->pTableMeta->tableType = TSDB_SUPER_TABLE;
23✔
1513
  }
1514

1515
  code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
20,163✔
1516

1517
  if (TSDB_CODE_SUCCESS != code) {
20,229!
1518
    return code;
×
1519
  }
1520
  if (pStmt->usingTableProcessing) {
20,229✔
1521
    pStmt->pTableMeta->uid = 0;
20,075✔
1522
  }
1523
  return insGetTableDataCxt(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), pStmt->pTableMeta,
20,229✔
1524
                            &pStmt->pCreateTblReq, pTableCxt, NULL != pCxt->pComCxt->pStmtCb, false);
20,229✔
1525
}
1526

1527
static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt) {
1,944,171✔
1528
  SToken  token;
1529
  int32_t index = 0;
1,944,171✔
1530
  NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
1,944,171✔
1531
  if (TK_NK_LP == token.type) {
1,944,328✔
1532
    pStmt->pSql += index;
92✔
1533
    if (NULL != pStmt->pBoundCols) {
92✔
1534
      return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
3✔
1535
    }
1536
    // pStmt->pSql -> field1_name, ...)
1537
    return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_COLUMNS, pStmt->pTableMeta, &pTableCxt->boundColsInfo);
89✔
1538
  }
1539

1540
  if (NULL != pStmt->pBoundCols) {
1,944,236✔
1541
    return parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_COLUMNS, pStmt->pTableMeta, &pTableCxt->boundColsInfo);
453,731✔
1542
  } else if (pTableCxt->boundColsInfo.hasBoundCols) {
1,490,505✔
1543
    insResetBoundColsInfo(&pTableCxt->boundColsInfo);
22✔
1544
  }
1545

1546
  return TSDB_CODE_SUCCESS;
1,490,424✔
1547
}
1548

1549
int32_t initTableColSubmitData(STableDataCxt* pTableCxt) {
3,970,038✔
1550
  if (0 == (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT)) {
3,970,038✔
1551
    return TSDB_CODE_SUCCESS;
3,949,828✔
1552
  }
1553

1554
  for (int32_t i = 0; i < pTableCxt->boundColsInfo.numOfBound; ++i) {
81,735✔
1555
    SSchema*  pSchema = &pTableCxt->pMeta->schema[pTableCxt->boundColsInfo.pColIndex[i]];
61,508✔
1556
    SColData* pCol = taosArrayReserve(pTableCxt->pData->aCol, 1);
61,508✔
1557
    if (NULL == pCol) {
61,530!
1558
      return terrno;
×
1559
    }
1560
    tColDataInit(pCol, pSchema->colId, pSchema->type, pSchema->flags);
61,530✔
1561
  }
1562

1563
  return TSDB_CODE_SUCCESS;
20,227✔
1564
}
1565

1566
int32_t initTableColSubmitDataWithBoundInfo(STableDataCxt* pTableCxt, SBoundColInfo pBoundColsInfo) {
19✔
1567
  insDestroyBoundColInfo(&(pTableCxt->boundColsInfo));
19✔
1568
  pTableCxt->boundColsInfo = pBoundColsInfo;
19✔
1569
  for (int32_t i = 0; i < pBoundColsInfo.numOfBound; ++i) {
48✔
1570
    SSchema*  pSchema = &pTableCxt->pMeta->schema[pTableCxt->boundColsInfo.pColIndex[i]];
29✔
1571
    SColData* pCol = taosArrayReserve(pTableCxt->pData->aCol, 1);
29✔
1572
    if (NULL == pCol) {
29!
1573
      return terrno;
×
1574
    }
1575
    tColDataInit(pCol, pSchema->colId, pSchema->type, pSchema->flags);
29✔
1576
  }
1577

1578
  return TSDB_CODE_SUCCESS;
19✔
1579
}
1580

1581
// input pStmt->pSql:
1582
//   1. [(tag1_name, ...)] ...
1583
//   2. VALUES ... | FILE ...
1584
// output pStmt->pSql: VALUES ... | FILE ...
1585
static int32_t parseSchemaClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
1,948,842✔
1586
                                       STableDataCxt** pTableCxt) {
1587
  int32_t code = parseUsingClauseBottom(pCxt, pStmt);
1,948,842✔
1588
  if (TSDB_CODE_SUCCESS == code) {
1,948,794✔
1589
    code = getTableDataCxt(pCxt, pStmt, pTableCxt);
1,944,274✔
1590
  }
1591
  if (TSDB_CODE_SUCCESS == code) {
1,948,820✔
1592
    code = parseBoundColumnsClause(pCxt, pStmt, *pTableCxt);
1,944,200✔
1593
  }
1594
  if (TSDB_CODE_SUCCESS == code) {
1,948,899✔
1595
    code = initTableColSubmitData(*pTableCxt);
1,944,276✔
1596
  }
1597
  return code;
1,948,806✔
1598
}
1599

1600
// input pStmt->pSql: [(field1_name, ...)] [ USING ... ] VALUES ... | FILE ...
1601
// output pStmt->pSql:
1602
//   1. [(tag1_name, ...)] ...
1603
//   2. VALUES ... | FILE ...
1604
static int32_t parseSchemaClauseTop(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) {
1,960,648✔
1605
  int32_t code = preParseTargetTableName(pCxt, pStmt, pTbName);
1,960,648✔
1606
  if (TSDB_CODE_SUCCESS == code) {
1,960,683✔
1607
    // option: [(field1_name, ...)]
1608
    code = preParseBoundColumnsClause(pCxt, pStmt);
1,960,443✔
1609
  }
1610
  if (TSDB_CODE_SUCCESS == code) {
1,960,696✔
1611
    // option: [USING stb_name]
1612
    code = parseUsingTableName(pCxt, pStmt);
1,960,390✔
1613
  }
1614
  return code;
1,960,598✔
1615
}
1616

1617
static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema,
1,113,650,894✔
1618
                                   const SSchemaExt* pExtSchema, int16_t timePrec, SColVal* pVal) {
1619
  switch (pSchema->type) {
1,113,650,894!
1620
    case TSDB_DATA_TYPE_BOOL: {
43,047,989✔
1621
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
43,047,989!
1622
        if (IS_TRUE_STR(pToken->z, pToken->n)) {
37,246,890!
1623
          VALUE_SET_TRIVIAL_DATUM(&pVal->value, TRUE_VALUE);
22,559,764✔
1624
        } else if (IS_FALSE_STR(pToken->z, pToken->n)) {
14,687,126!
1625
          VALUE_SET_TRIVIAL_DATUM(&pVal->value, FALSE_VALUE);
15,047,465✔
1626
        } else if (TSDB_CODE_SUCCESS ==
899✔
1627
                   toDoubleEx(pToken->z, pToken->n, pToken->type, (double*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value))) {
×
1628
          int8_t v = (*(double*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value) == 0 ? FALSE_VALUE : TRUE_VALUE);
389✔
1629
          valueSetDatum(&pVal->value, TSDB_DATA_TYPE_BOOL, &v, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes);
389✔
1630
        } else {
1631
          return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
510✔
1632
        }
1633
      } else if (pToken->type == TK_NK_INTEGER) {
5,801,099!
1634
        int8_t v = ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? FALSE_VALUE : TRUE_VALUE);
5,802,777✔
1635
        VALUE_SET_TRIVIAL_DATUM(&pVal->value, v);
5,802,470✔
1636
      } else if (pToken->type == TK_NK_FLOAT) {
×
1637
        int8_t v = ((taosStr2Double(pToken->z, NULL) == 0) ? FALSE_VALUE : TRUE_VALUE);
154✔
1638
        VALUE_SET_TRIVIAL_DATUM(&pVal->value, v);
154✔
1639
      } else if ((pToken->type == TK_NK_HEX || pToken->type == TK_NK_BIN) &&
×
1640
                 (TSDB_CODE_SUCCESS ==
1641
                  toDoubleEx(pToken->z, pToken->n, pToken->type, (double*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value)))) {
×
1642
        int8_t v = *(double*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value) == 0 ? FALSE_VALUE : TRUE_VALUE;
152✔
1643
        valueSetDatum(&pVal->value, TSDB_DATA_TYPE_BOOL, &v, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes);
152✔
1644
      } else {
1645
        return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
215✔
1646
      }
1647
      break;
43,410,394✔
1648
    }
1649
    case TSDB_DATA_TYPE_TINYINT: {
48,399,419✔
1650
      int32_t code = toIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
48,399,419✔
1651
      if (TSDB_CODE_SUCCESS != code) {
48,449,040✔
1652
        return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z);
812✔
1653
      } else if (!IS_VALID_TINYINT(VALUE_GET_TRIVIAL_DATUM(&pVal->value))) {
48,448,228!
1654
        return buildSyntaxErrMsg(&pCxt->msg, "tinyint data overflow", pToken->z);
14,075✔
1655
      }
1656
      break;
48,434,153✔
1657
    }
1658
    case TSDB_DATA_TYPE_UTINYINT: {
41,289,447✔
1659
      int32_t code = toUIntegerEx(pToken->z, pToken->n, pToken->type, (uint64_t*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value));
41,289,447✔
1660
      if (TSDB_CODE_SUCCESS != code) {
41,283,098!
1661
        return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z);
×
1662
      } else if (VALUE_GET_TRIVIAL_DATUM(&pVal->value) > UINT8_MAX) {
41,283,618✔
1663
        return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z);
11✔
1664
      }
1665
      break;
41,283,607✔
1666
    }
1667
    case TSDB_DATA_TYPE_SMALLINT: {
48,073,149✔
1668
      int32_t code = toIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
48,073,149✔
1669
      if (TSDB_CODE_SUCCESS != code) {
48,159,469✔
1670
        return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z);
812✔
1671
      } else if (!IS_VALID_SMALLINT(VALUE_GET_TRIVIAL_DATUM(&pVal->value))) {
48,158,657!
1672
        return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z);
×
1673
      }
1674
      break;
48,161,374✔
1675
    }
1676
    case TSDB_DATA_TYPE_USMALLINT: {
41,229,609✔
1677
      int32_t code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
41,229,609✔
1678
      if (TSDB_CODE_SUCCESS != code) {
41,277,383✔
1679
        return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z);
23,202✔
1680
      } else if (VALUE_GET_TRIVIAL_DATUM(&pVal->value) > UINT16_MAX) {
41,254,181✔
1681
        return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z);
11✔
1682
      }
1683
      break;
41,254,170✔
1684
    }
1685
    case TSDB_DATA_TYPE_INT: {
129,313,424✔
1686
      int32_t code = toIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
129,313,424✔
1687
      if (TSDB_CODE_SUCCESS != code) {
131,529,750✔
1688
        return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z);
813✔
1689
      } else if (!IS_VALID_INT(VALUE_GET_TRIVIAL_DATUM(&pVal->value))) {
131,528,937!
1690
        return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z);
×
1691
      }
1692
      break;
131,575,288✔
1693
    }
1694
    case TSDB_DATA_TYPE_UINT: {
41,267,353✔
1695
      int32_t code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
41,267,353✔
1696
      if (TSDB_CODE_SUCCESS != code) {
41,324,078!
1697
        return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z);
×
1698
      } else if (VALUE_GET_TRIVIAL_DATUM(&pVal->value) > UINT32_MAX) {
41,328,237✔
1699
        return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z);
13✔
1700
      }
1701
      break;
41,328,224✔
1702
    }
1703
    case TSDB_DATA_TYPE_BIGINT: {
237,171,870✔
1704
      int32_t code = toIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
237,171,870✔
1705
      if (TSDB_CODE_SUCCESS != code) {
264,240,419✔
1706
        return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z);
336,629✔
1707
      }
1708
      break;
263,903,790✔
1709
    }
1710
    case TSDB_DATA_TYPE_UBIGINT: {
41,311,166✔
1711
      int32_t code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
41,311,166✔
1712
      if (TSDB_CODE_SUCCESS != code) {
41,355,900!
1713
        return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z);
×
1714
      }
1715
      break;
41,360,262✔
1716
    }
1717
    case TSDB_DATA_TYPE_FLOAT: {
105,947,335✔
1718
      double  dv;
1719
      int32_t code = toDoubleEx(pToken->z, pToken->n, pToken->type, &dv);
105,947,335✔
1720
      if (TSDB_CODE_SUCCESS != code) {
106,824,737✔
1721
        return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
825✔
1722
      }
1723
      if (dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
106,823,927!
1724
        return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
18,235✔
1725
      }
1726
      float f = dv;
106,805,692✔
1727
      valueSetDatum(&pVal->value, TSDB_DATA_TYPE_FLOAT, &f, sizeof(f));
106,805,692✔
1728
      break;
106,478,101✔
1729
    }
1730
    case TSDB_DATA_TYPE_DOUBLE: {
68,120,790✔
1731
      double  dv;
1732
      int32_t code = toDoubleEx(pToken->z, pToken->n, pToken->type, &dv);
68,120,790✔
1733
      if (TSDB_CODE_SUCCESS != code) {
68,520,757✔
1734
        return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
43,430✔
1735
      }
1736
      if (isinf(dv) || isnan(dv)) {
68,477,327!
1737
        return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
×
1738
      }
1739
      VALUE_SET_TRIVIAL_DATUM(&pVal->value, (*(int64_t*)&dv));
68,484,953✔
1740
      break;
68,484,953✔
1741
    }
1742
    case TSDB_DATA_TYPE_BINARY: {
72,718,750✔
1743
      // Too long values will raise the invalid sql error message
1744
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
72,718,750✔
1745
        return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
34✔
1746
      }
1747
      pVal->value.pData = taosMemoryMalloc(pToken->n);
72,718,716!
1748
      if (NULL == pVal->value.pData) {
72,635,960!
1749
        return terrno;
×
1750
      }
1751
      memcpy(pVal->value.pData, pToken->z, pToken->n);
72,635,960✔
1752
      pVal->value.nData = pToken->n;
72,635,960✔
1753
      break;
72,635,960✔
1754
    }
1755
    case TSDB_DATA_TYPE_VARBINARY: {
10,667,562✔
1756
      int32_t code = parseVarbinary(pToken, &pVal->value.pData, &pVal->value.nData, pSchema->bytes);
10,667,562✔
1757
      if (code != TSDB_CODE_SUCCESS) {
10,667,562✔
1758
        return generateSyntaxErrMsg(&pCxt->msg, code, pSchema->name);
540✔
1759
      }
1760
      break;
10,667,022✔
1761
    }
1762
    case TSDB_DATA_TYPE_NCHAR: {
64,087,215✔
1763
      // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
1764
      int32_t len = 0;
64,087,215✔
1765
      int64_t realLen = pToken->n << 2;
64,087,215✔
1766
      if (realLen > pSchema->bytes - VARSTR_HEADER_SIZE) realLen = pSchema->bytes - VARSTR_HEADER_SIZE;
64,087,215✔
1767
      char* pUcs4 = taosMemoryMalloc(realLen);
64,087,215!
1768
      if (NULL == pUcs4) {
64,030,628!
1769
        return terrno;
18✔
1770
      }
1771
      if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)pUcs4, realLen, &len, pCxt->pComCxt->charsetCxt)) {
64,030,628✔
1772
        taosMemoryFree(pUcs4);
45,042!
1773
        if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
18✔
1774
          return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
17✔
1775
        }
1776
        char buf[512] = {0};
1✔
1777
        snprintf(buf, tListLen(buf), "%s", strerror(terrno));
1✔
1778
        return buildSyntaxErrMsg(&pCxt->msg, buf, pToken->z);
1✔
1779
      }
1780
      pVal->value.pData = pUcs4;
64,316,509✔
1781
      pVal->value.nData = len;
64,316,509✔
1782
      break;
64,316,509✔
1783
    }
1784
    case TSDB_DATA_TYPE_JSON: {
×
1785
      if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
×
1786
        return buildSyntaxErrMsg(&pCxt->msg, "json string too long than 4095", pToken->z);
×
1787
      }
1788
      pVal->value.pData = taosMemoryMalloc(pToken->n);
×
1789
      if (NULL == pVal->value.pData) {
×
1790
        return terrno;
×
1791
      }
1792
      memcpy(pVal->value.pData, pToken->z, pToken->n);
×
1793
      pVal->value.nData = pToken->n;
×
1794
      break;
×
1795
    }
1796
    case TSDB_DATA_TYPE_GEOMETRY: {
13,149✔
1797
      int32_t        code = TSDB_CODE_FAILED;
13,149✔
1798
      unsigned char* output = NULL;
13,149✔
1799
      size_t         size = 0;
13,149✔
1800

1801
      code = parseGeometry(pToken, &output, &size);
13,149✔
1802
      if (code != TSDB_CODE_SUCCESS) {
13,149✔
1803
        code = buildSyntaxErrMsg(&pCxt->msg, getGeosErrMsg(code), pToken->z);
1,266✔
1804
      }
1805
      // Too long values will raise the invalid sql error message
1806
      else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) {
11,883!
1807
        code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
1808
      } else {
1809
        pVal->value.pData = taosMemoryMalloc(size);
11,883!
1810
        if (NULL == pVal->value.pData) {
11,883!
1811
          code = terrno;
×
1812
        } else {
1813
          memcpy(pVal->value.pData, output, size);
11,883✔
1814
          pVal->value.nData = size;
11,883✔
1815
        }
1816
      }
1817

1818
      geosFreeBuffer(output);
13,149✔
1819
      if (code != TSDB_CODE_SUCCESS) {
13,149✔
1820
        return code;
1,266✔
1821
      }
1822

1823
      break;
11,883✔
1824
    }
1825
    case TSDB_DATA_TYPE_TIMESTAMP: {
168,406,397✔
1826
      if (parseTime(pSql, pToken, timePrec, &VALUE_GET_TRIVIAL_DATUM(&pVal->value), &pCxt->msg,
168,882,966✔
1827
                    pCxt->pComCxt->timezone) != TSDB_CODE_SUCCESS) {
168,406,397✔
1828
        return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z);
16,676✔
1829
      }
1830
      break;
168,866,290✔
1831
    }
1832
    case TSDB_DATA_TYPE_DECIMAL: {
908,215✔
1833
      if (!pExtSchema) {
908,215!
1834
        qError("Decimal type without ext schema info, cannot parse decimal values");
×
1835
        return TSDB_CODE_PAR_INTERNAL_ERROR;
×
1836
      }
1837
      uint8_t precision = 0, scale = 0;
908,215✔
1838
      decimalFromTypeMod(pExtSchema->typeMod, &precision, &scale);
908,215✔
1839
      Decimal128 dec = {0};
908,215✔
1840
      int32_t code = decimal128FromStr(pToken->z, pToken->n, precision, scale, &dec);
908,215✔
1841
      if (TSDB_CODE_SUCCESS != code) {
908,213!
1842
        return code;
×
1843
      }
1844

1845
      // precision check
1846
      // scale auto fit
1847

1848
      code = decimal128ToDataVal(&dec, &pVal->value);
908,213✔
1849
      if (TSDB_CODE_SUCCESS != code) {
908,215!
1850
        return code;
×
1851
      }
1852
      break;
908,215✔
1853
    }
1854
    case TSDB_DATA_TYPE_DECIMAL64: {
198,835✔
1855
      if (!pExtSchema) {
198,835!
1856
        qError("Decimal type without ext schema info, cannot parse decimal values");
×
1857
        return TSDB_CODE_PAR_INTERNAL_ERROR;
×
1858
      }
1859
      uint8_t precision = 0, scale = 0;
198,835✔
1860
      decimalFromTypeMod(pExtSchema->typeMod, &precision, &scale);
198,835✔
1861
      Decimal64 dec = {0};
198,834✔
1862
      int32_t code = decimal64FromStr(pToken->z, pToken->n, precision, scale, &dec);
198,834✔
1863
      if (TSDB_CODE_SUCCESS != code) {
198,835!
1864
        return code;
×
1865
      }
1866
      code = decimal64ToDataVal(&dec, &pVal->value);
198,835✔
1867
      if (TSDB_CODE_SUCCESS != code) {
198,834!
1868
        return code;
×
1869
      }
1870
      break;
198,834✔
1871
    }
1872
    default:
×
1873
      return TSDB_CODE_FAILED;
×
1874
  }
1875

1876
  pVal->flag = CV_FLAG_VALUE;
1,193,279,029✔
1877
  return TSDB_CODE_SUCCESS;
1,193,279,029✔
1878
}
1879

1880
static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema,
1,138,819,254✔
1881
                               const SSchemaExt* pExtSchema, int16_t timePrec, SColVal* pVal) {
1882
  int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pSchema->type);
1,138,819,254✔
1883
  if (TSDB_CODE_SUCCESS == code && isNullValue(pSchema->type, pToken)) {
2,147,483,647✔
1884
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
14,192,665✔
1885
      return buildSyntaxErrMsg(&pCxt->msg, "Primary timestamp column should not be null", pToken->z);
1✔
1886
    }
1887

1888
    pVal->flag = CV_FLAG_NULL;
14,192,664✔
1889
    return TSDB_CODE_SUCCESS;
14,192,664✔
1890
  }
1891

1892
  if (TSDB_CODE_SUCCESS == code) {
1,115,766,030✔
1893
    if (pToken->n == 0 && IS_NUMERIC_TYPE(pSchema->type)) {
1,111,767,562!
1894
      return buildSyntaxErrMsg(&pCxt->msg, "invalid numeric data", pToken->z);
126✔
1895
    }
1896
    code = parseValueTokenImpl(pCxt, pSql, pToken, pSchema, pExtSchema, timePrec, pVal);
1,111,767,436✔
1897
  }
1898

1899
  return code;
1,136,742,805✔
1900
}
1901

1902
static void clearColValArray(SArray* pCols) {
147,236,205✔
1903
  int32_t num = taosArrayGetSize(pCols);
147,236,205✔
1904
  for (int32_t i = 0; i < num; ++i) {
1,394,790,713✔
1905
    SColVal* pCol = taosArrayGet(pCols, i);
1,247,810,760✔
1906
    if (IS_VAR_DATA_TYPE(pCol->value.type) || pCol->value.type == TSDB_DATA_TYPE_DECIMAL) {
1,247,287,089!
1907
      taosMemoryFreeClear(pCol->value.pData);
155,009,283!
1908
    }
1909
  }
1910
}
146,979,953✔
1911

1912
typedef struct SStbRowsDataContext {
1913
  SName stbName;
1914

1915
  STableMeta*   pStbMeta;
1916
  SNode*        pTagCond;
1917
  SBoundColInfo boundColsInfo;
1918

1919
  // the following fields are for each stb row
1920
  SArray*        aTagVals;
1921
  SArray*        aColVals;
1922
  SArray*        aTagNames;
1923
  SName          ctbName;
1924
  STag*          pTag;
1925
  STableMeta*    pCtbMeta;
1926
  SVCreateTbReq* pCreateCtbReq;
1927
  bool           hasTimestampTag;
1928
  bool           isJsonTag;
1929
} SStbRowsDataContext;
1930

1931
typedef union SRowsDataContext {
1932
  STableDataCxt*       pTableDataCxt;
1933
  SStbRowsDataContext* pStbRowsCxt;
1934
} SRowsDataContext;
1935

1936
int32_t parseTbnameToken(SMsgBuf* pMsgBuf, char* tname, SToken* pToken, bool* pFoundCtbName) {
2,029,754✔
1937
  *pFoundCtbName = false;
2,029,754!
1938

1939
  if (isNullValue(TSDB_DATA_TYPE_BINARY, pToken)) {
2,029,754!
1940
    return buildInvalidOperationMsg(pMsgBuf, "tbname can not be null value");
×
1941
  }
1942

1943
  if (pToken->n > 0) {
2,029,754✔
1944
    if (pToken->n <= TSDB_TABLE_NAME_LEN - 1) {
2,029,753!
1945
      for (int i = 0; i < pToken->n; ++i) {
19,291,922✔
1946
        if (pToken->z[i] == '.') {
17,262,170✔
1947
          return buildInvalidOperationMsg(pMsgBuf, "tbname can not contain '.'");
1✔
1948
        } else {
1949
          tname[i] = pToken->z[i];
17,262,169✔
1950
        }
1951
      }
1952
      tname[pToken->n] = '\0';
2,029,752✔
1953
      *pFoundCtbName = true;
2,029,752✔
1954
    } else {
1955
      return buildInvalidOperationMsg(pMsgBuf, "tbname is too long");
×
1956
    }
1957
  } else {
1958
    return buildInvalidOperationMsg(pMsgBuf, "tbname can not be empty");
1✔
1959
  }
1960
  return TSDB_CODE_SUCCESS;
2,029,752✔
1961
}
1962

1963
static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
2,024,812✔
1964
                                          SStbRowsDataContext* pStbRowsCxt, bool ctbFirst, const SToken* tagTokens,
1965
                                          SSchema* const* tagSchemas, int numOfTagTokens) {
1966
  int32_t code = TSDB_CODE_SUCCESS;
2,024,812✔
1967
  uint8_t precision = pStmt->pTableMeta->tableInfo.precision;
2,024,812✔
1968

1969
  if (code == TSDB_CODE_SUCCESS && ctbFirst) {
2,024,812!
1970
    for (int32_t i = 0; code == TSDB_CODE_SUCCESS && i < numOfTagTokens; ++i) {
19,001✔
1971
      SToken*  pTagToken = (SToken*)(tagTokens + i);
8,293✔
1972
      SSchema* pTagSchema = tagSchemas[i];
8,293✔
1973
      code = checkAndTrimValue(pTagToken, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
8,293✔
1974
      if (code == TSDB_CODE_SUCCESS && TK_NK_VARIABLE == pTagToken->type) {
8,293!
1975
        code = buildInvalidOperationMsg(&pCxt->msg, "not expected tag");
×
1976
      }
1977

1978
      if (code == TSDB_CODE_SUCCESS) {
8,293✔
1979
        code = parseTagValue(&pCxt->msg, NULL, precision, pTagSchema, pTagToken, pStbRowsCxt->aTagNames,
8,052✔
1980
                             pStbRowsCxt->aTagVals, &pStbRowsCxt->pTag, pCxt->pComCxt->timezone, pCxt->pComCxt->charsetCxt);
8,052✔
1981
      }
1982
    }
1983
    if (code == TSDB_CODE_SUCCESS && !pStbRowsCxt->isJsonTag) {
10,708!
1984
      code = tTagNew(pStbRowsCxt->aTagVals, 1, false, &pStbRowsCxt->pTag);
9,380✔
1985
    }
1986
  }
1987

1988
  if (code == TSDB_CODE_SUCCESS && pStbRowsCxt->pTagCond) {
2,024,812!
1989
    code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond);
×
1990
  }
1991
  return code;
2,024,812✔
1992
}
1993

1994
static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
2,029,556✔
1995
                                 SStbRowsDataContext* pStbRowsCxt, SToken* pToken, const SBoundColInfo* pCols,
1996
                                 const SSchema* pSchemas, const SSchemaExt* pExtSchemas, SToken* tagTokens,
1997
                                 SSchema** tagSchemas, int* pNumOfTagTokens, bool* bFoundTbName, bool* setCtbName,
1998
                                 SBoundColInfo* ctbCols) {
1999
  int32_t code = TSDB_CODE_SUCCESS;
2,029,556✔
2000
  SArray* pTagNames = pStbRowsCxt->aTagNames;
2,029,556✔
2001
  SArray* pTagVals = pStbRowsCxt->aTagVals;
2,029,556✔
2002
  bool    canParseTagsAfter = !pStbRowsCxt->pTagCond && !pStbRowsCxt->hasTimestampTag;
2,029,556!
2003
  int32_t numOfCols = getNumOfColumns(pStbRowsCxt->pStbMeta);
2,029,556✔
2004
  int32_t numOfTags = getNumOfTags(pStbRowsCxt->pStbMeta);
2,029,556✔
2005
  int32_t tbnameIdx = getTbnameSchemaIndex(pStbRowsCxt->pStbMeta);
2,029,556✔
2006
  uint8_t precision = getTableInfo(pStbRowsCxt->pStbMeta).precision;
2,029,556✔
2007
  int     tag_index = 0;
2,029,556✔
2008
  int     col_index = 0;
2,029,556✔
2009
  for (int i = 0; i < pCols->numOfBound && (code) == TSDB_CODE_SUCCESS; ++i) {
10,224,210✔
2010
    const char* pTmpSql = *ppSql;
8,194,657✔
2011
    bool        ignoreComma = false;
8,194,657✔
2012
    NEXT_TOKEN_WITH_PREV_EXT(*ppSql, *pToken, &ignoreComma);
8,194,657✔
2013

2014
    if (ignoreComma) {
8,194,657!
2015
      code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pTmpSql);
×
2016
      break;
×
2017
    }
2018

2019
    if (TK_NK_RP == pToken->type) {
8,194,657!
2020
      code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
×
2021
      break;
×
2022
    }
2023

2024
    if (TK_NK_QUESTION == pToken->type) {
8,194,657✔
2025
      if (!pCxt->pComCxt->isStmtBind && i != 0) {
139!
2026
        return buildInvalidOperationMsg(&pCxt->msg, "not support mixed bind and non-bind values");
3✔
2027
      }
2028
      if (pCxt->pComCxt->pStmtCb == NULL) {
139✔
2029
        return buildInvalidOperationMsg(&pCxt->msg, "symbol ? only support in stmt mode");
1✔
2030
      }
2031
      pCxt->isStmtBind = true;
138✔
2032
      if (pCols->pColIndex[i] == tbnameIdx) {
138✔
2033
        *bFoundTbName = true;
28✔
2034
        char* tbName = NULL;
28✔
2035
        if ((*pCxt->pComCxt->pStmtCb->getTbNameFn)(pCxt->pComCxt->pStmtCb->pStmt, &tbName) == TSDB_CODE_SUCCESS) {
28✔
2036
          tstrncpy(pStbRowsCxt->ctbName.tname, tbName, sizeof(pStbRowsCxt->ctbName.tname));
19✔
2037
          tstrncpy(pStmt->usingTableName.tname, pStmt->targetTableName.tname, sizeof(pStmt->usingTableName.tname));
19✔
2038
          tstrncpy(pStmt->targetTableName.tname, tbName, sizeof(pStmt->targetTableName.tname));
19✔
2039
          tstrncpy(pStmt->usingTableName.dbname, pStmt->targetTableName.dbname, sizeof(pStmt->usingTableName.dbname));
19✔
2040
          pStmt->usingTableName.type = 1;
19✔
2041
          pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE;  // set the table type to child table for parse cache
19✔
2042
          *setCtbName = true;
19✔
2043
        }
2044
      } else if (pCols->pColIndex[i] < numOfCols) {
110✔
2045
        // bind column
2046
        if (ctbCols->pColIndex == NULL) {
60✔
2047
          ctbCols->pColIndex = taosMemoryCalloc(numOfCols, sizeof(int16_t));
28!
2048
          if (NULL == ctbCols->pColIndex) {
28!
2049
            return terrno;
×
2050
          }
2051
        }
2052
        ctbCols->pColIndex[col_index++] = pCols->pColIndex[i];
60✔
2053
        ctbCols->numOfBound++;
60✔
2054
        ctbCols->numOfCols++;
60✔
2055

2056
      } else if (pCols->pColIndex[i] < tbnameIdx) {
50!
2057
        if (pCxt->tags.pColIndex == NULL) {
50✔
2058
          pCxt->tags.pColIndex = taosMemoryCalloc(numOfTags, sizeof(int16_t));
23!
2059
          if (NULL == pCxt->tags.pColIndex) {
23!
2060
            return terrno;
×
2061
          }
2062
        }
2063
        if (!(tag_index < numOfTags)) {
50!
2064
          return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfTags");
×
2065
        }
2066
        pStmt->usingTableProcessing = true;
50✔
2067
        pCxt->tags.pColIndex[tag_index++] = pCols->pColIndex[i] - numOfCols;
50✔
2068
        pCxt->tags.mixTagsCols = true;
50✔
2069
        pCxt->tags.numOfBound++;
50✔
2070
        pCxt->tags.numOfCols++;
50✔
2071
      } else {
2072
        return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfBound");
×
2073
      }
2074
    } else {
2075
      if (pCxt->pComCxt->isStmtBind) {
8,194,518✔
2076
        return buildInvalidOperationMsg(&pCxt->msg, "not support mixed bind and non-bind values");
2✔
2077
      }
2078
      if (pCols->pColIndex[i] < numOfCols) {
8,194,516✔
2079
        const SSchema*    pSchema = &pSchemas[pCols->pColIndex[i]];
6,147,031✔
2080
        const SSchemaExt* pExtSchema = pExtSchemas + pCols->pColIndex[i];
6,147,031✔
2081
        SColVal*       pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]);
6,147,031✔
2082
        code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)pSchema, pExtSchema, precision, pVal);
6,147,031✔
2083
        if (TK_NK_VARIABLE == pToken->type) {
6,147,031!
2084
          code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
×
2085
        }
2086
      } else if (pCols->pColIndex[i] < tbnameIdx) {
2,047,485✔
2087
        const SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
17,961✔
2088
        if (canParseTagsAfter) {
17,961✔
2089
          tagTokens[(*pNumOfTagTokens)] = *pToken;
17,069✔
2090
          tagSchemas[(*pNumOfTagTokens)] = (SSchema*)pTagSchema;
17,069✔
2091
          ++(*pNumOfTagTokens);
17,069✔
2092
        } else {
2093
          code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
892✔
2094
          if (code == TSDB_CODE_SUCCESS && TK_NK_VARIABLE == pToken->type) {
892!
2095
            code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
×
2096
          }
2097
          if (code == TSDB_CODE_SUCCESS) {
892✔
2098
            code = parseTagValue(&pCxt->msg, ppSql, precision, (SSchema*)pTagSchema, pToken, pTagNames, pTagVals,
854✔
2099
                                 &pStbRowsCxt->pTag, pCxt->pComCxt->timezone, pCxt->pComCxt->charsetCxt);
854✔
2100
          }
2101
        }
2102
      } else if (pCols->pColIndex[i] == tbnameIdx) {
2,029,524!
2103
        code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, TSDB_DATA_TYPE_BINARY);
2,029,524✔
2104
        if (TK_NK_VARIABLE == pToken->type) {
2,029,524!
2105
          code = buildInvalidOperationMsg(&pCxt->msg, "not expected tbname");
×
2106
        }
2107

2108
        if (code == TSDB_CODE_SUCCESS) {
2,029,524✔
2109
          code = parseTbnameToken(&pCxt->msg, pStbRowsCxt->ctbName.tname, pToken, bFoundTbName);
2,029,500✔
2110
        }
2111
      }
2112
    }
2113

2114
    if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) {
8,194,654✔
2115
      NEXT_VALID_TOKEN(*ppSql, *pToken);
6,168,222✔
2116
      if (TK_NK_COMMA != pToken->type) {
6,166,338✔
2117
        code = buildSyntaxErrMsg(&pCxt->msg, ", expected", pToken->z);
1,237✔
2118
      }
2119
    }
2120
  }
2121

2122
  return code;
2,029,553✔
2123
}
2124

2125
static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
2,029,556✔
2126
                               SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken, bool* pCtbFirst,
2127
                               bool* setCtbName, SBoundColInfo* ctbCols) {
2128
  SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
2,029,556✔
2129
  SSchema*       pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
2,029,556✔
2130
  SSchemaExt*    pExtSchemas = getTableColumnExtSchema(pStbRowsCxt->pStbMeta);
2,029,556✔
2131

2132
  bool        bFoundTbName = false;
2,029,556✔
2133
  const char* pOrigSql = *ppSql;
2,029,556✔
2134

2135
  int32_t  code = TSDB_CODE_SUCCESS;
2,029,556✔
2136
  SToken   tagTokens[TSDB_MAX_TAGS] = {0};
2,029,556✔
2137
  SSchema* tagSchemas[TSDB_MAX_TAGS] = {0};
2,029,556✔
2138
  int      numOfTagTokens = 0;
2,029,556✔
2139

2140
  code = doGetStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pToken, pCols, pSchemas, pExtSchemas, tagTokens, tagSchemas,
2,029,556✔
2141
                           &numOfTagTokens, &bFoundTbName, setCtbName, ctbCols);
2142

2143
  if (code != TSDB_CODE_SUCCESS) {
2,029,556✔
2144
    return code;
4,735✔
2145
  }
2146

2147
  if (!bFoundTbName) {
2,024,821!
2148
    code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql);
×
2149
  }
2150

2151
  bool ctbFirst = true;
2,024,821✔
2152
  char ctbFName[TSDB_TABLE_FNAME_LEN];
2153
  if (code == TSDB_CODE_SUCCESS) {
2,024,821!
2154
    code = tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
2,024,821✔
2155
  }
2156
  if (TSDB_CODE_SUCCESS == code) {
2,024,821✔
2157
    STableMeta** pCtbMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName));
2,024,812✔
2158
    ctbFirst = (pCtbMeta == NULL);
2,024,812✔
2159
    if (!ctbFirst) {
2,024,812✔
2160
      pStbRowsCxt->pCtbMeta->uid = (*pCtbMeta)->uid;
2,014,104✔
2161
      pStbRowsCxt->pCtbMeta->vgId = (*pCtbMeta)->vgId;
2,014,104✔
2162
    }
2163
    *pCtbFirst = ctbFirst;
2,024,812✔
2164
  }
2165

2166
  if (code == TSDB_CODE_SUCCESS) {
2,024,821✔
2167
    code = processCtbTagsAfterCtbName(pCxt, pStmt, pStbRowsCxt, ctbFirst, tagTokens, tagSchemas, numOfTagTokens);
2,024,812✔
2168
  }
2169

2170
  if (code == TSDB_CODE_SUCCESS) {
2,024,821✔
2171
    *pGotRow = true;
2,023,484✔
2172
  }
2173
  return code;
2,024,821✔
2174
}
2175

2176
static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
9,380✔
2177
                                                SStbRowsDataContext* pStbRowsCxt) {
2178
  int32_t code = TSDB_CODE_SUCCESS;
9,380✔
2179

2180
  pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
9,380!
2181
  if (pStbRowsCxt->pCreateCtbReq == NULL) {
9,380!
2182
    code = terrno;
×
2183
  }
2184
  if (code == TSDB_CODE_SUCCESS) {
9,380!
2185
    code = insBuildCreateTbReq(pStbRowsCxt->pCreateCtbReq, pStbRowsCxt->ctbName.tname, pStbRowsCxt->pTag,
9,380✔
2186
                               pStbRowsCxt->pStbMeta->uid, pStbRowsCxt->stbName.tname, pStbRowsCxt->aTagNames,
9,380✔
2187
                               getNumOfTags(pStbRowsCxt->pStbMeta), TSDB_DEFAULT_TABLE_TTL);
9,380✔
2188
    pStbRowsCxt->pTag = NULL;
9,380✔
2189
  }
2190

2191
  if (code == TSDB_CODE_SUCCESS) {
9,380!
2192
    char ctbFName[TSDB_TABLE_FNAME_LEN];
2193
    code = tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
9,380✔
2194
    SVgroupInfo      vg;
2195
    SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter,
9,380✔
2196
                             .requestId = pCxt->pComCxt->requestId,
9,380✔
2197
                             .requestObjRefId = pCxt->pComCxt->requestRid,
9,380✔
2198
                             .mgmtEps = pCxt->pComCxt->mgmtEpSet};
9,380✔
2199
    if (TSDB_CODE_SUCCESS == code) {
9,380!
2200
      code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStbRowsCxt->ctbName, &vg);
9,380✔
2201
    }
2202
    if (code == TSDB_CODE_SUCCESS) {
9,380!
2203
      code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg));
9,380✔
2204
    }
2205
    STableMeta* pBackup = NULL;
9,380✔
2206
    if (TSDB_CODE_SUCCESS == code) {
9,380!
2207
      pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1;
9,380✔
2208
      pStbRowsCxt->pCtbMeta->vgId = vg.vgId;
9,380✔
2209

2210
      code = cloneTableMeta(pStbRowsCxt->pCtbMeta, &pBackup);
9,380✔
2211
    }
2212
    if (TSDB_CODE_SUCCESS == code) {
9,380!
2213
      code = taosHashPut(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName), &pBackup, POINTER_BYTES);
9,380✔
2214
    }
2215
    if (TSDB_CODE_SUCCESS == code) {
9,380!
2216
      code = collectUseTable(&pStbRowsCxt->ctbName, pStmt->pTableNameHashObj);
9,380✔
2217
    }
2218
  }
2219
  return code;
9,380✔
2220
}
2221

2222
static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
2,034,079✔
2223
  if (pStbRowsCxt == NULL) return;
2,034,079!
2224

2225
  taosArrayClear(pStbRowsCxt->aTagNames);
2,034,079✔
2226
  for (int i = 0; i < taosArrayGetSize(pStbRowsCxt->aTagVals); ++i) {
2,041,278✔
2227
    STagVal* p = (STagVal*)taosArrayGet(pStbRowsCxt->aTagVals, i);
7,199✔
2228
    if (IS_VAR_DATA_TYPE(p->type)) {
7,199!
2229
      taosMemoryFreeClear(p->pData);
3,128!
2230
    }
2231
  }
2232
  taosArrayClear(pStbRowsCxt->aTagVals);
2,034,079✔
2233

2234
  clearColValArray(pStbRowsCxt->aColVals);
2,034,079✔
2235

2236
  tTagFree(pStbRowsCxt->pTag);
2,034,079✔
2237
  pStbRowsCxt->pTag = NULL;
2,034,079✔
2238
  tdDestroySVCreateTbReq(pStbRowsCxt->pCreateCtbReq);
2,034,079!
2239
  taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq);
2,034,079!
2240
}
2241

2242
static int32_t parseStbBoundInfo(SVnodeModifyOpStmt* pStmt, SStbRowsDataContext* pStbRowsCxt,
11✔
2243
                                 STableDataCxt** ppTableDataCxt) {
2244
  char    tbFName[TSDB_TABLE_FNAME_LEN];
2245
  int32_t code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
11✔
2246
  if (TSDB_CODE_SUCCESS != code) {
11!
2247
    return code;
×
2248
  }
2249
  if (pStmt->usingTableProcessing) {
11✔
2250
    pStmt->pTableMeta->uid = 0;
6✔
2251
  }
2252

2253
  code = insGetTableDataCxt(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), pStmt->pTableMeta,
11✔
2254
                            &pStmt->pCreateTblReq, ppTableDataCxt, false, true);
2255
  if (code != TSDB_CODE_SUCCESS) {
11!
2256
    return code;
×
2257
  }
2258

2259
  insDestroyBoundColInfo(&((*ppTableDataCxt)->boundColsInfo));
11✔
2260
  (*ppTableDataCxt)->boundColsInfo = pStbRowsCxt->boundColsInfo;
11✔
2261

2262
  (*ppTableDataCxt)->boundColsInfo.pColIndex = taosMemoryCalloc(pStbRowsCxt->boundColsInfo.numOfBound, sizeof(int16_t));
11!
2263
  if (NULL == (*ppTableDataCxt)->boundColsInfo.pColIndex) {
11!
2264
    return terrno;
×
2265
  }
2266
  (void)memcpy((*ppTableDataCxt)->boundColsInfo.pColIndex, pStbRowsCxt->boundColsInfo.pColIndex,
11✔
2267
               sizeof(int16_t) * pStmt->pStbRowsCxt->boundColsInfo.numOfBound);
11✔
2268
  return TSDB_CODE_SUCCESS;
11✔
2269
}
2270

2271
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
2,029,556✔
2272
                              SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken,
2273
                              STableDataCxt** ppTableDataCxt) {
2274
  bool    bFirstTable = false;
2,029,556✔
2275
  bool    setCtbName = false;
2,029,556✔
2276
  SBoundColInfo ctbCols = {0};
2,029,556✔
2277
  int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken, &bFirstTable, &setCtbName, &ctbCols);
2,029,556✔
2278

2279
  if (!setCtbName && pCxt->isStmtBind) {
2,029,556✔
2280
    taosMemoryFreeClear(ctbCols.pColIndex);
11!
2281
    return parseStbBoundInfo(pStmt, pStbRowsCxt, ppTableDataCxt);
11✔
2282
  }
2283

2284
  if (code != TSDB_CODE_SUCCESS || !*pGotRow) {
2,029,545!
2285
    return code;
6,061✔
2286
  }
2287

2288
  if (code == TSDB_CODE_SUCCESS && bFirstTable) {
2,023,484!
2289
    code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt);
9,380✔
2290
  }
2291
  if (code == TSDB_CODE_SUCCESS) {
2,023,484!
2292
    if (pCxt->isStmtBind) {
2,023,484✔
2293
      char ctbFName[TSDB_TABLE_FNAME_LEN];
2294
      code = tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
19✔
2295
      if (code != TSDB_CODE_SUCCESS) {
19!
2296
        return code;
×
2297
      }
2298
      code = insGetTableDataCxt(pStmt->pTableBlockHashObj, ctbFName, strlen(ctbFName), pStbRowsCxt->pCtbMeta,
19✔
2299
                                &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, true, true);
2300
    } else {
2301
      code =
2302
          insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
2,023,465✔
2303
                             pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true);
2304
    }
2305
  }
2306
  if (code == TSDB_CODE_SUCCESS) {
2,023,484!
2307
    if (pCxt->isStmtBind) {
2,023,484✔
2308
      int32_t tbnameIdx = getTbnameSchemaIndex(pStbRowsCxt->pStbMeta);
19✔
2309
      code = initTableColSubmitDataWithBoundInfo(*ppTableDataCxt, ctbCols);
19✔
2310
    } else {
2311
      code = initTableColSubmitData(*ppTableDataCxt);
2,023,465✔
2312
    }
2313
  }
2314
  if (code == TSDB_CODE_SUCCESS && !pCxt->isStmtBind) {
2,023,484!
2315
    SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1);
2,023,465✔
2316
    code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow);
2,023,465✔
2317
    if (TSDB_CODE_SUCCESS == code) {
2,023,465✔
2318
      SRowKey key;
2319
      tRowGetKey(*pRow, &key);
4,046,906✔
2320
      insCheckTableDataOrder(*ppTableDataCxt, &key);
2,023,453✔
2321
    }
2322
  }
2323

2324
  if (code == TSDB_CODE_SUCCESS) {
2,023,484✔
2325
    *pGotRow = true;
2,023,472✔
2326
  }
2327

2328
  clearStbRowsDataContext(pStbRowsCxt);
2,023,484✔
2329

2330
  return code;
2,023,484✔
2331
}
2332

2333
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
145,934,234✔
2334
                       SToken* pToken) {
2335
  SBoundColInfo*    pCols = &pTableCxt->boundColsInfo;
145,934,234✔
2336
  SSchema*          pSchemas = getTableColumnSchema(pTableCxt->pMeta);
145,934,234✔
2337
  const SSchemaExt* pExtSchemas = getTableColumnExtSchema(pTableCxt->pMeta);
145,506,754✔
2338

2339
  int32_t code = TSDB_CODE_SUCCESS;
145,212,568✔
2340
  // 1. set the parsed value from sql string
2341
  for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) {
1,239,286,439!
2342
    const char* pOrigSql = *pSql;
1,097,653,412✔
2343
    bool        ignoreComma = false;
1,097,653,412✔
2344
    NEXT_TOKEN_WITH_PREV_EXT(*pSql, *pToken, &ignoreComma);
1,097,653,412✔
2345
    if (ignoreComma) {
1,222,033,794!
2346
      code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pOrigSql);
×
2347
      break;
10✔
2348
    }
2349

2350
    SSchema*          pSchema = &pSchemas[pCols->pColIndex[i]];
1,222,033,794✔
2351
    const SSchemaExt* pExtSchema = pExtSchemas + pCols->pColIndex[i];
1,222,033,794✔
2352
    SColVal*          pVal = taosArrayGet(pTableCxt->pValues, pCols->pColIndex[i]);
1,222,033,794✔
2353

2354
    if (pToken->type == TK_NK_QUESTION) {
1,181,858,771!
2355
      pCxt->isStmtBind = true;
×
2356
      if (NULL == pCxt->pComCxt->pStmtCb) {
×
2357
        code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z);
4✔
2358
        break;
4✔
2359
      }
2360
    } else {
2361
      if (TK_NK_RP == pToken->type) {
1,182,668,341✔
2362
        code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
2✔
2363
        break;
2✔
2364
      }
2365

2366
      if (pCxt->isStmtBind) {
1,182,668,339✔
2367
        code = buildInvalidOperationMsg(&pCxt->msg, "stmt bind param does not support normal value in sql");
4✔
2368
        break;
4✔
2369
      }
2370

2371
      if (TSDB_CODE_SUCCESS == code) {
1,182,668,335✔
2372
        code = parseValueToken(pCxt, pSql, pToken, pSchema, pExtSchema, getTableInfo(pTableCxt->pMeta).precision, pVal);
1,165,389,960✔
2373
      }
2374
    }
2375

2376
    if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) {
1,115,258,246✔
2377
      NEXT_VALID_TOKEN(*pSql, *pToken);
989,948,367✔
2378
      if (TK_NK_COMMA != pToken->type) {
964,421,108✔
2379
        code = buildSyntaxErrMsg(&pCxt->msg, ", expected", pToken->z);
770✔
2380
      }
2381
    }
2382
  }
2383

2384
  if (TSDB_CODE_SUCCESS == code && !pCxt->isStmtBind) {
141,633,037!
2385
    SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
144,880,298✔
2386
    code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
144,297,935✔
2387
    if (TSDB_CODE_SUCCESS == code) {
145,554,841!
2388
      SRowKey key;
2389
      tRowGetKey(*pRow, &key);
291,189,612✔
2390
      insCheckTableDataOrder(pTableCxt, &key);
145,594,806✔
2391
    }
2392
  }
2393

2394
  if (TSDB_CODE_SUCCESS == code && !pCxt->isStmtBind) {
141,979,070!
2395
    *pGotRow = true;
145,306,046✔
2396
  }
2397

2398
  clearColValArray(pTableCxt->pValues);
141,979,070✔
2399

2400
  return code;
143,163,089✔
2401
}
2402

2403
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
2404
static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
1,954,575✔
2405
                           int32_t* pNumOfRows, SToken* pToken) {
2406
  int32_t code = TSDB_CODE_SUCCESS;
1,954,575✔
2407

2408
  (*pNumOfRows) = 0;
1,954,575✔
2409
  while (TSDB_CODE_SUCCESS == code) {
126,277,093!
2410
    int32_t index = 0;
126,289,881✔
2411
    NEXT_TOKEN_KEEP_SQL(pStmt->pSql, *pToken, index);
126,289,881✔
2412
    if (TK_NK_LP != pToken->type) {
129,469,059✔
2413
      break;
1,935,114✔
2414
    }
2415
    pStmt->pSql += index;
127,533,945✔
2416

2417
    bool gotRow = false;
127,533,945✔
2418
    if (TSDB_CODE_SUCCESS == code) {
127,533,945!
2419
      if (!pStmt->stbSyntax) {
127,549,859✔
2420
        code = parseOneRow(pCxt, &pStmt->pSql, rowsDataCxt.pTableDataCxt, &gotRow, pToken);
127,444,829✔
2421
      } else {
2422
        STableDataCxt* pTableDataCxt = NULL;
105,030✔
2423
        code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, rowsDataCxt.pStbRowsCxt, &gotRow, pToken, &pTableDataCxt);
105,030✔
2424
      }
2425
    }
2426

2427
    if (TSDB_CODE_SUCCESS == code) {
124,408,687!
2428
      NEXT_VALID_TOKEN(pStmt->pSql, *pToken);
126,192,437✔
2429
      if (TK_NK_COMMA == pToken->type) {
124,315,596✔
2430
        code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
17✔
2431
      } else if (TK_NK_RP != pToken->type) {
124,315,579✔
2432
        code = buildSyntaxErrMsg(&pCxt->msg, ") expected", pToken->z);
604✔
2433
      }
2434
    }
2435

2436
    if (TSDB_CODE_SUCCESS == code && gotRow) {
124,322,518✔
2437
      (*pNumOfRows)++;
124,267,695✔
2438
    }
2439
  }
2440

2441
  if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) &&
1,922,326!
2442
      (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
20,208!
2443
    code = buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
×
2444
  }
2445
  return code;
1,954,679✔
2446
}
2447

2448
// VALUES (field1_value, ...) [(field1_value2, ...) ...]
2449
static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataContext,
1,954,512✔
2450
                                 SToken* pToken) {
2451
  int32_t numOfRows = 0;
1,954,512✔
2452
  int32_t code = parseValues(pCxt, pStmt, rowsDataContext, &numOfRows, pToken);
1,954,512✔
2453
  if (TSDB_CODE_SUCCESS == code) {
1,954,680✔
2454
    pStmt->totalRowsNum += numOfRows;
1,935,104✔
2455
    pStmt->totalTbNum += 1;
1,935,104✔
2456
    TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
1,935,104✔
2457
  }
2458
  return code;
1,954,680✔
2459
}
2460

2461
// Simplified CSV parser - only handles newlines within quotes
2462
static int32_t csvParserReadLine(SCsvParser* parser) {
20,870,322✔
2463
  if (!parser) {
20,870,322!
2464
    return TSDB_CODE_INVALID_PARA;
×
2465
  }
2466

2467
  size_t  lineLen = 0;
20,870,322✔
2468
  bool    inQuotes = false;
20,870,322✔
2469
  char    currentQuote = '\0';  // Track which quote character we're inside
20,870,322✔
2470
  int32_t code = TSDB_CODE_SUCCESS;
20,870,322✔
2471

2472
  while (true) {
820,248,011✔
2473
    // Fill buffer if needed
2474
    if (parser->bufferPos >= parser->bufferLen) {
841,118,333✔
2475
      code = csvParserFillBuffer(parser);
12,970✔
2476
      if (code != TSDB_CODE_SUCCESS) {
12,970!
2477
        break;
×
2478
      }
2479
      if (parser->bufferPos >= parser->bufferLen && parser->eof) {
12,970!
2480
        // End of file
2481
        if (lineLen == 0) {
179!
2482
          code = TSDB_CODE_TSC_QUERY_CANCELLED;  // Use this to indicate EOF
179✔
2483
        }
2484
        break;
179✔
2485
      }
2486
    }
2487

2488
    char ch = parser->buffer[parser->bufferPos++];
841,118,154✔
2489

2490
    // Handle quotes - support both single and double quotes
2491
    if (!inQuotes && (ch == CSV_QUOTE_SINGLE || ch == CSV_QUOTE_DOUBLE)) {
841,118,154✔
2492
      // Starting a quoted section
2493
      inQuotes = true;
6,613,424✔
2494
      currentQuote = ch;
6,613,424✔
2495
    } else if (inQuotes && ch == currentQuote) {
834,504,730✔
2496
      // Check for escaped quote (double quote)
2497
      if (parser->bufferPos < parser->bufferLen && parser->buffer[parser->bufferPos] == currentQuote) {
6,613,424!
2498
        // Escaped quote - keep both quotes in line for subsequent processing
2499
        // Ensure enough space for both quote characters
2500
        code = csvParserExpandLineBuffer(parser, lineLen + 2);
×
2501
        if (code != TSDB_CODE_SUCCESS) {
×
2502
          break;
×
2503
        }
2504

2505
        // Add the first quote character to the line
2506
        parser->lineBuffer[lineLen++] = ch;
×
2507

2508
        // Consume and add the second quote character
2509
        parser->bufferPos++;
×
2510
        ch = parser->buffer[parser->bufferPos - 1];  // The second quote
×
2511
        parser->lineBuffer[lineLen++] = ch;
×
2512
        continue;
×
2513
      } else {
2514
        // End of quoted section
2515
        inQuotes = false;
6,613,424✔
2516
        currentQuote = '\0';
6,613,424✔
2517
      }
2518
    }
2519

2520
    // Handle newlines
2521
    if (ch == '\n' && !inQuotes) {
841,118,154!
2522
      // End of line (not inside quotes)
2523
      break;
20,870,143✔
2524
    }
2525

2526
    // Skip \r characters only when outside quotes
2527
    if (ch == '\r' && !inQuotes) {
820,248,011!
2528
      continue;
20,765,123✔
2529
    }
2530

2531
    // Expand buffer if needed
2532
    code = csvParserExpandLineBuffer(parser, lineLen + 1);
799,482,888✔
2533
    if (code != TSDB_CODE_SUCCESS) {
799,482,888!
2534
      break;
×
2535
    }
2536

2537
    // Add character to line
2538
    parser->lineBuffer[lineLen++] = ch;
799,482,888✔
2539
  }
2540

2541
  if (code == TSDB_CODE_SUCCESS) {
20,870,322✔
2542
    parser->lineBuffer[lineLen] = '\0';
20,870,143✔
2543
  }
2544

2545
  return code;
20,870,322✔
2546
}
2547

2548
static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
241✔
2549
                            int32_t* pNumOfRows) {
2550
  int32_t code = TSDB_CODE_SUCCESS;
241✔
2551
  (*pNumOfRows) = 0;
241✔
2552

2553
  // Initialize or use existing CSV parser in pStmt
2554
  if (pStmt->pCsvParser == NULL) {
241✔
2555
    // First time - allocate and initialize CSV parser
2556
    pStmt->pCsvParser = taosMemoryMalloc(sizeof(SCsvParser));
221!
2557
    if (!pStmt->pCsvParser) {
221!
2558
      return terrno;
×
2559
    }
2560
    code = csvParserInit(pStmt->pCsvParser, pStmt->fp);
221✔
2561
    if (code != TSDB_CODE_SUCCESS) {
221!
2562
      taosMemoryFree(pStmt->pCsvParser);
×
2563
      pStmt->pCsvParser = NULL;
×
2564
      return code;
×
2565
    }
2566
  }
2567
  // If pStmt->pCsvParser exists, we continue from where we left off
2568

2569
  bool firstLine = (pStmt->fileProcessing == false);
241✔
2570
  pStmt->fileProcessing = false;
241✔
2571

2572
  while (TSDB_CODE_SUCCESS == code) {
20,870,364✔
2573
    // Read one line from CSV using the parser in pStmt
2574
    code = csvParserReadLine(pStmt->pCsvParser);
20,870,322✔
2575
    if (code == TSDB_CODE_TSC_QUERY_CANCELLED) {
20,870,322✔
2576
      // End of file
2577
      code = TSDB_CODE_SUCCESS;
179✔
2578
      break;
199✔
2579
    }
2580
    if (code != TSDB_CODE_SUCCESS) {
20,870,143!
2581
      break;
×
2582
    }
2583

2584
    // Skip empty lines
2585
    if (!pStmt->pCsvParser->lineBuffer || strlen(pStmt->pCsvParser->lineBuffer) == 0) {
20,870,143!
2586
      firstLine = false;
×
2587
      continue;
187✔
2588
    }
2589

2590
    bool   gotRow = false;
20,870,143✔
2591
    SToken token;
2592
    (void)strtolower(pStmt->pCsvParser->lineBuffer, pStmt->pCsvParser->lineBuffer);
20,870,143✔
2593
    const char* pRow = pStmt->pCsvParser->lineBuffer;
20,870,143✔
2594

2595
    if (!pStmt->stbSyntax) {
20,870,143✔
2596
      code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token);
18,851,207✔
2597
    } else {
2598
      STableDataCxt* pTableDataCxt = NULL;
2,018,936✔
2599
      code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token, &pTableDataCxt);
2,018,936✔
2600
      if (code == TSDB_CODE_SUCCESS) {
2,018,936✔
2601
        SStbRowsDataContext* pStbRowsCxt = rowsDataCxt.pStbRowsCxt;
2,018,912✔
2602
        void*                pData = pTableDataCxt;
2,018,912✔
2603
        code = taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
2,018,912✔
2604
                           &pData, POINTER_BYTES);
2605
        if (TSDB_CODE_SUCCESS != code) {
2,018,912!
2606
          break;
×
2607
        }
2608
      }
2609
    }
2610

2611
    if (code && firstLine) {
20,870,143✔
2612
      firstLine = false;
187✔
2613
      code = 0;
187✔
2614
      continue;
187✔
2615
    }
2616

2617
    if (TSDB_CODE_SUCCESS == code && gotRow) {
20,869,956!
2618
      (*pNumOfRows)++;
20,869,914✔
2619
    }
2620

2621
    if (TSDB_CODE_SUCCESS == code && (*pNumOfRows) >= tsMaxInsertBatchRows) {
20,869,956✔
2622
      // Reached batch limit - keep the parser in pStmt for next batch
2623
      pStmt->fileProcessing = true;
20✔
2624
      break;
20✔
2625
    }
2626
    firstLine = false;
20,869,936✔
2627
  }
2628

2629
  // Don't destroy the parser here - it will be cleaned up when file processing is complete
2630

2631
  parserDebug("QID:0x%" PRIx64 ", %d rows have been parsed", pCxt->pComCxt->requestId, *pNumOfRows);
241✔
2632

2633
  if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) && 0 == pStmt->totalRowsNum &&
241!
2634
      (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) && !pStmt->fileProcessing) {
×
2635
    code = buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
×
2636
  }
2637
  return code;
241✔
2638
}
2639

2640
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
241✔
2641
                                     SRowsDataContext rowsDataCxt) {
2642
  // init only for file
2643
  if (NULL == pStmt->pTableCxtHashObj) {
241✔
2644
    pStmt->pTableCxtHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
192✔
2645
    if (!pStmt->pTableCxtHashObj) {
192!
2646
      return terrno;
×
2647
    }
2648
  }
2649
  int32_t numOfRows = 0;
241✔
2650
  int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
241✔
2651
  if (TSDB_CODE_SUCCESS == code) {
241✔
2652
    pStmt->totalRowsNum += numOfRows;
199✔
2653
    pStmt->totalTbNum += 1;
199✔
2654
    TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT);
199✔
2655
    if (rowsDataCxt.pTableDataCxt && rowsDataCxt.pTableDataCxt->pData) {
199!
2656
      rowsDataCxt.pTableDataCxt->pData->flags |= SUBMIT_REQ_FROM_FILE;
170✔
2657
    }
2658
    if (!pStmt->fileProcessing) {
199✔
2659
      // File processing is complete, clean up saved CSV parser
2660
      destroySavedCsvParser(pStmt);
179✔
2661
      code = taosCloseFile(&pStmt->fp);
179✔
2662
      if (TSDB_CODE_SUCCESS != code) {
179!
2663
        parserWarn("QID:0x%" PRIx64 ", failed to close file.", pCxt->pComCxt->requestId);
×
2664
      }
2665
    } else {
2666
      parserDebug("QID:0x%" PRIx64 ", insert from csv. File is too large, do it in batches.", pCxt->pComCxt->requestId);
20!
2667
    }
2668
    if (pStmt->insertType != TSDB_QUERY_TYPE_FILE_INSERT) {
199!
2669
      destroySavedCsvParser(pStmt);
×
2670
      return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is exclusive", NULL);
×
2671
    }
2672
  } else {
2673
    // On error, also clean up saved CSV parser
2674
    destroySavedCsvParser(pStmt);
42✔
2675
    return buildInvalidOperationMsg(&pCxt->msg, tstrerror(code));
42✔
2676
  }
2677

2678
  // just record pTableCxt whose data come from file
2679
  if (!pStmt->stbSyntax && numOfRows > 0) {
199✔
2680
    void* pData = rowsDataCxt.pTableDataCxt;
161✔
2681
    code = taosHashPut(pStmt->pTableCxtHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid), &pData,
161✔
2682
                       POINTER_BYTES);
2683
  }
2684

2685
  return code;
199✔
2686
}
2687

2688
static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pFilePath,
221✔
2689
                                 SRowsDataContext rowsDataCxt) {
2690
  char filePathStr[PATH_MAX + 16] = {0};
221✔
2691
  if (TK_NK_STRING == pFilePath->type) {
221✔
2692
    (void)trimString(pFilePath->z, pFilePath->n, filePathStr, sizeof(filePathStr));
220✔
2693
    if (strlen(filePathStr) >= PATH_MAX) {
220!
2694
      return buildSyntaxErrMsg(&pCxt->msg, "file path is too long, max length is 4096", pFilePath->z);
×
2695
    }
2696
  } else {
2697
    if (pFilePath->n >= PATH_MAX) {
1!
2698
      return buildSyntaxErrMsg(&pCxt->msg, "file path is too long, max length is 4096", pFilePath->z);
×
2699
    }
2700
    strncpy(filePathStr, pFilePath->z, pFilePath->n);
1✔
2701
  }
2702
  pStmt->fp = taosOpenFile(filePathStr, TD_FILE_READ);
221✔
2703
  if (NULL == pStmt->fp) {
221!
2704
    return terrno;
×
2705
  }
2706

2707
  return parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt);
221✔
2708
}
2709

2710
static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
221✔
2711
                               SToken* pToken) {
2712
  if (tsUseAdapter) {
221!
2713
    return buildInvalidOperationMsg(&pCxt->msg, "proxy mode does not support csv loading");
×
2714
  }
2715

2716
  NEXT_TOKEN(pStmt->pSql, *pToken);
221✔
2717
  if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) {
221!
2718
    return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z);
×
2719
  }
2720
  return parseDataFromFile(pCxt, pStmt, pToken, rowsDataCxt);
221✔
2721
}
2722

2723
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
2724
static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) {
1,954,645✔
2725
  SToken token;
2726
  NEXT_TOKEN(pStmt->pSql, token);
1,954,645✔
2727
  switch (token.type) {
1,954,913!
2728
    case TK_VALUES:
1,954,698✔
2729
      if (TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
1,954,698✔
2730
        return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is exclusive", token.z);
1✔
2731
      }
2732
      return parseValuesClause(pCxt, pStmt, rowsDataCxt, &token);
1,954,697✔
2733
    case TK_FILE:
215✔
2734
      return parseFileClause(pCxt, pStmt, rowsDataCxt, &token);
215✔
2735
    default:
×
2736
      break;
×
2737
  }
2738
  return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
×
2739
}
2740

2741
static void destroyStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
3,813,202✔
2742
  if (pStbRowsCxt == NULL) return;
3,813,202✔
2743
  clearStbRowsDataContext(pStbRowsCxt);
10,579✔
2744
  taosArrayDestroy(pStbRowsCxt->aColVals);
10,595✔
2745
  pStbRowsCxt->aColVals = NULL;
10,595✔
2746
  taosArrayDestroy(pStbRowsCxt->aTagVals);
10,595✔
2747
  pStbRowsCxt->aTagVals = NULL;
10,595✔
2748
  taosArrayDestroy(pStbRowsCxt->aTagNames);
10,595✔
2749
  pStbRowsCxt->aTagNames = NULL;
10,595✔
2750
  insDestroyBoundColInfo(&pStbRowsCxt->boundColsInfo);
10,595✔
2751
  tTagFree(pStbRowsCxt->pTag);
10,595✔
2752
  pStbRowsCxt->pTag = NULL;
10,595✔
2753
  taosMemoryFreeClear(pStbRowsCxt->pCtbMeta);
10,595!
2754
  tdDestroySVCreateTbReq(pStbRowsCxt->pCreateCtbReq);
10,595!
2755
  taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq);
10,595!
2756
}
2757

2758
static int32_t constructStbRowsDataContext(SVnodeModifyOpStmt* pStmt, SStbRowsDataContext** ppStbRowsCxt) {
10,595✔
2759
  SStbRowsDataContext* pStbRowsCxt = taosMemoryCalloc(1, sizeof(SStbRowsDataContext));
10,595!
2760
  if (!pStbRowsCxt) {
10,595!
2761
    return terrno;
×
2762
  }
2763
  tNameAssign(&pStbRowsCxt->stbName, &pStmt->targetTableName);
10,595✔
2764
  int32_t code = collectUseTable(&pStbRowsCxt->stbName, pStmt->pTableNameHashObj);
10,595✔
2765
  if (TSDB_CODE_SUCCESS == code) {
10,595!
2766
    code = collectUseDatabase(&pStbRowsCxt->stbName, pStmt->pDbFNameHashObj);
10,595✔
2767
  }
2768
  if (TSDB_CODE_SUCCESS == code) {
10,595!
2769
    pStbRowsCxt->ctbName.type = TSDB_TABLE_NAME_T;
10,595✔
2770
    pStbRowsCxt->ctbName.acctId = pStbRowsCxt->stbName.acctId;
10,595✔
2771
    memcpy(pStbRowsCxt->ctbName.dbname, pStbRowsCxt->stbName.dbname, sizeof(pStbRowsCxt->stbName.dbname));
10,595✔
2772

2773
    pStbRowsCxt->pTagCond = pStmt->pTagCond;
10,595✔
2774
    pStbRowsCxt->pStbMeta = pStmt->pTableMeta;
10,595✔
2775

2776
    code = cloneTableMeta(pStbRowsCxt->pStbMeta, &pStbRowsCxt->pCtbMeta);
10,595✔
2777
  }
2778
  if (TSDB_CODE_SUCCESS == code) {
10,595!
2779
    pStbRowsCxt->pCtbMeta->tableType = TSDB_CHILD_TABLE;
10,595✔
2780
    pStbRowsCxt->pCtbMeta->suid = pStbRowsCxt->pStbMeta->uid;
10,595✔
2781

2782
    pStbRowsCxt->aTagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
10,595✔
2783
    if (!pStbRowsCxt->aTagNames) {
10,595!
2784
      code = terrno;
×
2785
    }
2786
  }
2787
  if (TSDB_CODE_SUCCESS == code) {
10,595!
2788
    pStbRowsCxt->aTagVals = taosArrayInit(8, sizeof(STagVal));
10,595✔
2789
    if (!pStbRowsCxt->aTagVals) {
10,595!
2790
      code = terrno;
×
2791
    }
2792
  }
2793
  if (TSDB_CODE_SUCCESS == code) {
10,595!
2794
    // col values and bound cols info of STableDataContext is not used
2795
    pStbRowsCxt->aColVals = taosArrayInit(getNumOfColumns(pStbRowsCxt->pStbMeta), sizeof(SColVal));
10,595✔
2796
    if (!pStbRowsCxt->aColVals) code = terrno;
10,595!
2797
  }
2798
  if (TSDB_CODE_SUCCESS == code) {
10,595!
2799
    code = insInitColValues(pStbRowsCxt->pStbMeta, pStbRowsCxt->aColVals);
10,595✔
2800
  }
2801
  if (TSDB_CODE_SUCCESS == code) {
10,595!
2802
    STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta);
10,595✔
2803
    code = insInitBoundColsInfo(tblInfo.numOfColumns + tblInfo.numOfTags + 1, &pStbRowsCxt->boundColsInfo);
10,595✔
2804
  }
2805
  if (TSDB_CODE_SUCCESS == code) {
10,595!
2806
    *ppStbRowsCxt = pStbRowsCxt;
10,595✔
2807
  } else {
2808
    clearStbRowsDataContext(pStbRowsCxt);
×
2809
  }
2810
  return code;
10,595✔
2811
}
2812

2813
static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
10,657✔
2814
  int32_t code = TSDB_CODE_SUCCESS;
10,657✔
2815
  if (!pStmt->pBoundCols) {
10,657✔
2816
    return buildSyntaxErrMsg(&pCxt->msg, "(...tbname, ts...) bounded cols is expected for supertable insertion",
62✔
2817
                             pStmt->pSql);
2818
  }
2819

2820
  SStbRowsDataContext* pStbRowsCxt = NULL;
10,595✔
2821
  code = constructStbRowsDataContext(pStmt, &pStbRowsCxt);
10,595✔
2822

2823
  if (code == TSDB_CODE_SUCCESS) {
10,595!
2824
    code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta,
10,595✔
2825
                             &pStbRowsCxt->boundColsInfo);
10,595✔
2826
    pStbRowsCxt->hasTimestampTag = false;
10,595✔
2827
    for (int32_t i = 0; i < pStbRowsCxt->boundColsInfo.numOfBound; ++i) {
71,662✔
2828
      int16_t schemaIndex = pStbRowsCxt->boundColsInfo.pColIndex[i];
61,067✔
2829
      if (schemaIndex != getTbnameSchemaIndex(pStmt->pTableMeta) && schemaIndex >= getNumOfColumns(pStmt->pTableMeta)) {
61,067✔
2830
        if (pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_TIMESTAMP) {
19,432✔
2831
          pStbRowsCxt->hasTimestampTag = true;
1,159✔
2832
        }
2833
        if (pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_JSON) {
19,432!
2834
          pStbRowsCxt->isJsonTag = true;
×
2835
        }
2836
      }
2837
    }
2838
    pStmt->pStbRowsCxt = pStbRowsCxt;
10,595✔
2839
  }
2840

2841
  if (code == TSDB_CODE_SUCCESS) {
10,595✔
2842
    SRowsDataContext rowsDataCxt;
2843
    rowsDataCxt.pStbRowsCxt = pStbRowsCxt;
10,577✔
2844
    code = parseDataClause(pCxt, pStmt, rowsDataCxt);
10,577✔
2845
  }
2846

2847
  return code;
10,595✔
2848
}
2849

2850
// input pStmt->pSql:
2851
//   1. [(tag1_name, ...)] ...
2852
//   2. VALUES ... | FILE ...
2853
static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
1,959,435✔
2854
  if (!pStmt->stbSyntax) {
1,959,435✔
2855
    STableDataCxt*   pTableCxt = NULL;
1,948,859✔
2856
    int32_t          code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt);
1,948,859✔
2857
    SRowsDataContext rowsDataCxt;
2858
    rowsDataCxt.pTableDataCxt = pTableCxt;
1,948,804✔
2859
    if (TSDB_CODE_SUCCESS == code) {
1,948,804✔
2860
      code = parseDataClause(pCxt, pStmt, rowsDataCxt);
1,944,207✔
2861
    }
2862
    return code;
1,948,874✔
2863
  } else {
2864
    int32_t code = parseInsertStbClauseBottom(pCxt, pStmt);
10,576✔
2865
    return code;
10,657✔
2866
  }
2867
}
2868

2869
static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
1,960,706✔
2870
  insDestroyBoundColInfo(&pCxt->tags);
1,960,706✔
2871
  taosMemoryFreeClear(pStmt->pTableMeta);
1,960,710!
2872
  nodesDestroyNode(pStmt->pTagCond);
1,960,710✔
2873
  taosArrayDestroy(pStmt->pTableTag);
1,960,717✔
2874
  tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
1,960,694!
2875
  taosMemoryFreeClear(pStmt->pCreateTblReq);
1,960,694!
2876
  pCxt->missCache = false;
1,960,694✔
2877
  pCxt->usingDuplicateTable = false;
1,960,694✔
2878
  pStmt->pBoundCols = NULL;
1,960,694✔
2879
  pStmt->usingTableProcessing = false;
1,960,694✔
2880
  pStmt->fileProcessing = false;
1,960,694✔
2881
  pStmt->usingTableName.type = 0;
1,960,694✔
2882

2883
  destroyStbRowsDataContext(pStmt->pStbRowsCxt);
1,960,694✔
2884
  taosMemoryFreeClear(pStmt->pStbRowsCxt);
1,960,681!
2885
  pStmt->stbSyntax = false;
1,960,681✔
2886
}
1,960,681✔
2887

2888
// input pStmt->pSql: [(field1_name, ...)] [ USING ... ] VALUES ... | FILE ...
2889
static int32_t parseInsertTableClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) {
1,960,710✔
2890
  resetEnvPreTable(pCxt, pStmt);
1,960,710✔
2891
  int32_t code = parseSchemaClauseTop(pCxt, pStmt, pTbName);
1,960,687✔
2892
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
1,960,585✔
2893
    code = parseInsertTableClauseBottom(pCxt, pStmt);
1,915,790✔
2894
  }
2895

2896
  return code;
1,960,599✔
2897
}
2898

2899
static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName,
3,788,274✔
2900
                                          bool* pHasData) {
2901
  // no data in the sql string anymore.
2902
  if (0 == pTbName->n) {
3,788,274✔
2903
    if (0 != pTbName->type && '\0' != pStmt->pSql[0]) {
1,827,622!
2904
      return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pTbName->z);
×
2905
    }
2906

2907
    if (0 == pStmt->totalRowsNum && (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
1,827,622!
2908
      return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
×
2909
    }
2910

2911
    *pHasData = false;
1,827,622✔
2912
    return TSDB_CODE_SUCCESS;
1,827,622✔
2913
  }
2914

2915
  if (TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && pStmt->totalTbNum > 0) {
1,960,652!
2916
    return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt");
×
2917
  }
2918

2919
  if (TK_NK_QUESTION == pTbName->type) {
1,960,652✔
2920
    pCxt->stmtTbNameFlag &= ~IS_FIXED_VALUE;
20,138✔
2921
    pCxt->isStmtBind = true;
20,138✔
2922
    if (NULL == pCxt->pComCxt->pStmtCb) {
20,138!
2923
      return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z);
×
2924
    }
2925

2926
    char*   tbName = NULL;
20,138✔
2927
    int32_t code = (*pCxt->pComCxt->pStmtCb->getTbNameFn)(pCxt->pComCxt->pStmtCb->pStmt, &tbName);
20,138✔
2928
    if (TSDB_CODE_SUCCESS == code) {
20,129✔
2929
      pCxt->stmtTbNameFlag |= HAS_BIND_VALUE;
20,124✔
2930
      pTbName->z = tbName;
20,124✔
2931
      pTbName->n = strlen(tbName);
20,124✔
2932
    }
2933
    if (code == TSDB_CODE_TSC_STMT_TBNAME_ERROR) {
20,129✔
2934
      pCxt->stmtTbNameFlag &= ~HAS_BIND_VALUE;
9✔
2935
      code = TSDB_CODE_SUCCESS;
9✔
2936
    }
2937
    return code;
20,129✔
2938
  }
2939

2940
  if (TK_NK_ID != pTbName->type && TK_NK_STRING != pTbName->type && TK_NK_QUESTION != pTbName->type) {
1,940,514!
2941
    return buildSyntaxErrMsg(&pCxt->msg, "table_name is expected", pTbName->z);
1✔
2942
  }
2943

2944
  // db.? situation,ensure that the only thing following the '.' mark is '?'
2945
  char* tbNameAfterDbName = strnchr(pTbName->z, '.', pTbName->n, true);
1,940,513✔
2946
  if (tbNameAfterDbName != NULL) {
1,940,534✔
2947
    if (*(tbNameAfterDbName + 1) == '?') {
941,682✔
2948
      pCxt->stmtTbNameFlag &= ~IS_FIXED_VALUE;
43✔
2949
      char* tbName = NULL;
43✔
2950
      if (NULL == pCxt->pComCxt->pStmtCb) {
43!
2951
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z);
×
2952
      }
2953
      int32_t code = (*pCxt->pComCxt->pStmtCb->getTbNameFn)(pCxt->pComCxt->pStmtCb->pStmt, &tbName);
43✔
2954
      if (TSDB_CODE_SUCCESS == code) {
43✔
2955
        pCxt->stmtTbNameFlag |= HAS_BIND_VALUE;
26✔
2956
        pTbName->z = tbName;
26✔
2957
        pTbName->n = strlen(tbName);
26✔
2958
      }
2959
      if (code == TSDB_CODE_TSC_STMT_TBNAME_ERROR) {
43✔
2960
        pCxt->stmtTbNameFlag &= ~HAS_BIND_VALUE;
17✔
2961
        code = TSDB_CODE_SUCCESS;
17✔
2962
      }
2963
    } else {
2964
      pCxt->stmtTbNameFlag |= IS_FIXED_VALUE;
941,639✔
2965
      parserWarn("QID:0x%" PRIx64 ", table name is specified in sql, ignore the table name in bind param",
941,639!
2966
                 pCxt->pComCxt->requestId);
2967
      *pHasData = true;
941,691✔
2968
    }
2969
    return TSDB_CODE_SUCCESS;
941,734✔
2970
  }
2971

2972
  if (TK_NK_ID == pTbName->type) {
998,852✔
2973
    pCxt->stmtTbNameFlag |= IS_FIXED_VALUE;
986,822✔
2974
  }
2975

2976
  *pHasData = true;
998,852✔
2977
  return TSDB_CODE_SUCCESS;
998,852✔
2978
}
2979

2980
static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
20,201✔
2981
  SBoundColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags));
20,201!
2982
  if (NULL == tags) {
20,218!
2983
    return terrno;
×
2984
  }
2985
  memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
20,218✔
2986

2987
  SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb;
20,218✔
2988
  int32_t        code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName,
20,218✔
2989
                                       pStmt->usingTableProcessing, pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj,
20,218✔
2990
                                       pStmt->usingTableName.tname, pCxt->stmtTbNameFlag);
20,218✔
2991

2992
  memset(&pCxt->tags, 0, sizeof(pCxt->tags));
20,210✔
2993
  pStmt->pVgroupsHashObj = NULL;
20,210✔
2994
  pStmt->pTableBlockHashObj = NULL;
20,210✔
2995
  return code;
20,210✔
2996
}
2997

2998
static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
1,827,539✔
2999
  if (TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
1,827,539✔
3000
    return setStmtInfo(pCxt, pStmt);
20,214✔
3001
  }
3002

3003
  // release old array alloced by merge
3004
  pStmt->freeArrayFunc(pStmt->pVgDataBlocks);
1,807,325✔
3005
  pStmt->pVgDataBlocks = NULL;
1,807,326✔
3006

3007
  bool fileOnly = (pStmt->insertType == TSDB_QUERY_TYPE_FILE_INSERT);
1,807,326✔
3008
  if (fileOnly) {
1,807,326✔
3009
    // none data, skip merge & buildvgdata
3010
    if (0 == taosHashGetSize(pStmt->pTableCxtHashObj)) {
153✔
3011
      pCxt->needRequest = false;
10✔
3012
      return TSDB_CODE_SUCCESS;
10✔
3013
    }
3014
  }
3015

3016
  // merge according to vgId
3017
  int32_t code = insMergeTableDataCxt(fileOnly ? pStmt->pTableCxtHashObj : pStmt->pTableBlockHashObj,
1,807,332✔
3018
                                      &pStmt->pVgDataBlocks, pStmt->fileProcessing);
1,807,332✔
3019
  // clear tmp hashobj only
3020
  taosHashClear(pStmt->pTableCxtHashObj);
1,807,403✔
3021

3022
  if (TSDB_CODE_SUCCESS == code) {
1,807,425!
3023
    code = insBuildVgDataBlocks(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false);
1,807,439✔
3024
  }
3025

3026
  return code;
1,807,406✔
3027
}
3028

3029
// tb_name
3030
//     [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
3031
//     [(field1_name, ...)]
3032
//     VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
3033
// [...];
3034
static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
1,885,838✔
3035
  SToken  token;
3036
  int32_t code = TSDB_CODE_SUCCESS;
1,885,838✔
3037
  bool    hasData = true;
1,885,838✔
3038
  // for each table
3039
  while (TSDB_CODE_SUCCESS == code && hasData && !pCxt->missCache && !pStmt->fileProcessing) {
5,673,952✔
3040
    // pStmt->pSql -> tb_name ...
3041
    NEXT_TOKEN(pStmt->pSql, token);
3,788,294✔
3042
    code = checkTableClauseFirstToken(pCxt, pStmt, &token, &hasData);
3,788,383✔
3043
    if (TSDB_CODE_SUCCESS == code && hasData) {
3,788,265!
3044
      code = parseInsertTableClause(pCxt, pStmt, &token);
1,960,730✔
3045
    }
3046
  }
3047

3048
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
1,885,658✔
3049
    code = parseInsertBodyBottom(pCxt, pStmt);
1,827,673✔
3050
  }
3051
  return code;
1,885,767✔
3052
}
3053

3054
static void destroySubTableHashElem(void* p) { taosMemoryFree(*(STableMeta**)p); }
149,950!
3055

3056
static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, SNode** pOutput) {
1,852,931✔
3057
  SVnodeModifyOpStmt* pStmt = NULL;
1,852,931✔
3058
  int32_t             code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&pStmt);
1,852,931✔
3059
  if (NULL == pStmt) {
1,853,064!
3060
    return code;
×
3061
  }
3062

3063
  if (pCxt->pComCxt->pStmtCb) {
1,853,064✔
3064
    TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
20,245✔
3065
  }
3066
  pStmt->pSql = pCxt->pComCxt->pSql;
1,853,064✔
3067

3068
  pStmt->freeHashFunc = insDestroyTableDataCxtHashMap;
1,853,064✔
3069
  pStmt->freeArrayFunc = insDestroyVgroupDataCxtList;
1,853,064✔
3070
  pStmt->freeStbRowsCxtFunc = destroyStbRowsDataContext;
1,853,064✔
3071
  pStmt->pCsvParser = NULL;
1,853,064✔
3072

3073
  if (!reentry) {
1,853,064✔
3074
    pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,853,041✔
3075
    if (pCxt->pComCxt->pStmtCb) {
1,853,039✔
3076
      pStmt->pTableBlockHashObj =
20,220✔
3077
          taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
20,214✔
3078
    } else {
3079
      pStmt->pTableBlockHashObj =
1,832,848✔
3080
          taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,832,825✔
3081
    }
3082
  }
3083
  pStmt->pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
1,853,091✔
3084
  pStmt->pSuperTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
1,853,089✔
3085
  pStmt->pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
1,853,091✔
3086
  pStmt->pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
1,853,083✔
3087
  if ((!reentry && (NULL == pStmt->pVgroupsHashObj || NULL == pStmt->pTableBlockHashObj)) ||
1,853,096!
3088
      NULL == pStmt->pSubTableHashObj || NULL == pStmt->pTableNameHashObj || NULL == pStmt->pDbFNameHashObj) {
1,853,099!
3089
    nodesDestroyNode((SNode*)pStmt);
15✔
3090
    return TSDB_CODE_OUT_OF_MEMORY;
×
3091
  }
3092

3093
  taosHashSetFreeFp(pStmt->pSubTableHashObj, destroySubTableHashElem);
1,853,081✔
3094
  taosHashSetFreeFp(pStmt->pSuperTableHashObj, destroySubTableHashElem);
1,853,069✔
3095

3096
  *pOutput = (SNode*)pStmt;
1,853,085✔
3097
  return TSDB_CODE_SUCCESS;
1,853,085✔
3098
}
3099

3100
static int32_t createInsertQuery(SInsertParseContext* pCxt, SQuery** pOutput) {
1,853,012✔
3101
  SQuery* pQuery = NULL;
1,853,012✔
3102
  int32_t code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
1,853,012✔
3103
  if (NULL == pQuery) {
1,852,984!
3104
    return code;
×
3105
  }
3106

3107
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
1,852,984✔
3108
  pQuery->haveResultSet = false;
1,852,984✔
3109
  pQuery->msgType = TDMT_VND_SUBMIT;
1,852,984✔
3110

3111
  code = createVnodeModifOpStmt(pCxt, false, &pQuery->pRoot);
1,852,984✔
3112
  if (TSDB_CODE_SUCCESS == code) {
1,853,054!
3113
    *pOutput = pQuery;
1,853,058✔
3114
  } else {
3115
    nodesDestroyNode((SNode*)pQuery);
×
3116
  }
3117
  return code;
1,853,060✔
3118
}
3119

3120
static int32_t checkAuthFromMetaData(const SArray* pUsers, SNode** pTagCond) {
44,386✔
3121
  if (1 != taosArrayGetSize(pUsers)) {
44,386!
3122
    return TSDB_CODE_FAILED;
×
3123
  }
3124

3125
  SMetaRes* pRes = taosArrayGet(pUsers, 0);
44,386✔
3126
  if (TSDB_CODE_SUCCESS == pRes->code) {
44,386✔
3127
    SUserAuthRes* pAuth = pRes->pRes;
44,384✔
3128
    pRes->code = nodesCloneNode(pAuth->pCond[AUTH_RES_BASIC], pTagCond);
44,384✔
3129
    if (TSDB_CODE_SUCCESS == pRes->code) {
44,384!
3130
      return pAuth->pass[AUTH_RES_BASIC] ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED;
44,384✔
3131
    }
3132
  }
3133
  return pRes->code;
2✔
3134
}
3135

3136
static int32_t getTableMetaFromMetaData(const SArray* pTables, STableMeta** pMeta) {
44,380✔
3137
  if (1 != taosArrayGetSize(pTables) && 2 != taosArrayGetSize(pTables)) {
44,380!
3138
    return TSDB_CODE_FAILED;
×
3139
  }
3140

3141
  taosMemoryFreeClear(*pMeta);
44,380!
3142
  SMetaRes* pRes = taosArrayGet(pTables, 0);
44,380✔
3143
  if (TSDB_CODE_SUCCESS == pRes->code) {
44,380✔
3144
    *pMeta = tableMetaDup((const STableMeta*)pRes->pRes);
43,791✔
3145
    if (NULL == *pMeta) {
43,790!
3146
      return TSDB_CODE_OUT_OF_MEMORY;
×
3147
    }
3148
  }
3149
  return pRes->code;
44,379✔
3150
}
3151

3152
static int32_t addTableVgroupFromMetaData(const SArray* pTables, SVnodeModifyOpStmt* pStmt, bool isStb) {
43,778✔
3153
  if (1 != taosArrayGetSize(pTables)) {
43,778!
3154
    return TSDB_CODE_FAILED;
×
3155
  }
3156

3157
  SMetaRes* pRes = taosArrayGet(pTables, 0);
43,778✔
3158
  if (TSDB_CODE_SUCCESS != pRes->code) {
43,779!
3159
    return pRes->code;
×
3160
  }
3161

3162
  SVgroupInfo* pVg = pRes->pRes;
43,779✔
3163
  if (isStb) {
43,779✔
3164
    pStmt->pTableMeta->vgId = pVg->vgId;
40,932✔
3165
  }
3166
  return taosHashPut(pStmt->pVgroupsHashObj, (const char*)&pVg->vgId, sizeof(pVg->vgId), (char*)pVg,
43,779✔
3167
                     sizeof(SVgroupInfo));
3168
}
3169

3170
static int32_t buildTagNameFromMeta(STableMeta* pMeta, SArray** pTagName) {
4✔
3171
  *pTagName = taosArrayInit(pMeta->tableInfo.numOfTags, TSDB_COL_NAME_LEN);
4✔
3172
  if (NULL == *pTagName) {
4!
3173
    return terrno;
×
3174
  }
3175
  SSchema* pSchema = getTableTagSchema(pMeta);
4✔
3176
  int32_t  code = 0;
4✔
3177
  for (int32_t i = 0; i < pMeta->tableInfo.numOfTags; ++i) {
12✔
3178
    if (NULL == taosArrayPush(*pTagName, pSchema[i].name)) {
16!
3179
      code = terrno;
×
3180
      taosArrayDestroy(*pTagName);
×
3181
      *pTagName = NULL;
×
3182
      break;
×
3183
    }
3184
  }
3185
  return code;
4✔
3186
}
3187

3188
static int32_t checkSubtablePrivilegeForTable(const SArray* pTables, SVnodeModifyOpStmt* pStmt) {
10✔
3189
  if (1 != taosArrayGetSize(pTables)) {
10✔
3190
    return TSDB_CODE_FAILED;
6✔
3191
  }
3192

3193
  SMetaRes* pRes = taosArrayGet(pTables, 0);
4✔
3194
  if (TSDB_CODE_SUCCESS != pRes->code) {
4!
3195
    return pRes->code;
×
3196
  }
3197

3198
  SArray* pTagName = NULL;
4✔
3199
  int32_t code = buildTagNameFromMeta(pStmt->pTableMeta, &pTagName);
4✔
3200
  if (TSDB_CODE_SUCCESS == code) {
4!
3201
    code = checkSubtablePrivilege((SArray*)pRes->pRes, pTagName, &pStmt->pTagCond);
4✔
3202
  }
3203
  taosArrayDestroy(pTagName);
4✔
3204
  return code;
4✔
3205
}
3206

3207
static int32_t processTableSchemaFromMetaData(SInsertParseContext* pCxt, const SMetaData* pMetaData,
43,778✔
3208
                                              SVnodeModifyOpStmt* pStmt, bool isStb) {
3209
  int32_t code = TSDB_CODE_SUCCESS;
43,778✔
3210
  if (!isStb && TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) {
43,778!
3211
    code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported");
×
3212
  }
3213
  if (TSDB_CODE_SUCCESS == code && isStb) {
43,778!
3214
    code = storeChildTableMeta(pCxt, pStmt);
40,932✔
3215
  }
3216
  if (TSDB_CODE_SUCCESS == code) {
43,778!
3217
    code = addTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb);
43,779✔
3218
  }
3219
  if (TSDB_CODE_SUCCESS == code && !isStb && NULL != pStmt->pTagCond) {
43,777!
3220
    code = checkSubtablePrivilegeForTable(pMetaData->pTableTag, pStmt);
10✔
3221
  }
3222
  return code;
43,779✔
3223
}
3224

3225
static void destoryTablesReq(void* p) {
88,772✔
3226
  STablesReq* pRes = (STablesReq*)p;
88,772✔
3227
  taosArrayDestroy(pRes->pTables);
88,772✔
3228
}
88,772✔
3229

3230
static void clearCatalogReq(SCatalogReq* pCatalogReq) {
44,386✔
3231
  if (NULL == pCatalogReq) {
44,386!
3232
    return;
×
3233
  }
3234

3235
  taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq);
44,386✔
3236
  pCatalogReq->pTableMeta = NULL;
44,386✔
3237
  taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq);
44,386✔
3238
  pCatalogReq->pTableHash = NULL;
44,386✔
3239
  taosArrayDestroy(pCatalogReq->pUser);
44,386✔
3240
  pCatalogReq->pUser = NULL;
44,386✔
3241
  taosArrayDestroy(pCatalogReq->pTableTag);
44,386✔
3242
  pCatalogReq->pTableTag = NULL;
44,386✔
3243
}
3244

3245
static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
44,386✔
3246
                                   SVnodeModifyOpStmt* pStmt) {
3247
  clearCatalogReq(pCatalogReq);
44,386✔
3248
  int32_t code = checkAuthFromMetaData(pMetaData->pUser, &pStmt->pTagCond);
44,386✔
3249
  if (code == TSDB_CODE_SUCCESS) {
44,386✔
3250
    code = getTableMetaFromMetaData(pMetaData->pTableMeta, &pStmt->pTableMeta);
44,380✔
3251
  }
3252
  if (code == TSDB_CODE_SUCCESS) {
44,385✔
3253
    if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE && !pStmt->usingTableProcessing) {
43,790✔
3254
      pStmt->stbSyntax = true;
12✔
3255
    }
3256
    if (!pStmt->stbSyntax) {
43,790✔
3257
      if (pStmt->usingTableProcessing) {
43,778✔
3258
        return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true);
40,932✔
3259
      }
3260
      return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false);
2,846✔
3261
    }
3262
  }
3263
  return code;
607✔
3264
}
3265

3266
static int32_t resetVnodeModifOpStmt(SInsertParseContext* pCxt, SQuery* pQuery) {
29✔
3267
  nodesDestroyNode(pQuery->pRoot);
29✔
3268

3269
  int32_t code = createVnodeModifOpStmt(pCxt, true, &pQuery->pRoot);
29✔
3270
  if (TSDB_CODE_SUCCESS == code) {
29!
3271
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
29✔
3272

3273
    code = (*pCxt->pComCxt->pStmtCb->getExecInfoFn)(pCxt->pComCxt->pStmtCb->pStmt, &pStmt->pVgroupsHashObj,
29✔
3274
                                                    &pStmt->pTableBlockHashObj);
3275
    if (TSDB_CODE_SUCCESS == code) {
29!
3276
      if (NULL == pStmt->pVgroupsHashObj) {
29!
3277
        pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
3278
      }
3279
      if (NULL == pStmt->pTableBlockHashObj) {
29!
3280
        pStmt->pTableBlockHashObj =
×
3281
            taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
×
3282
      }
3283
      if (NULL == pStmt->pVgroupsHashObj || NULL == pStmt->pTableBlockHashObj) {
29!
3284
        code = TSDB_CODE_OUT_OF_MEMORY;
×
3285
      }
3286
    }
3287
  }
3288

3289
  return code;
29✔
3290
}
3291

3292
static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
1,897,478✔
3293
                               SQuery** pQuery) {
3294
  if (NULL == *pQuery) {
1,897,478✔
3295
    return createInsertQuery(pCxt, pQuery);
1,853,062✔
3296
  }
3297

3298
  if (NULL != pCxt->pComCxt->pStmtCb) {
44,416✔
3299
    return resetVnodeModifOpStmt(pCxt, *pQuery);
29✔
3300
  }
3301

3302
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(*pQuery)->pRoot;
44,387✔
3303

3304
  if (!pStmt->fileProcessing) {
44,387✔
3305
    return setVnodeModifOpStmt(pCxt, pCatalogReq, pMetaData, pStmt);
44,386✔
3306
  }
3307

3308
  return TSDB_CODE_SUCCESS;
1✔
3309
}
3310

3311
static int32_t setRefreshMeta(SQuery* pQuery) {
1,827,400✔
3312
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
1,827,400✔
3313
  int32_t             code = 0;
1,827,400✔
3314

3315
  if (taosHashGetSize(pStmt->pTableNameHashObj) > 0) {
1,827,400✔
3316
    taosArrayDestroy(pQuery->pTableList);
24,291✔
3317
    pQuery->pTableList = taosArrayInit(taosHashGetSize(pStmt->pTableNameHashObj), sizeof(SName));
24,284✔
3318
    if (!pQuery->pTableList) {
24,284!
3319
      code = terrno;
×
3320
    } else {
3321
      SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL);
24,284✔
3322
      while (NULL != pTable) {
57,807✔
3323
        if (NULL == taosArrayPush(pQuery->pTableList, pTable)) {
67,007!
3324
          code = terrno;
×
3325
          taosHashCancelIterate(pStmt->pTableNameHashObj, pTable);
×
3326
          break;
×
3327
        }
3328
        pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable);
33,501✔
3329
      }
3330
    }
3331
  }
3332

3333
  if (TSDB_CODE_SUCCESS == code && taosHashGetSize(pStmt->pDbFNameHashObj) > 0) {
1,827,545✔
3334
    taosArrayDestroy(pQuery->pDbList);
24,300✔
3335
    pQuery->pDbList = taosArrayInit(taosHashGetSize(pStmt->pDbFNameHashObj), TSDB_DB_FNAME_LEN);
24,301✔
3336
    if (!pQuery->pDbList) {
24,305!
3337
      code = terrno;
×
3338
    } else {
3339
      char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL);
24,305✔
3340
      while (NULL != pDb) {
48,441✔
3341
        if (NULL == taosArrayPush(pQuery->pDbList, pDb)) {
48,622!
3342
          code = terrno;
×
3343
          taosHashCancelIterate(pStmt->pDbFNameHashObj, pDb);
×
3344
          break;
×
3345
        }
3346
        pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb);
24,311✔
3347
      }
3348
    }
3349
  }
3350

3351
  return code;
1,827,510✔
3352
}
3353

3354
// INSERT INTO
3355
//   tb_name
3356
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) [table_options]]
3357
//       [(field1_name, ...)]
3358
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
3359
//   [...];
3360
static int32_t parseInsertSqlFromStart(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
1,853,038✔
3361
  int32_t code = skipInsertInto(&pStmt->pSql, &pCxt->msg);
1,853,038✔
3362
  if (TSDB_CODE_SUCCESS == code) {
1,853,103!
3363
    code = parseInsertBody(pCxt, pStmt);
1,853,107✔
3364
  }
3365
  return code;
1,852,830✔
3366
}
3367

3368
static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
20✔
3369
  int32_t          code = TSDB_CODE_SUCCESS;
20✔
3370
  SRowsDataContext rowsDataCxt;
3371

3372
  if (!pStmt->stbSyntax) {
20✔
3373
    STableDataCxt* pTableCxt = NULL;
18✔
3374
    code = getTableDataCxt(pCxt, pStmt, &pTableCxt);
18✔
3375
    rowsDataCxt.pTableDataCxt = pTableCxt;
18✔
3376
  } else {
3377
    rowsDataCxt.pStbRowsCxt = pStmt->pStbRowsCxt;
2✔
3378
  }
3379
  if (TSDB_CODE_SUCCESS == code) {
20!
3380
    code = parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt);
20✔
3381
  }
3382

3383
  if (TSDB_CODE_SUCCESS == code) {
20!
3384
    if (pStmt->fileProcessing) {
20!
3385
      code = parseInsertBodyBottom(pCxt, pStmt);
×
3386
    } else {
3387
      code = parseInsertBody(pCxt, pStmt);
20✔
3388
    }
3389
  }
3390

3391
  return code;
20✔
3392
}
3393

3394
static int32_t parseInsertSqlFromTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
43,784✔
3395
  int32_t code = parseInsertTableClauseBottom(pCxt, pStmt);
43,784✔
3396
  if (TSDB_CODE_SUCCESS == code) {
43,785✔
3397
    code = parseInsertBody(pCxt, pStmt);
32,795✔
3398
  }
3399
  return code;
43,785✔
3400
}
3401

3402
static int32_t parseInsertSqlImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
1,896,849✔
3403
  if (pStmt->pSql == pCxt->pComCxt->pSql || NULL != pCxt->pComCxt->pStmtCb) {
1,896,849!
3404
    return parseInsertSqlFromStart(pCxt, pStmt);
1,853,044✔
3405
  }
3406

3407
  if (pStmt->fileProcessing) {
43,805✔
3408
    return parseInsertSqlFromCsv(pCxt, pStmt);
20✔
3409
  }
3410

3411
  return parseInsertSqlFromTable(pCxt, pStmt);
43,785✔
3412
}
3413

3414
static int32_t buildUsingInsertTableReq(SName* pSName, SName* pCName, SArray** pTables) {
40,944✔
3415
  if (NULL == *pTables) {
40,944!
3416
    *pTables = taosArrayInit(2, sizeof(SName));
40,944✔
3417
    if (NULL == *pTables) {
40,944!
3418
      goto _err;
×
3419
    }
3420
  }
3421
  if (NULL == taosArrayPush(*pTables, pSName)) {
81,888!
3422
    goto _err;
×
3423
  }
3424
  if (NULL == taosArrayPush(*pTables, pCName)) {
81,888!
3425
    goto _err;
×
3426
  }
3427
  return TSDB_CODE_SUCCESS;
40,944✔
3428

3429
_err:
×
3430
  if (NULL != *pTables) {
×
3431
    taosArrayDestroy(*pTables);
×
3432
    *pTables = NULL;
×
3433
  }
3434
  return terrno;
×
3435
}
3436

3437
static int32_t buildInsertTableReq(SName* pName, SArray** pTables) {
48,210✔
3438
  *pTables = taosArrayInit(1, sizeof(SName));
48,210✔
3439
  if (NULL == *pTables) {
48,212!
3440
    return terrno;
×
3441
  }
3442

3443
  if (NULL == taosArrayPush(*pTables, pName)) {
96,423!
3444
    taosArrayDestroy(*pTables);
×
3445
    *pTables = NULL;
×
3446
    return terrno;
×
3447
  }
3448
  return TSDB_CODE_SUCCESS;
48,211✔
3449
}
3450

3451
static int32_t buildInsertUsingDbReq(SName* pSName, SName* pCName, SArray** pDbs) {
40,944✔
3452
  if (NULL == *pDbs) {
40,944!
3453
    *pDbs = taosArrayInit(1, sizeof(STablesReq));
40,944✔
3454
    if (NULL == *pDbs) {
40,944!
3455
      return terrno;
×
3456
    }
3457
  }
3458

3459
  STablesReq req = {0};
40,944✔
3460
  req.autoCreate = 1;
40,944✔
3461
  (void)tNameGetFullDbName(pSName, req.dbFName);
40,944✔
3462
  (void)tNameGetFullDbName(pCName, req.dbFName);
40,944✔
3463

3464
  int32_t code = buildUsingInsertTableReq(pSName, pCName, &req.pTables);
40,944✔
3465
  if (TSDB_CODE_SUCCESS == code && NULL == taosArrayPush(*pDbs, &req)) {
81,888!
3466
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3467
  }
3468
  return code;
40,944✔
3469
}
3470

3471
static int32_t buildInsertDbReq(SName* pName, SArray** pDbs) {
48,205✔
3472
  if (NULL == *pDbs) {
48,205!
3473
    *pDbs = taosArrayInit(1, sizeof(STablesReq));
48,205✔
3474
    if (NULL == *pDbs) {
48,205!
3475
      return terrno;
×
3476
    }
3477
  }
3478

3479
  STablesReq req = {0};
48,205✔
3480
  (void)tNameGetFullDbName(pName, req.dbFName);
48,205✔
3481
  int32_t code = buildInsertTableReq(pName, &req.pTables);
48,207✔
3482
  if (TSDB_CODE_SUCCESS == code && NULL == taosArrayPush(*pDbs, &req)) {
96,416!
3483
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3484
  }
3485

3486
  return code;
48,208✔
3487
}
3488

3489
static int32_t buildInsertUserAuthReq(const char* pUser, SName* pName, SArray** pUserAuth) {
44,573✔
3490
  *pUserAuth = taosArrayInit(1, sizeof(SUserAuthInfo));
44,573✔
3491
  if (NULL == *pUserAuth) {
44,574!
3492
    return terrno;
×
3493
  }
3494

3495
  SUserAuthInfo userAuth = {.type = AUTH_TYPE_WRITE};
44,574✔
3496
  snprintf(userAuth.user, sizeof(userAuth.user), "%s", pUser);
44,574✔
3497
  memcpy(&userAuth.tbName, pName, sizeof(SName));
44,574✔
3498
  if (NULL == taosArrayPush(*pUserAuth, &userAuth)) {
89,148!
3499
    taosArrayDestroy(*pUserAuth);
×
3500
    *pUserAuth = NULL;
×
3501
    return terrno;
×
3502
  }
3503

3504
  return TSDB_CODE_SUCCESS;
44,574✔
3505
}
3506

3507
static int32_t buildInsertTableTagReq(SName* pName, SArray** pTables) { return buildInsertTableReq(pName, pTables); }
4✔
3508

3509
static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SCatalogReq* pCatalogReq) {
44,574✔
3510
  int32_t code = buildInsertUserAuthReq(
44,574✔
3511
      pCxt->pComCxt->pUser, (0 == pStmt->usingTableName.type ? &pStmt->targetTableName : &pStmt->usingTableName),
89,148✔
3512
      &pCatalogReq->pUser);
3513
  if (TSDB_CODE_SUCCESS == code && pCxt->needTableTagVal) {
44,575✔
3514
    code = buildInsertTableTagReq(&pStmt->targetTableName, &pCatalogReq->pTableTag);
4✔
3515
  }
3516
  if (TSDB_CODE_SUCCESS == code) {
44,575✔
3517
    if (0 == pStmt->usingTableName.type) {
44,574✔
3518
      code = buildInsertDbReq(&pStmt->targetTableName, &pCatalogReq->pTableMeta);
3,631✔
3519
    } else {
3520
      code = buildInsertUsingDbReq(&pStmt->usingTableName, &pStmt->targetTableName, &pCatalogReq->pTableMeta);
40,943✔
3521
    }
3522
  }
3523
  if (TSDB_CODE_SUCCESS == code) {
44,576✔
3524
    code = buildInsertDbReq(&pStmt->targetTableName, &pCatalogReq->pTableHash);
44,575✔
3525
  }
3526
  return code;
44,576✔
3527
}
3528

3529
static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) {
1,871,993✔
3530
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
1,871,993✔
3531
  if (pCxt->missCache) {
1,871,993✔
3532
    parserDebug("QID:0x%" PRIx64 ", %d rows of %d tables will be inserted before obtain the cache", pCxt->pComCxt->requestId,
44,576✔
3533
                pStmt->totalRowsNum, pStmt->totalTbNum);
3534

3535
    pQuery->execStage = QUERY_EXEC_STAGE_PARSE;
44,576✔
3536
    return buildInsertCatalogReq(pCxt, pStmt, pCatalogReq);
44,576✔
3537
  }
3538

3539
  parserDebug("QID:0x%" PRIx64 ", %d rows of %d tables will be inserted", pCxt->pComCxt->requestId, pStmt->totalRowsNum,
1,827,417✔
3540
              pStmt->totalTbNum);
3541

3542
  pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE;
1,827,458✔
3543
  return TSDB_CODE_SUCCESS;
1,827,458✔
3544
}
3545

3546
int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData) {
1,897,401✔
3547
  SInsertParseContext context = {.pComCxt = pCxt,
3,794,802✔
3548
                                 .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
1,897,401✔
3549
                                 .missCache = false,
3550
                                 .usingDuplicateTable = false,
3551
                                 .needRequest = true,
3552
                                 .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false),
1,897,401✔
3553
                                 .isStmtBind = pCxt->isStmtBind};
1,897,401✔
3554

3555
  int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
1,897,401✔
3556
  if (TSDB_CODE_SUCCESS == code) {
1,897,478✔
3557
    code = parseInsertSqlImpl(&context, (SVnodeModifyOpStmt*)((*pQuery)->pRoot));
1,896,881✔
3558
  }
3559
  if (TSDB_CODE_SUCCESS == code) {
1,897,186✔
3560
    code = setNextStageInfo(&context, *pQuery, pCatalogReq);
1,872,026✔
3561
  }
3562
  if ((TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) &&
1,897,202!
3563
      QUERY_EXEC_STAGE_SCHEDULE == (*pQuery)->execStage) {
1,872,474✔
3564
    code = setRefreshMeta(*pQuery);
1,827,409✔
3565
  }
3566

3567
  insDestroyBoundColInfo(&context.tags);
1,897,329✔
3568
  // if no data to insert, set emptyMode to avoid request server
3569
  if (!context.needRequest) {
1,897,283✔
3570
    (*pQuery)->execMode = QUERY_EXEC_MODE_EMPTY_RESULT;
10✔
3571
  }
3572
  return code;
1,897,283✔
3573
}
3574

3575
// CSV Parser Implementation
3576
static int32_t csvParserInit(SCsvParser* parser, TdFilePtr pFile) {
221✔
3577
  if (!parser || !pFile) {
221!
3578
    return TSDB_CODE_INVALID_PARA;
×
3579
  }
3580

3581
  memset(parser, 0, sizeof(SCsvParser));
221✔
3582

3583
  // Set default CSV format
3584
  parser->delimiter = CSV_DEFAULT_DELIMITER;
221✔
3585
  parser->quote = CSV_QUOTE_SINGLE;  // Default to single quote for TDengine compatibility
221✔
3586
  parser->escape = CSV_ESCAPE_CHAR;
221✔
3587
  parser->allowNewlineInField = true;
221✔
3588

3589
  // Initialize buffer
3590
  parser->bufferSize = 64 * 1024;  // 64KB buffer
221✔
3591
  parser->buffer = taosMemoryMalloc(parser->bufferSize);
221!
3592
  if (!parser->buffer) {
221!
3593
    return terrno;
×
3594
  }
3595

3596
  // Initialize line buffer for reuse
3597
  parser->lineBufferCapacity = 64 * 1024;  // Initial 64KB line buffer
221✔
3598
  parser->lineBuffer = taosMemoryMalloc(parser->lineBufferCapacity);
221!
3599
  if (!parser->lineBuffer) {
221!
3600
    return terrno;
×
3601
  }
3602

3603
  parser->bufferPos = 0;
221✔
3604
  parser->bufferLen = 0;
221✔
3605
  parser->eof = false;
221✔
3606
  parser->pFile = pFile;
221✔
3607

3608
  // Fill initial buffer to detect quote type
3609
  int32_t code = csvParserFillBuffer(parser);
221✔
3610
  if (code != TSDB_CODE_SUCCESS) {
221!
3611
    return code;
×
3612
  }
3613

3614
  // Auto-detect quote character by finding the first quote in the file
3615
  // Skip the header line and look for the first quote character in data
3616
  bool foundFirstQuote = false;
221✔
3617
  bool inFirstLine = true;
221✔
3618
  
3619
  for (size_t i = 0; i < parser->bufferLen && !foundFirstQuote; i++) {
1,208,157✔
3620
    char ch = parser->buffer[i];
1,207,936✔
3621
    
3622
    // Skip the first line (header)
3623
    if (inFirstLine) {
1,207,936✔
3624
      if (ch == '\n') {
25,378✔
3625
        inFirstLine = false;
221✔
3626
      }
3627
      continue;
25,378✔
3628
    }
3629
    
3630
    // Look for the first quote character in data lines
3631
    if (ch == CSV_QUOTE_SINGLE) {
1,182,558✔
3632
      parser->quote = CSV_QUOTE_SINGLE;
109✔
3633
      foundFirstQuote = true;
109✔
3634
    } else if (ch == CSV_QUOTE_DOUBLE) {
1,182,449✔
3635
      parser->quote = CSV_QUOTE_DOUBLE;
93✔
3636
      foundFirstQuote = true;
93✔
3637
    }
3638
  }
3639

3640
  // If no quotes found, keep default (single quote for TDengine compatibility)
3641
  
3642
  // Reset buffer position for actual parsing
3643
  parser->bufferPos = 0;
221✔
3644

3645
  return TSDB_CODE_SUCCESS;
221✔
3646
}
3647

3648
static void csvParserDestroy(SCsvParser* parser) {
221✔
3649
  if (parser) {
221!
3650
    taosMemoryFree(parser->buffer);
221!
3651
    taosMemoryFree(parser->lineBuffer);
221!
3652
    memset(parser, 0, sizeof(SCsvParser));
221✔
3653
  }
3654
}
221✔
3655

3656
static int32_t csvParserFillBuffer(SCsvParser* parser) {
13,191✔
3657
  if (!parser || parser->eof) {
13,191!
3658
    return TSDB_CODE_SUCCESS;
×
3659
  }
3660

3661
  // Move remaining data to beginning of buffer
3662
  // Since this function is only called when bufferPos >= bufferLen,
3663
  // we can simplify by always resetting the buffer
3664
  parser->bufferLen = 0;
13,191✔
3665
  parser->bufferPos = 0;
13,191✔
3666

3667
  // Read more data
3668
  size_t spaceLeft = parser->bufferSize - parser->bufferLen;
13,191✔
3669
  if (spaceLeft > 0) {
13,191!
3670
    int64_t bytesRead = taosReadFile(parser->pFile, parser->buffer + parser->bufferLen, spaceLeft);
13,191✔
3671
    if (bytesRead < 0) {
13,191!
3672
      return TAOS_SYSTEM_ERROR(errno);
×
3673
    }
3674
    if (bytesRead == 0) {
13,191✔
3675
      parser->eof = true;
179✔
3676
    } else {
3677
      parser->bufferLen += bytesRead;
13,012✔
3678
    }
3679
  }
3680

3681
  return TSDB_CODE_SUCCESS;
13,191✔
3682
}
3683

3684
// Destroy saved CSV parser in SVnodeModifyOpStmt
3685
static void destroySavedCsvParser(SVnodeModifyOpStmt* pStmt) {
221✔
3686
  if (pStmt && pStmt->pCsvParser) {
221!
3687
    csvParserDestroy(pStmt->pCsvParser);
221✔
3688
    taosMemoryFree(pStmt->pCsvParser);
221!
3689
    pStmt->pCsvParser = NULL;
221✔
3690
  }
3691
}
221✔
3692

3693
static int32_t csvParserExpandLineBuffer(SCsvParser* parser, size_t requiredLen) {
799,482,888✔
3694
  if (!parser || requiredLen <= parser->lineBufferCapacity) {
799,482,888!
3695
    return TSDB_CODE_SUCCESS;
799,482,888✔
3696
  }
3697

3698
  size_t newCapacity = parser->lineBufferCapacity;
×
3699
  while (newCapacity < requiredLen) {
×
3700
    newCapacity *= 2;
×
3701
  }
3702

3703
  char* newLineBuffer = taosMemoryRealloc(parser->lineBuffer, newCapacity);
×
3704
  if (!newLineBuffer) {
×
3705
    return TSDB_CODE_OUT_OF_MEMORY;
×
3706
  }
3707

3708
  parser->lineBuffer = newLineBuffer;
×
3709
  parser->lineBufferCapacity = newCapacity;
×
3710
  return TSDB_CODE_SUCCESS;
×
3711
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc