• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.93
/source/libs/parser/src/parInsertSql.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "decimal.h"
17
#include "geosWrapper.h"
18
#include "parInsertUtil.h"
19
#include "parToken.h"
20
#include "query.h"
21
#include "scalar.h"
22
#include "tglobal.h"
23
#include "ttime.h"
24

25
// CSV delimiter and quote character definitions
26
#define CSV_DEFAULT_DELIMITER ','
27
#define CSV_QUOTE_SINGLE      '\''
28
#define CSV_QUOTE_DOUBLE      '"'
29
#define CSV_ESCAPE_CHAR       '\\'
30
#define CSV_QUOTE_NONE        '\0'
31

32
typedef struct SCsvParser {
33
  char      delimiter;            // Field delimiter (default: ',')
34
  char      quote;                // Quote character (default: '"')
35
  char      escape;               // Escape character (default: '\')
36
  bool      allowNewlineInField;  // Allow newlines in quoted fields
37
  char*     buffer;               // Read buffer
38
  size_t    bufferSize;           // Buffer size
39
  size_t    bufferPos;            // Current position in buffer
40
  size_t    bufferLen;            // Valid data length in buffer
41
  bool      eof;                  // End of file reached
42
  TdFilePtr pFile;                // File pointer
43
  // Line buffer for reuse to avoid frequent memory allocation
44
  char*  lineBuffer;          // Reusable line buffer
45
  size_t lineBufferCapacity;  // Line buffer capacity
46
} SCsvParser;
47

48
typedef struct SInsertParseContext {
49
  SParseContext* pComCxt;
50
  SMsgBuf        msg;
51
  char           tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
52
  SBoundColInfo  tags;  // for stmt
53
  bool           missCache;
54
  bool           usingDuplicateTable;
55
  bool           forceUpdate;
56
  bool           needTableTagVal;
57
  bool           needRequest;  // whether or not request server
58
  // bool           isStmtBind;   // whether is stmt bind
59
  uint8_t        stmtTbNameFlag;
60
  SArray*        pParsedValues;  // <SColVal> for stmt bind col
61
} SInsertParseContext;
62

63
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
64
static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt);
65
static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate);
66

67
// CSV parser function declarations
68
static int32_t csvParserInit(SCsvParser* parser, TdFilePtr pFile);
69
static void    csvParserDestroy(SCsvParser* parser);
70
static int32_t csvParserFillBuffer(SCsvParser* parser);
71
static int32_t csvParserReadLine(SCsvParser* parser);
72
static void    destroySavedCsvParser(SVnodeModifyOpStmt* pStmt);
73
static int32_t csvParserExpandLineBuffer(SCsvParser* parser, size_t requiredLen);
74

75

76
static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
77
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
78

79
static FORCE_INLINE bool isNullValue(int8_t dataType, SToken* pToken) {
80
  return TK_NULL == pToken->type ||
425,168,333✔
81
         (TK_NK_STRING == pToken->type && !IS_STR_DATA_TYPE(dataType) && IS_NULL_STR(pToken->z, pToken->n));
424,565,185!
82
}
83

84
static FORCE_INLINE int32_t toDouble(SToken* pToken, double* value, char** endPtr) {
85
  SET_ERRNO(0);
86
  *value = taosStr2Double(pToken->z, endPtr);
87

88
  // not a valid integer number, return error
89
  if ((*endPtr - pToken->z) != pToken->n) {
90
    return TK_NK_ILLEGAL;
91
  }
92

93
  return pToken->type;
94
}
95

96
static int32_t skipInsertInto(const char** pSql, SMsgBuf* pMsg) {
81,074✔
97
  SToken token;
98
  NEXT_TOKEN(*pSql, token);
81,074✔
99
  if (TK_INSERT != token.type && TK_IMPORT != token.type) {
81,100!
100
    return buildSyntaxErrMsg(pMsg, "keyword INSERT is expected", token.z);
×
101
  }
102
  NEXT_TOKEN(*pSql, token);
81,100✔
103
  if (TK_INTO != token.type) {
81,092!
104
    return buildSyntaxErrMsg(pMsg, "keyword INTO is expected", token.z);
×
105
  }
106
  return TSDB_CODE_SUCCESS;
81,099✔
107
}
108

109
static int32_t skipParentheses(SInsertParseContext* pCxt, const char** pSql) {
1,474✔
110
  SToken  token;
111
  int32_t expectRightParenthesis = 1;
1,474✔
112
  while (1) {
113
    NEXT_TOKEN(*pSql, token);
18,318✔
114
    if (TK_NK_LP == token.type) {
18,318!
115
      ++expectRightParenthesis;
×
116
    } else if (TK_NK_RP == token.type && 0 == --expectRightParenthesis) {
18,318!
117
      break;
1,474✔
118
    }
119
    if (0 == token.n) {
16,844!
120
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", NULL);
×
121
    }
122
  }
123
  return TSDB_CODE_SUCCESS;
1,474✔
124
}
125

126
static int32_t skipTableOptions(SInsertParseContext* pCxt, const char** pSql) {
224✔
127
  do {
32✔
128
    int32_t index = 0;
224✔
129
    SToken  token;
130
    NEXT_TOKEN_KEEP_SQL(*pSql, token, index);
224✔
131
    if (TK_TTL == token.type || TK_COMMENT == token.type) {
224!
132
      *pSql += index;
32✔
133
      NEXT_TOKEN_WITH_PREV(*pSql, token);
32✔
134
    } else {
135
      break;
136
    }
137
  } while (1);
138
  return TSDB_CODE_SUCCESS;
192✔
139
}
140

141
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
142
static int32_t ignoreUsingClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
×
143
  const char** pSql = &pStmt->pSql;
×
144
  int32_t      code = TSDB_CODE_SUCCESS;
×
145
  SToken       token;
146
  NEXT_TOKEN(*pSql, token);
×
147

148
  NEXT_TOKEN(*pSql, token);
×
149
  if (TK_NK_LP == token.type) {
×
150
    code = skipParentheses(pCxt, pSql);
×
151
    if (TSDB_CODE_SUCCESS == code) {
×
152
      NEXT_TOKEN(*pSql, token);
×
153
    }
154
  }
155

156
  // pSql -> TAGS (tag1_value, ...)
157
  if (TSDB_CODE_SUCCESS == code) {
×
158
    if (TK_TAGS != token.type) {
×
159
      code = buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", token.z);
×
160
    } else {
161
      NEXT_TOKEN(*pSql, token);
×
162
    }
163
  }
164
  if (TSDB_CODE_SUCCESS == code) {
×
165
    if (TK_NK_LP != token.type) {
×
166
      code = buildSyntaxErrMsg(&pCxt->msg, "( is expected", token.z);
×
167
    } else {
168
      code = skipParentheses(pCxt, pSql);
×
169
    }
170
  }
171

172
  if (TSDB_CODE_SUCCESS == code) {
×
173
    code = skipTableOptions(pCxt, pSql);
×
174
  }
175

176
  return code;
×
177
}
178

179
static int32_t ignoreUsingClauseAndCheckTagValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
195✔
180
  const char** pSql = &pStmt->pSql;
195✔
181
  int32_t      code = TSDB_CODE_SUCCESS;
195✔
182
  code = parseBoundTagsClause(pCxt, pStmt);
195✔
183
  if (TSDB_CODE_SUCCESS != code) {
195!
184
    return code;
×
185
  }
186
  // pSql -> TAGS (tag1_value, ...)
187
  code = parseTagsClause(pCxt, pStmt, true);
195✔
188
  if (TSDB_CODE_SUCCESS != code) {
195✔
189
    return code;
3✔
190
  }
191

192
  if (TSDB_CODE_SUCCESS == code) {
192!
193
    code = skipTableOptions(pCxt, pSql);
192✔
194
  }
195

196
  return code;
192✔
197
}
198

199
static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* pDuplicate) {
10,404✔
200
  int32_t code = TSDB_CODE_SUCCESS;
10,404✔
201
  *pDuplicate = false;
10,404✔
202

203
  char tbFName[TSDB_TABLE_FNAME_LEN];
204
  code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
10,404✔
205
  if (TSDB_CODE_SUCCESS != code) {
10,410!
206
    return code;
×
207
  }
208
  STableMeta** pMeta = taosHashGet(pStmt->pSubTableHashObj, tbFName, strlen(tbFName));
10,410✔
209
  if (NULL != pMeta) {
10,410!
210
    *pDuplicate = true;
×
211
    pCxt->missCache = false;
×
212
    code = cloneTableMeta(*pMeta, &pStmt->pTableMeta);
×
213
    if (TSDB_CODE_SUCCESS != code) {
×
214
      return code;
×
215
    }
216
    return ignoreUsingClause(pCxt, pStmt);
×
217
  }
218

219
  return code;
10,410✔
220
}
221

222
typedef enum { BOUND_TAGS, BOUND_COLUMNS, BOUND_ALL_AND_TBNAME } EBoundColumnsType;
223

224
static int32_t getTbnameSchemaIndex(STableMeta* pTableMeta) {
1,967✔
225
  return pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns;
1,967✔
226
}
227

228
// pStmt->pSql -> field1_name, ...)
229
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, EBoundColumnsType boundColsType,
1,580✔
230
                                 STableMeta* pTableMeta, SBoundColInfo* pBoundInfo) {
231
  SSchema* pSchema = NULL;
1,580✔
232
  if (boundColsType == BOUND_TAGS) {
1,580✔
233
    pSchema = getTableTagSchema(pTableMeta);
61✔
234
  } else if (boundColsType == BOUND_COLUMNS) {
1,519✔
235
    pSchema = getTableColumnSchema(pTableMeta);
1,468✔
236
  } else {
237
    pSchema = pTableMeta->schema;
51✔
238
    if (pBoundInfo->numOfCols != getTbnameSchemaIndex(pTableMeta) + 1) {
51!
239
      return TSDB_CODE_PAR_INTERNAL_ERROR;
×
240
    }
241
  }
242
  int32_t tbnameSchemaIndex = getTbnameSchemaIndex(pTableMeta);
1,580✔
243

244
  bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
1,580!
245
  if (NULL == pUseCols) {
1,580!
246
    return terrno;
×
247
  }
248

249
  pBoundInfo->numOfBound = 0;
1,580✔
250
  pBoundInfo->hasBoundCols = true;
1,580✔
251

252
  bool    hasPK = pTableMeta->tableInfo.numOfPKs;
1,580✔
253
  int16_t numOfBoundPKs = 0;
1,580✔
254
  int16_t lastColIdx = -1;  // last column found
1,580✔
255
  int32_t code = TSDB_CODE_SUCCESS;
1,580✔
256
  while (TSDB_CODE_SUCCESS == code) {
18,654!
257
    SToken token;
258
    NEXT_TOKEN(*pSql, token);
18,654✔
259

260
    if (TK_NK_RP == token.type) {
18,656✔
261
      break;
1,580✔
262
    }
263

264
    char tmpTokenBuf[TSDB_COL_NAME_LEN + 2] = {0};  // used for deleting Escape character backstick(`)
17,076✔
265
    strncpy(tmpTokenBuf, token.z, token.n);
17,076✔
266
    token.z = tmpTokenBuf;
17,076✔
267
    token.n = strdequote(token.z);
17,076✔
268

269
    if (boundColsType == BOUND_ALL_AND_TBNAME && token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) {
17,075!
270
      pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex;
50✔
271
      pUseCols[tbnameSchemaIndex] = true;
50✔
272
      ++pBoundInfo->numOfBound;
50✔
273
      continue;
50✔
274
    }
275
    int16_t t = lastColIdx + 1;
17,025✔
276
    int16_t end = (boundColsType == BOUND_ALL_AND_TBNAME) ? (pBoundInfo->numOfCols - 1) : pBoundInfo->numOfCols;
17,025✔
277
    int16_t index = insFindCol(&token, t, end, pSchema);
17,025✔
278
    if (index < 0 && t > 0) {
17,024!
279
      index = insFindCol(&token, 0, t, pSchema);
513✔
280
    }
281
    if (index < 0) {
17,025!
282
      code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, token.z);
×
283
    } else if (pUseCols[index]) {
17,025!
284
      code = buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", token.z);
×
285
    } else {
286
      lastColIdx = index;
17,025✔
287
      pUseCols[index] = true;
17,025✔
288
      pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
17,025✔
289
      ++pBoundInfo->numOfBound;
17,025✔
290
      if (hasPK && (pSchema[index].flags & COL_IS_KEY)) ++numOfBoundPKs;
17,025✔
291
    }
292
  }
293

294
  if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType)) {
1,580!
295
    if (!pUseCols[0]) {
1,519✔
296
      code = buildInvalidOperationMsg(&pCxt->msg, "Primary timestamp column should not be null");
4✔
297
    }
298
    if (numOfBoundPKs != pTableMeta->tableInfo.numOfPKs) {
1,519!
299
      code = buildInvalidOperationMsg(&pCxt->msg, "Primary key column should not be none");
×
300
    }
301
  }
302
  if (TSDB_CODE_SUCCESS == code && (BOUND_ALL_AND_TBNAME == boundColsType) && !pUseCols[tbnameSchemaIndex]) {
1,580✔
303
    code = buildInvalidOperationMsg(&pCxt->msg, "tbname column should not be null");
1✔
304
  }
305
  taosMemoryFree(pUseCols);
1,580!
306

307
  return code;
1,580✔
308
}
309

310
static int32_t parseTimestampOrInterval(const char** end, SToken* pToken, int16_t timePrec, int64_t* ts,
43,451,405✔
311
                                        int64_t* interval, SMsgBuf* pMsgBuf, bool* isTs, timezone_t tz) {
312
  if (pToken->type == TK_NOW) {
43,451,405✔
313
    *isTs = true;
749✔
314
    *ts = taosGetTimestamp(timePrec);
1,498!
315
  } else if (pToken->type == TK_TODAY) {
43,450,656!
316
    *isTs = true;
×
317
    *ts = taosGetTimestampToday(timePrec, tz);
×
318
  } else if (pToken->type == TK_NK_INTEGER) {
43,450,656!
319
    *isTs = true;
43,453,864✔
320
    if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, ts)) {
43,453,864!
321
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
322
    }
323
  } else if (pToken->type == TK_NK_VARIABLE) {
×
324
    char unit = 0;
485✔
325
    *isTs = false;
485✔
326
    if (parseAbsoluteDuration(pToken->z, pToken->n, interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
485!
327
      return TSDB_CODE_TSC_INVALID_OPERATION;
×
328
    }
329
  } else {  // parse the RFC-3339/ISO-8601 timestamp format string
330
    *isTs = true;
×
331
    if (taosParseTime(pToken->z, ts, pToken->n, timePrec, tz) != TSDB_CODE_SUCCESS) {
×
332
      if ((pToken->n == 0) ||
23,919✔
333
          (pToken->type != TK_NK_STRING && pToken->type != TK_NK_HEX && pToken->type != TK_NK_BIN)) {
50!
334
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
23,869✔
335
      }
336
      if (IS_NOW_STR(pToken->z, pToken->n)) {
50!
337
        *isTs = true;
×
338
        *ts = taosGetTimestamp(timePrec);
×
339
      } else if (IS_TODAY_STR(pToken->z, pToken->n)) {
50!
340
        *isTs = true;
×
341
        *ts = taosGetTimestampToday(timePrec, tz);
×
342
      } else if (TSDB_CODE_SUCCESS == toIntegerPure(pToken->z, pToken->n, 10, ts)) {
50!
343
        *isTs = true;
×
344
      } else {
345
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
50✔
346
      }
347
    }
348
  }
349

350
  return TSDB_CODE_SUCCESS;
43,568,980✔
351
}
352

353
static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf,
43,429,107✔
354
                     timezone_t tz) {
355
  int32_t     index = 0, i = 0;
43,429,107✔
356
  int64_t     interval = 0, tempInterval = 0;
43,429,107✔
357
  int64_t     ts = 0, tempTs = 0;
43,429,107✔
358
  bool        firstIsTS = false, secondIsTs = false;
43,429,107✔
359
  const char* pTokenEnd = *end;
43,429,107✔
360

361
  if (TSDB_CODE_SUCCESS !=
43,559,024!
362
      parseTimestampOrInterval(&pTokenEnd, pToken, timePrec, &ts, &interval, pMsgBuf, &firstIsTS, tz)) {
43,429,107✔
363
    return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
364
  }
365

366
  if (firstIsTS) {
43,595,120✔
367
    *time = ts;
43,564,032✔
368
  }
369

370
  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
43,595,612!
371
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
43,615,596!
372
    if (pToken->z[k] == '(') {  // for insert NOW()/TODAY()
43,615,305✔
373
      if (pToken->z[k + 1] == ')') {
201!
374
        *end = pTokenEnd = &pToken->z[k + 2];
201✔
375
        ++k;
201✔
376
        continue;
201✔
377
      } else {
378
        char nc = pToken->z[k + 1];
×
379
        while (nc == ' ' || nc == '\t' || nc == '\n' || nc == '\r' || nc == '\f') {
×
380
          nc = pToken->z[(++k) + 1];
×
381
        }
382
        if (nc == ')') {
×
383
          *end = pTokenEnd = &pToken->z[k + 2];
×
384
          ++k;
×
385
          continue;
×
386
        }
387
      }
388
    }
389
    if (pToken->z[k] == ',') {
43,615,104✔
390
      *end = pTokenEnd;
43,345,156✔
391
      if (!firstIsTS) {
43,345,156!
392
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
393
      }
394
      *time = ts;
43,345,156✔
395
      return TSDB_CODE_SUCCESS;
43,345,156✔
396
    }
397
    break;
269,948✔
398
  }
399

400
  while (pTokenEnd[i] != '\0') {
250,255✔
401
    if (pTokenEnd[i] == ' ' || pTokenEnd[i] == '\t') {
202,120!
402
      i++;
291✔
403
      continue;
291✔
404
    } else if (pTokenEnd[i] == ',' || pTokenEnd[i] == ')') {
201,829✔
405
      *end = pTokenEnd + i;
201,339✔
406
      if (!firstIsTS) {
201,339!
407
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
408
      }
409
      *time = ts;
201,339✔
410
      return TSDB_CODE_SUCCESS;
201,339✔
411
    } else {
412
      break;
413
    }
414
  }
415
  pTokenEnd = pTokenEnd + i;
48,625✔
416

417
  index = 0;
48,625✔
418
  SToken token = tStrGetToken(pTokenEnd, &index, false, NULL);
48,625✔
419

420
  if (token.type == TK_NK_MINUS || token.type == TK_NK_PLUS) {
554!
421
    pTokenEnd += index;
485✔
422
    index = 0;
485✔
423
    SToken valueToken = tStrGetToken(pTokenEnd, &index, false, NULL);
485✔
424
    pTokenEnd += index;
485✔
425
    char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
426
    if (TK_NK_STRING == valueToken.type) {
485!
427
      if (valueToken.n >= TSDB_MAX_BYTES_PER_ROW) {
×
428
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", valueToken.z);
473✔
429
      }
430
      int32_t len = trimString(valueToken.z, valueToken.n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
×
431
      valueToken.z = tmpTokenBuf;
×
432
      valueToken.n = len;
×
433
    }
434

435
    if (TSDB_CODE_SUCCESS !=
485!
436
        parseTimestampOrInterval(&pTokenEnd, &valueToken, timePrec, &tempTs, &tempInterval, pMsgBuf, &secondIsTs, tz)) {
485✔
437
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
438
    }
439

440
    if (valueToken.n < 2) {
485!
441
      return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", token.z);
×
442
    }
443

444
    if (secondIsTs) {
485!
445
      // not support operator between tow timestamp, such as today() + now()
446
      if (firstIsTS) {
×
447
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
448
      }
449
      ts = tempTs;
×
450
    } else {
451
      // not support operator between tow interval, such as 2h + 3s
452
      if (!firstIsTS) {
485!
453
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
454
      }
455
      interval = tempInterval;
485✔
456
    }
457
    if (token.type == TK_NK_MINUS) {
485!
458
      // not support interval - ts,such as 2h - today()
459
      if (secondIsTs) {
×
460
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
461
      }
462
      *time = ts - interval;
×
463
    } else {
464
      *time = ts + interval;
485✔
465
    }
466

467
    for (int k = valueToken.n; valueToken.z[k] != '\0'; k++) {
485✔
468
      if (valueToken.z[k] == ' ' || valueToken.z[k] == '\t') continue;
473!
469
      if (valueToken.z[k] == '(' && valueToken.z[k + 1] == ')') {  // for insert NOW()/TODAY()
473!
470
        *end = pTokenEnd = &valueToken.z[k + 2];
×
471
        k++;
×
472
        continue;
×
473
      }
474
      if (valueToken.z[k] == ',' || valueToken.z[k] == ')') {
473!
475
        *end = pTokenEnd;
473✔
476
        return TSDB_CODE_SUCCESS;
473✔
477
      }
478
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
479
    }
480
  }
481

482
  *end = pTokenEnd;
81✔
483
  return TSDB_CODE_SUCCESS;
81✔
484
}
485

486
// need to call geosFreeBuffer(*output) later
487
static int parseGeometry(SToken* pToken, unsigned char** output, size_t* size) {
925,878✔
488
#ifdef USE_GEOS
489
  int32_t code = TSDB_CODE_FAILED;
925,878✔
490

491
  //[ToDo] support to parse WKB as well as WKT
492
  if (pToken->type == TK_NK_STRING) {
925,878!
493
    code = initCtxGeomFromText();
925,947✔
494
    if (code != TSDB_CODE_SUCCESS) {
925,596!
495
      return code;
×
496
    }
497

498
    code = doGeomFromText(pToken->z, output, size);
925,596✔
499
    if (code != TSDB_CODE_SUCCESS) {
929,259!
500
      return code;
×
501
    }
502
  }
503

504
  return code;
929,190✔
505
#else
506
  TAOS_RETURN(TSDB_CODE_OPS_NOT_SUPPORT);
507
#endif
508
}
509

510
static int32_t parseVarbinary(SToken* pToken, uint8_t** pData, uint32_t* nData, int32_t bytes) {
2,255✔
511
  if (pToken->type != TK_NK_STRING) {
2,255✔
512
    return TSDB_CODE_PAR_INVALID_VARBINARY;
3✔
513
  }
514

515
  if (isHex(pToken->z + 1, pToken->n - 2)) {
2,252✔
516
    if (!isValidateHex(pToken->z + 1, pToken->n - 2)) {
9✔
517
      return TSDB_CODE_PAR_INVALID_VARBINARY;
1✔
518
    }
519

520
    void*    data = NULL;
8✔
521
    uint32_t size = 0;
8✔
522
    if (taosHex2Ascii(pToken->z + 1, pToken->n - 2, &data, &size) < 0) {
8!
523
      return TSDB_CODE_OUT_OF_MEMORY;
×
524
    }
525

526
    if (size + VARSTR_HEADER_SIZE > bytes) {
8!
527
      taosMemoryFree(data);
×
528
      return TSDB_CODE_PAR_VALUE_TOO_LONG;
×
529
    }
530
    *pData = data;
8✔
531
    *nData = size;
8✔
532
  } else {
533
    *pData = taosMemoryCalloc(1, pToken->n);
2,243!
534
    if (!pData) return terrno;
2,243!
535
    int32_t len = trimString(pToken->z, pToken->n, *pData, pToken->n);
2,243✔
536
    *nData = len;
2,243✔
537

538
    if (*nData + VARSTR_HEADER_SIZE > bytes) {
2,243!
539
      return TSDB_CODE_PAR_VALUE_TOO_LONG;
×
540
    }
541
  }
542
  return TSDB_CODE_SUCCESS;
2,251✔
543
}
544
static int32_t parseBlob(SToken* pToken, uint8_t** pData, uint32_t* nData, int32_t bytes) {
×
545
  if (pToken->type != TK_NK_STRING) {
×
546
    return TSDB_CODE_PAR_INVALID_VARBINARY;
×
547
  }
548
  int32_t  code = 0;
×
549
  int32_t  lino = 0;
×
550
  uint32_t size = 0;
×
551
  *pData = NULL;
×
552

553
  if (isHex(pToken->z + 1, pToken->n - 2)) {
×
554
    if (!isValidateHex(pToken->z + 1, pToken->n - 2)) {
×
555
      return TSDB_CODE_PAR_INVALID_VARBINARY;
×
556
    }
557
    void* data = NULL;
×
558

559
    if (taosHex2Ascii(pToken->z + 1, pToken->n - 2, &data, &size) < 0) {
×
560
      TSDB_CHECK_CODE(code, lino, _error);
×
561
    }
562
    *pData = data;
×
563
    *nData = size;
×
564
  } else {
565
    *pData = taosMemoryCalloc(1, pToken->n);
×
566
    if (!pData) return terrno;
×
567
    size = trimString(pToken->z, pToken->n, (char*)*pData, pToken->n);
×
568
    *nData = size;
×
569
  }
570

571
  if (size + BLOBSTR_HEADER_SIZE > TSDB_MAX_BLOB_LEN) {
×
572
    TSDB_CHECK_CODE(code = TSDB_CODE_BLOB_VALUE_TOO_LONG, lino, _error);
×
573
  }
574

575
_error:
×
576
  if (code != 0) {
×
577
    taosMemoryFree(pData);
×
578
    uError("parseBlob failed at lino %d code: %s", lino, tstrerror(code));
×
579
    return code;
×
580
  }
581
  *nData = size;
×
582
  return TSDB_CODE_SUCCESS;
×
583
}
584

585
static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, STagVal* val,
85,753✔
586
                             SMsgBuf* pMsgBuf, timezone_t tz, void* charsetCxt) {
587
  int64_t  iv;
588
  uint64_t uv;
589
  char*    endptr = NULL;
85,753✔
590
  int32_t  code = TSDB_CODE_SUCCESS;
85,753✔
591

592
#if 0
593
  if (isNullValue(pSchema->type, pToken)) {
594
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
595
      return buildSyntaxErrMsg(pMsgBuf, "Primary timestamp column can not be null", pToken->z);
596
    }
597

598
    return TSDB_CODE_SUCCESS;
599
  }
600
#endif
601

602
  //  strcpy(val->colName, pSchema->name);
603
  val->cid = pSchema->colId;
85,753✔
604
  val->type = pSchema->type;
85,753✔
605

606
  switch (pSchema->type) {
85,753!
607
    case TSDB_DATA_TYPE_BOOL: {
319✔
608
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
319!
609
        if (IS_TRUE_STR(pToken->z, pToken->n)) {
274!
610
          *(int8_t*)(&val->i64) = TRUE_VALUE;
170✔
611
        } else if (IS_FALSE_STR(pToken->z, pToken->n)) {
104!
612
          *(int8_t*)(&val->i64) = FALSE_VALUE;
104✔
613
        } else if (TSDB_CODE_SUCCESS == toDoubleEx(pToken->z, pToken->n, pToken->type, (double*)&iv)) {
×
614
          *(int8_t*)(&val->i64) = (*(double*)&iv == 0 ? FALSE_VALUE : TRUE_VALUE);
×
615
        } else {
616
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
×
617
        }
618
      } else if (pToken->type == TK_NK_INTEGER) {
45!
619
        *(int8_t*)(&val->i64) = ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? FALSE_VALUE : TRUE_VALUE);
45✔
620
      } else if (pToken->type == TK_NK_FLOAT) {
×
621
        *(int8_t*)(&val->i64) = ((taosStr2Double(pToken->z, NULL) == 0) ? FALSE_VALUE : TRUE_VALUE);
×
622
      } else if ((pToken->type == TK_NK_HEX || pToken->type == TK_NK_BIN) &&
×
623
                 (TSDB_CODE_SUCCESS == toDoubleEx(pToken->z, pToken->n, pToken->type, (double*)&iv))) {
×
624
        *(int8_t*)(&val->i64) = (*(double*)&iv == 0 ? FALSE_VALUE : TRUE_VALUE);
×
625
      } else {
626
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
×
627
      }
628
      break;
320✔
629
    }
630

631
    case TSDB_DATA_TYPE_TINYINT: {
5,928✔
632
      code = toIntegerEx(pToken->z, pToken->n, pToken->type, &iv);
5,928✔
633
      if (TSDB_CODE_SUCCESS != code) {
5,926!
634
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
×
635
      } else if (!IS_VALID_TINYINT(iv)) {
5,926!
636
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
×
637
      }
638

639
      *(int8_t*)(&val->i64) = iv;
5,931✔
640
      break;
5,931✔
641
    }
642

643
    case TSDB_DATA_TYPE_UTINYINT: {
1,479✔
644
      code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &uv);
1,479✔
645
      if (TSDB_CODE_SUCCESS != code) {
1,479!
646
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
×
647
      } else if (uv > UINT8_MAX) {
1,479!
648
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
×
649
      }
650
      *(uint8_t*)(&val->i64) = uv;
1,479✔
651
      break;
1,479✔
652
    }
653

654
    case TSDB_DATA_TYPE_SMALLINT: {
1,441✔
655
      code = toIntegerEx(pToken->z, pToken->n, pToken->type, &iv);
1,441✔
656
      if (TSDB_CODE_SUCCESS != code) {
1,441!
657
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
×
658
      } else if (!IS_VALID_SMALLINT(iv)) {
1,441!
659
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
1✔
660
      }
661
      *(int16_t*)(&val->i64) = iv;
1,440✔
662
      break;
1,440✔
663
    }
664

665
    case TSDB_DATA_TYPE_USMALLINT: {
1,270✔
666
      code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &uv);
1,270✔
667
      if (TSDB_CODE_SUCCESS != code) {
1,270!
668
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
×
669
      } else if (uv > UINT16_MAX) {
1,270!
670
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
×
671
      }
672
      *(uint16_t*)(&val->i64) = uv;
1,270✔
673
      break;
1,270✔
674
    }
675

676
    case TSDB_DATA_TYPE_INT: {
36,633✔
677
      code = toIntegerEx(pToken->z, pToken->n, pToken->type, &iv);
36,633✔
678
      if (TSDB_CODE_SUCCESS != code) {
36,646✔
679
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
1✔
680
      } else if (!IS_VALID_INT(iv)) {
36,645!
681
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
×
682
      }
683
      *(int32_t*)(&val->i64) = iv;
36,658✔
684
      break;
36,658✔
685
    }
686

687
    case TSDB_DATA_TYPE_UINT: {
1,270✔
688
      code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &uv);
1,270✔
689
      if (TSDB_CODE_SUCCESS != code) {
1,270!
690
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
×
691
      } else if (uv > UINT32_MAX) {
1,270!
692
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
×
693
      }
694
      *(uint32_t*)(&val->i64) = uv;
1,270✔
695
      break;
1,270✔
696
    }
697

698
    case TSDB_DATA_TYPE_BIGINT: {
1,270✔
699
      code = toIntegerEx(pToken->z, pToken->n, pToken->type, &iv);
1,270✔
700
      if (TSDB_CODE_SUCCESS != code) {
1,271✔
701
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
1✔
702
      }
703
      val->i64 = iv;
1,270✔
704
      break;
1,270✔
705
    }
706

707
    case TSDB_DATA_TYPE_UBIGINT: {
1,320✔
708
      code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &uv);
1,320✔
709
      if (TSDB_CODE_SUCCESS != code) {
1,320!
710
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
×
711
      }
712
      *(uint64_t*)(&val->i64) = uv;
1,320✔
713
      break;
1,320✔
714
    }
715

716
    case TSDB_DATA_TYPE_FLOAT: {
504✔
717
      double dv;
718
      code = toDoubleEx(pToken->z, pToken->n, pToken->type, &dv);
504✔
719
      if (TSDB_CODE_SUCCESS != code) {
502!
720
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
×
721
      }
722
      if (dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
502!
723
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
×
724
      }
725
      *(float*)(&val->i64) = dv;
505✔
726
      break;
505✔
727
    }
728

729
    case TSDB_DATA_TYPE_DOUBLE: {
483✔
730
      double dv;
731
      code = toDoubleEx(pToken->z, pToken->n, pToken->type, &dv);
483✔
732
      if (TSDB_CODE_SUCCESS != code) {
483!
733
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
×
734
      }
735
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && ERRNO == ERANGE) || isinf(dv) || isnan(dv)) {
483!
736
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
×
737
      }
738

739
      *(double*)(&val->i64) = dv;
483✔
740
      break;
483✔
741
    }
742

743
    case TSDB_DATA_TYPE_BINARY: {
30,427✔
744
      // Too long values will raise the invalid sql error message
745
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
30,427✔
746
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
12✔
747
      }
748
      val->pData = taosStrdup(pToken->z);
30,415!
749
      if (!val->pData) {
30,421!
750
        return terrno;
×
751
      }
752
      val->nData = pToken->n;
30,421✔
753
      break;
30,421✔
754
    }
755
    case TSDB_DATA_TYPE_VARBINARY: {
1,027✔
756
      code = parseVarbinary(pToken, &val->pData, &val->nData, pSchema->bytes);
1,027✔
757
      if (code != TSDB_CODE_SUCCESS) {
1,027✔
758
        return generateSyntaxErrMsg(pMsgBuf, code, pSchema->name);
4✔
759
      }
760
      break;
1,023✔
761
    }
762
    case TSDB_DATA_TYPE_GEOMETRY: {
16✔
763
      unsigned char* output = NULL;
16✔
764
      size_t         size = 0;
16✔
765

766
      code = parseGeometry(pToken, &output, &size);
16✔
767
      if (code != TSDB_CODE_SUCCESS) {
16!
768
        code = buildSyntaxErrMsg(pMsgBuf, getGeosErrMsg(code), pToken->z);
×
769
      } else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) {
16!
770
        // Too long values will raise the invalid sql error message
771
        code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
772
      } else {
773
        val->pData = taosMemoryMalloc(size);
16!
774
        if (NULL == val->pData) {
16!
775
          code = terrno;
×
776
        } else {
777
          memcpy(val->pData, output, size);
16✔
778
          val->nData = size;
16✔
779
        }
780
      }
781

782
      geosFreeBuffer(output);
16✔
783
      break;
16✔
784
    }
785

786
    case TSDB_DATA_TYPE_NCHAR: {
2,212✔
787
      int32_t output = 0;
2,212✔
788
      int64_t realLen = pToken->n << 2;
2,212✔
789
      if (realLen > pSchema->bytes - VARSTR_HEADER_SIZE) realLen = pSchema->bytes - VARSTR_HEADER_SIZE;
2,212!
790
      void* p = taosMemoryMalloc(realLen);
2,212!
791
      if (p == NULL) {
2,213!
792
        return terrno;
2✔
793
      }
794
      if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)(p), realLen, &output, charsetCxt)) {
2,213✔
795
        if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
2!
796
          taosMemoryFree(p);
×
797
          return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
798
        }
799
        char buf[512] = {0};
2✔
800
        snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s %d %d", strerror(terrno), ERRNO, EILSEQ);
2✔
801
        taosMemoryFree(p);
2!
802
        return buildSyntaxErrMsg(pMsgBuf, buf, pToken->z);
2✔
803
      }
804
      val->pData = p;
2,210✔
805
      val->nData = output;
2,210✔
806
      break;
2,210✔
807
    }
808
    case TSDB_DATA_TYPE_TIMESTAMP: {
192✔
809
      if (parseTime(end, pToken, timePrec, &iv, pMsgBuf, tz) != TSDB_CODE_SUCCESS) {
192!
810
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
×
811
      }
812

813
      val->i64 = iv;
193✔
814
      break;
193✔
815
    }
816
    case TSDB_DATA_TYPE_MEDIUMBLOB:
×
817
    case TSDB_DATA_TYPE_BLOB: {
818
      code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_BLOB_NOT_SUPPORT_TAG, pSchema->name);
×
819
      break;
×
820
    }
821
  }
822

823
  return code;
85,771✔
824
}
825

826
// input pStmt->pSql:  [(tag1_name, ...)] TAGS (tag1_value, ...) ...
827
// output pStmt->pSql: TAGS (tag1_value, ...) ...
828
static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
10,392✔
829
  int32_t code = insInitBoundColsInfo(getNumOfTags(pStmt->pTableMeta), &pCxt->tags);
10,392✔
830
  if (TSDB_CODE_SUCCESS != code) {
10,409!
831
    return code;
×
832
  }
833

834
  SToken  token;
835
  int32_t index = 0;
10,409✔
836
  NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
10,409✔
837
  if (TK_NK_LP != token.type) {
10,405✔
838
    return TSDB_CODE_SUCCESS;
10,346✔
839
  }
840

841
  pStmt->pSql += index;
59✔
842
  return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_TAGS, pStmt->pTableMeta, &pCxt->tags);
59✔
843
}
844

845
int32_t parseTagValue(SMsgBuf* pMsgBuf, const char** pSql, uint8_t precision, SSchema* pTagSchema, SToken* pToken,
85,864✔
846
                      SArray* pTagName, SArray* pTagVals, STag** pTag, timezone_t tz, void* charsetCxt) {
847
  bool isNull = isNullValue(pTagSchema->type, pToken);
85,864✔
848
  if (!isNull && pTagName) {
85,864✔
849
    if (NULL == taosArrayPush(pTagName, pTagSchema->name)) {
171,525!
850
      return terrno;
×
851
    }
852
  }
853

854
  if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
85,875✔
855
    if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
34!
856
      return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
×
857
    }
858

859
    if (isNull) {
34✔
860
      return tTagNew(pTagVals, 1, true, pTag);
2✔
861
    } else {
862
      return parseJsontoTagData(pToken->z, pTagVals, pTag, pMsgBuf, charsetCxt);
32✔
863
    }
864
  }
865

866
  if (isNull) return 0;
85,841✔
867

868
  STagVal val = {0};
85,762✔
869
  int32_t code = parseTagToken(pSql, pToken, pTagSchema, precision, &val, pMsgBuf, tz, charsetCxt);
85,762✔
870
  if (TSDB_CODE_SUCCESS == code) {
85,750✔
871
    if (NULL == taosArrayPush(pTagVals, &val)) {
85,746!
872
      code = terrno;
×
873
    }
874
  }
875

876
  return code;
85,765✔
877
}
878

879
static int32_t buildCreateTbReq(SVnodeModifyOpStmt* pStmt, STag* pTag, SArray* pTagName) {
10,160✔
880
  if (pStmt->pCreateTblReq) {
10,160!
881
    tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
×
882
    taosMemoryFreeClear(pStmt->pCreateTblReq);
×
883
  }
884
  pStmt->pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
10,160!
885
  if (NULL == pStmt->pCreateTblReq) {
10,163!
886
    return terrno;
×
887
  }
888
  return insBuildCreateTbReq(pStmt->pCreateTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid,
10,163✔
889
                             pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags,
10,163✔
890
                             TSDB_DEFAULT_TABLE_TTL);
891
}
892

893
int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf, int8_t type) {
430,048,528✔
894
  if (pToken->type == TK_NK_QUESTION) {
430,048,528!
895
    return buildInvalidOperationMsg(pMsgBuf, "insert into super table syntax is not supported for stmt");
×
896
  }
897
  if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
430,048,528!
898
       pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
90,585,926✔
899
       pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN &&
1,992,344!
900
       pToken->type != TK_NK_VARIABLE) ||
2!
901
      (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
430,048,526!
902
    return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
×
903
  }
904

905
  // Remove quotation marks
906
  if (TK_NK_STRING == pToken->type && type != TSDB_DATA_TYPE_VARBINARY && !IS_STR_DATA_BLOB(type)) {
430,350,489!
907
    if (!IS_STR_DATA_BLOB(type)) {
37,087,100!
908
      if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
37,116,141!
909
        return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
×
910
      }
911

912
      int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
37,116,141✔
913
      pToken->z = tmpTokenBuf;
37,291,185✔
914
      pToken->n = len;
37,291,185✔
915
    } else {
916
      if (pToken->n >= TSDB_MAX_BLOB_LEN) {
×
917
        return buildSyntaxErrMsg(pMsgBuf, "too long blob", pToken->z);
×
918
      }
919

920
      int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BLOB_LEN);
×
921
      pToken->z = tmpTokenBuf;
×
922
      pToken->n = len;
×
923
    }
924
  }
925

926
  return TSDB_CODE_SUCCESS;
430,554,574✔
927
}
928

929
typedef struct SRewriteTagCondCxt {
930
  SArray* pTagVals;
931
  SArray* pTagName;
932
  int32_t code;
933
} SRewriteTagCondCxt;
934

935
static int32_t rewriteTagCondColumnImpl(STagVal* pVal, SNode** pNode) {
×
936
  SValueNode* pValue = NULL;
×
937
  int32_t     code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pValue);
×
938
  if (NULL == pValue) {
×
939
    return code;
×
940
  }
941

942
  pValue->node.resType = ((SColumnNode*)*pNode)->node.resType;
×
943
  nodesDestroyNode(*pNode);
×
944
  *pNode = (SNode*)pValue;
×
945

946
  switch (pVal->type) {
×
947
    case TSDB_DATA_TYPE_BOOL:
×
948
      pValue->datum.b = *(int8_t*)(&pVal->i64);
×
949
      *(bool*)&pValue->typeData = pValue->datum.b;
×
950
      break;
×
951
    case TSDB_DATA_TYPE_TINYINT:
×
952
      pValue->datum.i = *(int8_t*)(&pVal->i64);
×
953
      *(int8_t*)&pValue->typeData = pValue->datum.i;
×
954
      break;
×
955
    case TSDB_DATA_TYPE_SMALLINT:
×
956
      pValue->datum.i = *(int16_t*)(&pVal->i64);
×
957
      *(int16_t*)&pValue->typeData = pValue->datum.i;
×
958
      break;
×
959
    case TSDB_DATA_TYPE_INT:
×
960
      pValue->datum.i = *(int32_t*)(&pVal->i64);
×
961
      *(int32_t*)&pValue->typeData = pValue->datum.i;
×
962
      break;
×
963
    case TSDB_DATA_TYPE_BIGINT:
×
964
      pValue->datum.i = pVal->i64;
×
965
      pValue->typeData = pValue->datum.i;
×
966
      break;
×
967
    case TSDB_DATA_TYPE_FLOAT:
×
968
      pValue->datum.d = *(float*)(&pVal->i64);
×
969
      *(float*)&pValue->typeData = pValue->datum.d;
×
970
      break;
×
971
    case TSDB_DATA_TYPE_DOUBLE:
×
972
      pValue->datum.d = *(double*)(&pVal->i64);
×
973
      *(double*)&pValue->typeData = pValue->datum.d;
×
974
      break;
×
975
    case TSDB_DATA_TYPE_VARCHAR:
×
976
    case TSDB_DATA_TYPE_VARBINARY:
977
    case TSDB_DATA_TYPE_NCHAR:
978
      pValue->datum.p = taosMemoryCalloc(1, pVal->nData + VARSTR_HEADER_SIZE);
×
979
      if (NULL == pValue->datum.p) {
×
980
        return terrno;
×
981
      }
982
      varDataSetLen(pValue->datum.p, pVal->nData);
×
983
      memcpy(varDataVal(pValue->datum.p), pVal->pData, pVal->nData);
×
984
      break;
×
985
    case TSDB_DATA_TYPE_TIMESTAMP:
×
986
      pValue->datum.i = pVal->i64;
×
987
      pValue->typeData = pValue->datum.i;
×
988
      break;
×
989
    case TSDB_DATA_TYPE_UTINYINT:
×
990
      pValue->datum.i = *(uint8_t*)(&pVal->i64);
×
991
      *(uint8_t*)&pValue->typeData = pValue->datum.i;
×
992
      break;
×
993
    case TSDB_DATA_TYPE_USMALLINT:
×
994
      pValue->datum.i = *(uint16_t*)(&pVal->i64);
×
995
      *(uint16_t*)&pValue->typeData = pValue->datum.i;
×
996
      break;
×
997
    case TSDB_DATA_TYPE_UINT:
×
998
      pValue->datum.i = *(uint32_t*)(&pVal->i64);
×
999
      *(uint32_t*)&pValue->typeData = pValue->datum.i;
×
1000
      break;
×
1001
    case TSDB_DATA_TYPE_UBIGINT:
×
1002
      pValue->datum.i = *(uint64_t*)(&pVal->i64);
×
1003
      *(uint64_t*)&pValue->typeData = pValue->datum.i;
×
1004
      break;
×
1005
    case TSDB_DATA_TYPE_JSON:
×
1006
    case TSDB_DATA_TYPE_DECIMAL:
1007
    case TSDB_DATA_TYPE_BLOB:
1008
    case TSDB_DATA_TYPE_MEDIUMBLOB:
1009
    default:
1010
      return TSDB_CODE_FAILED;
×
1011
  }
1012
  return TSDB_CODE_SUCCESS;
×
1013
}
1014

1015
static int32_t rewriteTagCondColumn(SArray* pTagVals, SArray* pTagName, SNode** pNode) {
×
1016
  SColumnNode* pCol = (SColumnNode*)*pNode;
×
1017
  int32_t      ntags = taosArrayGetSize(pTagName);
×
1018
  for (int32_t i = 0; i < ntags; ++i) {
×
1019
    char* pTagColName = taosArrayGet(pTagName, i);
×
1020
    if (0 == strcmp(pTagColName, pCol->colName)) {
×
1021
      return rewriteTagCondColumnImpl(taosArrayGet(pTagVals, i), pNode);
×
1022
    }
1023
  }
1024
  return TSDB_CODE_PAR_PERMISSION_DENIED;
×
1025
}
1026

1027
static EDealRes rewriteTagCond(SNode** pNode, void* pContext) {
×
1028
  if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
×
1029
    SRewriteTagCondCxt* pCxt = pContext;
×
1030
    pCxt->code = rewriteTagCondColumn(pCxt->pTagVals, pCxt->pTagName, pNode);
×
1031
    return (TSDB_CODE_SUCCESS == pCxt->code ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
×
1032
  }
1033
  return DEAL_RES_CONTINUE;
×
1034
}
1035

1036
static int32_t setTagVal(SArray* pTagVals, SArray* pTagName, SNode* pCond) {
×
1037
  SRewriteTagCondCxt cxt = {.code = TSDB_CODE_SUCCESS, .pTagVals = pTagVals, .pTagName = pTagName};
×
1038
  nodesRewriteExpr(&pCond, rewriteTagCond, &cxt);
×
1039
  return cxt.code;
×
1040
}
1041

1042
static int32_t checkTagCondResult(SNode* pResult) {
×
1043
  return (QUERY_NODE_VALUE == nodeType(pResult) && ((SValueNode*)pResult)->datum.b) ? TSDB_CODE_SUCCESS
×
1044
                                                                                    : TSDB_CODE_PAR_PERMISSION_DENIED;
×
1045
}
1046

1047
static int32_t checkSubtablePrivilege(SArray* pTagVals, SArray* pTagName, SNode** pCond) {
×
1048
  int32_t code = setTagVal(pTagVals, pTagName, *pCond);
×
1049
  if (TSDB_CODE_SUCCESS == code) {
×
1050
    code = scalarCalculateConstants(*pCond, pCond);
×
1051
  }
1052
  if (TSDB_CODE_SUCCESS == code) {
×
1053
    code = checkTagCondResult(*pCond);
×
1054
  }
1055
  NODES_DESTORY_NODE(*pCond);
×
1056
  return code;
×
1057
}
1058

1059
// pSql -> tag1_value, ...)
1060
static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate) {
10,389✔
1061
  int32_t  code = TSDB_CODE_SUCCESS;
10,389✔
1062
  SSchema* pSchema = getTableTagSchema(pStmt->pTableMeta);
10,389✔
1063
  SArray*  pTagVals = NULL;
10,405✔
1064
  SArray*  pTagName = NULL;
10,405✔
1065
  uint8_t  precision = pStmt->pTableMeta->tableInfo.precision;
10,405✔
1066
  SToken   token;
1067
  bool     isJson = false;
10,405✔
1068
  STag*    pTag = NULL;
10,405✔
1069
  uint8_t*    pTagsIndex;
1070
  int32_t     numOfTags = 0;
10,405✔
1071

1072
  if (pCxt->pComCxt->stmtBindVersion == 2) {  // only support stmt2
10,405✔
1073
    pTagsIndex = taosMemoryCalloc(pCxt->tags.numOfBound, sizeof(uint8_t));
57!
1074
  }
1075

1076
  if (!(pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal))) ||
20,794!
1077
      !(pTagName = taosArrayInit(pCxt->tags.numOfBound, TSDB_COL_NAME_LEN))) {
10,393✔
1078
    code = terrno;
×
1079
    goto _exit;
×
1080
  }
1081

1082
  for (int i = 0; TSDB_CODE_SUCCESS == code && i < pCxt->tags.numOfBound; ++i) {
22,037✔
1083
    NEXT_TOKEN_WITH_PREV(pStmt->pSql, token);
11,632✔
1084

1085
    if (token.type == TK_NK_QUESTION) {
11,642✔
1086
      if (pCxt->pComCxt->stmtBindVersion == 0) {
125!
1087
        code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", token.z);
×
1088
        break;
×
1089
      }
1090

1091
      continue;
125✔
1092
    }
1093

1094
    SSchema* pTagSchema = &pSchema[pCxt->tags.pColIndex[i]];
11,517✔
1095
    isJson = pTagSchema->type == TSDB_DATA_TYPE_JSON;
11,517✔
1096
    code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
11,517✔
1097
    if (TSDB_CODE_SUCCESS == code && TK_NK_VARIABLE == token.type) {
11,514!
1098
      code = buildSyntaxErrMsg(&pCxt->msg, "not expected tags values ", token.z);
×
1099
    }
1100
    if (TSDB_CODE_SUCCESS == code) {
11,514!
1101
      code = parseTagValue(&pCxt->msg, &pStmt->pSql, precision, pTagSchema, &token, pTagName, pTagVals, &pTag,
11,516✔
1102
                           pCxt->pComCxt->timezone, pCxt->pComCxt->charsetCxt);
11,516✔
1103
    }
1104
    if (pCxt->pComCxt->stmtBindVersion == 2) {
11,523✔
1105
      pTagsIndex[numOfTags++] = pCxt->tags.pColIndex[i];
53✔
1106
    }
1107
  }
1108

1109
  if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pTagCond) {
10,405!
1110
    code = checkSubtablePrivilege(pTagVals, pTagName, &pStmt->pTagCond);
×
1111
  }
1112

1113
  if (TSDB_CODE_SUCCESS == code && pCxt->pComCxt->stmtBindVersion == 2) {
10,405✔
1114
    if (numOfTags > 0) {
57✔
1115
      if (pTagVals->size == pCxt->tags.numOfBound) {
29✔
1116
        pCxt->stmtTbNameFlag |= IS_FIXED_TAG;
26✔
1117
      } else {
1118
        pCxt->stmtTbNameFlag &= ~IS_FIXED_TAG;
3✔
1119
        pCxt->tags.parseredTags = taosMemoryMalloc(sizeof(STagsInfo));
3!
1120
        if (pCxt->tags.parseredTags == NULL) {
3!
1121
          code = terrno;
×
1122
          goto _exit;
×
1123
        }
1124
        pCxt->tags.parseredTags->numOfTags = numOfTags;
3✔
1125
        pCxt->tags.parseredTags->pTagIndex = pTagsIndex;
3✔
1126
        pCxt->tags.parseredTags->pTagVals = pTagVals;
3✔
1127
        pCxt->tags.parseredTags->STagNames = pTagName;
3✔
1128
      }
1129
    } else {
1130
      goto _exit;
28✔
1131
    }
1132
  }
1133

1134
  if (TSDB_CODE_SUCCESS == code && !isJson) {
10,377!
1135
    code = tTagNew(pTagVals, 1, false, &pTag);
10,365✔
1136
  }
1137

1138
  if (TSDB_CODE_SUCCESS == code && !autoCreate) {
10,369✔
1139
    code = buildCreateTbReq(pStmt, pTag, pTagName);
10,179✔
1140
    pTag = NULL;
10,182✔
1141
  }
1142

1143
_exit:
190✔
1144
  if (pCxt->tags.parseredTags == NULL) {
10,400!
1145
    for (int32_t i = 0; i < taosArrayGetSize(pTagVals); ++i) {
21,920✔
1146
      STagVal* p = (STagVal*)TARRAY_GET_ELEM(pTagVals, i);
11,515✔
1147
      if (IS_VAR_DATA_TYPE(p->type)) {
11,515!
1148
        taosMemoryFreeClear(p->pData);
359!
1149
      }
1150
    }
1151
    taosArrayDestroy(pTagVals);
10,388✔
1152
    taosArrayDestroy(pTagName);
10,401✔
1153
    if (pCxt->pComCxt->stmtBindVersion == 2) {
10,401✔
1154
      taosMemoryFreeClear(pTagsIndex);
54!
1155
    }
1156
  }
1157

1158
  tTagFree(pTag);
10,396✔
1159
  return code;
10,393✔
1160
}
1161

1162
// input pStmt->pSql:  TAGS (tag1_value, ...) [table_options] ...
1163
// output pStmt->pSql: [table_options] ...
1164
static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate) {
10,406✔
1165
  SToken token;
1166
  NEXT_TOKEN(pStmt->pSql, token);
10,406✔
1167
  if (TK_TAGS != token.type) {
10,407✔
1168
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", token.z);
1✔
1169
  }
1170

1171
  NEXT_TOKEN(pStmt->pSql, token);
10,406✔
1172
  if (TK_NK_LP != token.type) {
10,389!
1173
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", token.z);
×
1174
  }
1175

1176
  int32_t code = parseTagsClauseImpl(pCxt, pStmt, autoCreate);
10,389✔
1177
  if (TSDB_CODE_SUCCESS == code) {
10,393✔
1178
    NEXT_VALID_TOKEN(pStmt->pSql, token);
10,390!
1179
    if (TK_NK_COMMA == token.type) {
10,398!
1180
      code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
×
1181
    } else if (TK_NK_RP != token.type) {
10,398!
1182
      code = buildSyntaxErrMsg(&pCxt->msg, ") is expected", token.z);
×
1183
    }
1184
  }
1185
  return code;
10,401✔
1186
}
1187

1188
static int32_t storeChildTableMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
10,196✔
1189
  pStmt->pTableMeta->suid = pStmt->pTableMeta->uid;
10,196✔
1190
  pStmt->pTableMeta->uid = pStmt->totalTbNum;
10,196✔
1191
  pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE;
10,196✔
1192

1193
  STableMeta* pBackup = NULL;
10,196✔
1194
  if (TSDB_CODE_SUCCESS != cloneTableMeta(pStmt->pTableMeta, &pBackup)) {
10,196!
1195
    return TSDB_CODE_OUT_OF_MEMORY;
×
1196
  }
1197

1198
  char    tbFName[TSDB_TABLE_FNAME_LEN];
1199
  int32_t code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
10,215✔
1200
  if (TSDB_CODE_SUCCESS != code) {
10,213!
1201
    taosMemoryFree(pBackup);
×
1202
    return code;
×
1203
  }
1204
  code = taosHashPut(pStmt->pSubTableHashObj, tbFName, strlen(tbFName), &pBackup, POINTER_BYTES);
10,213✔
1205
  if (TSDB_CODE_SUCCESS != code) {
10,216!
1206
    taosMemoryFree(pBackup);
×
1207
  }
1208
  return code;
10,208✔
1209
}
1210

1211
static int32_t parseTableOptions(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
10,222✔
1212
  do {
16✔
1213
    int32_t index = 0;
10,222✔
1214
    SToken  token;
1215
    NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
10,222✔
1216
    if (TK_TTL == token.type) {
10,225✔
1217
      pStmt->pSql += index;
27✔
1218
      NEXT_TOKEN_WITH_PREV(pStmt->pSql, token);
27✔
1219
      if (TK_NK_INTEGER != token.type) {
16!
1220
        return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", token.z);
×
1221
      }
1222
      pStmt->pCreateTblReq->ttl = taosStr2Int32(token.z, NULL, 10);
16✔
1223
      if (pStmt->pCreateTblReq->ttl < 0) {
16!
1224
        return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", token.z);
×
1225
      }
1226
    } else if (TK_COMMENT == token.type) {
10,198!
1227
      pStmt->pSql += index;
×
1228
      NEXT_TOKEN(pStmt->pSql, token);
×
1229
      if (TK_NK_STRING != token.type) {
×
1230
        return buildSyntaxErrMsg(&pCxt->msg, "Invalid option comment", token.z);
×
1231
      }
1232
      if (token.n >= TSDB_TB_COMMENT_LEN) {
×
1233
        return buildSyntaxErrMsg(&pCxt->msg, "comment too long", token.z);
×
1234
      }
1235
      int32_t len = trimString(token.z, token.n, pCxt->tmpTokenBuf, TSDB_TB_COMMENT_LEN);
×
1236
      pStmt->pCreateTblReq->comment = taosStrndup(pCxt->tmpTokenBuf, len);
×
1237
      if (NULL == pStmt->pCreateTblReq->comment) {
×
1238
        return terrno;
×
1239
      }
1240
      pStmt->pCreateTblReq->commentLen = len;
×
1241
    } else {
1242
      break;
10,198✔
1243
    }
1244
  } while (1);
1245
  return TSDB_CODE_SUCCESS;
10,198✔
1246
}
1247

1248
// input pStmt->pSql:
1249
//   1. [(tag1_name, ...)] ...
1250
//   2. VALUES ... | FILE ...
1251
// output pStmt->pSql:
1252
//   1. [(field1_name, ...)]
1253
//   2. VALUES ... | FILE ...
1254
static int32_t parseUsingClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
83,991✔
1255
  if (!pStmt->usingTableProcessing || pCxt->usingDuplicateTable) {
83,991!
1256
    return TSDB_CODE_SUCCESS;
73,785✔
1257
  }
1258

1259
  int32_t code = parseBoundTagsClause(pCxt, pStmt);
10,206✔
1260
  if (TSDB_CODE_SUCCESS == code) {
10,215✔
1261
    code = parseTagsClause(pCxt, pStmt, false);
10,214✔
1262
  }
1263
  if (TSDB_CODE_SUCCESS == code) {
10,202!
1264
    code = parseTableOptions(pCxt, pStmt);
10,205✔
1265
  }
1266

1267
  return code;
10,201✔
1268
}
1269

1270
static void setUserAuthInfo(SParseContext* pCxt, SName* pTbName, SUserAuthInfo* pInfo) {
73,725✔
1271
  snprintf(pInfo->user, sizeof(pInfo->user), "%s", pCxt->pUser);
73,725✔
1272
  memcpy(&pInfo->tbName, pTbName, sizeof(SName));
73,725✔
1273
  pInfo->type = AUTH_TYPE_WRITE;
73,725✔
1274
}
73,725✔
1275

1276
static int32_t checkAuth(SParseContext* pCxt, SName* pTbName, bool* pMissCache, SNode** pTagCond) {
73,724✔
1277
  int32_t       code = TSDB_CODE_SUCCESS;
73,724✔
1278
  SUserAuthInfo authInfo = {0};
73,724✔
1279
  setUserAuthInfo(pCxt, pTbName, &authInfo);
73,724✔
1280
  SUserAuthRes authRes = {0};
73,745✔
1281
  bool         exists = true;
73,745✔
1282
  if (pCxt->async) {
73,745✔
1283
    code = catalogChkAuthFromCache(pCxt->pCatalog, &authInfo, &authRes, &exists);
72,638✔
1284
  } else {
1285
    SRequestConnInfo conn = {.pTrans = pCxt->pTransporter,
1,107✔
1286
                             .requestId = pCxt->requestId,
1,107✔
1287
                             .requestObjRefId = pCxt->requestRid,
1,107✔
1288
                             .mgmtEps = pCxt->mgmtEpSet};
1289
    code = catalogChkAuth(pCxt->pCatalog, &conn, &authInfo, &authRes);
1,107✔
1290
  }
1291
  if (TSDB_CODE_SUCCESS == code) {
73,742!
1292
    if (!exists) {
73,747✔
1293
      *pMissCache = true;
28✔
1294
    } else if (!authRes.pass[AUTH_RES_BASIC]) {
73,719!
1295
      code = TSDB_CODE_PAR_PERMISSION_DENIED;
×
1296
    } else if (NULL != authRes.pCond[AUTH_RES_BASIC]) {
73,719!
1297
      *pTagCond = authRes.pCond[AUTH_RES_BASIC];
×
1298
    }
1299
  }
1300
  return code;
73,742✔
1301
}
1302

1303
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMeta** pTableMeta, bool* pMissCache,
11,795✔
1304
                            bool bUsingTable) {
1305
  SParseContext* pComCxt = pCxt->pComCxt;
11,795✔
1306
  int32_t        code = TSDB_CODE_SUCCESS;
11,795✔
1307
  if (pComCxt->async) {
11,795✔
1308
    if (bUsingTable) {
535✔
1309
      code = catalogGetCachedSTableMeta(pComCxt->pCatalog, pTbName, pTableMeta);
248✔
1310
    } else {
1311
      code = catalogGetCachedTableMeta(pComCxt->pCatalog, pTbName, pTableMeta);
287✔
1312
    }
1313
  } else {
1314
    SRequestConnInfo conn = {.pTrans = pComCxt->pTransporter,
11,260✔
1315
                             .requestId = pComCxt->requestId,
11,260✔
1316
                             .requestObjRefId = pComCxt->requestRid,
11,260✔
1317
                             .mgmtEps = pComCxt->mgmtEpSet};
1318
    if (bUsingTable) {
11,260✔
1319
      code = catalogGetSTableMeta(pComCxt->pCatalog, &conn, pTbName, pTableMeta);
10,105✔
1320
    } else {
1321
      code = catalogGetTableMeta(pComCxt->pCatalog, &conn, pTbName, pTableMeta);
1,155✔
1322
    }
1323
  }
1324
  if (TSDB_CODE_SUCCESS == code) {
11,806✔
1325
    if (NULL == *pTableMeta) {
11,802✔
1326
      *pMissCache = true;
137✔
1327
    } else if (bUsingTable && TSDB_SUPER_TABLE != (*pTableMeta)->tableType) {
11,665!
1328
      code = buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
×
1329
    } else if (((*pTableMeta)->virtualStb) || TSDB_VIRTUAL_CHILD_TABLE == (*pTableMeta)->tableType ||
11,665!
1330
               TSDB_VIRTUAL_NORMAL_TABLE == (*pTableMeta)->tableType) {
11,670✔
1331
      code = TSDB_CODE_VTABLE_NOT_SUPPORT_STMT;
×
1332
    }
1333
  }
1334
  return code;
11,812✔
1335
}
1336

1337
static int32_t getTargetTableVgroup(SParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool isStb, bool* pMissCache) {
11,333✔
1338
  int32_t     code = TSDB_CODE_SUCCESS;
11,333✔
1339
  SVgroupInfo vg;
1340
  bool        exists = true;
11,333✔
1341
  if (pCxt->async) {
11,333✔
1342
    code = catalogGetCachedTableHashVgroup(pCxt->pCatalog, &pStmt->targetTableName, &vg, &exists);
150✔
1343
  } else {
1344
    SRequestConnInfo conn = {.pTrans = pCxt->pTransporter,
11,183✔
1345
                             .requestId = pCxt->requestId,
11,183✔
1346
                             .requestObjRefId = pCxt->requestRid,
11,183✔
1347
                             .mgmtEps = pCxt->mgmtEpSet};
1348
    code = catalogGetTableHashVgroup(pCxt->pCatalog, &conn, &pStmt->targetTableName, &vg);
11,183✔
1349
  }
1350
  if (TSDB_CODE_SUCCESS == code) {
11,335!
1351
    if (exists) {
11,335!
1352
      if (isStb) {
11,337✔
1353
        pStmt->pTableMeta->vgId = vg.vgId;
10,274✔
1354
      }
1355
      code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
11,337✔
1356
    }
1357
    *pMissCache = !exists;
11,338✔
1358
  }
1359
  return code;
11,338✔
1360
}
1361

1362
static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* pMissCache) {
73,685✔
1363
  SParseContext* pComCxt = pCxt->pComCxt;
73,685✔
1364
  int32_t        code = TSDB_CODE_SUCCESS;
73,685✔
1365
  if (pComCxt->async) {
73,685✔
1366
    {
1367
      SVgroupInfo vg;
1368
      code = catalogGetCachedTableVgMeta(pComCxt->pCatalog, &pStmt->targetTableName, &vg, &pStmt->pTableMeta);
72,602✔
1369
      if (TSDB_CODE_SUCCESS == code) {
72,600✔
1370
        if (NULL != pStmt->pTableMeta) {
72,597✔
1371
          if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE) {
72,329✔
1372
            pStmt->stbSyntax = true;
2✔
1373
          } else {
1374
            code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
72,327✔
1375
          }
1376
        }
1377
        *pMissCache = (NULL == pStmt->pTableMeta);
72,611✔
1378
      }
1379
    }
1380
  } else {
1381
    bool bUsingTable = false;
1,083✔
1382
    code = getTableMeta(pCxt, &pStmt->targetTableName, &pStmt->pTableMeta, pMissCache, bUsingTable);
1,083✔
1383
    if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
1,116!
1384
      if (TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) {
1,112✔
1385
        pStmt->stbSyntax = true;
48✔
1386
      }
1387
      if (!pStmt->stbSyntax) {
1,112✔
1388
        code = getTargetTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
1,065✔
1389
      }
1390
    }
1391
  }
1392
  return code;
73,730✔
1393
}
1394

1395
static int32_t collectUseTable(const SName* pName, SHashObj* pTable) {
11,316✔
1396
  char    fullName[TSDB_TABLE_FNAME_LEN];
1397
  int32_t code = tNameExtractFullName(pName, fullName);
11,316✔
1398
  if (TSDB_CODE_SUCCESS != code) {
11,321!
1399
    return code;
×
1400
  }
1401
  return taosHashPut(pTable, fullName, strlen(fullName), pName, sizeof(SName));
11,321✔
1402
}
1403

1404
static int32_t collectUseDatabase(const SName* pName, SHashObj* pDbs) {
11,275✔
1405
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
11,275✔
1406
  (void)tNameGetFullDbName(pName, dbFName);
11,275✔
1407
  return taosHashPut(pDbs, dbFName, strlen(dbFName), dbFName, sizeof(dbFName));
11,284✔
1408
}
1409

1410
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
73,735✔
1411
  if (pCxt->forceUpdate) {
73,735✔
1412
    pCxt->missCache = true;
9✔
1413
    return TSDB_CODE_SUCCESS;
9✔
1414
  }
1415
  SNode*  pTagCond = NULL;
73,726✔
1416
  int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache, &pTagCond);
73,726✔
1417
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
73,738!
1418
    code = getTargetTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
73,720✔
1419
  }
1420

1421
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
73,741!
1422
    if (TSDB_SUPER_TABLE != pStmt->pTableMeta->tableType) {
73,445✔
1423
      pCxt->needTableTagVal = (NULL != pTagCond);
73,377✔
1424
      pCxt->missCache = (NULL != pTagCond);
73,377✔
1425
    } else {
1426
      pStmt->pTagCond = NULL;
68✔
1427
      code = nodesCloneNode(pTagCond, &pStmt->pTagCond);
68✔
1428
    }
1429
  }
1430
  nodesDestroyNode(pTagCond);
73,723✔
1431

1432
  if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
73,736!
1433
    code = collectUseDatabase(&pStmt->targetTableName, pStmt->pDbFNameHashObj);
1,113✔
1434
    if (TSDB_CODE_SUCCESS == code) {
1,113!
1435
      code = collectUseTable(&pStmt->targetTableName, pStmt->pTableNameHashObj);
1,113✔
1436
    }
1437
  }
1438
  return code;
73,751✔
1439
}
1440

1441
static int32_t preParseUsingTableName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) {
10,394✔
1442
  return insCreateSName(&pStmt->usingTableName, pTbName, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
10,394✔
1443
}
1444

1445
static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* ctbCacheHit) {
10,395✔
1446
  int32_t     code = TSDB_CODE_SUCCESS;
10,395✔
1447
  STableMeta* pStableMeta = NULL;
10,395✔
1448
  STableMeta* pCtableMeta = NULL;
10,395✔
1449
  if (pCxt->forceUpdate) {
10,395!
1450
    pCxt->missCache = true;
×
1451
    return TSDB_CODE_SUCCESS;
×
1452
  }
1453
  char stableFName[TSDB_TABLE_FNAME_LEN];
1454
  code = tNameExtractFullName(&pStmt->usingTableName, stableFName);
10,395✔
1455
  if (TSDB_CODE_SUCCESS != code) {
10,412!
1456
    return code;
×
1457
  }
1458

1459
  char ctableFName[TSDB_TABLE_FNAME_LEN];
1460
  code = tNameExtractFullName(&pStmt->targetTableName, ctableFName);
10,412✔
1461
  if (TSDB_CODE_SUCCESS != code) {
10,412!
1462
    return code;
×
1463
  }
1464

1465
  if (strcmp(stableFName, ctableFName) == 0) {
10,412!
1466
    return TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
1467
  }
1468
  if (!pCxt->missCache) {
10,412✔
1469
    char tbFName[TSDB_TABLE_FNAME_LEN];
1470
    code = tNameExtractFullName(&pStmt->usingTableName, tbFName);
10,411✔
1471
    if (TSDB_CODE_SUCCESS != code) {
10,408!
1472
      return code;
×
1473
    }
1474
    STableMeta** ppStableMeta = taosHashGet(pStmt->pSuperTableHashObj, tbFName, strlen(tbFName));
10,408✔
1475
    if (NULL != ppStableMeta) {
10,409✔
1476
      pStableMeta = *ppStableMeta;
48✔
1477
    }
1478
    if (NULL == pStableMeta) {
10,409✔
1479
      bool bUsingTable = true;
10,362✔
1480
      code = getTableMeta(pCxt, &pStmt->usingTableName, &pStableMeta, &pCxt->missCache, bUsingTable);
10,362✔
1481
      if (TSDB_CODE_SUCCESS == code) {
10,357!
1482
        code = taosHashPut(pStmt->pSuperTableHashObj, tbFName, strlen(tbFName), &pStableMeta, POINTER_BYTES);
10,359✔
1483
      } else {
1484
        taosMemoryFreeClear(pStableMeta);
×
1485
      }
1486
    }
1487
  }
1488
  if (pCxt->pComCxt->stmtBindVersion > 0) {
10,404✔
1489
    goto _no_ctb_cache;
10,074✔
1490
  }
1491

1492
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
330!
1493
    bool bUsingTable = false;
332✔
1494
    code = getTableMeta(pCxt, &pStmt->targetTableName, &pCtableMeta, &pCxt->missCache, bUsingTable);
332✔
1495
  }
1496

1497
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
332!
1498
    code = (pStableMeta->suid == pCtableMeta->suid) ? TSDB_CODE_SUCCESS : TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
195!
1499
    *ctbCacheHit = true;
195✔
1500
  }
1501
_no_ctb_cache:
137✔
1502
  if (TSDB_CODE_SUCCESS == code) {
10,406✔
1503
    if (*ctbCacheHit) {
10,396✔
1504
      code = cloneTableMeta(pCtableMeta, &pStmt->pTableMeta);
195✔
1505
    } else {
1506
      code = cloneTableMeta(pStableMeta, &pStmt->pTableMeta);
10,201✔
1507
    }
1508
  }
1509
  taosMemoryFree(pCtableMeta);
10,420!
1510
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
10,405✔
1511
    code = getTargetTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache);
10,268✔
1512
  }
1513
  if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
10,408!
1514
    code = collectUseDatabase(&pStmt->usingTableName, pStmt->pDbFNameHashObj);
10,122✔
1515
    if (TSDB_CODE_SUCCESS == code) {
10,122!
1516
      code = collectUseTable(&pStmt->usingTableName, pStmt->pTableNameHashObj);
10,122✔
1517
    }
1518
  }
1519
  return code;
10,412✔
1520
}
1521

1522
static int32_t parseUsingTableNameImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
10,405✔
1523
  SToken token;
1524
  NEXT_TOKEN(pStmt->pSql, token);
10,405✔
1525
  bool    ctbCacheHit = false;
10,413✔
1526
  int32_t code = preParseUsingTableName(pCxt, pStmt, &token);
10,413✔
1527
  if (TSDB_CODE_SUCCESS == code) {
10,402!
1528
    code = getUsingTableSchema(pCxt, pStmt, &ctbCacheHit);
10,404✔
1529
    if (TSDB_CODE_SUCCESS == code && ctbCacheHit && !pCxt->missCache) {
10,405!
1530
      pStmt->usingTableProcessing = false;
195✔
1531
      return ignoreUsingClauseAndCheckTagValues(pCxt, pStmt);
195✔
1532
    }
1533
  }
1534
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
10,208!
1535
    code = storeChildTableMeta(pCxt, pStmt);
10,074✔
1536
  }
1537
  return code;
10,214✔
1538
}
1539

1540
// input pStmt->pSql:
1541
//   1(care). [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) [table_options]] ...
1542
//   2. VALUES ... | FILE ...
1543
// output pStmt->pSql:
1544
//   1. [(tag1_name, ...)] TAGS (tag1_value, ...) [table_options]] ...
1545
//   2. VALUES ... | FILE ...
1546
static int32_t parseUsingTableName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
84,142✔
1547
  SToken  token;
1548
  int32_t index = 0;
84,142✔
1549
  NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
84,142✔
1550
  if (TK_USING != token.type) {
84,180✔
1551
    return getTargetTableSchema(pCxt, pStmt);
73,768✔
1552
  }
1553
  pStmt->usingTableProcessing = true;
10,412✔
1554
  pCxt->stmtTbNameFlag |= USING_CLAUSE;
10,412✔
1555
  // pStmt->pSql -> stb_name [(tag1_name, ...)
1556
  pStmt->pSql += index;
10,412✔
1557
  int32_t code = parseDuplicateUsingClause(pCxt, pStmt, &pCxt->usingDuplicateTable);
10,412✔
1558
  if (TSDB_CODE_SUCCESS == code && !pCxt->usingDuplicateTable) {
10,398!
1559
    return parseUsingTableNameImpl(pCxt, pStmt);
10,407✔
1560
  }
1561
  return code;
×
1562
}
1563

1564
static int32_t preParseTargetTableName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) {
84,163✔
1565
  int32_t code = insCreateSName(&pStmt->targetTableName, pTbName, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
84,163✔
1566
  if (TSDB_CODE_SUCCESS == code) {
84,156!
1567
    if (IS_SYS_DBNAME(pStmt->targetTableName.dbname)) {
84,162!
1568
      return TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED;
×
1569
    }
1570
  }
1571

1572
  return code;
84,156✔
1573
}
1574

1575
// input pStmt->pSql:
1576
//   1(care). [(field1_name, ...)] ...
1577
//   2. [ USING ... ] ...
1578
//   3. VALUES ... | FILE ...
1579
// output pStmt->pSql:
1580
//   1. [ USING ... ] ...
1581
//   2. VALUES ... | FILE ...
1582
static int32_t preParseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
84,170✔
1583
  SToken  token;
1584
  int32_t index = 0;
84,170✔
1585
  NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
84,170✔
1586
  if (TK_NK_LP != token.type) {
84,174✔
1587
    return TSDB_CODE_SUCCESS;
82,702✔
1588
  }
1589

1590
  // pStmt->pSql -> field1_name, ...)
1591
  pStmt->pSql += index;
1,472✔
1592
  pStmt->pBoundCols = pStmt->pSql;
1,472✔
1593
  return skipParentheses(pCxt, &pStmt->pSql);
1,472✔
1594
}
1595

1596
static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt** pTableCxt) {
83,984✔
1597
  if (pCxt->pComCxt->async) {
83,984✔
1598
    return insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid),
72,856✔
1599
                              pStmt->pTableMeta, &pStmt->pCreateTblReq, pTableCxt, false, false);
72,821✔
1600
  }
1601

1602
  char    tbFName[TSDB_TABLE_FNAME_LEN];
1603
  int32_t code = 0;
11,163✔
1604
  if ((pCxt->stmtTbNameFlag & NO_DATA_USING_CLAUSE) == USING_CLAUSE) {
11,163✔
1605
    tstrncpy(pStmt->targetTableName.tname, pStmt->usingTableName.tname, sizeof(pStmt->targetTableName.tname));
24✔
1606
    tstrncpy(pStmt->targetTableName.dbname, pStmt->usingTableName.dbname, sizeof(pStmt->targetTableName.dbname));
24✔
1607
    pStmt->targetTableName.type = TSDB_SUPER_TABLE;
24✔
1608
    pStmt->pTableMeta->tableType = TSDB_SUPER_TABLE;
24✔
1609
  }
1610

1611
  code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
11,163✔
1612

1613
  if (TSDB_CODE_SUCCESS != code) {
11,177!
1614
    return code;
×
1615
  }
1616
  if (pStmt->usingTableProcessing) {
11,177✔
1617
    pStmt->pTableMeta->uid = 0;
10,069✔
1618
  }
1619
  return insGetTableDataCxt(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), pStmt->pTableMeta,
11,177✔
1620
                            &pStmt->pCreateTblReq, pTableCxt, NULL != pCxt->pComCxt->pStmtCb, false);
11,177✔
1621
}
1622

1623
static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt) {
84,009✔
1624
  SToken  token;
1625
  int32_t index = 0;
84,009✔
1626
  NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
84,009✔
1627
  if (TK_NK_LP == token.type) {
84,031✔
1628
    pStmt->pSql += index;
48✔
1629
    if (NULL != pStmt->pBoundCols) {
48!
1630
      return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
×
1631
    }
1632
    // pStmt->pSql -> field1_name, ...)
1633
    return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_COLUMNS, pStmt->pTableMeta, &pTableCxt->boundColsInfo);
48✔
1634
  }
1635

1636
  if (NULL != pStmt->pBoundCols) {
83,983✔
1637
    return parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_COLUMNS, pStmt->pTableMeta, &pTableCxt->boundColsInfo);
1,420✔
1638
  } else if (pTableCxt->boundColsInfo.hasBoundCols) {
82,563!
1639
    insResetBoundColsInfo(&pTableCxt->boundColsInfo);
×
1640
  }
1641

1642
  return TSDB_CODE_SUCCESS;
82,547✔
1643
}
1644

1645
int32_t initTableColSubmitData(STableDataCxt* pTableCxt) {
88,356✔
1646
  if (0 == (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT)) {
88,356✔
1647
    return TSDB_CODE_SUCCESS;
77,195✔
1648
  }
1649

1650
  for (int32_t i = 0; i < pTableCxt->boundColsInfo.numOfBound; ++i) {
49,594✔
1651
    SSchema*  pSchema = &pTableCxt->pMeta->schema[pTableCxt->boundColsInfo.pColIndex[i]];
38,422✔
1652
    SColData* pCol = taosArrayReserve(pTableCxt->pData->aCol, 1);
38,422✔
1653
    if (NULL == pCol) {
38,419!
1654
      return terrno;
×
1655
    }
1656
    tColDataInit(pCol, pSchema->colId, pSchema->type, pSchema->flags);
38,419✔
1657
  }
1658

1659
  return TSDB_CODE_SUCCESS;
11,172✔
1660
}
1661

1662
int32_t initTableColSubmitDataWithBoundInfo(STableDataCxt* pTableCxt, SBoundColInfo pBoundColsInfo) {
31✔
1663
  insDestroyBoundColInfo(&(pTableCxt->boundColsInfo));
31✔
1664
  pTableCxt->boundColsInfo = pBoundColsInfo;
31✔
1665
  for (int32_t i = 0; i < pBoundColsInfo.numOfBound; ++i) {
102✔
1666
    SSchema*  pSchema = &pTableCxt->pMeta->schema[pTableCxt->boundColsInfo.pColIndex[i]];
71✔
1667
    SColData* pCol = taosArrayReserve(pTableCxt->pData->aCol, 1);
71✔
1668
    if (NULL == pCol) {
71!
1669
      return terrno;
×
1670
    }
1671
    tColDataInit(pCol, pSchema->colId, pSchema->type, pSchema->flags);
71✔
1672
  }
1673

1674
  return TSDB_CODE_SUCCESS;
31✔
1675
}
1676

1677
// input pStmt->pSql:
1678
//   1. [(tag1_name, ...)] ...
1679
//   2. VALUES ... | FILE ...
1680
// output pStmt->pSql: VALUES ... | FILE ...
1681
static int32_t parseSchemaClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
84,009✔
1682
                                       STableDataCxt** pTableCxt) {
1683
  int32_t code = parseUsingClauseBottom(pCxt, pStmt);
84,009✔
1684
  if (TSDB_CODE_SUCCESS == code) {
84,006!
1685
    code = getTableDataCxt(pCxt, pStmt, pTableCxt);
84,025✔
1686
  }
1687
  if (TSDB_CODE_SUCCESS == code) {
84,013!
1688
    code = parseBoundColumnsClause(pCxt, pStmt, *pTableCxt);
84,019✔
1689
  }
1690
  if (TSDB_CODE_SUCCESS == code) {
84,019!
1691
    code = initTableColSubmitData(*pTableCxt);
84,030✔
1692
  }
1693
  return code;
84,019✔
1694
}
1695

1696
// input pStmt->pSql: [(field1_name, ...)] [ USING ... ] VALUES ... | FILE ...
1697
// output pStmt->pSql:
1698
//   1. [(tag1_name, ...)] ...
1699
//   2. VALUES ... | FILE ...
1700
static int32_t parseSchemaClauseTop(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) {
84,120✔
1701
  int32_t code = preParseTargetTableName(pCxt, pStmt, pTbName);
84,120✔
1702
  if (TSDB_CODE_SUCCESS == code) {
84,164!
1703
    // option: [(field1_name, ...)]
1704
    code = preParseBoundColumnsClause(pCxt, pStmt);
84,165✔
1705
  }
1706
  if (TSDB_CODE_SUCCESS == code) {
84,176✔
1707
    // option: [USING stb_name]
1708
    code = parseUsingTableName(pCxt, pStmt);
84,146✔
1709
  }
1710
  return code;
84,144✔
1711
}
1712

1713
static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema,
423,775,948✔
1714
                                   const SSchemaExt* pExtSchema, int16_t timePrec, SColVal* pVal) {
1715
  switch (pSchema->type) {
423,775,948!
1716
    case TSDB_DATA_TYPE_BOOL: {
12,734,352✔
1717
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
12,734,352!
1718
        if (IS_TRUE_STR(pToken->z, pToken->n)) {
10,067,831!
1719
          VALUE_SET_TRIVIAL_DATUM(&pVal->value, TRUE_VALUE);
5,581,858✔
1720
        } else if (IS_FALSE_STR(pToken->z, pToken->n)) {
4,485,973!
1721
          VALUE_SET_TRIVIAL_DATUM(&pVal->value, FALSE_VALUE);
4,563,136✔
1722
        } else if (TSDB_CODE_SUCCESS ==
×
1723
                   toDoubleEx(pToken->z, pToken->n, pToken->type, (double*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value))) {
×
1724
          int8_t v = (*(double*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value) == 0 ? FALSE_VALUE : TRUE_VALUE);
×
1725
          valueSetDatum(&pVal->value, TSDB_DATA_TYPE_BOOL, &v, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes);
×
1726
        } else {
1727
          return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
×
1728
        }
1729
      } else if (pToken->type == TK_NK_INTEGER) {
2,666,521!
1730
        int8_t v = ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? FALSE_VALUE : TRUE_VALUE);
2,668,024✔
1731
        VALUE_SET_TRIVIAL_DATUM(&pVal->value, v);
2,684,592✔
1732
      } else if (pToken->type == TK_NK_FLOAT) {
×
1733
        int8_t v = ((taosStr2Double(pToken->z, NULL) == 0) ? FALSE_VALUE : TRUE_VALUE);
×
1734
        VALUE_SET_TRIVIAL_DATUM(&pVal->value, v);
×
1735
      } else if ((pToken->type == TK_NK_HEX || pToken->type == TK_NK_BIN) &&
×
1736
                 (TSDB_CODE_SUCCESS ==
1737
                  toDoubleEx(pToken->z, pToken->n, pToken->type, (double*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value)))) {
×
1738
        int8_t v = *(double*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value) == 0 ? FALSE_VALUE : TRUE_VALUE;
×
1739
        valueSetDatum(&pVal->value, TSDB_DATA_TYPE_BOOL, &v, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes);
×
1740
      } else {
1741
        return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
×
1742
      }
1743
      break;
12,829,586✔
1744
    }
1745
    case TSDB_DATA_TYPE_TINYINT: {
12,777,842✔
1746
      int32_t code = toIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
12,777,842✔
1747
      if (TSDB_CODE_SUCCESS != code) {
12,807,180!
1748
        return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z);
×
1749
      } else if (!IS_VALID_TINYINT(VALUE_GET_TRIVIAL_DATUM(&pVal->value))) {
12,807,180!
1750
        return buildSyntaxErrMsg(&pCxt->msg, "tinyint data overflow", pToken->z);
×
1751
      }
1752
      break;
12,809,423✔
1753
    }
1754
    case TSDB_DATA_TYPE_UTINYINT: {
12,760,126✔
1755
      int32_t code =
1756
          toUIntegerEx(pToken->z, pToken->n, pToken->type, (uint64_t*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value));
12,760,126✔
1757
      if (TSDB_CODE_SUCCESS != code) {
12,758,476!
1758
        return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z);
×
1759
      } else if (VALUE_GET_TRIVIAL_DATUM(&pVal->value) > UINT8_MAX) {
12,759,053!
1760
        return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z);
×
1761
      }
1762
      break;
12,759,053✔
1763
    }
1764
    case TSDB_DATA_TYPE_SMALLINT: {
12,767,854✔
1765
      int32_t code = toIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
12,767,854✔
1766
      if (TSDB_CODE_SUCCESS != code) {
12,796,952!
1767
        return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z);
×
1768
      } else if (!IS_VALID_SMALLINT(VALUE_GET_TRIVIAL_DATUM(&pVal->value))) {
12,796,952!
1769
        return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z);
1,395✔
1770
      }
1771
      break;
12,795,557✔
1772
    }
1773
    case TSDB_DATA_TYPE_USMALLINT: {
12,761,876✔
1774
      int32_t code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
12,761,876✔
1775
      if (TSDB_CODE_SUCCESS != code) {
12,765,241!
1776
        return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z);
×
1777
      } else if (VALUE_GET_TRIVIAL_DATUM(&pVal->value) > UINT16_MAX) {
12,768,201!
1778
        return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z);
×
1779
      }
1780
      break;
12,768,201✔
1781
    }
1782
    case TSDB_DATA_TYPE_INT: {
28,408,860✔
1783
      int32_t code = toIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
28,408,860✔
1784
      if (TSDB_CODE_SUCCESS != code) {
30,776,868!
1785
        return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z);
×
1786
      } else if (!IS_VALID_INT(VALUE_GET_TRIVIAL_DATUM(&pVal->value))) {
30,776,868!
1787
        return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z);
×
1788
      }
1789
      break;
30,815,624✔
1790
    }
1791
    case TSDB_DATA_TYPE_UINT: {
12,773,692✔
1792
      int32_t code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
12,773,692✔
1793
      if (TSDB_CODE_SUCCESS != code) {
12,781,359✔
1794
        return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z);
2,225✔
1795
      } else if (VALUE_GET_TRIVIAL_DATUM(&pVal->value) > UINT32_MAX) {
12,779,134!
1796
        return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z);
×
1797
      }
1798
      break;
12,779,134✔
1799
    }
1800
    case TSDB_DATA_TYPE_BIGINT: {
198,893,558✔
1801
      int32_t code = toIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
198,893,558✔
1802
      if (TSDB_CODE_SUCCESS != code) {
214,197,249!
1803
        return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z);
×
1804
      }
1805
      break;
214,236,183✔
1806
    }
1807
    case TSDB_DATA_TYPE_UBIGINT: {
12,780,041✔
1808
      int32_t code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
12,780,041✔
1809
      if (TSDB_CODE_SUCCESS != code) {
12,781,743✔
1810
        return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z);
903✔
1811
      }
1812
      break;
12,780,840✔
1813
    }
1814
    case TSDB_DATA_TYPE_FLOAT: {
21,861,812✔
1815
      double  dv;
1816
      int32_t code = toDoubleEx(pToken->z, pToken->n, pToken->type, &dv);
21,861,812✔
1817
      if (TSDB_CODE_SUCCESS != code) {
22,414,034!
1818
        return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
×
1819
      }
1820
      if (dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
22,414,034!
1821
        return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
×
1822
      }
1823
      float f = dv;
22,445,243✔
1824
      valueSetDatum(&pVal->value, TSDB_DATA_TYPE_FLOAT, &f, sizeof(f));
22,445,243✔
1825
      break;
22,386,854✔
1826
    }
1827
    case TSDB_DATA_TYPE_DOUBLE: {
18,812,908✔
1828
      double  dv;
1829
      int32_t code = toDoubleEx(pToken->z, pToken->n, pToken->type, &dv);
18,812,908✔
1830
      if (TSDB_CODE_SUCCESS != code) {
18,976,190!
1831
        return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
×
1832
      }
1833
      if (isinf(dv) || isnan(dv)) {
18,987,534!
1834
        return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
457✔
1835
      }
1836
      VALUE_SET_TRIVIAL_DATUM(&pVal->value, (*(int64_t*)&dv));
18,987,077✔
1837
      break;
18,987,077✔
1838
    }
1839
    case TSDB_DATA_TYPE_BINARY: {
18,708,435✔
1840
      // Too long values will raise the invalid sql error message
1841
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
18,708,435!
1842
        return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
1843
      }
1844
      pVal->value.pData = taosMemoryMalloc(pToken->n);
18,708,435!
1845
      if (NULL == pVal->value.pData) {
18,873,932!
1846
        return terrno;
×
1847
      }
1848
      memcpy(pVal->value.pData, pToken->z, pToken->n);
18,873,932✔
1849
      pVal->value.nData = pToken->n;
18,873,932✔
1850
      break;
18,873,932✔
1851
    }
1852
    case TSDB_DATA_TYPE_VARBINARY: {
1,228✔
1853
      int32_t code = parseVarbinary(pToken, &pVal->value.pData, &pVal->value.nData, pSchema->bytes);
1,228✔
1854
      if (code != TSDB_CODE_SUCCESS) {
1,228!
1855
        return generateSyntaxErrMsg(&pCxt->msg, code, pSchema->name);
×
1856
      }
1857
      break;
1,228✔
1858
    }
1859
    case TSDB_DATA_TYPE_NCHAR: {
17,678,928✔
1860
      // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
1861
      int32_t len = 0;
17,678,928✔
1862
      int64_t realLen = pToken->n << 2;
17,678,928✔
1863
      if (realLen > pSchema->bytes - VARSTR_HEADER_SIZE) realLen = pSchema->bytes - VARSTR_HEADER_SIZE;
17,678,928!
1864
      char* pUcs4 = taosMemoryMalloc(realLen);
17,678,928!
1865
      if (NULL == pUcs4) {
17,625,657!
1866
        return terrno;
2✔
1867
      }
1868
      if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)pUcs4, realLen, &len, pCxt->pComCxt->charsetCxt)) {
17,625,657✔
1869
        taosMemoryFree(pUcs4);
9,867!
1870
        if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
2!
1871
          return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
1872
        }
1873
        char buf[512] = {0};
2✔
1874
        snprintf(buf, tListLen(buf), "%s", strerror(terrno));
2✔
1875
        return buildSyntaxErrMsg(&pCxt->msg, buf, pToken->z);
2✔
1876
      }
1877
      pVal->value.pData = pUcs4;
17,757,527✔
1878
      pVal->value.nData = len;
17,757,527✔
1879
      break;
17,757,527✔
1880
    }
1881
    case TSDB_DATA_TYPE_JSON: {
×
1882
      if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
×
1883
        return buildSyntaxErrMsg(&pCxt->msg, "json string too long than 4095", pToken->z);
×
1884
      }
1885
      pVal->value.pData = taosMemoryMalloc(pToken->n);
×
1886
      if (NULL == pVal->value.pData) {
×
1887
        return terrno;
×
1888
      }
1889
      memcpy(pVal->value.pData, pToken->z, pToken->n);
×
1890
      pVal->value.nData = pToken->n;
×
1891
      break;
×
1892
    }
1893
    case TSDB_DATA_TYPE_GEOMETRY: {
926,207✔
1894
      int32_t        code = TSDB_CODE_FAILED;
926,207✔
1895
      unsigned char* output = NULL;
926,207✔
1896
      size_t         size = 0;
926,207✔
1897

1898
      code = parseGeometry(pToken, &output, &size);
926,207✔
1899
      if (code != TSDB_CODE_SUCCESS) {
928,804!
1900
        code = buildSyntaxErrMsg(&pCxt->msg, getGeosErrMsg(code), pToken->z);
×
1901
      }
1902
      // Too long values will raise the invalid sql error message
1903
      else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) {
929,553!
1904
        code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
1905
      } else {
1906
        pVal->value.pData = taosMemoryMalloc(size);
929,553!
1907
        if (NULL == pVal->value.pData) {
928,834!
1908
          code = terrno;
×
1909
        } else {
1910
          memcpy(pVal->value.pData, output, size);
928,834✔
1911
          pVal->value.nData = size;
928,834✔
1912
        }
1913
      }
1914

1915
      geosFreeBuffer(output);
928,834✔
1916
      if (code != TSDB_CODE_SUCCESS) {
929,093!
1917
        return code;
×
1918
      }
1919

1920
      break;
929,093✔
1921
    }
1922
    case TSDB_DATA_TYPE_TIMESTAMP: {
42,760,497✔
1923
      if (parseTime(pSql, pToken, timePrec, &VALUE_GET_TRIVIAL_DATUM(&pVal->value), &pCxt->msg,
43,515,800!
1924
                    pCxt->pComCxt->timezone) != TSDB_CODE_SUCCESS) {
42,760,497✔
1925
        return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z);
×
1926
      }
1927
      break;
43,524,022✔
1928
    }
1929
    case TSDB_DATA_TYPE_BLOB:
×
1930
    case TSDB_DATA_TYPE_MEDIUMBLOB: {
1931
      int32_t code = parseBlob(pToken, &pVal->value.pData, &pVal->value.nData, pSchema->bytes);
×
1932
      if (code != TSDB_CODE_SUCCESS) {
×
1933
        return generateSyntaxErrMsg(&pCxt->msg, code, pSchema->name);
×
1934
      }
1935
    } break;
×
1936
    case TSDB_DATA_TYPE_DECIMAL: {
919,546✔
1937
      if (!pExtSchema) {
919,546!
1938
        qError("Decimal type without ext schema info, cannot parse decimal values");
×
1939
        return TSDB_CODE_PAR_INTERNAL_ERROR;
×
1940
      }
1941
      uint8_t precision = 0, scale = 0;
919,546✔
1942
      decimalFromTypeMod(pExtSchema->typeMod, &precision, &scale);
919,546✔
1943
      Decimal128 dec = {0};
919,395✔
1944
      int32_t    code = decimal128FromStr(pToken->z, pToken->n, precision, scale, &dec);
919,395✔
1945
      if (TSDB_CODE_SUCCESS != code) {
918,488!
1946
        return code;
×
1947
      }
1948

1949
      // precision check
1950
      // scale auto fit
1951

1952
      code = decimal128ToDataVal(&dec, &pVal->value);
918,488✔
1953
      if (TSDB_CODE_SUCCESS != code) {
927,523!
1954
        return code;
×
1955
      }
1956
      break;
927,523✔
1957
    }
1958
    case TSDB_DATA_TYPE_DECIMAL64: {
920,530✔
1959
      if (!pExtSchema) {
920,530!
1960
        qError("Decimal type without ext schema info, cannot parse decimal values");
×
1961
        return TSDB_CODE_PAR_INTERNAL_ERROR;
×
1962
      }
1963
      uint8_t precision = 0, scale = 0;
920,530✔
1964
      decimalFromTypeMod(pExtSchema->typeMod, &precision, &scale);
920,530✔
1965
      Decimal64 dec = {0};
920,698✔
1966
      int32_t   code = decimal64FromStr(pToken->z, pToken->n, precision, scale, &dec);
920,698✔
1967
      if (TSDB_CODE_SUCCESS != code) {
919,528!
1968
        return code;
×
1969
      }
1970
      code = decimal64ToDataVal(&dec, &pVal->value);
919,528✔
1971
      if (TSDB_CODE_SUCCESS != code) {
918,983!
1972
        return code;
×
1973
      }
1974
      break;
918,983✔
1975
    }
1976
    default:
×
1977

1978
      return TSDB_CODE_FAILED;
×
1979
  }
1980

1981
  pVal->flag = CV_FLAG_VALUE;
458,879,840✔
1982
  return TSDB_CODE_SUCCESS;
458,879,840✔
1983
}
1984

1985
static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema,
431,118,002✔
1986
                               const SSchemaExt* pExtSchema, int16_t timePrec, SColVal* pVal) {
1987
  int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pSchema->type);
431,118,002✔
1988
  if (TSDB_CODE_SUCCESS == code && isNullValue(pSchema->type, pToken)) {
851,059,678✔
1989
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
1,990,830!
1990
      return buildSyntaxErrMsg(&pCxt->msg, "Primary timestamp column should not be null", pToken->z);
×
1991
    }
1992

1993
    pVal->flag = CV_FLAG_NULL;
1,990,830✔
1994
    return TSDB_CODE_SUCCESS;
1,990,830✔
1995
  }
1996

1997
  if (TSDB_CODE_SUCCESS == code) {
423,986,383✔
1998
    if (pToken->n == 0 && IS_NUMERIC_TYPE(pSchema->type)) {
423,428,242!
1999
      return buildSyntaxErrMsg(&pCxt->msg, "invalid numeric data", pToken->z);
×
2000
    }
2001
    code = parseValueTokenImpl(pCxt, pSql, pToken, pSchema, pExtSchema, timePrec, pVal);
423,428,310✔
2002
  }
2003

2004
  return code;
443,873,511✔
2005
}
2006

2007
static void clearColValArray(SArray* pCols) {
42,621,882✔
2008
  int32_t num = taosArrayGetSize(pCols);
42,621,882✔
2009
  for (int32_t i = 0; i < num; ++i) {
506,813,726✔
2010
    SColVal* pCol = taosArrayGet(pCols, i);
464,209,605✔
2011
    if (IS_VAR_DATA_TYPE(pCol->value.type) || pCol->value.type == TSDB_DATA_TYPE_DECIMAL) {
463,839,561!
2012
      taosMemoryFreeClear(pCol->value.pData);
37,592,289!
2013
    }
2014
  }
2015
}
42,604,121✔
2016

2017
typedef struct SStbRowsDataContext {
2018
  SName stbName;
2019

2020
  STableMeta*   pStbMeta;
2021
  SNode*        pTagCond;
2022
  SBoundColInfo boundColsInfo;
2023

2024
  // the following fields are for each stb row
2025
  SArray*        aTagVals;
2026
  SArray*        aColVals;
2027
  SArray*        aTagNames;
2028
  SName          ctbName;
2029
  STag*          pTag;
2030
  STableMeta*    pCtbMeta;
2031
  SVCreateTbReq* pCreateCtbReq;
2032
  bool           hasTimestampTag;
2033
  bool           isJsonTag;
2034
} SStbRowsDataContext;
2035

2036
typedef union SRowsDataContext {
2037
  STableDataCxt*       pTableDataCxt;
2038
  SStbRowsDataContext* pStbRowsCxt;
2039
} SRowsDataContext;
2040

2041
int32_t parseTbnameToken(SMsgBuf* pMsgBuf, char* tname, SToken* pToken, bool* pFoundCtbName) {
4✔
2042
  *pFoundCtbName = false;
4!
2043

2044
  if (isNullValue(TSDB_DATA_TYPE_BINARY, pToken)) {
4!
2045
    return buildInvalidOperationMsg(pMsgBuf, "tbname can not be null value");
×
2046
  }
2047

2048
  if (pToken->n > 0) {
4!
2049
    if (pToken->n <= TSDB_TABLE_NAME_LEN - 1) {
4!
2050
      for (int i = 0; i < pToken->n; ++i) {
16✔
2051
        if (pToken->z[i] == '.') {
12!
2052
          return buildInvalidOperationMsg(pMsgBuf, "tbname can not contain '.'");
×
2053
        } else {
2054
          tname[i] = pToken->z[i];
12✔
2055
        }
2056
      }
2057
      tname[pToken->n] = '\0';
4✔
2058
      *pFoundCtbName = true;
4✔
2059
    } else {
2060
      return buildInvalidOperationMsg(pMsgBuf, "tbname is too long");
×
2061
    }
2062
  } else {
2063
    return buildInvalidOperationMsg(pMsgBuf, "tbname can not be empty");
×
2064
  }
2065
  return TSDB_CODE_SUCCESS;
4✔
2066
}
2067

2068
static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
35✔
2069
                                          SStbRowsDataContext* pStbRowsCxt, bool ctbFirst, const SToken* tagTokens,
2070
                                          SSchema* const* tagSchemas, int numOfTagTokens) {
2071
  int32_t code = TSDB_CODE_SUCCESS;
35✔
2072
  uint8_t precision = pStmt->pTableMeta->tableInfo.precision;
35✔
2073

2074
  if (code == TSDB_CODE_SUCCESS && ctbFirst) {
35!
2075
    for (int32_t i = 0; code == TSDB_CODE_SUCCESS && i < numOfTagTokens; ++i) {
39!
2076
      SToken*  pTagToken = (SToken*)(tagTokens + i);
4✔
2077
      SSchema* pTagSchema = tagSchemas[i];
4✔
2078
      code = checkAndTrimValue(pTagToken, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
4✔
2079
      if (code == TSDB_CODE_SUCCESS && TK_NK_VARIABLE == pTagToken->type) {
4!
2080
        code = buildInvalidOperationMsg(&pCxt->msg, "not expected tag");
×
2081
      }
2082

2083
      if (code == TSDB_CODE_SUCCESS) {
4!
2084
        code = parseTagValue(&pCxt->msg, NULL, precision, pTagSchema, pTagToken, pStbRowsCxt->aTagNames,
4✔
2085
                             pStbRowsCxt->aTagVals, &pStbRowsCxt->pTag, pCxt->pComCxt->timezone,
4✔
2086
                             pCxt->pComCxt->charsetCxt);
4✔
2087
      }
2088
    }
2089
    if (code == TSDB_CODE_SUCCESS && !pStbRowsCxt->isJsonTag) {
35!
2090
      code = tTagNew(pStbRowsCxt->aTagVals, 1, false, &pStbRowsCxt->pTag);
35✔
2091
    }
2092
  }
2093

2094
  if (code == TSDB_CODE_SUCCESS && pStbRowsCxt->pTagCond) {
35!
2095
    code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond);
×
2096
  }
2097
  return code;
35✔
2098
}
2099

2100
static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
49✔
2101
                                 SStbRowsDataContext* pStbRowsCxt, SToken* pToken, const SBoundColInfo* pCols,
2102
                                 const SSchema* pSchemas, const SSchemaExt* pExtSchemas, SToken* tagTokens,
2103
                                 SSchema** tagSchemas, int* pNumOfTagTokens, bool* bFoundTbName, bool* setCtbName,
2104
                                 SBoundColInfo* ctbCols) {
2105
  int32_t code = TSDB_CODE_SUCCESS;
49✔
2106
  SArray* pTagNames = pStbRowsCxt->aTagNames;
49✔
2107
  SArray* pTagVals = pStbRowsCxt->aTagVals;
49✔
2108
  bool    canParseTagsAfter = !pStbRowsCxt->pTagCond && !pStbRowsCxt->hasTimestampTag;
49!
2109
  int32_t numOfCols = getNumOfColumns(pStbRowsCxt->pStbMeta);
49✔
2110
  int32_t numOfTags = getNumOfTags(pStbRowsCxt->pStbMeta);
49✔
2111
  int32_t tbnameIdx = getTbnameSchemaIndex(pStbRowsCxt->pStbMeta);
49✔
2112
  uint8_t precision = getTableInfo(pStbRowsCxt->pStbMeta).precision;
49✔
2113
  int     tag_index = 0;
49✔
2114
  int     col_index = 0;
49✔
2115
  for (int i = 0; i < pCols->numOfBound && (code) == TSDB_CODE_SUCCESS; ++i) {
297✔
2116
    const char* pTmpSql = *ppSql;
249✔
2117
    bool        ignoreComma = false;
249✔
2118
    NEXT_TOKEN_WITH_PREV_EXT(*ppSql, *pToken, &ignoreComma);
249✔
2119

2120
    if (ignoreComma) {
249!
2121
      code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pTmpSql);
×
2122
      break;
×
2123
    }
2124

2125
    if (TK_NK_RP == pToken->type) {
249!
2126
      code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
×
2127
      break;
×
2128
    }
2129

2130
    // cols tags tbname
2131
    if (TK_NK_QUESTION == pToken->type) {
249✔
2132
      if (pCxt->pComCxt->stmtBindVersion != 2) {
226✔
2133
        return buildInvalidOperationMsg(&pCxt->msg,
1✔
2134
                                        "insert into stb(...tbname...)values(...,?,...) only support in stmt2");
2135
      }
2136
      if (pCols->pColIndex[i] == tbnameIdx) {
225✔
2137
        *bFoundTbName = true;
43✔
2138
        char* tbName = NULL;
43✔
2139
        if ((*pCxt->pComCxt->pStmtCb->getTbNameFn)(pCxt->pComCxt->pStmtCb->pStmt, &tbName) == TSDB_CODE_SUCCESS) {
43✔
2140
          tstrncpy(pStbRowsCxt->ctbName.tname, tbName, sizeof(pStbRowsCxt->ctbName.tname));
31✔
2141
          tstrncpy(pStmt->usingTableName.tname, pStmt->targetTableName.tname, sizeof(pStmt->usingTableName.tname));
31✔
2142
          tstrncpy(pStmt->targetTableName.tname, tbName, sizeof(pStmt->targetTableName.tname));
31✔
2143
          tstrncpy(pStmt->usingTableName.dbname, pStmt->targetTableName.dbname, sizeof(pStmt->usingTableName.dbname));
31✔
2144
          pStmt->usingTableName.type = 1;
31✔
2145
          pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE;  // set the table type to child table for parse cache
31✔
2146
          *setCtbName = true;
31✔
2147
        }
2148
      } else if (pCols->pColIndex[i] < numOfCols) {
182✔
2149
        // bind column
2150
        if (ctbCols->pColIndex == NULL) {
103✔
2151
          ctbCols->pColIndex = taosMemoryCalloc(numOfCols, sizeof(int16_t));
43!
2152
          if (NULL == ctbCols->pColIndex) {
43!
2153
            return terrno;
×
2154
          }
2155
        }
2156
        ctbCols->pColIndex[col_index++] = pCols->pColIndex[i];
103✔
2157
        ctbCols->numOfBound++;
103✔
2158
        ctbCols->numOfCols++;
103✔
2159

2160
      } else if (pCols->pColIndex[i] < tbnameIdx) {
79!
2161
        if (pCxt->tags.pColIndex == NULL) {
79✔
2162
          pCxt->tags.pColIndex = taosMemoryCalloc(numOfTags, sizeof(int16_t));
29!
2163
          if (NULL == pCxt->tags.pColIndex) {
29!
2164
            return terrno;
×
2165
          }
2166
        }
2167
        if (!(tag_index < numOfTags)) {
79!
2168
          return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfTags");
×
2169
        }
2170
        pStmt->usingTableProcessing = true;
79✔
2171
        pCxt->tags.pColIndex[tag_index++] = pCols->pColIndex[i] - numOfCols;
79✔
2172
        pCxt->tags.mixTagsCols = true;
79✔
2173
        pCxt->tags.numOfBound++;
79✔
2174
        pCxt->tags.numOfCols++;
79✔
2175
      } else {
2176
        return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfBound");
×
2177
      }
2178
    } else {
2179
      if (pCxt->pComCxt->stmtBindVersion == 2 && pCxt->pComCxt->pStmtCb != NULL) {
23!
2180
        if (pCols->pColIndex[i] < numOfCols) {
11✔
2181
          const SSchema*    pSchema = &pSchemas[pCols->pColIndex[i]];
5✔
2182
          const SSchemaExt* pExtSchema = pExtSchemas + pCols->pColIndex[i];
5✔
2183
          SColVal*          pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]);
5✔
2184
          code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)pSchema, pExtSchema, precision, pVal);
5✔
2185
          if (TK_NK_VARIABLE == pToken->type) {
5!
2186
            code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
×
2187
          }
2188

2189
          if (ctbCols->pColIndex == NULL) {
5!
2190
            ctbCols->pColIndex = taosMemoryCalloc(numOfCols, sizeof(int16_t));
×
2191
            if (NULL == ctbCols->pColIndex) {
×
2192
              return terrno;
×
2193
            }
2194
          }
2195
          ctbCols->pColIndex[col_index++] = pCols->pColIndex[i];
5✔
2196
          ctbCols->numOfBound++;
5✔
2197
          ctbCols->numOfCols++;
5✔
2198

2199
          if (code == TSDB_CODE_SUCCESS && pCxt->pParsedValues == NULL) {
5!
2200
            pCxt->pParsedValues = taosArrayInit(16, sizeof(SColVal));
3✔
2201
            if (pCxt->pParsedValues == NULL) {
3!
2202
              return terrno;
×
2203
            }
2204
          }
2205

2206
          if (code == TSDB_CODE_SUCCESS) {
5!
2207
            SColVal clonedVal = *pVal;
5✔
2208
            if (COL_VAL_IS_VALUE(&clonedVal) && IS_VAR_DATA_TYPE(clonedVal.value.type)) {
5!
2209
              clonedVal.value.pData = taosMemoryMalloc(clonedVal.value.nData);
1!
2210
              if (NULL == clonedVal.value.pData) {
1!
2211
                code = terrno;
×
2212
                break;
×
2213
              }
2214
              memcpy(clonedVal.value.pData, pVal->value.pData, clonedVal.value.nData);
1✔
2215
            }
2216

2217
            clonedVal.cid = pSchema->colId;
5✔
2218
            if (taosArrayPush(pCxt->pParsedValues, &clonedVal) == NULL) {
10!
2219
              code = terrno;
×
2220
            }
2221
          }
2222
        } else if (pCols->pColIndex[i] < tbnameIdx) {
6!
2223
          if (pCxt->tags.parseredTags == NULL) {
6✔
2224
            pCxt->tags.parseredTags = taosMemoryCalloc(1, sizeof(STagsInfo));
4!
2225
            if (pCxt->tags.parseredTags == NULL) {
4!
2226
              code = terrno;
×
2227
            } else {
2228
              pCxt->tags.parseredTags->STagNames = taosArrayInit(numOfTags, sizeof(STagVal));
4✔
2229
              pCxt->tags.parseredTags->pTagVals = taosArrayInit(numOfTags, sizeof(STagVal));
4✔
2230
              pCxt->tags.parseredTags->pTagIndex = taosMemoryCalloc(numOfTags, sizeof(uint8_t));
4!
2231
              pCxt->tags.parseredTags->numOfTags = 0;
4✔
2232

2233
              if (pCxt->tags.parseredTags->STagNames == NULL || pCxt->tags.parseredTags->pTagVals == NULL ||
4!
2234
                  pCxt->tags.parseredTags->pTagIndex == NULL) {
4!
2235
                return terrno;
×
2236
              }
2237
            }
2238
          }
2239

2240
          const SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
6✔
2241
          // if (canParseTagsAfter) {
2242
          //   tagTokens[(*pNumOfTagTokens)] = *pToken;
2243
          //   tagSchemas[(*pNumOfTagTokens)] = (SSchema*)pTagSchema;
2244
          //   ++(*pNumOfTagTokens);
2245
          // } else {
2246
          code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
6✔
2247
          if (code == TSDB_CODE_SUCCESS && TK_NK_VARIABLE == pToken->type) {
6!
2248
            code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
×
2249
          }
2250
            if (code == TSDB_CODE_SUCCESS) {
6✔
2251
              code = parseTagValue(&pCxt->msg, ppSql, precision, (SSchema*)pTagSchema, pToken,
5✔
2252
                                   pCxt->tags.parseredTags->STagNames, pCxt->tags.parseredTags->pTagVals,
5✔
2253
                                   &pStbRowsCxt->pTag, pCxt->pComCxt->timezone, pCxt->pComCxt->charsetCxt);
5✔
2254
            }
2255

2256
            pCxt->tags.parseredTags->pTagIndex[pCxt->tags.parseredTags->numOfTags++] = pCols->pColIndex[i] - numOfCols;
6✔
2257

2258
            if (pCxt->tags.pColIndex == NULL) {
6✔
2259
              pCxt->tags.pColIndex = taosMemoryCalloc(numOfTags, sizeof(int16_t));
4!
2260
              if (NULL == pCxt->tags.pColIndex) {
4!
2261
                return terrno;
×
2262
              }
2263
            }
2264
            if (!(tag_index < numOfTags)) {
6!
2265
              return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfTags");
×
2266
            }
2267
            pStmt->usingTableProcessing = true;
6✔
2268
            pCxt->tags.pColIndex[tag_index++] = pCols->pColIndex[i] - numOfCols;
6✔
2269
            pCxt->tags.mixTagsCols = true;
6✔
2270
            pCxt->tags.numOfBound++;
6✔
2271
            pCxt->tags.numOfCols++;
6✔
2272
            // }
2273
        } else if (pCols->pColIndex[i] == tbnameIdx) {
×
2274
          return buildInvalidOperationMsg(&pCxt->msg, "STMT tbname in bound cols should not be a fixed value");
×
2275
        }
2276
      } else {
2277
        if (pCols->pColIndex[i] < numOfCols) {
12✔
2278
          const SSchema*    pSchema = &pSchemas[pCols->pColIndex[i]];
4✔
2279
          const SSchemaExt* pExtSchema = pExtSchemas + pCols->pColIndex[i];
4✔
2280
          SColVal*          pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]);
4✔
2281
          code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)pSchema, pExtSchema, precision, pVal);
4✔
2282
          if (TK_NK_VARIABLE == pToken->type) {
4!
2283
            code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
×
2284
          }
2285
        } else if (pCols->pColIndex[i] < tbnameIdx) {
8✔
2286
          const SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
4✔
2287
          if (canParseTagsAfter) {
4!
2288
            tagTokens[(*pNumOfTagTokens)] = *pToken;
4✔
2289
            tagSchemas[(*pNumOfTagTokens)] = (SSchema*)pTagSchema;
4✔
2290
            ++(*pNumOfTagTokens);
4✔
2291
          } else {
2292
            code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
×
2293
            if (code == TSDB_CODE_SUCCESS && TK_NK_VARIABLE == pToken->type) {
×
2294
              code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
×
2295
            }
2296
            if (code == TSDB_CODE_SUCCESS) {
×
2297
              code = parseTagValue(&pCxt->msg, ppSql, precision, (SSchema*)pTagSchema, pToken, pTagNames, pTagVals,
×
2298
                                   &pStbRowsCxt->pTag, pCxt->pComCxt->timezone, pCxt->pComCxt->charsetCxt);
×
2299
            }
2300
          }
2301
        } else if (pCols->pColIndex[i] == tbnameIdx) {
4!
2302
          code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, TSDB_DATA_TYPE_BINARY);
4✔
2303
          if (TK_NK_VARIABLE == pToken->type) {
4!
2304
            code = buildInvalidOperationMsg(&pCxt->msg, "not expected tbname");
×
2305
          }
2306

2307
          if (code == TSDB_CODE_SUCCESS) {
4!
2308
            code = parseTbnameToken(&pCxt->msg, pStbRowsCxt->ctbName.tname, pToken, bFoundTbName);
4✔
2309
          }
2310
        }
2311
      }
2312
    }
2313

2314
    if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) {
248✔
2315
      NEXT_VALID_TOKEN(*ppSql, *pToken);
200!
2316
      if (TK_NK_COMMA != pToken->type) {
200!
2317
        code = buildSyntaxErrMsg(&pCxt->msg, ", expected", pToken->z);
×
2318
      }
2319
    }
2320
  }
2321

2322
  return code;
48✔
2323
}
2324

2325
static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
49✔
2326
                               SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken, bool* pCtbFirst,
2327
                               bool* setCtbName, SBoundColInfo* ctbCols) {
2328
  SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
49✔
2329
  SSchema*       pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
49✔
2330
  SSchemaExt*    pExtSchemas = getTableColumnExtSchema(pStbRowsCxt->pStbMeta);
49✔
2331

2332
  bool        bFoundTbName = false;
49✔
2333
  const char* pOrigSql = *ppSql;
49✔
2334

2335
  int32_t  code = TSDB_CODE_SUCCESS;
49✔
2336
  SToken   tagTokens[TSDB_MAX_TAGS] = {0};
49✔
2337
  SSchema* tagSchemas[TSDB_MAX_TAGS] = {0};
49✔
2338
  int      numOfTagTokens = 0;
49✔
2339

2340
  code = doGetStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pToken, pCols, pSchemas, pExtSchemas, tagTokens, tagSchemas,
49✔
2341
                           &numOfTagTokens, &bFoundTbName, setCtbName, ctbCols);
2342

2343
  if (code != TSDB_CODE_SUCCESS) {
49✔
2344
    return code;
2✔
2345
  }
2346

2347
  if (!bFoundTbName) {
47!
2348
    code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql);
×
2349
  }
2350

2351
  bool ctbFirst = true;
47✔
2352
  char ctbFName[TSDB_TABLE_FNAME_LEN];
2353
  if (code == TSDB_CODE_SUCCESS) {
47!
2354
    code = tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
47✔
2355
  }
2356
  if (TSDB_CODE_SUCCESS == code) {
47✔
2357
    STableMeta** pCtbMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName));
35✔
2358
    ctbFirst = (pCtbMeta == NULL);
35✔
2359
    if (!ctbFirst) {
35!
2360
      pStbRowsCxt->pCtbMeta->uid = (*pCtbMeta)->uid;
×
2361
      pStbRowsCxt->pCtbMeta->vgId = (*pCtbMeta)->vgId;
×
2362
    }
2363
    *pCtbFirst = ctbFirst;
35✔
2364
  }
2365

2366
  if (code == TSDB_CODE_SUCCESS) {
47✔
2367
    code = processCtbTagsAfterCtbName(pCxt, pStmt, pStbRowsCxt, ctbFirst, tagTokens, tagSchemas, numOfTagTokens);
35✔
2368
  }
2369

2370
  if (code == TSDB_CODE_SUCCESS) {
47✔
2371
    *pGotRow = true;
35✔
2372
  }
2373
  return code;
47✔
2374
}
2375

2376
static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
35✔
2377
                                                SStbRowsDataContext* pStbRowsCxt) {
2378
  int32_t code = TSDB_CODE_SUCCESS;
35✔
2379

2380
  pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
35!
2381
  if (pStbRowsCxt->pCreateCtbReq == NULL) {
35!
2382
    code = terrno;
×
2383
  }
2384
  if (code == TSDB_CODE_SUCCESS) {
35!
2385
    code = insBuildCreateTbReq(pStbRowsCxt->pCreateCtbReq, pStbRowsCxt->ctbName.tname, pStbRowsCxt->pTag,
35✔
2386
                               pStbRowsCxt->pStbMeta->uid, pStbRowsCxt->stbName.tname, pStbRowsCxt->aTagNames,
35✔
2387
                               getNumOfTags(pStbRowsCxt->pStbMeta), TSDB_DEFAULT_TABLE_TTL);
35✔
2388
    pStbRowsCxt->pTag = NULL;
35✔
2389
  }
2390

2391
  if (code == TSDB_CODE_SUCCESS) {
35!
2392
    char ctbFName[TSDB_TABLE_FNAME_LEN];
2393
    code = tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
35✔
2394
    SVgroupInfo      vg;
2395
    SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter,
35✔
2396
                             .requestId = pCxt->pComCxt->requestId,
35✔
2397
                             .requestObjRefId = pCxt->pComCxt->requestRid,
35✔
2398
                             .mgmtEps = pCxt->pComCxt->mgmtEpSet};
35✔
2399
    if (TSDB_CODE_SUCCESS == code) {
35!
2400
      code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStbRowsCxt->ctbName, &vg);
35✔
2401
    }
2402
    if (code == TSDB_CODE_SUCCESS) {
35!
2403
      code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg));
35✔
2404
    }
2405
    STableMeta* pBackup = NULL;
35✔
2406
    if (TSDB_CODE_SUCCESS == code) {
35!
2407
      pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1;
35✔
2408
      pStbRowsCxt->pCtbMeta->vgId = vg.vgId;
35✔
2409

2410
      code = cloneTableMeta(pStbRowsCxt->pCtbMeta, &pBackup);
35✔
2411
    }
2412
    if (TSDB_CODE_SUCCESS == code) {
35!
2413
      code = taosHashPut(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName), &pBackup, POINTER_BYTES);
35✔
2414
    }
2415
    if (TSDB_CODE_SUCCESS == code) {
35!
2416
      code = collectUseTable(&pStbRowsCxt->ctbName, pStmt->pTableNameHashObj);
35✔
2417
    }
2418
  }
2419
  return code;
35✔
2420
}
2421

2422
static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
86✔
2423
  if (pStbRowsCxt == NULL) return;
86!
2424

2425
  taosArrayClear(pStbRowsCxt->aTagNames);
86✔
2426
  for (int i = 0; i < taosArrayGetSize(pStbRowsCxt->aTagVals); ++i) {
90✔
2427
    STagVal* p = (STagVal*)taosArrayGet(pStbRowsCxt->aTagVals, i);
4✔
2428
    if (IS_VAR_DATA_TYPE(p->type)) {
4!
2429
      taosMemoryFreeClear(p->pData);
×
2430
    }
2431
  }
2432
  taosArrayClear(pStbRowsCxt->aTagVals);
86✔
2433

2434
  clearColValArray(pStbRowsCxt->aColVals);
86✔
2435

2436
  tTagFree(pStbRowsCxt->pTag);
86✔
2437
  pStbRowsCxt->pTag = NULL;
86✔
2438
  tdDestroySVCreateTbReq(pStbRowsCxt->pCreateCtbReq);
86!
2439
  taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq);
86!
2440

2441
  if (pStbRowsCxt->boundColsInfo.parseredTags != NULL) {
86!
2442
    if (pStbRowsCxt->boundColsInfo.parseredTags->STagNames != NULL) {
×
2443
      taosArrayDestroy(pStbRowsCxt->boundColsInfo.parseredTags->STagNames);
×
2444
    }
2445
    if (pStbRowsCxt->boundColsInfo.parseredTags->pTagVals != NULL) {
×
2446
      taosArrayDestroy(pStbRowsCxt->boundColsInfo.parseredTags->pTagVals);
×
2447
    }
2448
    taosMemoryFreeClear(pStbRowsCxt->boundColsInfo.parseredTags);
×
2449
  }
2450
}
2451

2452
static void clearInsertParseContext(SInsertParseContext* pCxt) {
81,411✔
2453
  if (pCxt == NULL) return;
81,411!
2454

2455
  if (pCxt->pParsedValues != NULL) {
81,411✔
2456
    taosArrayDestroy(pCxt->pParsedValues);
8✔
2457
    pCxt->pParsedValues = NULL;
8✔
2458
  }
2459
}
2460

2461
static int32_t parseStbBoundInfo(SVnodeModifyOpStmt* pStmt, SStbRowsDataContext* pStbRowsCxt,
13✔
2462
                                 STableDataCxt** ppTableDataCxt) {
2463
  char    tbFName[TSDB_TABLE_FNAME_LEN];
2464
  int32_t code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
13✔
2465
  if (TSDB_CODE_SUCCESS != code) {
13!
2466
    return code;
×
2467
  }
2468
  if (pStmt->usingTableProcessing) {
13✔
2469
    pStmt->pTableMeta->uid = 0;
10✔
2470
  }
2471

2472
  code = insGetTableDataCxt(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), pStmt->pTableMeta,
13✔
2473
                            &pStmt->pCreateTblReq, ppTableDataCxt, false, true);
2474
  if (code != TSDB_CODE_SUCCESS) {
13!
2475
    return code;
×
2476
  }
2477

2478
  insDestroyBoundColInfo(&((*ppTableDataCxt)->boundColsInfo));
13✔
2479
  (*ppTableDataCxt)->boundColsInfo = pStbRowsCxt->boundColsInfo;
13✔
2480

2481
  (*ppTableDataCxt)->boundColsInfo.pColIndex = taosMemoryCalloc(pStbRowsCxt->boundColsInfo.numOfBound, sizeof(int16_t));
13!
2482
  if (NULL == (*ppTableDataCxt)->boundColsInfo.pColIndex) {
13!
2483
    return terrno;
×
2484
  }
2485
  (void)memcpy((*ppTableDataCxt)->boundColsInfo.pColIndex, pStbRowsCxt->boundColsInfo.pColIndex,
13✔
2486
               sizeof(int16_t) * pStmt->pStbRowsCxt->boundColsInfo.numOfBound);
13✔
2487
  return TSDB_CODE_SUCCESS;
13✔
2488
}
2489

2490
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
49✔
2491
                              SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken,
2492
                              STableDataCxt** ppTableDataCxt) {
2493
  bool          bFirstTable = false;
49✔
2494
  bool          setCtbName = false;
49✔
2495
  SBoundColInfo ctbCols = {0};
49✔
2496
  int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken, &bFirstTable, &setCtbName, &ctbCols);
49✔
2497

2498
  if (!setCtbName && pCxt->pComCxt->stmtBindVersion == 2) {
49✔
2499
    taosMemoryFreeClear(ctbCols.pColIndex);
13!
2500
    return parseStbBoundInfo(pStmt, pStbRowsCxt, ppTableDataCxt);
13✔
2501
  }
2502

2503
  if (code != TSDB_CODE_SUCCESS || !*pGotRow) {
36!
2504
    return code;
1✔
2505
  }
2506

2507
  if (code == TSDB_CODE_SUCCESS && bFirstTable) {
35!
2508
    code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt);
35✔
2509
  }
2510
  if (code == TSDB_CODE_SUCCESS) {
35!
2511
    if (pCxt->pComCxt->stmtBindVersion == 2) {
35✔
2512
      char ctbFName[TSDB_TABLE_FNAME_LEN];
2513
      code = tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
31✔
2514
      if (code != TSDB_CODE_SUCCESS) {
31!
2515
        return code;
×
2516
      }
2517
      code = insGetTableDataCxt(pStmt->pTableBlockHashObj, ctbFName, strlen(ctbFName), pStbRowsCxt->pCtbMeta,
31✔
2518
                                &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, true, true);
2519
    } else {
2520
      code =
2521
          insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
4✔
2522
                             pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true);
2523
    }
2524
  }
2525
  if (code == TSDB_CODE_SUCCESS) {
35!
2526
    if (pCxt->pComCxt->stmtBindVersion == 2) {
35✔
2527
      int32_t tbnameIdx = getTbnameSchemaIndex(pStbRowsCxt->pStbMeta);
31✔
2528
      code = initTableColSubmitDataWithBoundInfo(*ppTableDataCxt, ctbCols);
31✔
2529
    } else {
2530
      code = initTableColSubmitData(*ppTableDataCxt);
4✔
2531
    }
2532
  }
2533
  if (code == TSDB_CODE_SUCCESS && pCxt->pComCxt->stmtBindVersion == 0) {
35!
2534
    SRow**            pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1);
4✔
2535
    SRowBuildScanInfo sinfo = {0};
4✔
2536
    code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow, &sinfo);
4✔
2537
    if (TSDB_CODE_SUCCESS == code) {
4!
2538
      SRowKey key;
2539
      tRowGetKey(*pRow, &key);
8!
2540
      insCheckTableDataOrder(*ppTableDataCxt, &key);
4✔
2541
    }
2542
  }
2543

2544
  if (code == TSDB_CODE_SUCCESS) {
35!
2545
    *pGotRow = true;
35✔
2546
  }
2547

2548
  clearStbRowsDataContext(pStbRowsCxt);
35✔
2549

2550
  return code;
35✔
2551
}
2552

2553
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
42,537,939✔
2554
                       SToken* pToken) {
2555
  SBoundColInfo*    pCols = &pTableCxt->boundColsInfo;
42,537,939✔
2556
  SSchema*          pSchemas = getTableColumnSchema(pTableCxt->pMeta);
42,537,939✔
2557
  const SSchemaExt* pExtSchemas = getTableColumnExtSchema(pTableCxt->pMeta);
42,366,217✔
2558

2559
  int32_t code = TSDB_CODE_SUCCESS;
42,253,990✔
2560
  // 1. set the parsed value from sql string
2561
  for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) {
469,235,266!
2562
    const char* pOrigSql = *pSql;
429,035,105✔
2563
    bool        ignoreComma = false;
429,035,105✔
2564
    NEXT_TOKEN_WITH_PREV_EXT(*pSql, *pToken, &ignoreComma);
429,035,105✔
2565
    if (ignoreComma) {
459,598,036!
2566
      code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pOrigSql);
×
2567
      break;
×
2568
    }
2569

2570
    SSchema*          pSchema = &pSchemas[pCols->pColIndex[i]];
459,598,036✔
2571
    const SSchemaExt* pExtSchema = pExtSchemas + pCols->pColIndex[i];
459,598,036✔
2572
    SColVal*          pVal = taosArrayGet(pTableCxt->pValues, pCols->pColIndex[i]);
459,598,036✔
2573

2574
    if (pToken->type == TK_NK_QUESTION) {
450,502,541✔
2575
      if (pCxt->pComCxt->stmtBindVersion == 0) {
5,315,163!
2576
        code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z);
×
2577
        break;
×
2578
      }
2579
    } else {
2580
      if (TK_NK_RP == pToken->type) {
445,187,378!
2581
        code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
×
2582
        break;
×
2583
      }
2584

2585
      if (TSDB_CODE_SUCCESS == code) {
445,187,378!
2586
        code = parseValueToken(pCxt, pSql, pToken, pSchema, pExtSchema, getTableInfo(pTableCxt->pMeta).precision, pVal);
445,490,557✔
2587

2588
        if (TSDB_CODE_SUCCESS == code && NULL != pCxt->pComCxt->pStmtCb) {
437,741,471!
2589
          if (NULL == pCxt->pParsedValues) {
8✔
2590
            pCxt->pParsedValues = taosArrayInit(16, sizeof(SColVal));
5✔
2591
            if (NULL == pCxt->pParsedValues) {
5!
2592
              code = terrno;
×
2593
              break;
×
2594
            }
2595
          }
2596

2597
          SColVal clonedVal = *pVal;
8✔
2598
          if (COL_VAL_IS_VALUE(&clonedVal) && IS_VAR_DATA_TYPE(clonedVal.value.type)) {
8!
2599
            clonedVal.value.pData = taosMemoryMalloc(clonedVal.value.nData);
2!
2600
            if (NULL == clonedVal.value.pData) {
2!
2601
              code = terrno;
×
2602
              break;
×
2603
            }
2604
            memcpy(clonedVal.value.pData, pVal->value.pData, clonedVal.value.nData);
2✔
2605
          }
2606

2607
          if (taosArrayPush(pCxt->pParsedValues, &clonedVal) == NULL) {
16!
2608
            code = terrno;
×
2609
            break;
×
2610
          }
2611
        }
2612
      }
2613
    }
2614

2615
    if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) {
442,753,455✔
2616
      NEXT_VALID_TOKEN(*pSql, *pToken);
399,466,835!
2617
      if (TK_NK_COMMA != pToken->type) {
391,208,020!
2618
        code = buildSyntaxErrMsg(&pCxt->msg, ", expected", pToken->z);
×
2619
      }
2620
    }
2621
  }
2622

2623
  if (TSDB_CODE_SUCCESS == code && pCxt->pComCxt->stmtBindVersion == 0) {
40,200,161!
2624
    SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
42,337,589✔
2625
    if (pTableCxt->hasBlob) {
42,229,979!
2626
      SRowBuildScanInfo sinfo = {.hasBlob = 1, .scanType = ROW_BUILD_UPDATE};
×
2627
      if (pTableCxt->pData->pBlobSet == NULL) {
×
2628
        code = tBlobSetCreate(1024, 0, &pTableCxt->pData->pBlobSet);
×
2629
        TAOS_CHECK_RETURN(code);
×
2630
      }
2631
      code = tRowBuildWithBlob(pTableCxt->pValues, pTableCxt->pSchema, pRow, pTableCxt->pData->pBlobSet, &sinfo);
×
2632
    } else {
2633
      SRowBuildScanInfo sinfo = {0};
42,229,979✔
2634
      code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow, &sinfo);
42,229,979✔
2635
    }
2636
    if (TSDB_CODE_SUCCESS == code) {
42,716,371✔
2637
      SRowKey key;
2638
      tRowGetKey(*pRow, &key);
85,412,900✔
2639
      insCheckTableDataOrder(pTableCxt, &key);
42,706,450✔
2640
    }
2641
  }
2642

2643
  if (TSDB_CODE_SUCCESS == code && pCxt->pComCxt->stmtBindVersion == 0) {
40,531,812!
2644
    *pGotRow = true;
42,670,828✔
2645
  }
2646

2647
  clearColValArray(pTableCxt->pValues);
40,531,812✔
2648

2649
  return code;
42,010,874✔
2650
}
2651

2652
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
2653
static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
84,037✔
2654
                           int32_t* pNumOfRows, SToken* pToken) {
2655
  int32_t code = TSDB_CODE_SUCCESS;
84,037✔
2656

2657
  (*pNumOfRows) = 0;
84,037✔
2658
  while (TSDB_CODE_SUCCESS == code) {
42,004,770✔
2659
    int32_t index = 0;
41,960,866✔
2660
    NEXT_TOKEN_KEEP_SQL(pStmt->pSql, *pToken, index);
41,960,866✔
2661
    if (TK_NK_LP != pToken->type) {
42,745,948✔
2662
      break;
84,023✔
2663
    }
2664
    pStmt->pSql += index;
42,661,925✔
2665

2666
    bool gotRow = false;
42,661,925✔
2667
    if (TSDB_CODE_SUCCESS == code) {
42,661,925!
2668
      if (!pStmt->stbSyntax) {
42,675,136!
2669
        code = parseOneRow(pCxt, &pStmt->pSql, rowsDataCxt.pTableDataCxt, &gotRow, pToken);
42,680,883✔
2670
      } else {
2671
        // foreach subtable
2672
        STableDataCxt* pTableDataCxt = NULL;
×
2673
        code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, rowsDataCxt.pStbRowsCxt, &gotRow, pToken, &pTableDataCxt);
×
2674
      }
2675
    }
2676

2677
    if (TSDB_CODE_SUCCESS == code) {
41,963,818!
2678
      NEXT_VALID_TOKEN(pStmt->pSql, *pToken);
41,989,682!
2679
      if (TK_NK_COMMA == pToken->type) {
41,905,660✔
2680
        code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
3✔
2681
      } else if (TK_NK_RP != pToken->type) {
41,905,657!
2682
        code = buildSyntaxErrMsg(&pCxt->msg, ") expected", pToken->z);
×
2683
      }
2684
    }
2685

2686
    if (TSDB_CODE_SUCCESS == code && gotRow) {
41,920,733✔
2687
      (*pNumOfRows)++;
41,907,726✔
2688
    }
2689
  }
2690

2691
  if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) &&
127,927✔
2692
      (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
11,087!
2693
    code = buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
×
2694
  }
2695
  return code;
84,076✔
2696
}
2697

2698
// VALUES (field1_value, ...) [(field1_value2, ...) ...]
2699
static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataContext,
84,043✔
2700
                                 SToken* pToken) {
2701
  int32_t numOfRows = 0;
84,043✔
2702
  int32_t code = parseValues(pCxt, pStmt, rowsDataContext, &numOfRows, pToken);
84,043✔
2703
  if (TSDB_CODE_SUCCESS == code) {
84,075✔
2704
    pStmt->totalRowsNum += numOfRows;
84,018✔
2705
    pStmt->totalTbNum += 1;
84,018✔
2706
    TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
84,018✔
2707
  }
2708
  return code;
84,075✔
2709
}
2710

2711
// Simplified CSV parser - only handles newlines within quotes
2712
static int32_t csvParserReadLine(SCsvParser* parser) {
517✔
2713
  if (!parser) {
517!
2714
    return TSDB_CODE_INVALID_PARA;
×
2715
  }
2716

2717
  size_t  lineLen = 0;
517✔
2718
  bool    inQuotes = false;
517✔
2719
  char    currentQuote = '\0';  // Track which quote character we're inside
517✔
2720
  int32_t code = TSDB_CODE_SUCCESS;
517✔
2721

2722
  while (true) {
989,040✔
2723
    // Fill buffer if needed
2724
    if (parser->bufferPos >= parser->bufferLen) {
989,557✔
2725
      code = csvParserFillBuffer(parser);
21✔
2726
      if (code != TSDB_CODE_SUCCESS) {
21!
2727
        break;
×
2728
      }
2729
      if (parser->bufferPos >= parser->bufferLen && parser->eof) {
21!
2730
        // End of file
2731
        if (lineLen == 0) {
6!
2732
          code = TSDB_CODE_TSC_QUERY_CANCELLED;  // Use this to indicate EOF
6✔
2733
        }
2734
        break;
6✔
2735
      }
2736
    }
2737

2738
    char ch = parser->buffer[parser->bufferPos++];
989,551✔
2739

2740
    // Handle quotes - support both single and double quotes
2741
    if (!inQuotes && (ch == CSV_QUOTE_SINGLE || ch == CSV_QUOTE_DOUBLE)) {
989,551✔
2742
      // Starting a quoted section
2743
      inQuotes = true;
2,530✔
2744
      currentQuote = ch;
2,530✔
2745
    } else if (inQuotes && ch == currentQuote) {
987,021✔
2746
      // Check for escaped quote (double quote)
2747
      if (parser->bufferPos < parser->bufferLen && parser->buffer[parser->bufferPos] == currentQuote) {
2,530!
2748
        // Escaped quote - keep both quotes in line for subsequent processing
2749
        // Ensure enough space for both quote characters
2750
        code = csvParserExpandLineBuffer(parser, lineLen + 2);
×
2751
        if (code != TSDB_CODE_SUCCESS) {
×
2752
          break;
×
2753
        }
2754

2755
        // Add the first quote character to the line
2756
        parser->lineBuffer[lineLen++] = ch;
×
2757

2758
        // Consume and add the second quote character
2759
        parser->bufferPos++;
×
2760
        ch = parser->buffer[parser->bufferPos - 1];  // The second quote
×
2761
        parser->lineBuffer[lineLen++] = ch;
×
2762
        continue;
×
2763
      } else {
2764
        // End of quoted section
2765
        inQuotes = false;
2,530✔
2766
        currentQuote = '\0';
2,530✔
2767
      }
2768
    }
2769

2770
    // Handle newlines
2771
    if (ch == '\n' && !inQuotes) {
989,551!
2772
      // End of line (not inside quotes)
2773
      break;
511✔
2774
    }
2775

2776
    // Skip \r characters only when outside quotes
2777
    if (ch == '\r' && !inQuotes) {
989,040!
2778
      continue;
511✔
2779
    }
2780

2781
    // Expand buffer if needed
2782
    code = csvParserExpandLineBuffer(parser, lineLen + 1);
988,529✔
2783
    if (code != TSDB_CODE_SUCCESS) {
988,529!
2784
      break;
×
2785
    }
2786

2787
    // Add character to line
2788
    parser->lineBuffer[lineLen++] = ch;
988,529✔
2789
  }
2790

2791
  if (code == TSDB_CODE_SUCCESS) {
517✔
2792
    parser->lineBuffer[lineLen] = '\0';
511✔
2793
  }
2794

2795
  return code;
517✔
2796
}
2797

2798
static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
6✔
2799
                            int32_t* pNumOfRows) {
2800
  int32_t code = TSDB_CODE_SUCCESS;
6✔
2801
  (*pNumOfRows) = 0;
6✔
2802

2803
  // Initialize or use existing CSV parser in pStmt
2804
  if (pStmt->pCsvParser == NULL) {
6!
2805
    // First time - allocate and initialize CSV parser
2806
    pStmt->pCsvParser = taosMemoryMalloc(sizeof(SCsvParser));
6!
2807
    if (!pStmt->pCsvParser) {
6!
2808
      return terrno;
×
2809
    }
2810
    code = csvParserInit(pStmt->pCsvParser, pStmt->fp);
6✔
2811
    if (code != TSDB_CODE_SUCCESS) {
6!
2812
      taosMemoryFree(pStmt->pCsvParser);
×
2813
      pStmt->pCsvParser = NULL;
×
2814
      return code;
×
2815
    }
2816
  }
2817
  // If pStmt->pCsvParser exists, we continue from where we left off
2818

2819
  bool firstLine = (pStmt->fileProcessing == false);
6✔
2820
  pStmt->fileProcessing = false;
6✔
2821

2822
  while (TSDB_CODE_SUCCESS == code) {
517!
2823
    // Read one line from CSV using the parser in pStmt
2824
    code = csvParserReadLine(pStmt->pCsvParser);
517✔
2825
    if (code == TSDB_CODE_TSC_QUERY_CANCELLED) {
517✔
2826
      // End of file
2827
      code = TSDB_CODE_SUCCESS;
6✔
2828
      break;
6✔
2829
    }
2830
    if (code != TSDB_CODE_SUCCESS) {
511!
2831
      break;
×
2832
    }
2833

2834
    // Skip empty lines
2835
    if (!pStmt->pCsvParser->lineBuffer || strlen(pStmt->pCsvParser->lineBuffer) == 0) {
511!
2836
      firstLine = false;
×
2837
      continue;
1✔
2838
    }
2839

2840
    bool   gotRow = false;
511✔
2841
    SToken token;
2842
    (void)strtolower(pStmt->pCsvParser->lineBuffer, pStmt->pCsvParser->lineBuffer);
511✔
2843
    const char* pRow = pStmt->pCsvParser->lineBuffer;
511✔
2844

2845
    if (!pStmt->stbSyntax) {
511!
2846
      code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token);
511✔
2847
    } else {
2848
      STableDataCxt* pTableDataCxt = NULL;
×
2849
      code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token, &pTableDataCxt);
×
2850
      if (code == TSDB_CODE_SUCCESS) {
×
2851
        SStbRowsDataContext* pStbRowsCxt = rowsDataCxt.pStbRowsCxt;
×
2852
        void*                pData = pTableDataCxt;
×
2853
        code = taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
×
2854
                           &pData, POINTER_BYTES);
2855
        if (TSDB_CODE_SUCCESS != code) {
×
2856
          break;
×
2857
        }
2858
      }
2859
    }
2860

2861
    if (code && firstLine) {
511!
2862
      firstLine = false;
1✔
2863
      code = 0;
1✔
2864
      continue;
1✔
2865
    }
2866

2867
    if (TSDB_CODE_SUCCESS == code && gotRow) {
510!
2868
      (*pNumOfRows)++;
510✔
2869
    }
2870

2871
    if (TSDB_CODE_SUCCESS == code && (*pNumOfRows) >= tsMaxInsertBatchRows) {
510!
2872
      // Reached batch limit - keep the parser in pStmt for next batch
2873
      pStmt->fileProcessing = true;
×
2874
      break;
×
2875
    }
2876
    firstLine = false;
510✔
2877
  }
2878

2879
  // Don't destroy the parser here - it will be cleaned up when file processing is complete
2880

2881
  parserDebug("QID:0x%" PRIx64 ", %d rows have been parsed", pCxt->pComCxt->requestId, *pNumOfRows);
6!
2882

2883
  if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) && 0 == pStmt->totalRowsNum &&
6!
2884
      (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) && !pStmt->fileProcessing) {
×
2885
    code = buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
×
2886
  }
2887
  return code;
6✔
2888
}
2889

2890
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
6✔
2891
                                     SRowsDataContext rowsDataCxt) {
2892
  // init only for file
2893
  if (NULL == pStmt->pTableCxtHashObj) {
6!
2894
    pStmt->pTableCxtHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
6✔
2895
    if (!pStmt->pTableCxtHashObj) {
6!
2896
      return terrno;
×
2897
    }
2898
  }
2899
  int32_t numOfRows = 0;
6✔
2900
  int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
6✔
2901
  if (TSDB_CODE_SUCCESS == code) {
6!
2902
    pStmt->totalRowsNum += numOfRows;
6✔
2903
    pStmt->totalTbNum += 1;
6✔
2904
    TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT);
6✔
2905
    if (rowsDataCxt.pTableDataCxt && rowsDataCxt.pTableDataCxt->pData) {
6!
2906
      rowsDataCxt.pTableDataCxt->pData->flags |= SUBMIT_REQ_FROM_FILE;
6✔
2907
    }
2908
    if (!pStmt->fileProcessing) {
6!
2909
      // File processing is complete, clean up saved CSV parser
2910
      destroySavedCsvParser(pStmt);
6✔
2911
      code = taosCloseFile(&pStmt->fp);
6✔
2912
      if (TSDB_CODE_SUCCESS != code) {
6!
2913
        parserWarn("QID:0x%" PRIx64 ", failed to close file.", pCxt->pComCxt->requestId);
×
2914
      }
2915
    } else {
2916
      parserDebug("QID:0x%" PRIx64 ", insert from csv. File is too large, do it in batches.", pCxt->pComCxt->requestId);
×
2917
    }
2918
    if (pStmt->insertType != TSDB_QUERY_TYPE_FILE_INSERT) {
6!
2919
      destroySavedCsvParser(pStmt);
×
2920
      return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is exclusive", NULL);
×
2921
    }
2922
  } else {
2923
    // On error, also clean up saved CSV parser
2924
    destroySavedCsvParser(pStmt);
×
2925
    return buildInvalidOperationMsg(&pCxt->msg, tstrerror(code));
×
2926
  }
2927

2928
  // just record pTableCxt whose data come from file
2929
  if (!pStmt->stbSyntax && numOfRows > 0) {
6!
2930
    void* pData = rowsDataCxt.pTableDataCxt;
6✔
2931
    code = taosHashPut(pStmt->pTableCxtHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid), &pData,
6✔
2932
                       POINTER_BYTES);
2933
  }
2934

2935
  return code;
6✔
2936
}
2937

2938
static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pFilePath,
6✔
2939
                                 SRowsDataContext rowsDataCxt) {
2940
  char filePathStr[PATH_MAX + 16] = {0};
6✔
2941
  if (TK_NK_STRING == pFilePath->type) {
6✔
2942
    (void)trimString(pFilePath->z, pFilePath->n, filePathStr, sizeof(filePathStr));
5✔
2943
    if (strlen(filePathStr) >= PATH_MAX) {
5!
2944
      return buildSyntaxErrMsg(&pCxt->msg, "file path is too long, max length is 4096", pFilePath->z);
×
2945
    }
2946
  } else {
2947
    if (pFilePath->n >= PATH_MAX) {
1!
2948
      return buildSyntaxErrMsg(&pCxt->msg, "file path is too long, max length is 4096", pFilePath->z);
×
2949
    }
2950
    strncpy(filePathStr, pFilePath->z, pFilePath->n);
1✔
2951
  }
2952
  pStmt->fp = taosOpenFile(filePathStr, TD_FILE_READ);
6✔
2953
  if (NULL == pStmt->fp) {
6!
2954
    return terrno;
×
2955
  }
2956

2957
  return parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt);
6✔
2958
}
2959

2960
static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
6✔
2961
                               SToken* pToken) {
2962
  if (tsUseAdapter) {
6!
2963
    return buildInvalidOperationMsg(&pCxt->msg, "proxy mode does not support csv loading");
×
2964
  }
2965

2966
  NEXT_TOKEN(pStmt->pSql, *pToken);
6✔
2967
  if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) {
6!
2968
    return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z);
×
2969
  }
2970
  return parseDataFromFile(pCxt, pStmt, pToken, rowsDataCxt);
6✔
2971
}
2972

2973
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
2974
static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) {
84,028✔
2975
  SToken token;
2976
  NEXT_TOKEN(pStmt->pSql, token);
84,028✔
2977
  switch (token.type) {
84,091!
2978
    case TK_VALUES:
84,086✔
2979
      if (TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
84,086!
2980
        return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is exclusive", token.z);
×
2981
      }
2982
      return parseValuesClause(pCxt, pStmt, rowsDataCxt, &token);
84,086✔
2983
    case TK_FILE:
5✔
2984
      return parseFileClause(pCxt, pStmt, rowsDataCxt, &token);
5✔
2985
    default:
×
2986
      break;
×
2987
  }
2988
  return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
×
2989
}
2990

2991
static void destroyStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
165,164✔
2992
  if (pStbRowsCxt == NULL) return;
165,164!
2993
  clearStbRowsDataContext(pStbRowsCxt);
×
2994
  taosArrayDestroy(pStbRowsCxt->aColVals);
51✔
2995
  pStbRowsCxt->aColVals = NULL;
51✔
2996
  taosArrayDestroy(pStbRowsCxt->aTagVals);
51✔
2997
  pStbRowsCxt->aTagVals = NULL;
51✔
2998
  taosArrayDestroy(pStbRowsCxt->aTagNames);
51✔
2999
  pStbRowsCxt->aTagNames = NULL;
51✔
3000
  insDestroyBoundColInfo(&pStbRowsCxt->boundColsInfo);
51✔
3001
  tTagFree(pStbRowsCxt->pTag);
51✔
3002
  pStbRowsCxt->pTag = NULL;
51✔
3003
  taosMemoryFreeClear(pStbRowsCxt->pCtbMeta);
51!
3004
  tdDestroySVCreateTbReq(pStbRowsCxt->pCreateCtbReq);
51!
3005
  taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq);
51!
3006
}
3007

3008
static int32_t constructStbRowsDataContext(SVnodeModifyOpStmt* pStmt, SStbRowsDataContext** ppStbRowsCxt) {
51✔
3009
  SStbRowsDataContext* pStbRowsCxt = taosMemoryCalloc(1, sizeof(SStbRowsDataContext));
51!
3010
  if (!pStbRowsCxt) {
51!
3011
    return terrno;
×
3012
  }
3013
  tNameAssign(&pStbRowsCxt->stbName, &pStmt->targetTableName);
51✔
3014
  int32_t code = collectUseTable(&pStbRowsCxt->stbName, pStmt->pTableNameHashObj);
51✔
3015
  if (TSDB_CODE_SUCCESS == code) {
51!
3016
    code = collectUseDatabase(&pStbRowsCxt->stbName, pStmt->pDbFNameHashObj);
51✔
3017
  }
3018
  if (TSDB_CODE_SUCCESS == code) {
51!
3019
    pStbRowsCxt->ctbName.type = TSDB_TABLE_NAME_T;
51✔
3020
    pStbRowsCxt->ctbName.acctId = pStbRowsCxt->stbName.acctId;
51✔
3021
    memcpy(pStbRowsCxt->ctbName.dbname, pStbRowsCxt->stbName.dbname, sizeof(pStbRowsCxt->stbName.dbname));
51✔
3022

3023
    pStbRowsCxt->pTagCond = pStmt->pTagCond;
51✔
3024
    pStbRowsCxt->pStbMeta = pStmt->pTableMeta;
51✔
3025

3026
    code = cloneTableMeta(pStbRowsCxt->pStbMeta, &pStbRowsCxt->pCtbMeta);
51✔
3027
  }
3028
  if (TSDB_CODE_SUCCESS == code) {
51!
3029
    pStbRowsCxt->pCtbMeta->tableType = TSDB_CHILD_TABLE;
51✔
3030
    pStbRowsCxt->pCtbMeta->suid = pStbRowsCxt->pStbMeta->uid;
51✔
3031

3032
    pStbRowsCxt->aTagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
51✔
3033
    if (!pStbRowsCxt->aTagNames) {
51!
3034
      code = terrno;
×
3035
    }
3036
  }
3037
  if (TSDB_CODE_SUCCESS == code) {
51!
3038
    pStbRowsCxt->aTagVals = taosArrayInit(8, sizeof(STagVal));
51✔
3039
    if (!pStbRowsCxt->aTagVals) {
51!
3040
      code = terrno;
×
3041
    }
3042
  }
3043
  if (TSDB_CODE_SUCCESS == code) {
51!
3044
    // col values and bound cols info of STableDataContext is not used
3045
    pStbRowsCxt->aColVals = taosArrayInit(getNumOfColumns(pStbRowsCxt->pStbMeta), sizeof(SColVal));
51✔
3046
    if (!pStbRowsCxt->aColVals) code = terrno;
51!
3047
  }
3048
  if (TSDB_CODE_SUCCESS == code) {
51!
3049
    code = insInitColValues(pStbRowsCxt->pStbMeta, pStbRowsCxt->aColVals);
51✔
3050
  }
3051
  if (TSDB_CODE_SUCCESS == code) {
51!
3052
    STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta);
51✔
3053
    code = insInitBoundColsInfo(tblInfo.numOfColumns + tblInfo.numOfTags + 1, &pStbRowsCxt->boundColsInfo);
51✔
3054
  }
3055
  if (TSDB_CODE_SUCCESS == code) {
51!
3056
    *ppStbRowsCxt = pStbRowsCxt;
51✔
3057
  } else {
3058
    clearStbRowsDataContext(pStbRowsCxt);
×
3059
  }
3060
  return code;
51✔
3061
}
3062

3063
static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
51✔
3064
  int32_t code = TSDB_CODE_SUCCESS;
51✔
3065
  if (!pStmt->pBoundCols) {
51!
3066
    return buildSyntaxErrMsg(&pCxt->msg, "(...tbname, ts...) bounded cols is expected for supertable insertion",
×
3067
                             pStmt->pSql);
3068
  }
3069

3070
  SStbRowsDataContext* pStbRowsCxt = NULL;
51✔
3071
  code = constructStbRowsDataContext(pStmt, &pStbRowsCxt);
51✔
3072

3073
  if (code == TSDB_CODE_SUCCESS) {
51!
3074
    code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta,
51✔
3075
                             &pStbRowsCxt->boundColsInfo);
51✔
3076
    pStbRowsCxt->hasTimestampTag = false;
51✔
3077
    for (int32_t i = 0; i < pStbRowsCxt->boundColsInfo.numOfBound; ++i) {
307✔
3078
      int16_t schemaIndex = pStbRowsCxt->boundColsInfo.pColIndex[i];
256✔
3079
      if (schemaIndex != getTbnameSchemaIndex(pStmt->pTableMeta) && schemaIndex >= getNumOfColumns(pStmt->pTableMeta)) {
256✔
3080
        if (pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_TIMESTAMP) {
90✔
3081
          pStbRowsCxt->hasTimestampTag = true;
3✔
3082
        }
3083
        if (pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_JSON) {
90!
3084
          pStbRowsCxt->isJsonTag = true;
×
3085
        }
3086
      }
3087
    }
3088
    pStmt->pStbRowsCxt = pStbRowsCxt;
51✔
3089
  }
3090

3091
  if (code == TSDB_CODE_SUCCESS) {
51✔
3092
    SRowsDataContext rowsDataCxt;
3093
    rowsDataCxt.pStbRowsCxt = pStbRowsCxt;
47✔
3094
    code = parseDataClause(pCxt, pStmt, rowsDataCxt);
47✔
3095
  }
3096

3097
  return code;
51✔
3098
}
3099

3100
// input pStmt->pSql:
3101
//   1. [(tag1_name, ...)] ...
3102
//   2. VALUES ... | FILE ...
3103
static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
84,037✔
3104
  if (!pStmt->stbSyntax) {
84,037✔
3105
    STableDataCxt*   pTableCxt = NULL;
84,012✔
3106
    int32_t          code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt);
84,012✔
3107
    SRowsDataContext rowsDataCxt;
3108
    rowsDataCxt.pTableDataCxt = pTableCxt;
84,009✔
3109
    if (TSDB_CODE_SUCCESS == code) {
84,009!
3110
      code = parseDataClause(pCxt, pStmt, rowsDataCxt);
84,013✔
3111
    }
3112
    return code;
84,007✔
3113
  } else {
3114
    int32_t code = parseInsertStbClauseBottom(pCxt, pStmt);
25✔
3115
    return code;
51✔
3116
  }
3117
}
3118

3119
static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
84,168✔
3120
  insDestroyBoundColInfo(&pCxt->tags);
84,168✔
3121
  taosArrayDestroy(pCxt->pParsedValues);
84,170✔
3122
  taosMemoryFreeClear(pStmt->pTableMeta);
84,170!
3123
  nodesDestroyNode(pStmt->pTagCond);
84,173✔
3124
  taosArrayDestroy(pStmt->pTableTag);
84,168✔
3125
  tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
84,173!
3126
  taosMemoryFreeClear(pStmt->pCreateTblReq);
84,173!
3127
  pCxt->missCache = false;
84,173✔
3128
  pCxt->usingDuplicateTable = false;
84,173✔
3129
  pStmt->pBoundCols = NULL;
84,173✔
3130
  pStmt->usingTableProcessing = false;
84,173✔
3131
  pStmt->fileProcessing = false;
84,173✔
3132
  pStmt->usingTableName.type = 0;
84,173✔
3133

3134
  destroyStbRowsDataContext(pStmt->pStbRowsCxt);
84,173✔
3135
  taosMemoryFreeClear(pStmt->pStbRowsCxt);
84,167!
3136
  pStmt->stbSyntax = false;
84,167✔
3137
}
84,167✔
3138

3139
// input pStmt->pSql: [(field1_name, ...)] [ USING ... ] VALUES ... | FILE ...
3140
static int32_t parseInsertTableClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) {
84,171✔
3141
  resetEnvPreTable(pCxt, pStmt);
84,171✔
3142
  int32_t code = parseSchemaClauseTop(pCxt, pStmt, pTbName);
84,166✔
3143
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
84,149!
3144
    code = parseInsertTableClauseBottom(pCxt, pStmt);
83,692✔
3145
  }
3146

3147
  return code;
84,141✔
3148
}
3149

3150
static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName,
165,068✔
3151
                                          bool* pHasData) {
3152
  // no data in the sql string anymore.
3153
  if (0 == pTbName->n) {
165,068✔
3154
    if (0 != pTbName->type && '\0' != pStmt->pSql[0]) {
80,940!
3155
      return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pTbName->z);
×
3156
    }
3157

3158
    if (0 == pStmt->totalRowsNum && (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
80,940!
3159
      return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
×
3160
    }
3161

3162
    *pHasData = false;
80,940✔
3163
    return TSDB_CODE_SUCCESS;
80,940✔
3164
  }
3165

3166
  if (TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && pStmt->totalTbNum > 0) {
84,128!
3167
    return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt");
×
3168
  }
3169

3170
  if (TK_NK_QUESTION == pTbName->type) {
84,128✔
3171
    pCxt->stmtTbNameFlag &= ~IS_FIXED_VALUE;
10,182✔
3172
    if (pCxt->pComCxt->stmtBindVersion == 0) {
10,182!
3173
      return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z);
×
3174
    }
3175

3176
    char*   tbName = NULL;
10,182✔
3177
    int32_t code = (*pCxt->pComCxt->pStmtCb->getTbNameFn)(pCxt->pComCxt->pStmtCb->pStmt, &tbName);
10,182✔
3178
    if (TSDB_CODE_SUCCESS == code) {
10,183✔
3179
      pCxt->stmtTbNameFlag |= HAS_BIND_VALUE;
10,177✔
3180
      pTbName->z = tbName;
10,177✔
3181
      pTbName->n = strlen(tbName);
10,177✔
3182
    }
3183
    if (code == TSDB_CODE_TSC_STMT_TBNAME_ERROR) {
10,183✔
3184
      pCxt->stmtTbNameFlag &= ~HAS_BIND_VALUE;
10✔
3185
      code = TSDB_CODE_SUCCESS;
10✔
3186
    }
3187
    return code;
10,183✔
3188
  }
3189

3190
  if (TK_NK_ID != pTbName->type && TK_NK_STRING != pTbName->type && TK_NK_QUESTION != pTbName->type) {
73,946!
3191
    return buildSyntaxErrMsg(&pCxt->msg, "table_name is expected", pTbName->z);
1✔
3192
  }
3193

3194
  // db.? situation,ensure that the only thing following the '.' mark is '?'
3195
  char* tbNameAfterDbName = strnchr(pTbName->z, '.', pTbName->n, true);
73,945✔
3196
  if (tbNameAfterDbName != NULL) {
73,966✔
3197
    if (*(tbNameAfterDbName + 1) == '?') {
66,375✔
3198
      pCxt->stmtTbNameFlag &= ~IS_FIXED_VALUE;
49✔
3199
      char* tbName = NULL;
49✔
3200
      if (NULL == pCxt->pComCxt->pStmtCb) {
49!
3201
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z);
×
3202
      }
3203
      int32_t code = (*pCxt->pComCxt->pStmtCb->getTbNameFn)(pCxt->pComCxt->pStmtCb->pStmt, &tbName);
49✔
3204
      if (TSDB_CODE_SUCCESS == code) {
49✔
3205
        pCxt->stmtTbNameFlag |= HAS_BIND_VALUE;
31✔
3206
        pTbName->z = tbName;
31✔
3207
        pTbName->n = strlen(tbName);
31✔
3208
      }
3209
      if (code == TSDB_CODE_TSC_STMT_TBNAME_ERROR) {
49✔
3210
        pCxt->stmtTbNameFlag &= ~HAS_BIND_VALUE;
18✔
3211
        code = TSDB_CODE_SUCCESS;
18✔
3212
      }
3213
    } else {
3214
      pCxt->stmtTbNameFlag |= IS_FIXED_VALUE;
66,326✔
3215
      parserWarn("QID:0x%" PRIx64 ", table name is specified in sql, ignore the table name in bind param",
66,326!
3216
                 pCxt->pComCxt->requestId);
3217
      *pHasData = true;
66,357✔
3218
    }
3219
    return TSDB_CODE_SUCCESS;
66,406✔
3220
  }
3221

3222
  if (TK_NK_ID == pTbName->type) {
7,591✔
3223
    pCxt->stmtTbNameFlag |= IS_FIXED_VALUE;
7,590✔
3224
  }
3225

3226
  *pHasData = true;
7,591✔
3227
  return TSDB_CODE_SUCCESS;
7,591✔
3228
}
3229

3230
static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
11,100✔
3231
  SBoundColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags));
11,100!
3232
  if (NULL == tags) {
11,109!
3233
    return terrno;
×
3234
  }
3235
  memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
11,109✔
3236

3237
  SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb;
11,109✔
3238
  int32_t        code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, pCxt->pParsedValues,
11,109✔
3239
                                       &pStmt->targetTableName, pStmt->usingTableProcessing, pStmt->pVgroupsHashObj,
11,109✔
3240
                                       pStmt->pTableBlockHashObj, pStmt->usingTableName.tname, pCxt->stmtTbNameFlag);
11,109✔
3241

3242
  memset(&pCxt->tags, 0, sizeof(pCxt->tags));
11,110✔
3243
  pStmt->pVgroupsHashObj = NULL;
11,110✔
3244
  pStmt->pTableBlockHashObj = NULL;
11,110✔
3245
  return code;
11,110✔
3246
}
3247

3248
static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
80,909✔
3249
  if (TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
80,909✔
3250
    return setStmtInfo(pCxt, pStmt);
11,110✔
3251
  }
3252

3253
  // release old array alloced by merge
3254
  pStmt->freeArrayFunc(pStmt->pVgDataBlocks);
69,799✔
3255
  pStmt->pVgDataBlocks = NULL;
69,799✔
3256

3257
  bool fileOnly = (pStmt->insertType == TSDB_QUERY_TYPE_FILE_INSERT);
69,799✔
3258
  if (fileOnly) {
69,799✔
3259
    // none data, skip merge & buildvgdata
3260
    if (0 == taosHashGetSize(pStmt->pTableCxtHashObj)) {
8!
3261
      pCxt->needRequest = false;
×
3262
      return TSDB_CODE_SUCCESS;
×
3263
    }
3264
  }
3265

3266
  // merge according to vgId
3267
  int32_t code = insMergeTableDataCxt(fileOnly ? pStmt->pTableCxtHashObj : pStmt->pTableBlockHashObj,
69,797✔
3268
                                      &pStmt->pVgDataBlocks, pStmt->fileProcessing);
69,797✔
3269
  // clear tmp hashobj only
3270
  taosHashClear(pStmt->pTableCxtHashObj);
69,830✔
3271

3272
  if (TSDB_CODE_SUCCESS == code) {
69,827!
3273
    code = insBuildVgDataBlocks(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false);
69,832✔
3274
  }
3275

3276
  return code;
69,807✔
3277
}
3278

3279
// tb_name
3280
//     [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
3281
//     [(field1_name, ...)]
3282
//     VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
3283
// [...];
3284
static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
81,457✔
3285
  SToken  token;
3286
  int32_t code = TSDB_CODE_SUCCESS;
81,457✔
3287
  bool    hasData = true;
81,457✔
3288
  // for each table
3289
  while (TSDB_CODE_SUCCESS == code && hasData && !pCxt->missCache && !pStmt->fileProcessing) {
246,497✔
3290
    // pStmt->pSql -> tb_name ...
3291
    NEXT_TOKEN(pStmt->pSql, token);
165,093✔
3292
    code = checkTableClauseFirstToken(pCxt, pStmt, &token, &hasData);
165,110✔
3293
    if (TSDB_CODE_SUCCESS == code && hasData) {
165,085!
3294
      code = parseInsertTableClause(pCxt, pStmt, &token);
84,182✔
3295
    }
3296

3297
    if( TSDB_CODE_SUCCESS == code && pStmt->pTableMeta &&
165,036✔
3298
        (((pStmt->pTableMeta->virtualStb == 1) && (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE)) ||
164,701!
3299
        (pStmt->pTableMeta->tableType == TSDB_VIRTUAL_NORMAL_TABLE ||
164,701!
3300
          pStmt->pTableMeta->tableType == TSDB_VIRTUAL_CHILD_TABLE))) {
164,705!
3301
      code = buildInvalidOperationMsg(&pCxt->msg, "Virtual table can not be written");
×
3302
    }
3303
  }
3304

3305
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
81,404✔
3306
    code = parseInsertBodyBottom(pCxt, pStmt);
80,952✔
3307
  }
3308
  return code;
81,411✔
3309
}
3310

3311
static void destroySubTableHashElem(void* p) { taosMemoryFree(*(STableMeta**)p); }
20,608!
3312

3313
static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, SNode** pOutput) {
81,038✔
3314
  SVnodeModifyOpStmt* pStmt = NULL;
81,038✔
3315
  int32_t             code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&pStmt);
81,038✔
3316
  if (NULL == pStmt) {
81,076!
3317
    return code;
×
3318
  }
3319

3320
  if (pCxt->pComCxt->pStmtCb) {
81,076✔
3321
    TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
11,132✔
3322
  }
3323
  pStmt->pSql = pCxt->pComCxt->pSql;
81,076✔
3324

3325
  pStmt->freeHashFunc = insDestroyTableDataCxtHashMap;
81,076✔
3326
  pStmt->freeArrayFunc = insDestroyVgroupDataCxtList;
81,076✔
3327
  pStmt->freeStbRowsCxtFunc = destroyStbRowsDataContext;
81,076✔
3328
  pStmt->pCsvParser = NULL;
81,076✔
3329

3330
  if (!reentry) {
81,076✔
3331
    pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
81,066✔
3332
    if (pCxt->pComCxt->pStmtCb) {
81,072✔
3333
      pStmt->pTableBlockHashObj =
11,120✔
3334
          taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
11,116✔
3335
    } else {
3336
      pStmt->pTableBlockHashObj =
69,965✔
3337
          taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
69,956✔
3338
    }
3339
  }
3340
  pStmt->pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
81,095✔
3341
  pStmt->pSuperTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
81,093✔
3342
  pStmt->pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
81,106✔
3343
  pStmt->pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
81,099✔
3344
  if ((!reentry && (NULL == pStmt->pVgroupsHashObj || NULL == pStmt->pTableBlockHashObj)) ||
81,098!
3345
      NULL == pStmt->pSubTableHashObj || NULL == pStmt->pTableNameHashObj || NULL == pStmt->pDbFNameHashObj) {
81,097!
3346
    nodesDestroyNode((SNode*)pStmt);
×
3347
    return TSDB_CODE_OUT_OF_MEMORY;
×
3348
  }
3349

3350
  taosHashSetFreeFp(pStmt->pSubTableHashObj, destroySubTableHashElem);
81,099✔
3351
  taosHashSetFreeFp(pStmt->pSuperTableHashObj, destroySubTableHashElem);
81,098✔
3352

3353
  *pOutput = (SNode*)pStmt;
81,099✔
3354
  return TSDB_CODE_SUCCESS;
81,099✔
3355
}
3356

3357
static int32_t createInsertQuery(SInsertParseContext* pCxt, SQuery** pOutput) {
81,067✔
3358
  SQuery* pQuery = NULL;
81,067✔
3359
  int32_t code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
81,067✔
3360
  if (NULL == pQuery) {
81,044!
3361
    return code;
×
3362
  }
3363

3364
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
81,044✔
3365
  pQuery->haveResultSet = false;
81,044✔
3366
  pQuery->msgType = TDMT_VND_SUBMIT;
81,044✔
3367

3368
  code = createVnodeModifOpStmt(pCxt, false, &pQuery->pRoot);
81,044✔
3369
  if (TSDB_CODE_SUCCESS == code) {
81,083!
3370
    *pOutput = pQuery;
81,084✔
3371
  } else {
3372
    nodesDestroyNode((SNode*)pQuery);
×
3373
  }
3374
  return code;
81,084✔
3375
}
3376

3377
static int32_t checkAuthFromMetaData(const SArray* pUsers, SNode** pTagCond) {
386✔
3378
  if (1 != taosArrayGetSize(pUsers)) {
386!
3379
    return TSDB_CODE_FAILED;
×
3380
  }
3381

3382
  SMetaRes* pRes = taosArrayGet(pUsers, 0);
386✔
3383
  if (TSDB_CODE_SUCCESS == pRes->code) {
386!
3384
    SUserAuthRes* pAuth = pRes->pRes;
386✔
3385
    pRes->code = nodesCloneNode(pAuth->pCond[AUTH_RES_BASIC], pTagCond);
386✔
3386
    if (TSDB_CODE_SUCCESS == pRes->code) {
386!
3387
      return pAuth->pass[AUTH_RES_BASIC] ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED;
386!
3388
    }
3389
  }
3390
  return pRes->code;
×
3391
}
3392

3393
static int32_t getTableMetaFromMetaData(const SArray* pTables, STableMeta** pMeta) {
386✔
3394
  if (1 != taosArrayGetSize(pTables) && 2 != taosArrayGetSize(pTables)) {
386!
3395
    return TSDB_CODE_FAILED;
×
3396
  }
3397

3398
  taosMemoryFreeClear(*pMeta);
386!
3399
  SMetaRes* pRes = taosArrayGet(pTables, 0);
386✔
3400
  if (TSDB_CODE_SUCCESS == pRes->code) {
386✔
3401
    *pMeta = tableMetaDup((const STableMeta*)pRes->pRes);
379✔
3402
    if (NULL == *pMeta) {
379!
3403
      return TSDB_CODE_OUT_OF_MEMORY;
×
3404
    }
3405
  }
3406
  return pRes->code;
386✔
3407
}
3408

3409
static int32_t addTableVgroupFromMetaData(const SArray* pTables, SVnodeModifyOpStmt* pStmt, bool isStb) {
378✔
3410
  if (1 != taosArrayGetSize(pTables)) {
378!
3411
    return TSDB_CODE_FAILED;
×
3412
  }
3413

3414
  SMetaRes* pRes = taosArrayGet(pTables, 0);
378✔
3415
  if (TSDB_CODE_SUCCESS != pRes->code) {
378!
3416
    return pRes->code;
×
3417
  }
3418

3419
  SVgroupInfo* pVg = pRes->pRes;
378✔
3420
  if (isStb) {
378✔
3421
    pStmt->pTableMeta->vgId = pVg->vgId;
137✔
3422
  }
3423
  return taosHashPut(pStmt->pVgroupsHashObj, (const char*)&pVg->vgId, sizeof(pVg->vgId), (char*)pVg,
378✔
3424
                     sizeof(SVgroupInfo));
3425
}
3426

3427
static int32_t buildTagNameFromMeta(STableMeta* pMeta, SArray** pTagName) {
×
3428
  *pTagName = taosArrayInit(pMeta->tableInfo.numOfTags, TSDB_COL_NAME_LEN);
×
3429
  if (NULL == *pTagName) {
×
3430
    return terrno;
×
3431
  }
3432
  SSchema* pSchema = getTableTagSchema(pMeta);
×
3433
  int32_t  code = 0;
×
3434
  for (int32_t i = 0; i < pMeta->tableInfo.numOfTags; ++i) {
×
3435
    if (NULL == taosArrayPush(*pTagName, pSchema[i].name)) {
×
3436
      code = terrno;
×
3437
      taosArrayDestroy(*pTagName);
×
3438
      *pTagName = NULL;
×
3439
      break;
×
3440
    }
3441
  }
3442
  return code;
×
3443
}
3444

3445
static int32_t checkSubtablePrivilegeForTable(const SArray* pTables, SVnodeModifyOpStmt* pStmt) {
×
3446
  if (1 != taosArrayGetSize(pTables)) {
×
3447
    return TSDB_CODE_FAILED;
×
3448
  }
3449

3450
  SMetaRes* pRes = taosArrayGet(pTables, 0);
×
3451
  if (TSDB_CODE_SUCCESS != pRes->code) {
×
3452
    return pRes->code;
×
3453
  }
3454

3455
  SArray* pTagName = NULL;
×
3456
  int32_t code = buildTagNameFromMeta(pStmt->pTableMeta, &pTagName);
×
3457
  if (TSDB_CODE_SUCCESS == code) {
×
3458
    code = checkSubtablePrivilege((SArray*)pRes->pRes, pTagName, &pStmt->pTagCond);
×
3459
  }
3460
  taosArrayDestroy(pTagName);
×
3461
  return code;
×
3462
}
3463

3464
static int32_t processTableSchemaFromMetaData(SInsertParseContext* pCxt, const SMetaData* pMetaData,
378✔
3465
                                              SVnodeModifyOpStmt* pStmt, bool isStb) {
3466
  int32_t code = TSDB_CODE_SUCCESS;
378✔
3467
  if (!isStb && TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) {
378!
3468
    code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported");
×
3469
  }
3470

3471
  if (TSDB_CODE_SUCCESS == code && isStb) {
378!
3472
    code = storeChildTableMeta(pCxt, pStmt);
137✔
3473
  }
3474
  if (TSDB_CODE_SUCCESS == code) {
378!
3475
    code = addTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb);
378✔
3476
  }
3477
  if (TSDB_CODE_SUCCESS == code && !isStb && NULL != pStmt->pTagCond) {
378!
3478
    code = checkSubtablePrivilegeForTable(pMetaData->pTableTag, pStmt);
×
3479
  }
3480
  return code;
378✔
3481
}
3482

3483
static void destoryTablesReq(void* p) {
772✔
3484
  STablesReq* pRes = (STablesReq*)p;
772✔
3485
  taosArrayDestroy(pRes->pTables);
772✔
3486
}
772✔
3487

3488
static void clearCatalogReq(SCatalogReq* pCatalogReq) {
386✔
3489
  if (NULL == pCatalogReq) {
386!
3490
    return;
×
3491
  }
3492

3493
  taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq);
386✔
3494
  pCatalogReq->pTableMeta = NULL;
386✔
3495
  taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq);
386✔
3496
  pCatalogReq->pTableHash = NULL;
386✔
3497
  taosArrayDestroy(pCatalogReq->pUser);
386✔
3498
  pCatalogReq->pUser = NULL;
386✔
3499
  taosArrayDestroy(pCatalogReq->pTableTag);
386✔
3500
  pCatalogReq->pTableTag = NULL;
386✔
3501
}
3502

3503
static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
386✔
3504
                                   SVnodeModifyOpStmt* pStmt) {
3505
  clearCatalogReq(pCatalogReq);
386✔
3506
  int32_t code = checkAuthFromMetaData(pMetaData->pUser, &pStmt->pTagCond);
386✔
3507
  if (code == TSDB_CODE_SUCCESS) {
386!
3508
    code = getTableMetaFromMetaData(pMetaData->pTableMeta, &pStmt->pTableMeta);
386✔
3509
  }
3510
  if (code == TSDB_CODE_SUCCESS) {
386✔
3511
    if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE && !pStmt->usingTableProcessing) {
379✔
3512
      pStmt->stbSyntax = true;
1✔
3513
    }
3514
    if (!pStmt->stbSyntax) {
379✔
3515
      if (pStmt->usingTableProcessing) {
378✔
3516
        return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true);
137✔
3517
      }
3518
      return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false);
241✔
3519
    }
3520
  }
3521
  return code;
8✔
3522
}
3523

3524
static int32_t resetVnodeModifOpStmt(SInsertParseContext* pCxt, SQuery* pQuery) {
14✔
3525
  nodesDestroyNode(pQuery->pRoot);
14✔
3526

3527
  int32_t code = createVnodeModifOpStmt(pCxt, true, &pQuery->pRoot);
14✔
3528
  if (TSDB_CODE_SUCCESS == code) {
14!
3529
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
14✔
3530

3531
    code = (*pCxt->pComCxt->pStmtCb->getExecInfoFn)(pCxt->pComCxt->pStmtCb->pStmt, &pStmt->pVgroupsHashObj,
14✔
3532
                                                    &pStmt->pTableBlockHashObj);
3533
    if (TSDB_CODE_SUCCESS == code) {
14!
3534
      if (NULL == pStmt->pVgroupsHashObj) {
14!
3535
        pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
3536
      }
3537
      if (NULL == pStmt->pTableBlockHashObj) {
14!
3538
        pStmt->pTableBlockHashObj =
×
3539
            taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
×
3540
      }
3541
      if (NULL == pStmt->pVgroupsHashObj || NULL == pStmt->pTableBlockHashObj) {
14!
3542
        code = TSDB_CODE_OUT_OF_MEMORY;
×
3543
      }
3544
    }
3545
  }
3546

3547
  return code;
14✔
3548
}
3549

3550
static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
81,467✔
3551
                               SQuery** pQuery) {
3552
  if (NULL == *pQuery) {
81,467✔
3553
    return createInsertQuery(pCxt, pQuery);
81,082✔
3554
  }
3555

3556
  if (NULL != pCxt->pComCxt->pStmtCb) {
385✔
3557
    return resetVnodeModifOpStmt(pCxt, *pQuery);
14✔
3558
  }
3559

3560
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(*pQuery)->pRoot;
371✔
3561

3562
  if (!pStmt->fileProcessing) {
371!
3563
    return setVnodeModifOpStmt(pCxt, pCatalogReq, pMetaData, pStmt);
386✔
3564
  }
3565

3566
  return TSDB_CODE_SUCCESS;
×
3567
}
3568

3569
static int32_t setRefreshMeta(SQuery* pQuery) {
80,904✔
3570
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
80,904✔
3571
  int32_t             code = 0;
80,904✔
3572

3573
  if (taosHashGetSize(pStmt->pTableNameHashObj) > 0) {
80,904✔
3574
    taosArrayDestroy(pQuery->pTableList);
11,184✔
3575
    pQuery->pTableList = taosArrayInit(taosHashGetSize(pStmt->pTableNameHashObj), sizeof(SName));
11,183✔
3576
    if (!pQuery->pTableList) {
11,185!
3577
      code = terrno;
×
3578
    } else {
3579
      SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL);
11,185✔
3580
      while (NULL != pTable) {
22,457✔
3581
        if (NULL == taosArrayPush(pQuery->pTableList, pTable)) {
22,503!
3582
          code = terrno;
×
3583
          taosHashCancelIterate(pStmt->pTableNameHashObj, pTable);
×
3584
          break;
×
3585
        }
3586
        pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable);
11,250✔
3587
      }
3588
    }
3589
  }
3590

3591
  if (TSDB_CODE_SUCCESS == code && taosHashGetSize(pStmt->pDbFNameHashObj) > 0) {
80,916✔
3592
    taosArrayDestroy(pQuery->pDbList);
11,201✔
3593
    pQuery->pDbList = taosArrayInit(taosHashGetSize(pStmt->pDbFNameHashObj), TSDB_DB_FNAME_LEN);
11,201✔
3594
    if (!pQuery->pDbList) {
11,179!
3595
      code = terrno;
×
3596
    } else {
3597
      char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL);
11,179✔
3598
      while (NULL != pDb) {
22,389✔
3599
        if (NULL == taosArrayPush(pQuery->pDbList, pDb)) {
22,402!
3600
          code = terrno;
×
3601
          taosHashCancelIterate(pStmt->pDbFNameHashObj, pDb);
×
3602
          break;
×
3603
        }
3604
        pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb);
11,200✔
3605
      }
3606
    }
3607
  }
3608

3609
  return code;
80,913✔
3610
}
3611

3612
// INSERT INTO
3613
//   tb_name
3614
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) [table_options]]
3615
//       [(field1_name, ...)]
3616
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
3617
//   [...];
3618
static int32_t parseInsertSqlFromStart(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
81,082✔
3619
  int32_t code = skipInsertInto(&pStmt->pSql, &pCxt->msg);
81,082✔
3620
  if (TSDB_CODE_SUCCESS == code) {
81,100!
3621
    code = parseInsertBody(pCxt, pStmt);
81,100✔
3622
  }
3623
  return code;
81,025✔
3624
}
3625

3626
static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
×
3627
  int32_t          code = TSDB_CODE_SUCCESS;
×
3628
  SRowsDataContext rowsDataCxt;
3629

3630
  if (!pStmt->stbSyntax) {
×
3631
    STableDataCxt* pTableCxt = NULL;
×
3632
    code = getTableDataCxt(pCxt, pStmt, &pTableCxt);
×
3633
    rowsDataCxt.pTableDataCxt = pTableCxt;
×
3634
  } else {
3635
    rowsDataCxt.pStbRowsCxt = pStmt->pStbRowsCxt;
×
3636
  }
3637
  if (TSDB_CODE_SUCCESS == code) {
×
3638
    code = parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt);
×
3639
  }
3640

3641
  if (TSDB_CODE_SUCCESS == code) {
×
3642
    if (pStmt->fileProcessing) {
×
3643
      code = parseInsertBodyBottom(pCxt, pStmt);
×
3644
    } else {
3645
      code = parseInsertBody(pCxt, pStmt);
×
3646
    }
3647
  }
3648

3649
  return code;
×
3650
}
3651

3652
static int32_t parseInsertSqlFromTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
379✔
3653
  int32_t code = parseInsertTableClauseBottom(pCxt, pStmt);
379✔
3654
  if (TSDB_CODE_SUCCESS == code) {
379✔
3655
    code = parseInsertBody(pCxt, pStmt);
378✔
3656
  }
3657
  return code;
379✔
3658
}
3659

3660
static int32_t parseInsertSqlImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
81,461✔
3661
  if (pStmt->pSql == pCxt->pComCxt->pSql || NULL != pCxt->pComCxt->pStmtCb) {
81,461!
3662
    return parseInsertSqlFromStart(pCxt, pStmt);
81,082✔
3663
  }
3664

3665
  if (pStmt->fileProcessing) {
379!
3666
    return parseInsertSqlFromCsv(pCxt, pStmt);
×
3667
  }
3668

3669
  return parseInsertSqlFromTable(pCxt, pStmt);
379✔
3670
}
3671

3672
static int32_t buildUsingInsertTableReq(SName* pSName, SName* pCName, SArray** pTables) {
137✔
3673
  if (NULL == *pTables) {
137!
3674
    *pTables = taosArrayInit(2, sizeof(SName));
137✔
3675
    if (NULL == *pTables) {
137!
3676
      goto _err;
×
3677
    }
3678
  }
3679
  if (NULL == taosArrayPush(*pTables, pSName)) {
274!
3680
    goto _err;
×
3681
  }
3682
  if (NULL == taosArrayPush(*pTables, pCName)) {
274!
3683
    goto _err;
×
3684
  }
3685
  return TSDB_CODE_SUCCESS;
137✔
3686

3687
_err:
×
3688
  if (NULL != *pTables) {
×
3689
    taosArrayDestroy(*pTables);
×
3690
    *pTables = NULL;
×
3691
  }
3692
  return terrno;
×
3693
}
3694

3695
static int32_t buildInsertTableReq(SName* pName, SArray** pTables) {
771✔
3696
  *pTables = taosArrayInit(1, sizeof(SName));
771✔
3697
  if (NULL == *pTables) {
771!
3698
    return terrno;
×
3699
  }
3700

3701
  if (NULL == taosArrayPush(*pTables, pName)) {
1,542!
3702
    taosArrayDestroy(*pTables);
×
3703
    *pTables = NULL;
×
3704
    return terrno;
×
3705
  }
3706
  return TSDB_CODE_SUCCESS;
771✔
3707
}
3708

3709
static int32_t buildInsertUsingDbReq(SName* pSName, SName* pCName, SArray** pDbs) {
137✔
3710
  if (NULL == *pDbs) {
137!
3711
    *pDbs = taosArrayInit(1, sizeof(STablesReq));
137✔
3712
    if (NULL == *pDbs) {
137!
3713
      return terrno;
×
3714
    }
3715
  }
3716

3717
  STablesReq req = {0};
137✔
3718
  req.autoCreate = 1;
137✔
3719
  (void)tNameGetFullDbName(pSName, req.dbFName);
137✔
3720
  (void)tNameGetFullDbName(pCName, req.dbFName);
137✔
3721

3722
  int32_t code = buildUsingInsertTableReq(pSName, pCName, &req.pTables);
137✔
3723
  if (TSDB_CODE_SUCCESS == code && NULL == taosArrayPush(*pDbs, &req)) {
274!
3724
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3725
  }
3726
  return code;
137✔
3727
}
3728

3729
static int32_t buildInsertDbReq(SName* pName, SArray** pDbs) {
771✔
3730
  if (NULL == *pDbs) {
771!
3731
    *pDbs = taosArrayInit(1, sizeof(STablesReq));
771✔
3732
    if (NULL == *pDbs) {
771!
3733
      return terrno;
×
3734
    }
3735
  }
3736

3737
  STablesReq req = {0};
771✔
3738
  (void)tNameGetFullDbName(pName, req.dbFName);
771✔
3739
  int32_t code = buildInsertTableReq(pName, &req.pTables);
771✔
3740
  if (TSDB_CODE_SUCCESS == code && NULL == taosArrayPush(*pDbs, &req)) {
1,542!
3741
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3742
  }
3743

3744
  return code;
771✔
3745
}
3746

3747
static int32_t buildInsertUserAuthReq(const char* pUser, SName* pName, SArray** pUserAuth) {
454✔
3748
  *pUserAuth = taosArrayInit(1, sizeof(SUserAuthInfo));
454✔
3749
  if (NULL == *pUserAuth) {
454!
3750
    return terrno;
×
3751
  }
3752

3753
  SUserAuthInfo userAuth = {.type = AUTH_TYPE_WRITE};
454✔
3754
  snprintf(userAuth.user, sizeof(userAuth.user), "%s", pUser);
454✔
3755
  memcpy(&userAuth.tbName, pName, sizeof(SName));
454✔
3756
  if (NULL == taosArrayPush(*pUserAuth, &userAuth)) {
908!
3757
    taosArrayDestroy(*pUserAuth);
×
3758
    *pUserAuth = NULL;
×
3759
    return terrno;
×
3760
  }
3761

3762
  return TSDB_CODE_SUCCESS;
454✔
3763
}
3764

3765
static int32_t buildInsertTableTagReq(SName* pName, SArray** pTables) { return buildInsertTableReq(pName, pTables); }
×
3766

3767
static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SCatalogReq* pCatalogReq) {
454✔
3768
  int32_t code = buildInsertUserAuthReq(
454✔
3769
      pCxt->pComCxt->pUser, (0 == pStmt->usingTableName.type ? &pStmt->targetTableName : &pStmt->usingTableName),
908✔
3770
      &pCatalogReq->pUser);
3771
  if (TSDB_CODE_SUCCESS == code && pCxt->needTableTagVal) {
454!
3772
    code = buildInsertTableTagReq(&pStmt->targetTableName, &pCatalogReq->pTableTag);
×
3773
  }
3774
  if (TSDB_CODE_SUCCESS == code) {
454!
3775
    if (0 == pStmt->usingTableName.type) {
454✔
3776
      code = buildInsertDbReq(&pStmt->targetTableName, &pCatalogReq->pTableMeta);
317✔
3777
    } else {
3778
      code = buildInsertUsingDbReq(&pStmt->usingTableName, &pStmt->targetTableName, &pCatalogReq->pTableMeta);
137✔
3779
    }
3780
  }
3781
  if (TSDB_CODE_SUCCESS == code) {
454!
3782
    code = buildInsertDbReq(&pStmt->targetTableName, &pCatalogReq->pTableHash);
454✔
3783
  }
3784
  return code;
454✔
3785
}
3786

3787
static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) {
81,338✔
3788
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
81,338✔
3789
  if (pCxt->missCache) {
81,338✔
3790
    parserDebug("QID:0x%" PRIx64 ", %d rows of %d tables will be inserted before obtain the cache",
454✔
3791
                pCxt->pComCxt->requestId, pStmt->totalRowsNum, pStmt->totalTbNum);
3792

3793
    pQuery->execStage = QUERY_EXEC_STAGE_PARSE;
454✔
3794
    return buildInsertCatalogReq(pCxt, pStmt, pCatalogReq);
454✔
3795
  }
3796

3797
  parserDebug("QID:0x%" PRIx64 ", %d rows of %d tables will be inserted", pCxt->pComCxt->requestId, pStmt->totalRowsNum,
80,884✔
3798
              pStmt->totalTbNum);
3799

3800
  pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE;
80,925✔
3801
  return TSDB_CODE_SUCCESS;
80,925✔
3802
}
3803

3804
int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData) {
81,443✔
3805
  SInsertParseContext context = {.pComCxt = pCxt,
162,886✔
3806
                                 .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
81,443✔
3807
                                 .missCache = false,
3808
                                 .usingDuplicateTable = false,
3809
                                 .needRequest = true,
3810
                                 .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)};
81,443✔
3811

3812
  int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
81,443✔
3813
  if (TSDB_CODE_SUCCESS == code) {
81,473✔
3814
    code = parseInsertSqlImpl(&context, (SVnodeModifyOpStmt*)((*pQuery)->pRoot));
81,469✔
3815
  }
3816

3817
  if (TSDB_CODE_SUCCESS == code) {
81,406✔
3818
    code = setNextStageInfo(&context, *pQuery, pCatalogReq);
81,339✔
3819
  }
3820
  if ((TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) &&
81,445!
3821
      QUERY_EXEC_STAGE_SCHEDULE == (*pQuery)->execStage) {
81,374✔
3822
    code = setRefreshMeta(*pQuery);
80,907✔
3823
  }
3824

3825
  insDestroyBoundColInfo(&context.tags);
81,450✔
3826
  clearInsertParseContext(&context);
81,422✔
3827
  // if no data to insert, set emptyMode to avoid request server
3828
  if (!context.needRequest) {
81,422!
3829
    (*pQuery)->execMode = QUERY_EXEC_MODE_EMPTY_RESULT;
×
3830
  }
3831
  return code;
81,422✔
3832
}
3833

3834
// CSV Parser Implementation
3835
static int32_t csvParserInit(SCsvParser* parser, TdFilePtr pFile) {
6✔
3836
  if (!parser || !pFile) {
6!
3837
    return TSDB_CODE_INVALID_PARA;
×
3838
  }
3839

3840
  memset(parser, 0, sizeof(SCsvParser));
6✔
3841

3842
  // Set default CSV format
3843
  parser->delimiter = CSV_DEFAULT_DELIMITER;
6✔
3844
  parser->quote = CSV_QUOTE_SINGLE;  // Default to single quote for TDengine compatibility
6✔
3845
  parser->escape = CSV_ESCAPE_CHAR;
6✔
3846
  parser->allowNewlineInField = true;
6✔
3847

3848
  // Initialize buffer
3849
  parser->bufferSize = 64 * 1024;  // 64KB buffer
6✔
3850
  parser->buffer = taosMemoryMalloc(parser->bufferSize);
6!
3851
  if (!parser->buffer) {
6!
3852
    return terrno;
×
3853
  }
3854

3855
  // Initialize line buffer for reuse
3856
  parser->lineBufferCapacity = 64 * 1024;  // Initial 64KB line buffer
6✔
3857
  parser->lineBuffer = taosMemoryMalloc(parser->lineBufferCapacity);
6!
3858
  if (!parser->lineBuffer) {
6!
3859
    return terrno;
×
3860
  }
3861

3862
  parser->bufferPos = 0;
6✔
3863
  parser->bufferLen = 0;
6✔
3864
  parser->eof = false;
6✔
3865
  parser->pFile = pFile;
6✔
3866

3867
  // Fill initial buffer to detect quote type
3868
  int32_t code = csvParserFillBuffer(parser);
6✔
3869
  if (code != TSDB_CODE_SUCCESS) {
6!
3870
    return code;
×
3871
  }
3872

3873
  // Auto-detect quote character by finding the first quote in the file
3874
  // Skip the header line and look for the first quote character in data
3875
  bool foundFirstQuote = false;
6✔
3876
  bool inFirstLine = true;
6✔
3877

3878
  for (size_t i = 0; i < parser->bufferLen && !foundFirstQuote; i++) {
8,918!
3879
    char ch = parser->buffer[i];
8,912✔
3880

3881
    // Skip the first line (header)
3882
    if (inFirstLine) {
8,912✔
3883
      if (ch == '\n') {
8,906✔
3884
        inFirstLine = false;
6✔
3885
      }
3886
      continue;
8,906✔
3887
    }
3888

3889
    // Look for the first quote character in data lines
3890
    if (ch == CSV_QUOTE_SINGLE) {
6✔
3891
      parser->quote = CSV_QUOTE_SINGLE;
5✔
3892
      foundFirstQuote = true;
5✔
3893
    } else if (ch == CSV_QUOTE_DOUBLE) {
1!
3894
      parser->quote = CSV_QUOTE_DOUBLE;
1✔
3895
      foundFirstQuote = true;
1✔
3896
    }
3897
  }
3898

3899
  // If no quotes found, keep default (single quote for TDengine compatibility)
3900

3901
  // Reset buffer position for actual parsing
3902
  parser->bufferPos = 0;
6✔
3903

3904
  return TSDB_CODE_SUCCESS;
6✔
3905
}
3906

3907
static void csvParserDestroy(SCsvParser* parser) {
6✔
3908
  if (parser) {
6!
3909
    taosMemoryFree(parser->buffer);
6!
3910
    taosMemoryFree(parser->lineBuffer);
6!
3911
    memset(parser, 0, sizeof(SCsvParser));
6✔
3912
  }
3913
}
6✔
3914

3915
static int32_t csvParserFillBuffer(SCsvParser* parser) {
27✔
3916
  if (!parser || parser->eof) {
27!
3917
    return TSDB_CODE_SUCCESS;
×
3918
  }
3919

3920
  // Move remaining data to beginning of buffer
3921
  // Since this function is only called when bufferPos >= bufferLen,
3922
  // we can simplify by always resetting the buffer
3923
  parser->bufferLen = 0;
27✔
3924
  parser->bufferPos = 0;
27✔
3925

3926
  // Read more data
3927
  size_t spaceLeft = parser->bufferSize - parser->bufferLen;
27✔
3928
  if (spaceLeft > 0) {
27!
3929
    int64_t bytesRead = taosReadFile(parser->pFile, parser->buffer + parser->bufferLen, spaceLeft);
27✔
3930
    if (bytesRead < 0) {
27!
3931
      return TAOS_SYSTEM_ERROR(errno);
×
3932
    }
3933
    if (bytesRead == 0) {
27✔
3934
      parser->eof = true;
6✔
3935
    } else {
3936
      parser->bufferLen += bytesRead;
21✔
3937
    }
3938
  }
3939

3940
  return TSDB_CODE_SUCCESS;
27✔
3941
}
3942

3943
// Destroy saved CSV parser in SVnodeModifyOpStmt
3944
static void destroySavedCsvParser(SVnodeModifyOpStmt* pStmt) {
6✔
3945
  if (pStmt && pStmt->pCsvParser) {
6!
3946
    csvParserDestroy(pStmt->pCsvParser);
6✔
3947
    taosMemoryFree(pStmt->pCsvParser);
6!
3948
    pStmt->pCsvParser = NULL;
6✔
3949
  }
3950
}
6✔
3951

3952
static int32_t csvParserExpandLineBuffer(SCsvParser* parser, size_t requiredLen) {
988,529✔
3953
  if (!parser || requiredLen <= parser->lineBufferCapacity) {
988,529!
3954
    return TSDB_CODE_SUCCESS;
988,529✔
3955
  }
3956

3957
  size_t newCapacity = parser->lineBufferCapacity;
×
3958
  while (newCapacity < requiredLen) {
×
3959
    newCapacity *= 2;
×
3960
  }
3961

3962
  char* newLineBuffer = taosMemoryRealloc(parser->lineBuffer, newCapacity);
×
3963
  if (!newLineBuffer) {
×
3964
    return TSDB_CODE_OUT_OF_MEMORY;
×
3965
  }
3966

3967
  parser->lineBuffer = newLineBuffer;
×
3968
  parser->lineBufferCapacity = newCapacity;
×
3969
  return TSDB_CODE_SUCCESS;
×
3970
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc