• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4811

16 Oct 2025 11:40AM UTC coverage: 58.693% (+0.2%) from 58.518%
#4811

push

travis-ci

web-flow
fix(tref): increase TSDB_REF_OBJECTS from 100 to 2000 for improved reference handling (#33281)

139835 of 303532 branches covered (46.07%)

Branch coverage included in aggregate %.

211576 of 295200 relevant lines covered (71.67%)

16841075.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.3
/source/libs/parser/src/parInsertSql.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "decimal.h"
17
#include "geosWrapper.h"
18
#include "parInsertUtil.h"
19
#include "parToken.h"
20
#include "query.h"
21
#include "scalar.h"
22
#include "tglobal.h"
23
#include "ttime.h"
24

25
// CSV delimiter and quote character definitions
26
#define CSV_DEFAULT_DELIMITER ','
27
#define CSV_QUOTE_SINGLE      '\''
28
#define CSV_QUOTE_DOUBLE      '"'
29
#define CSV_ESCAPE_CHAR       '\\'
30
#define CSV_QUOTE_NONE        '\0'
31

32
typedef struct SCsvParser {
33
  char      delimiter;            // Field delimiter (default: ',')
34
  char      quote;                // Quote character (default: '"')
35
  char      escape;               // Escape character (default: '\')
36
  bool      allowNewlineInField;  // Allow newlines in quoted fields
37
  char*     buffer;               // Read buffer
38
  size_t    bufferSize;           // Buffer size
39
  size_t    bufferPos;            // Current position in buffer
40
  size_t    bufferLen;            // Valid data length in buffer
41
  bool      eof;                  // End of file reached
42
  TdFilePtr pFile;                // File pointer
43
  // Line buffer for reuse to avoid frequent memory allocation
44
  char*  lineBuffer;          // Reusable line buffer
45
  size_t lineBufferCapacity;  // Line buffer capacity
46
} SCsvParser;
47

48
typedef struct SInsertParseContext {
49
  SParseContext* pComCxt;
50
  SMsgBuf        msg;
51
  char           tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
52
  SBoundColInfo  tags;  // for stmt
53
  bool           missCache;
54
  bool           usingDuplicateTable;
55
  bool           forceUpdate;
56
  bool           needTableTagVal;
57
  bool           needRequest;  // whether or not request server
58
  // bool           isStmtBind;   // whether is stmt bind
59
  uint8_t        stmtTbNameFlag;
60
  SArray*        pParsedValues;  // <SColVal> for stmt bind col
61
} SInsertParseContext;
62

63
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
64
static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt);
65
static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate);
66

67
// CSV parser function declarations
68
static int32_t csvParserInit(SCsvParser* parser, TdFilePtr pFile);
69
static void    csvParserDestroy(SCsvParser* parser);
70
static int32_t csvParserFillBuffer(SCsvParser* parser);
71
static int32_t csvParserReadLine(SCsvParser* parser);
72
static void    destroySavedCsvParser(SVnodeModifyOpStmt* pStmt);
73
static int32_t csvParserExpandLineBuffer(SCsvParser* parser, size_t requiredLen);
74

75

76
static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
77
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
78

79
static FORCE_INLINE bool isNullValue(int8_t dataType, SToken* pToken) {
80
  return TK_NULL == pToken->type ||
2,147,483,647✔
81
         (TK_NK_STRING == pToken->type && !IS_STR_DATA_TYPE(dataType) && IS_NULL_STR(pToken->z, pToken->n));
2,147,483,647!
82
}
83

84
static FORCE_INLINE int32_t toDouble(SToken* pToken, double* value, char** endPtr) {
85
  SET_ERRNO(0);
86
  *value = taosStr2Double(pToken->z, endPtr);
87

88
  // not a valid integer number, return error
89
  if ((*endPtr - pToken->z) != pToken->n) {
90
    return TK_NK_ILLEGAL;
91
  }
92

93
  return pToken->type;
94
}
95

96
static int32_t skipInsertInto(const char** pSql, SMsgBuf* pMsg) {
7,986,378✔
97
  SToken token;
98
  NEXT_TOKEN(*pSql, token);
7,986,378✔
99
  if (TK_INSERT != token.type && TK_IMPORT != token.type) {
7,997,018!
100
    return buildSyntaxErrMsg(pMsg, "keyword INSERT is expected", token.z);
×
101
  }
102
  NEXT_TOKEN(*pSql, token);
7,997,018✔
103
  if (TK_INTO != token.type) {
7,996,081!
104
    return buildSyntaxErrMsg(pMsg, "keyword INTO is expected", token.z);
×
105
  }
106
  return TSDB_CODE_SUCCESS;
7,998,498✔
107
}
108

109
static int32_t skipParentheses(SInsertParseContext* pCxt, const char** pSql) {
101,170✔
110
  SToken  token;
111
  int32_t expectRightParenthesis = 1;
101,170✔
112
  while (1) {
113
    NEXT_TOKEN(*pSql, token);
725,192✔
114
    if (TK_NK_LP == token.type) {
725,201!
115
      ++expectRightParenthesis;
×
116
    } else if (TK_NK_RP == token.type && 0 == --expectRightParenthesis) {
725,201✔
117
      break;
101,179✔
118
    }
119
    if (0 == token.n) {
624,022!
120
      return buildSyntaxErrMsg(&pCxt->msg, ") expected", NULL);
×
121
    }
122
  }
123
  return TSDB_CODE_SUCCESS;
101,179✔
124
}
125

126
static int32_t skipTableOptions(SInsertParseContext* pCxt, const char** pSql) {
442✔
127
  do {
32✔
128
    int32_t index = 0;
442✔
129
    SToken  token;
130
    NEXT_TOKEN_KEEP_SQL(*pSql, token, index);
442✔
131
    if (TK_TTL == token.type || TK_COMMENT == token.type) {
442!
132
      *pSql += index;
32✔
133
      NEXT_TOKEN_WITH_PREV(*pSql, token);
32✔
134
    } else {
135
      break;
136
    }
137
  } while (1);
138
  return TSDB_CODE_SUCCESS;
410✔
139
}
140

141
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
142
static int32_t ignoreUsingClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
×
143
  const char** pSql = &pStmt->pSql;
×
144
  int32_t      code = TSDB_CODE_SUCCESS;
×
145
  SToken       token;
146
  NEXT_TOKEN(*pSql, token);
×
147

148
  NEXT_TOKEN(*pSql, token);
×
149
  if (TK_NK_LP == token.type) {
×
150
    code = skipParentheses(pCxt, pSql);
×
151
    if (TSDB_CODE_SUCCESS == code) {
×
152
      NEXT_TOKEN(*pSql, token);
×
153
    }
154
  }
155

156
  // pSql -> TAGS (tag1_value, ...)
157
  if (TSDB_CODE_SUCCESS == code) {
×
158
    if (TK_TAGS != token.type) {
×
159
      code = buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", token.z);
×
160
    } else {
161
      NEXT_TOKEN(*pSql, token);
×
162
    }
163
  }
164
  if (TSDB_CODE_SUCCESS == code) {
×
165
    if (TK_NK_LP != token.type) {
×
166
      code = buildSyntaxErrMsg(&pCxt->msg, "( is expected", token.z);
×
167
    } else {
168
      code = skipParentheses(pCxt, pSql);
×
169
    }
170
  }
171

172
  if (TSDB_CODE_SUCCESS == code) {
×
173
    code = skipTableOptions(pCxt, pSql);
×
174
  }
175

176
  return code;
×
177
}
178

179
static int32_t ignoreUsingClauseAndCheckTagValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
413✔
180
  const char** pSql = &pStmt->pSql;
413✔
181
  int32_t      code = TSDB_CODE_SUCCESS;
413✔
182
  code = parseBoundTagsClause(pCxt, pStmt);
413✔
183
  if (TSDB_CODE_SUCCESS != code) {
413!
184
    return code;
×
185
  }
186
  // pSql -> TAGS (tag1_value, ...)
187
  code = parseTagsClause(pCxt, pStmt, true);
413✔
188
  if (TSDB_CODE_SUCCESS != code) {
413✔
189
    return code;
3✔
190
  }
191

192
  if (TSDB_CODE_SUCCESS == code) {
410!
193
    code = skipTableOptions(pCxt, pSql);
410✔
194
  }
195

196
  return code;
410✔
197
}
198

199
static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* pDuplicate) {
10,681✔
200
  int32_t code = TSDB_CODE_SUCCESS;
10,681✔
201
  *pDuplicate = false;
10,681✔
202

203
  char tbFName[TSDB_TABLE_FNAME_LEN];
204
  code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
10,681✔
205
  if (TSDB_CODE_SUCCESS != code) {
10,695!
206
    return code;
×
207
  }
208
  STableMeta** pMeta = taosHashGet(pStmt->pSubTableHashObj, tbFName, strlen(tbFName));
10,695✔
209
  if (NULL != pMeta) {
10,690!
210
    *pDuplicate = true;
×
211
    pCxt->missCache = false;
×
212
    code = cloneTableMeta(*pMeta, &pStmt->pTableMeta);
×
213
    if (TSDB_CODE_SUCCESS != code) {
×
214
      return code;
×
215
    }
216
    return ignoreUsingClause(pCxt, pStmt);
×
217
  }
218

219
  return code;
10,692✔
220
}
221

222
typedef enum { BOUND_TAGS, BOUND_COLUMNS, BOUND_ALL_AND_TBNAME } EBoundColumnsType;
223

224
static int32_t getTbnameSchemaIndex(STableMeta* pTableMeta) {
101,680✔
225
  return pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns;
101,680✔
226
}
227

228
// pStmt->pSql -> field1_name, ...)
229
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, EBoundColumnsType boundColsType,
101,280✔
230
                                 STableMeta* pTableMeta, SBoundColInfo* pBoundInfo) {
231
  SSchema* pSchema = NULL;
101,280✔
232
  if (boundColsType == BOUND_TAGS) {
101,280✔
233
    pSchema = getTableTagSchema(pTableMeta);
62✔
234
  } else if (boundColsType == BOUND_COLUMNS) {
101,218✔
235
    pSchema = getTableColumnSchema(pTableMeta);
101,164✔
236
  } else {
237
    pSchema = pTableMeta->schema;
54✔
238
    if (pBoundInfo->numOfCols != getTbnameSchemaIndex(pTableMeta) + 1) {
54!
239
      return TSDB_CODE_PAR_INTERNAL_ERROR;
×
240
    }
241
  }
242
  int32_t tbnameSchemaIndex = getTbnameSchemaIndex(pTableMeta);
101,282✔
243

244
  bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
101,283!
245
  if (NULL == pUseCols) {
101,291!
246
    return terrno;
×
247
  }
248

249
  pBoundInfo->numOfBound = 0;
101,291✔
250
  pBoundInfo->hasBoundCols = true;
101,291✔
251

252
  bool    hasPK = pTableMeta->tableInfo.numOfPKs;
101,291✔
253
  int16_t numOfBoundPKs = 0;
101,291✔
254
  int16_t lastColIdx = -1;  // last column found
101,291✔
255
  int32_t code = TSDB_CODE_SUCCESS;
101,291✔
256
  while (TSDB_CODE_SUCCESS == code) {
725,575✔
257
    SToken token;
258
    NEXT_TOKEN(*pSql, token);
725,574✔
259

260
    if (TK_NK_RP == token.type) {
725,556✔
261
      break;
101,286✔
262
    }
263

264
    char tmpTokenBuf[TSDB_COL_NAME_LEN + 2] = {0};  // used for deleting Escape character backstick(`)
624,270✔
265
    strncpy(tmpTokenBuf, token.z, token.n);
624,270✔
266
    token.z = tmpTokenBuf;
624,270✔
267
    token.n = strdequote(token.z);
624,270✔
268

269
    if (boundColsType == BOUND_ALL_AND_TBNAME && token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) {
624,282!
270
      pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex;
52✔
271
      pUseCols[tbnameSchemaIndex] = true;
52✔
272
      ++pBoundInfo->numOfBound;
52✔
273
      continue;
52✔
274
    }
275
    int16_t t = lastColIdx + 1;
624,230✔
276
    int16_t end = (boundColsType == BOUND_ALL_AND_TBNAME) ? (pBoundInfo->numOfCols - 1) : pBoundInfo->numOfCols;
624,230✔
277
    int16_t index = insFindCol(&token, t, end, pSchema);
624,230✔
278
    if (index < 0 && t > 0) {
624,229!
279
      index = insFindCol(&token, 0, t, pSchema);
84,563✔
280
    }
281
    if (index < 0) {
624,229!
282
      code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, token.z);
×
283
    } else if (pUseCols[index]) {
624,229!
284
      code = buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", token.z);
×
285
    } else {
286
      lastColIdx = index;
624,229✔
287
      pUseCols[index] = true;
624,229✔
288
      pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
624,229✔
289
      ++pBoundInfo->numOfBound;
624,229✔
290
      if (hasPK && (pSchema[index].flags & COL_IS_KEY)) ++numOfBoundPKs;
624,229✔
291
    }
292
  }
293

294
  if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType)) {
101,287✔
295
    if (!pUseCols[0]) {
101,223✔
296
      code = buildInvalidOperationMsg(&pCxt->msg, "Primary timestamp column should not be null");
5✔
297
    }
298
    if (numOfBoundPKs != pTableMeta->tableInfo.numOfPKs) {
101,223!
299
      code = buildInvalidOperationMsg(&pCxt->msg, "Primary key column should not be none");
×
300
    }
301
  }
302
  if (TSDB_CODE_SUCCESS == code && (BOUND_ALL_AND_TBNAME == boundColsType) && !pUseCols[tbnameSchemaIndex]) {
101,287✔
303
    code = buildInvalidOperationMsg(&pCxt->msg, "tbname column should not be null");
1✔
304
  }
305
  taosMemoryFree(pUseCols);
101,287!
306

307
  return code;
101,290✔
308
}
309

310
static int32_t parseTimestampOrInterval(const char** end, SToken* pToken, int16_t timePrec, int64_t* ts,
692,568,187✔
311
                                        int64_t* interval, SMsgBuf* pMsgBuf, bool* isTs, timezone_t tz) {
312
  if (pToken->type == TK_NOW) {
692,568,187✔
313
    *isTs = true;
49,873✔
314
    *ts = taosGetTimestamp(timePrec);
99,746!
315
  } else if (pToken->type == TK_TODAY) {
692,518,314!
316
    *isTs = true;
×
317
    *ts = taosGetTimestampToday(timePrec, tz);
×
318
  } else if (pToken->type == TK_NK_INTEGER) {
692,518,314!
319
    *isTs = true;
692,849,274✔
320
    if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, ts)) {
692,849,274!
321
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
322
    }
323
  } else if (pToken->type == TK_NK_VARIABLE) {
×
324
    char unit = 0;
523✔
325
    *isTs = false;
523✔
326
    if (parseAbsoluteDuration(pToken->z, pToken->n, interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
523!
327
      return TSDB_CODE_TSC_INVALID_OPERATION;
×
328
    }
329
  } else {  // parse the RFC-3339/ISO-8601 timestamp format string
330
    *isTs = true;
×
331
    if (taosParseTime(pToken->z, ts, pToken->n, timePrec, tz) != TSDB_CODE_SUCCESS) {
×
332
      if ((pToken->n == 0) ||
448,514✔
333
          (pToken->type != TK_NK_STRING && pToken->type != TK_NK_HEX && pToken->type != TK_NK_BIN)) {
1,050!
334
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
447,464✔
335
      }
336
      if (IS_NOW_STR(pToken->z, pToken->n)) {
1,050!
337
        *isTs = true;
×
338
        *ts = taosGetTimestamp(timePrec);
×
339
      } else if (IS_TODAY_STR(pToken->z, pToken->n)) {
1,050!
340
        *isTs = true;
×
341
        *ts = taosGetTimestampToday(timePrec, tz);
×
342
      } else if (TSDB_CODE_SUCCESS == toIntegerPure(pToken->z, pToken->n, 10, ts)) {
1,050✔
343
        *isTs = true;
1,000✔
344
      } else {
345
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
50✔
346
      }
347
    }
348
  }
349

350
  return TSDB_CODE_SUCCESS;
692,187,699✔
351
}
352

353
static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf,
692,835,763✔
354
                     timezone_t tz) {
355
  int32_t     index = 0, i = 0;
692,835,763✔
356
  int64_t     interval = 0, tempInterval = 0;
692,835,763✔
357
  int64_t     ts = 0, tempTs = 0;
692,835,763✔
358
  bool        firstIsTS = false, secondIsTs = false;
692,835,763✔
359
  const char* pTokenEnd = *end;
692,835,763✔
360

361
  if (TSDB_CODE_SUCCESS !=
692,380,646!
362
      parseTimestampOrInterval(&pTokenEnd, pToken, timePrec, &ts, &interval, pMsgBuf, &firstIsTS, tz)) {
692,835,763✔
363
    return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
364
  }
365

366
  if (firstIsTS) {
692,804,150✔
367
    *time = ts;
692,514,447✔
368
  }
369

370
  for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
692,804,644!
371
    if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
693,162,826!
372
    if (pToken->z[k] == '(') {  // for insert NOW()/TODAY()
693,162,535✔
373
      if (pToken->z[k + 1] == ')') {
203!
374
        *end = pTokenEnd = &pToken->z[k + 2];
203✔
375
        ++k;
203✔
376
        continue;
203✔
377
      } else {
378
        char nc = pToken->z[k + 1];
×
379
        while (nc == ' ' || nc == '\t' || nc == '\n' || nc == '\r' || nc == '\f') {
×
380
          nc = pToken->z[(++k) + 1];
×
381
        }
382
        if (nc == ')') {
×
383
          *end = pTokenEnd = &pToken->z[k + 2];
×
384
          ++k;
×
385
          continue;
×
386
        }
387
      }
388
    }
389
    if (pToken->z[k] == ',') {
693,162,332✔
390
      *end = pTokenEnd;
691,991,356✔
391
      if (!firstIsTS) {
691,991,356!
392
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
393
      }
394
      *time = ts;
691,991,356✔
395
      return TSDB_CODE_SUCCESS;
691,991,356✔
396
    }
397
    break;
1,170,976✔
398
  }
399

400
  while (pTokenEnd[i] != '\0') {
813,097✔
401
    if (pTokenEnd[i] == ' ' || pTokenEnd[i] == '\t') {
253,586!
402
      i++;
303✔
403
      continue;
303✔
404
    } else if (pTokenEnd[i] == ',' || pTokenEnd[i] == ')') {
253,283✔
405
      *end = pTokenEnd + i;
252,765✔
406
      if (!firstIsTS) {
252,765!
407
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
408
      }
409
      *time = ts;
252,765✔
410
      return TSDB_CODE_SUCCESS;
252,765✔
411
    } else {
412
      break;
413
    }
414
  }
415
  pTokenEnd = pTokenEnd + i;
560,029✔
416

417
  index = 0;
560,029✔
418
  SToken token = tStrGetToken(pTokenEnd, &index, false, NULL);
560,029✔
419

420
  if (token.type == TK_NK_MINUS || token.type == TK_NK_PLUS) {
2,150!
421
    pTokenEnd += index;
523✔
422
    index = 0;
523✔
423
    SToken valueToken = tStrGetToken(pTokenEnd, &index, false, NULL);
523✔
424
    pTokenEnd += index;
523✔
425
    char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
426
    if (TK_NK_STRING == valueToken.type) {
523!
427
      if (valueToken.n >= TSDB_MAX_BYTES_PER_ROW) {
×
428
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", valueToken.z);
511✔
429
      }
430
      int32_t len = trimString(valueToken.z, valueToken.n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
×
431
      valueToken.z = tmpTokenBuf;
×
432
      valueToken.n = len;
×
433
    }
434

435
    if (TSDB_CODE_SUCCESS !=
523!
436
        parseTimestampOrInterval(&pTokenEnd, &valueToken, timePrec, &tempTs, &tempInterval, pMsgBuf, &secondIsTs, tz)) {
523✔
437
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
438
    }
439

440
    if (valueToken.n < 2) {
523!
441
      return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", token.z);
×
442
    }
443

444
    if (secondIsTs) {
523!
445
      // not support operator between tow timestamp, such as today() + now()
446
      if (firstIsTS) {
×
447
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
448
      }
449
      ts = tempTs;
×
450
    } else {
451
      // not support operator between tow interval, such as 2h + 3s
452
      if (!firstIsTS) {
523!
453
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
454
      }
455
      interval = tempInterval;
523✔
456
    }
457
    if (token.type == TK_NK_MINUS) {
523!
458
      // not support interval - ts,such as 2h - today()
459
      if (secondIsTs) {
×
460
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
461
      }
462
      *time = ts - interval;
×
463
    } else {
464
      *time = ts + interval;
523✔
465
    }
466

467
    for (int k = valueToken.n; valueToken.z[k] != '\0'; k++) {
523✔
468
      if (valueToken.z[k] == ' ' || valueToken.z[k] == '\t') continue;
511!
469
      if (valueToken.z[k] == '(' && valueToken.z[k + 1] == ')') {  // for insert NOW()/TODAY()
511!
470
        *end = pTokenEnd = &valueToken.z[k + 2];
×
471
        k++;
×
472
        continue;
×
473
      }
474
      if (valueToken.z[k] == ',' || valueToken.z[k] == ')') {
511!
475
        *end = pTokenEnd;
511✔
476
        return TSDB_CODE_SUCCESS;
511✔
477
      }
478
      return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
×
479
    }
480
  }
481

482
  *end = pTokenEnd;
1,639✔
483
  return TSDB_CODE_SUCCESS;
1,639✔
484
}
485

486
// need to call geosFreeBuffer(*output) later
487
static int parseGeometry(SToken* pToken, unsigned char** output, size_t* size) {
1,644,483✔
488
#ifdef USE_GEOS
489
  int32_t code = TSDB_CODE_FAILED;
1,644,483✔
490

491
  //[ToDo] support to parse WKB as well as WKT
492
  if (pToken->type == TK_NK_STRING) {
1,644,483!
493
    code = initCtxGeomFromText();
1,644,569✔
494
    if (code != TSDB_CODE_SUCCESS) {
1,644,105!
495
      return code;
×
496
    }
497

498
    code = doGeomFromText(pToken->z, output, size);
1,644,105✔
499
    if (code != TSDB_CODE_SUCCESS) {
1,646,880!
500
      return code;
×
501
    }
502
  }
503

504
  return code;
1,646,794✔
505
#else
506
  TAOS_RETURN(TSDB_CODE_OPS_NOT_SUPPORT);
507
#endif
508
}
509

510
static int32_t parseVarbinary(SToken* pToken, uint8_t** pData, uint32_t* nData, int32_t bytes) {
720,721✔
511
  if (pToken->type != TK_NK_STRING) {
720,721✔
512
    return TSDB_CODE_PAR_INVALID_VARBINARY;
3✔
513
  }
514

515
  if (isHex(pToken->z + 1, pToken->n - 2)) {
720,718✔
516
    if (!isValidateHex(pToken->z + 1, pToken->n - 2)) {
9✔
517
      return TSDB_CODE_PAR_INVALID_VARBINARY;
1✔
518
    }
519

520
    void*    data = NULL;
8✔
521
    uint32_t size = 0;
8✔
522
    if (taosHex2Ascii(pToken->z + 1, pToken->n - 2, &data, &size) < 0) {
8!
523
      return TSDB_CODE_OUT_OF_MEMORY;
×
524
    }
525

526
    if (size + VARSTR_HEADER_SIZE > bytes) {
8!
527
      taosMemoryFree(data);
×
528
      return TSDB_CODE_PAR_VALUE_TOO_LONG;
×
529
    }
530
    *pData = data;
8✔
531
    *nData = size;
8✔
532
  } else {
533
    *pData = taosMemoryCalloc(1, pToken->n);
720,707!
534
    if (!pData) return terrno;
720,708!
535
    int32_t len = trimString(pToken->z, pToken->n, *pData, pToken->n);
720,708✔
536
    *nData = len;
720,708✔
537

538
    if (*nData + VARSTR_HEADER_SIZE > bytes) {
720,708!
539
      return TSDB_CODE_PAR_VALUE_TOO_LONG;
×
540
    }
541
  }
542
  return TSDB_CODE_SUCCESS;
720,716✔
543
}
544
static int32_t parseBlob(SToken* pToken, uint8_t** pData, uint32_t* nData, int32_t bytes) {
×
545
  if (pToken->type != TK_NK_STRING) {
×
546
    return TSDB_CODE_PAR_INVALID_VARBINARY;
×
547
  }
548
  int32_t  code = 0;
×
549
  int32_t  lino = 0;
×
550
  uint32_t size = 0;
×
551
  *pData = NULL;
×
552

553
  if (isHex(pToken->z + 1, pToken->n - 2)) {
×
554
    if (!isValidateHex(pToken->z + 1, pToken->n - 2)) {
×
555
      return TSDB_CODE_PAR_INVALID_VARBINARY;
×
556
    }
557
    void* data = NULL;
×
558

559
    if (taosHex2Ascii(pToken->z + 1, pToken->n - 2, &data, &size) < 0) {
×
560
      TSDB_CHECK_CODE(code, lino, _error);
×
561
    }
562
    *pData = data;
×
563
    *nData = size;
×
564
  } else {
565
    *pData = taosMemoryCalloc(1, pToken->n);
×
566
    if (!pData) return terrno;
×
567
    size = trimString(pToken->z, pToken->n, (char*)*pData, pToken->n);
×
568
    *nData = size;
×
569
  }
570

571
  if (size + BLOBSTR_HEADER_SIZE > TSDB_MAX_BLOB_LEN) {
×
572
    TSDB_CHECK_CODE(code = TSDB_CODE_BLOB_VALUE_TOO_LONG, lino, _error);
×
573
  }
574

575
_error:
×
576
  if (code != 0) {
×
577
    taosMemoryFree(pData);
×
578
    uError("parseBlob failed at lino %d code: %s", lino, tstrerror(code));
×
579
    return code;
×
580
  }
581
  *nData = size;
×
582
  return TSDB_CODE_SUCCESS;
×
583
}
584

585
static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, STagVal* val,
131,434✔
586
                             SMsgBuf* pMsgBuf, timezone_t tz, void* charsetCxt) {
587
  int64_t  iv;
588
  uint64_t uv;
589
  char*    endptr = NULL;
131,434✔
590
  int32_t  code = TSDB_CODE_SUCCESS;
131,434✔
591

592
#if 0
593
  if (isNullValue(pSchema->type, pToken)) {
594
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
595
      return buildSyntaxErrMsg(pMsgBuf, "Primary timestamp column can not be null", pToken->z);
596
    }
597

598
    return TSDB_CODE_SUCCESS;
599
  }
600
#endif
601

602
  //  strcpy(val->colName, pSchema->name);
603
  val->cid = pSchema->colId;
131,434✔
604
  val->type = pSchema->type;
131,434✔
605

606
  switch (pSchema->type) {
131,434!
607
    case TSDB_DATA_TYPE_BOOL: {
1,878✔
608
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
1,878!
609
        if (IS_TRUE_STR(pToken->z, pToken->n)) {
274!
610
          *(int8_t*)(&val->i64) = TRUE_VALUE;
169✔
611
        } else if (IS_FALSE_STR(pToken->z, pToken->n)) {
105!
612
          *(int8_t*)(&val->i64) = FALSE_VALUE;
105✔
613
        } else if (TSDB_CODE_SUCCESS == toDoubleEx(pToken->z, pToken->n, pToken->type, (double*)&iv)) {
×
614
          *(int8_t*)(&val->i64) = (*(double*)&iv == 0 ? FALSE_VALUE : TRUE_VALUE);
×
615
        } else {
616
          return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
×
617
        }
618
      } else if (pToken->type == TK_NK_INTEGER) {
1,604!
619
        *(int8_t*)(&val->i64) = ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? FALSE_VALUE : TRUE_VALUE);
1,604✔
620
      } else if (pToken->type == TK_NK_FLOAT) {
×
621
        *(int8_t*)(&val->i64) = ((taosStr2Double(pToken->z, NULL) == 0) ? FALSE_VALUE : TRUE_VALUE);
×
622
      } else if ((pToken->type == TK_NK_HEX || pToken->type == TK_NK_BIN) &&
×
623
                 (TSDB_CODE_SUCCESS == toDoubleEx(pToken->z, pToken->n, pToken->type, (double*)&iv))) {
×
624
        *(int8_t*)(&val->i64) = (*(double*)&iv == 0 ? FALSE_VALUE : TRUE_VALUE);
×
625
      } else {
626
        return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
×
627
      }
628
      break;
1,878✔
629
    }
630

631
    case TSDB_DATA_TYPE_TINYINT: {
7,952✔
632
      code = toIntegerEx(pToken->z, pToken->n, pToken->type, &iv);
7,952✔
633
      if (TSDB_CODE_SUCCESS != code) {
7,949!
634
        return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
×
635
      } else if (!IS_VALID_TINYINT(iv)) {
7,949!
636
        return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
×
637
      }
638

639
      *(int8_t*)(&val->i64) = iv;
7,952✔
640
      break;
7,952✔
641
    }
642

643
    case TSDB_DATA_TYPE_UTINYINT: {
3,037✔
644
      code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &uv);
3,037✔
645
      if (TSDB_CODE_SUCCESS != code) {
3,038!
646
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
×
647
      } else if (uv > UINT8_MAX) {
3,038!
648
        return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
×
649
      }
650
      *(uint8_t*)(&val->i64) = uv;
3,038✔
651
      break;
3,038✔
652
    }
653

654
    case TSDB_DATA_TYPE_SMALLINT: {
2,999✔
655
      code = toIntegerEx(pToken->z, pToken->n, pToken->type, &iv);
2,999✔
656
      if (TSDB_CODE_SUCCESS != code) {
2,999!
657
        return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
×
658
      } else if (!IS_VALID_SMALLINT(iv)) {
2,999!
659
        return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
×
660
      }
661
      *(int16_t*)(&val->i64) = iv;
2,999✔
662
      break;
2,999✔
663
    }
664

665
    case TSDB_DATA_TYPE_USMALLINT: {
2,828✔
666
      code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &uv);
2,828✔
667
      if (TSDB_CODE_SUCCESS != code) {
2,828!
668
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
×
669
      } else if (uv > UINT16_MAX) {
2,828!
670
        return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
×
671
      }
672
      *(uint16_t*)(&val->i64) = uv;
2,828✔
673
      break;
2,828✔
674
    }
675

676
    case TSDB_DATA_TYPE_INT: {
55,445✔
677
      code = toIntegerEx(pToken->z, pToken->n, pToken->type, &iv);
55,445✔
678
      if (TSDB_CODE_SUCCESS != code) {
55,454✔
679
        return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
1✔
680
      } else if (!IS_VALID_INT(iv)) {
55,453!
681
        return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
×
682
      }
683
      *(int32_t*)(&val->i64) = iv;
55,471✔
684
      break;
55,471✔
685
    }
686

687
    case TSDB_DATA_TYPE_UINT: {
2,830✔
688
      code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &uv);
2,830✔
689
      if (TSDB_CODE_SUCCESS != code) {
2,830!
690
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
×
691
      } else if (uv > UINT32_MAX) {
2,830!
692
        return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
×
693
      }
694
      *(uint32_t*)(&val->i64) = uv;
2,830✔
695
      break;
2,830✔
696
    }
697

698
    case TSDB_DATA_TYPE_BIGINT: {
2,921✔
699
      code = toIntegerEx(pToken->z, pToken->n, pToken->type, &iv);
2,921✔
700
      if (TSDB_CODE_SUCCESS != code) {
2,921✔
701
        return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
21✔
702
      }
703
      val->i64 = iv;
2,900✔
704
      break;
2,900✔
705
    }
706

707
    case TSDB_DATA_TYPE_UBIGINT: {
2,878✔
708
      code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &uv);
2,878✔
709
      if (TSDB_CODE_SUCCESS != code) {
2,877!
710
        return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
×
711
      }
712
      *(uint64_t*)(&val->i64) = uv;
2,877✔
713
      break;
2,877✔
714
    }
715

716
    case TSDB_DATA_TYPE_FLOAT: {
2,063✔
717
      double dv;
718
      code = toDoubleEx(pToken->z, pToken->n, pToken->type, &dv);
2,063✔
719
      if (TSDB_CODE_SUCCESS != code) {
2,062!
720
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
×
721
      }
722
      if (dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
2,062!
723
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
×
724
      }
725
      *(float*)(&val->i64) = dv;
2,063✔
726
      break;
2,063✔
727
    }
728

729
    case TSDB_DATA_TYPE_DOUBLE: {
2,047✔
730
      double dv;
731
      code = toDoubleEx(pToken->z, pToken->n, pToken->type, &dv);
2,047✔
732
      if (TSDB_CODE_SUCCESS != code) {
2,047!
733
        return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
×
734
      }
735
      if (((dv == HUGE_VAL || dv == -HUGE_VAL) && ERRNO == ERANGE) || isinf(dv) || isnan(dv)) {
2,047!
736
        return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
×
737
      }
738

739
      *(double*)(&val->i64) = dv;
2,047✔
740
      break;
2,047✔
741
    }
742

743
    case TSDB_DATA_TYPE_BINARY: {
33,640✔
744
      // Too long values will raise the invalid sql error message
745
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
33,640✔
746
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
12✔
747
      }
748
      val->pData = taosStrdup(pToken->z);
33,628!
749
      if (!val->pData) {
33,630!
750
        return terrno;
×
751
      }
752
      val->nData = pToken->n;
33,630✔
753
      break;
33,630✔
754
    }
755
    case TSDB_DATA_TYPE_VARBINARY: {
2,584✔
756
      code = parseVarbinary(pToken, &val->pData, &val->nData, pSchema->bytes);
2,584✔
757
      if (code != TSDB_CODE_SUCCESS) {
2,582✔
758
        return generateSyntaxErrMsg(pMsgBuf, code, pSchema->name);
3✔
759
      }
760
      break;
2,579✔
761
    }
762
    case TSDB_DATA_TYPE_GEOMETRY: {
1,574✔
763
      unsigned char* output = NULL;
1,574✔
764
      size_t         size = 0;
1,574✔
765

766
      code = parseGeometry(pToken, &output, &size);
1,574✔
767
      if (code != TSDB_CODE_SUCCESS) {
1,574!
768
        code = buildSyntaxErrMsg(pMsgBuf, getGeosErrMsg(code), pToken->z);
×
769
      } else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) {
1,574!
770
        // Too long values will raise the invalid sql error message
771
        code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
772
      } else {
773
        val->pData = taosMemoryMalloc(size);
1,574!
774
        if (NULL == val->pData) {
1,574!
775
          code = terrno;
×
776
        } else {
777
          memcpy(val->pData, output, size);
1,574✔
778
          val->nData = size;
1,574✔
779
        }
780
      }
781

782
      geosFreeBuffer(output);
1,574✔
783
      break;
1,574✔
784
    }
785

786
    case TSDB_DATA_TYPE_NCHAR: {
5,043✔
787
      int32_t output = 0;
5,043✔
788
      int64_t realLen = pToken->n << 2;
5,043✔
789
      if (realLen > pSchema->bytes - VARSTR_HEADER_SIZE) realLen = pSchema->bytes - VARSTR_HEADER_SIZE;
5,043!
790
      void* p = taosMemoryMalloc(realLen);
5,043!
791
      if (p == NULL) {
5,044!
792
        return terrno;
2✔
793
      }
794
      if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)(p), realLen, &output, charsetCxt)) {
5,044✔
795
        if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
2!
796
          taosMemoryFree(p);
×
797
          return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
798
        }
799
        char buf[512] = {0};
2✔
800
        snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s %d %d", strerror(terrno), ERRNO, EILSEQ);
2✔
801
        taosMemoryFree(p);
2!
802
        return buildSyntaxErrMsg(pMsgBuf, buf, pToken->z);
2✔
803
      }
804
      val->pData = p;
5,042✔
805
      val->nData = output;
5,042✔
806
      break;
5,042✔
807
    }
808
    case TSDB_DATA_TYPE_TIMESTAMP: {
1,751✔
809
      if (parseTime(end, pToken, timePrec, &iv, pMsgBuf, tz) != TSDB_CODE_SUCCESS) {
1,751!
810
        return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
×
811
      }
812

813
      val->i64 = iv;
1,751✔
814
      break;
1,751✔
815
    }
816
    case TSDB_DATA_TYPE_MEDIUMBLOB:
×
817
    case TSDB_DATA_TYPE_BLOB: {
818
      code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_BLOB_NOT_SUPPORT_TAG, pSchema->name);
×
819
      break;
×
820
    }
821
  }
822

823
  return code;
131,423✔
824
}
825

826
// input pStmt->pSql:  [(tag1_name, ...)] TAGS (tag1_value, ...) ...
827
// output pStmt->pSql: TAGS (tag1_value, ...) ...
828
static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
10,678✔
829
  int32_t code = insInitBoundColsInfo(getNumOfTags(pStmt->pTableMeta), &pCxt->tags);
10,678✔
830
  if (TSDB_CODE_SUCCESS != code) {
10,692!
831
    return code;
×
832
  }
833

834
  SToken  token;
835
  int32_t index = 0;
10,692✔
836
  NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
10,692✔
837
  if (TK_NK_LP != token.type) {
10,690✔
838
    return TSDB_CODE_SUCCESS;
10,628✔
839
  }
840

841
  pStmt->pSql += index;
62✔
842
  return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_TAGS, pStmt->pTableMeta, &pCxt->tags);
62✔
843
}
844

845
int32_t parseTagValue(SMsgBuf* pMsgBuf, const char** pSql, uint8_t precision, SSchema* pTagSchema, SToken* pToken,
131,913✔
846
                      SArray* pTagName, SArray* pTagVals, STag** pTag, timezone_t tz, void* charsetCxt) {
847
  bool isNull = isNullValue(pTagSchema->type, pToken);
131,913✔
848
  if (!isNull && pTagName) {
131,913✔
849
    if (NULL == taosArrayPush(pTagName, pTagSchema->name)) {
263,122!
850
      return terrno;
×
851
    }
852
  }
853

854
  if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
131,893✔
855
    if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
165!
856
      return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
×
857
    }
858

859
    if (isNull) {
165✔
860
      return tTagNew(pTagVals, 1, true, pTag);
2✔
861
    } else {
862
      return parseJsontoTagData(pToken->z, pTagVals, pTag, pMsgBuf, charsetCxt);
163✔
863
    }
864
  }
865

866
  if (isNull) return 0;
131,728✔
867

868
  STagVal val = {0};
131,440✔
869
  int32_t code = parseTagToken(pSql, pToken, pTagSchema, precision, &val, pMsgBuf, tz, charsetCxt);
131,440✔
870
  if (TSDB_CODE_SUCCESS == code) {
131,430✔
871
    if (NULL == taosArrayPush(pTagVals, &val)) {
131,406!
872
      code = terrno;
×
873
    }
874
  }
875

876
  return code;
131,447✔
877
}
878

879
static int32_t buildCreateTbReq(SVnodeModifyOpStmt* pStmt, STag* pTag, SArray* pTagName) {
10,212✔
880
  if (pStmt->pCreateTblReq) {
10,212!
881
    tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
×
882
    taosMemoryFreeClear(pStmt->pCreateTblReq);
×
883
  }
884
  pStmt->pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
10,212!
885
  if (NULL == pStmt->pCreateTblReq) {
10,208!
886
    return terrno;
×
887
  }
888
  return insBuildCreateTbReq(pStmt->pCreateTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid,
10,208✔
889
                             pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags,
10,208✔
890
                             TSDB_DEFAULT_TABLE_TTL);
891
}
892

893
int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf, int8_t type) {
2,147,483,647✔
894
  if (pToken->type == TK_NK_QUESTION) {
2,147,483,647!
895
    return buildInvalidOperationMsg(pMsgBuf, "insert into super table syntax is not supported for stmt");
×
896
  }
897
  if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
2,147,483,647!
898
       pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
925,748,227✔
899
       pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN &&
8,438,513!
900
       pToken->type != TK_NK_VARIABLE) ||
7!
901
      (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
2,147,483,647!
902
    return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
×
903
  }
904

905
  // Remove quotation marks
906
  if (TK_NK_STRING == pToken->type && type != TSDB_DATA_TYPE_VARBINARY && !IS_STR_DATA_BLOB(type)) {
2,147,483,647!
907
    if (!IS_STR_DATA_BLOB(type)) {
31,835,170!
908
      if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
31,838,701!
909
        return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
×
910
      }
911

912
      int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BYTES_PER_ROW);
31,838,701✔
913
      pToken->z = tmpTokenBuf;
31,920,595✔
914
      pToken->n = len;
31,920,595✔
915
    } else {
916
      if (pToken->n >= TSDB_MAX_BLOB_LEN) {
×
917
        return buildSyntaxErrMsg(pMsgBuf, "too long blob", pToken->z);
×
918
      }
919

920
      int32_t len = trimString(pToken->z, pToken->n, tmpTokenBuf, TSDB_MAX_BLOB_LEN);
×
921
      pToken->z = tmpTokenBuf;
×
922
      pToken->n = len;
×
923
    }
924
  }
925

926
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
927
}
928

929
typedef struct SRewriteTagCondCxt {
930
  SArray* pTagVals;
931
  SArray* pTagName;
932
  int32_t code;
933
} SRewriteTagCondCxt;
934

935
static int32_t rewriteTagCondColumnImpl(STagVal* pVal, SNode** pNode) {
×
936
  SValueNode* pValue = NULL;
×
937
  int32_t     code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pValue);
×
938
  if (NULL == pValue) {
×
939
    return code;
×
940
  }
941

942
  pValue->node.resType = ((SColumnNode*)*pNode)->node.resType;
×
943
  nodesDestroyNode(*pNode);
×
944
  *pNode = (SNode*)pValue;
×
945

946
  switch (pVal->type) {
×
947
    case TSDB_DATA_TYPE_BOOL:
×
948
      pValue->datum.b = *(int8_t*)(&pVal->i64);
×
949
      *(bool*)&pValue->typeData = pValue->datum.b;
×
950
      break;
×
951
    case TSDB_DATA_TYPE_TINYINT:
×
952
      pValue->datum.i = *(int8_t*)(&pVal->i64);
×
953
      *(int8_t*)&pValue->typeData = pValue->datum.i;
×
954
      break;
×
955
    case TSDB_DATA_TYPE_SMALLINT:
×
956
      pValue->datum.i = *(int16_t*)(&pVal->i64);
×
957
      *(int16_t*)&pValue->typeData = pValue->datum.i;
×
958
      break;
×
959
    case TSDB_DATA_TYPE_INT:
×
960
      pValue->datum.i = *(int32_t*)(&pVal->i64);
×
961
      *(int32_t*)&pValue->typeData = pValue->datum.i;
×
962
      break;
×
963
    case TSDB_DATA_TYPE_BIGINT:
×
964
      pValue->datum.i = pVal->i64;
×
965
      pValue->typeData = pValue->datum.i;
×
966
      break;
×
967
    case TSDB_DATA_TYPE_FLOAT:
×
968
      pValue->datum.d = *(float*)(&pVal->i64);
×
969
      *(float*)&pValue->typeData = pValue->datum.d;
×
970
      break;
×
971
    case TSDB_DATA_TYPE_DOUBLE:
×
972
      pValue->datum.d = *(double*)(&pVal->i64);
×
973
      *(double*)&pValue->typeData = pValue->datum.d;
×
974
      break;
×
975
    case TSDB_DATA_TYPE_VARCHAR:
×
976
    case TSDB_DATA_TYPE_VARBINARY:
977
    case TSDB_DATA_TYPE_NCHAR:
978
      pValue->datum.p = taosMemoryCalloc(1, pVal->nData + VARSTR_HEADER_SIZE);
×
979
      if (NULL == pValue->datum.p) {
×
980
        return terrno;
×
981
      }
982
      varDataSetLen(pValue->datum.p, pVal->nData);
×
983
      memcpy(varDataVal(pValue->datum.p), pVal->pData, pVal->nData);
×
984
      break;
×
985
    case TSDB_DATA_TYPE_TIMESTAMP:
×
986
      pValue->datum.i = pVal->i64;
×
987
      pValue->typeData = pValue->datum.i;
×
988
      break;
×
989
    case TSDB_DATA_TYPE_UTINYINT:
×
990
      pValue->datum.i = *(uint8_t*)(&pVal->i64);
×
991
      *(uint8_t*)&pValue->typeData = pValue->datum.i;
×
992
      break;
×
993
    case TSDB_DATA_TYPE_USMALLINT:
×
994
      pValue->datum.i = *(uint16_t*)(&pVal->i64);
×
995
      *(uint16_t*)&pValue->typeData = pValue->datum.i;
×
996
      break;
×
997
    case TSDB_DATA_TYPE_UINT:
×
998
      pValue->datum.i = *(uint32_t*)(&pVal->i64);
×
999
      *(uint32_t*)&pValue->typeData = pValue->datum.i;
×
1000
      break;
×
1001
    case TSDB_DATA_TYPE_UBIGINT:
×
1002
      pValue->datum.i = *(uint64_t*)(&pVal->i64);
×
1003
      *(uint64_t*)&pValue->typeData = pValue->datum.i;
×
1004
      break;
×
1005
    case TSDB_DATA_TYPE_JSON:
×
1006
    case TSDB_DATA_TYPE_DECIMAL:
1007
    case TSDB_DATA_TYPE_BLOB:
1008
    case TSDB_DATA_TYPE_MEDIUMBLOB:
1009
    default:
1010
      return TSDB_CODE_FAILED;
×
1011
  }
1012
  return TSDB_CODE_SUCCESS;
×
1013
}
1014

1015
static int32_t rewriteTagCondColumn(SArray* pTagVals, SArray* pTagName, SNode** pNode) {
×
1016
  SColumnNode* pCol = (SColumnNode*)*pNode;
×
1017
  int32_t      ntags = taosArrayGetSize(pTagName);
×
1018
  for (int32_t i = 0; i < ntags; ++i) {
×
1019
    char* pTagColName = taosArrayGet(pTagName, i);
×
1020
    if (0 == strcmp(pTagColName, pCol->colName)) {
×
1021
      return rewriteTagCondColumnImpl(taosArrayGet(pTagVals, i), pNode);
×
1022
    }
1023
  }
1024
  return TSDB_CODE_PAR_PERMISSION_DENIED;
×
1025
}
1026

1027
static EDealRes rewriteTagCond(SNode** pNode, void* pContext) {
×
1028
  if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
×
1029
    SRewriteTagCondCxt* pCxt = pContext;
×
1030
    pCxt->code = rewriteTagCondColumn(pCxt->pTagVals, pCxt->pTagName, pNode);
×
1031
    return (TSDB_CODE_SUCCESS == pCxt->code ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
×
1032
  }
1033
  return DEAL_RES_CONTINUE;
×
1034
}
1035

1036
static int32_t setTagVal(SArray* pTagVals, SArray* pTagName, SNode* pCond) {
×
1037
  SRewriteTagCondCxt cxt = {.code = TSDB_CODE_SUCCESS, .pTagVals = pTagVals, .pTagName = pTagName};
×
1038
  nodesRewriteExpr(&pCond, rewriteTagCond, &cxt);
×
1039
  return cxt.code;
×
1040
}
1041

1042
static int32_t checkTagCondResult(SNode* pResult) {
×
1043
  return (QUERY_NODE_VALUE == nodeType(pResult) && ((SValueNode*)pResult)->datum.b) ? TSDB_CODE_SUCCESS
×
1044
                                                                                    : TSDB_CODE_PAR_PERMISSION_DENIED;
×
1045
}
1046

1047
static int32_t checkSubtablePrivilege(SArray* pTagVals, SArray* pTagName, SNode** pCond) {
×
1048
  int32_t code = setTagVal(pTagVals, pTagName, *pCond);
×
1049
  if (TSDB_CODE_SUCCESS == code) {
×
1050
    code = scalarCalculateConstants(*pCond, pCond);
×
1051
  }
1052
  if (TSDB_CODE_SUCCESS == code) {
×
1053
    code = checkTagCondResult(*pCond);
×
1054
  }
1055
  NODES_DESTORY_NODE(*pCond);
×
1056
  return code;
×
1057
}
1058

1059
// pSql -> tag1_value, ...)
1060
static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate) {
10,676✔
1061
  int32_t  code = TSDB_CODE_SUCCESS;
10,676✔
1062
  SSchema* pSchema = getTableTagSchema(pStmt->pTableMeta);
10,676✔
1063
  SArray*  pTagVals = NULL;
10,688✔
1064
  SArray*  pTagName = NULL;
10,688✔
1065
  uint8_t  precision = pStmt->pTableMeta->tableInfo.precision;
10,688✔
1066
  SToken   token;
1067
  bool     isJson = false;
10,688✔
1068
  STag*    pTag = NULL;
10,688✔
1069
  uint8_t*    pTagsIndex;
1070
  int32_t     numOfTags = 0;
10,688✔
1071

1072
  if (pCxt->pComCxt->stmtBindVersion == 2) {  // only support stmt2
10,688✔
1073
    pTagsIndex = taosMemoryCalloc(pCxt->tags.numOfBound, sizeof(uint8_t));
66!
1074
  }
1075

1076
  if (!(pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal))) ||
21,355!
1077
      !(pTagName = taosArrayInit(pCxt->tags.numOfBound, TSDB_COL_NAME_LEN))) {
10,670✔
1078
    code = terrno;
×
1079
    goto _exit;
×
1080
  }
1081

1082
  for (int i = 0; TSDB_CODE_SUCCESS == code && i < pCxt->tags.numOfBound; ++i) {
22,600✔
1083
    NEXT_TOKEN_WITH_PREV(pStmt->pSql, token);
11,914✔
1084

1085
    if (token.type == TK_NK_QUESTION) {
11,930✔
1086
      if (pCxt->pComCxt->stmtBindVersion == 0) {
134!
1087
        code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", token.z);
×
1088
        break;
×
1089
      }
1090

1091
      continue;
134✔
1092
    }
1093

1094
    SSchema* pTagSchema = &pSchema[pCxt->tags.pColIndex[i]];
11,796✔
1095
    isJson = pTagSchema->type == TSDB_DATA_TYPE_JSON;
11,796✔
1096
    code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
11,796✔
1097
    if (TSDB_CODE_SUCCESS == code && TK_NK_VARIABLE == token.type) {
11,795!
1098
      code = buildSyntaxErrMsg(&pCxt->msg, "not expected tags values ", token.z);
×
1099
    }
1100
    if (TSDB_CODE_SUCCESS == code) {
11,795!
1101
      code = parseTagValue(&pCxt->msg, &pStmt->pSql, precision, pTagSchema, &token, pTagName, pTagVals, &pTag,
11,799✔
1102
                           pCxt->pComCxt->timezone, pCxt->pComCxt->charsetCxt);
11,799✔
1103
    }
1104
    if (pCxt->pComCxt->stmtBindVersion == 2) {
11,799✔
1105
      pTagsIndex[numOfTags++] = pCxt->tags.pColIndex[i];
55✔
1106
    }
1107
  }
1108

1109
  if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pTagCond) {
10,686!
1110
    code = checkSubtablePrivilege(pTagVals, pTagName, &pStmt->pTagCond);
×
1111
  }
1112

1113
  if (TSDB_CODE_SUCCESS == code && pCxt->pComCxt->stmtBindVersion == 2) {
10,686✔
1114
    if (numOfTags > 0) {
65✔
1115
      if (pTagVals->size == pCxt->tags.numOfBound) {
30✔
1116
        pCxt->stmtTbNameFlag |= IS_FIXED_TAG;
27✔
1117
      } else {
1118
        pCxt->stmtTbNameFlag &= ~IS_FIXED_TAG;
3✔
1119
        pCxt->tags.parseredTags = taosMemoryMalloc(sizeof(STagsInfo));
3!
1120
        if (pCxt->tags.parseredTags == NULL) {
3!
1121
          code = terrno;
×
1122
          goto _exit;
×
1123
        }
1124
        pCxt->tags.parseredTags->numOfTags = numOfTags;
3✔
1125
        pCxt->tags.parseredTags->pTagIndex = pTagsIndex;
3✔
1126
        pCxt->tags.parseredTags->pTagVals = pTagVals;
3✔
1127
        pCxt->tags.parseredTags->STagNames = pTagName;
3✔
1128
      }
1129
    } else {
1130
      goto _exit;
35✔
1131
    }
1132
  }
1133

1134
  if (TSDB_CODE_SUCCESS == code && !isJson) {
10,651!
1135
    code = tTagNew(pTagVals, 1, false, &pTag);
10,632✔
1136
  }
1137

1138
  if (TSDB_CODE_SUCCESS == code && !autoCreate) {
10,653✔
1139
    code = buildCreateTbReq(pStmt, pTag, pTagName);
10,231✔
1140
    pTag = NULL;
10,228✔
1141
  }
1142

1143
_exit:
422✔
1144
  if (pCxt->tags.parseredTags == NULL) {
10,685✔
1145
    for (int32_t i = 0; i < taosArrayGetSize(pTagVals); ++i) {
22,457✔
1146
      STagVal* p = (STagVal*)TARRAY_GET_ELEM(pTagVals, i);
11,775✔
1147
      if (IS_VAR_DATA_TYPE(p->type)) {
11,775!
1148
        taosMemoryFreeClear(p->pData);
362!
1149
      }
1150
    }
1151
    taosArrayDestroy(pTagVals);
10,673✔
1152
    taosArrayDestroy(pTagName);
10,689✔
1153
    if (pCxt->pComCxt->stmtBindVersion == 2) {
10,684✔
1154
      taosMemoryFreeClear(pTagsIndex);
63!
1155
    }
1156
  }
1157

1158
  tTagFree(pTag);
10,687✔
1159
  return code;
10,679✔
1160
}
1161

1162
// input pStmt->pSql:  TAGS (tag1_value, ...) [table_options] ...
1163
// output pStmt->pSql: [table_options] ...
1164
static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate) {
10,690✔
1165
  SToken token;
1166
  NEXT_TOKEN(pStmt->pSql, token);
10,690✔
1167
  if (TK_TAGS != token.type) {
10,694✔
1168
    return buildSyntaxErrMsg(&pCxt->msg, "TAGS is expected", token.z);
1✔
1169
  }
1170

1171
  NEXT_TOKEN(pStmt->pSql, token);
10,693✔
1172
  if (TK_NK_LP != token.type) {
10,676!
1173
    return buildSyntaxErrMsg(&pCxt->msg, "( is expected", token.z);
×
1174
  }
1175

1176
  int32_t code = parseTagsClauseImpl(pCxt, pStmt, autoCreate);
10,676✔
1177
  if (TSDB_CODE_SUCCESS == code) {
10,678✔
1178
    NEXT_VALID_TOKEN(pStmt->pSql, token);
10,665!
1179
    if (TK_NK_COMMA == token.type) {
10,672!
1180
      code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
×
1181
    } else if (TK_NK_RP != token.type) {
10,672✔
1182
      code = buildSyntaxErrMsg(&pCxt->msg, ") is expected", token.z);
1✔
1183
    }
1184
  }
1185
  return code;
10,685✔
1186
}
1187

1188
static int32_t storeChildTableMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
10,265✔
1189
  pStmt->pTableMeta->suid = pStmt->pTableMeta->uid;
10,265✔
1190
  pStmt->pTableMeta->uid = pStmt->totalTbNum;
10,265✔
1191
  pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE;
10,265✔
1192

1193
  STableMeta* pBackup = NULL;
10,265✔
1194
  if (TSDB_CODE_SUCCESS != cloneTableMeta(pStmt->pTableMeta, &pBackup)) {
10,265!
1195
    return TSDB_CODE_OUT_OF_MEMORY;
×
1196
  }
1197

1198
  char    tbFName[TSDB_TABLE_FNAME_LEN];
1199
  int32_t code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
10,280✔
1200
  if (TSDB_CODE_SUCCESS != code) {
10,281!
1201
    taosMemoryFree(pBackup);
×
1202
    return code;
×
1203
  }
1204
  code = taosHashPut(pStmt->pSubTableHashObj, tbFName, strlen(tbFName), &pBackup, POINTER_BYTES);
10,281✔
1205
  if (TSDB_CODE_SUCCESS != code) {
10,275!
1206
    taosMemoryFree(pBackup);
×
1207
  }
1208
  return code;
10,274✔
1209
}
1210

1211
static int32_t parseTableOptions(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
10,278✔
1212
  do {
16✔
1213
    int32_t index = 0;
10,278✔
1214
    SToken  token;
1215
    NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
10,278✔
1216
    if (TK_TTL == token.type) {
10,279✔
1217
      pStmt->pSql += index;
30✔
1218
      NEXT_TOKEN_WITH_PREV(pStmt->pSql, token);
30✔
1219
      if (TK_NK_INTEGER != token.type) {
16!
1220
        return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", token.z);
×
1221
      }
1222
      pStmt->pCreateTblReq->ttl = taosStr2Int32(token.z, NULL, 10);
16✔
1223
      if (pStmt->pCreateTblReq->ttl < 0) {
16!
1224
        return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", token.z);
×
1225
      }
1226
    } else if (TK_COMMENT == token.type) {
10,249!
1227
      pStmt->pSql += index;
×
1228
      NEXT_TOKEN(pStmt->pSql, token);
×
1229
      if (TK_NK_STRING != token.type) {
×
1230
        return buildSyntaxErrMsg(&pCxt->msg, "Invalid option comment", token.z);
×
1231
      }
1232
      if (token.n >= TSDB_TB_COMMENT_LEN) {
×
1233
        return buildSyntaxErrMsg(&pCxt->msg, "comment too long", token.z);
×
1234
      }
1235
      int32_t len = trimString(token.z, token.n, pCxt->tmpTokenBuf, TSDB_TB_COMMENT_LEN);
×
1236
      pStmt->pCreateTblReq->comment = taosStrndup(pCxt->tmpTokenBuf, len);
×
1237
      if (NULL == pStmt->pCreateTblReq->comment) {
×
1238
        return terrno;
×
1239
      }
1240
      pStmt->pCreateTblReq->commentLen = len;
×
1241
    } else {
1242
      break;
10,249✔
1243
    }
1244
  } while (1);
1245
  return TSDB_CODE_SUCCESS;
10,249✔
1246
}
1247

1248
// input pStmt->pSql:
1249
//   1. [(tag1_name, ...)] ...
1250
//   2. VALUES ... | FILE ...
1251
// output pStmt->pSql:
1252
//   1. [(field1_name, ...)]
1253
//   2. VALUES ... | FILE ...
1254
static int32_t parseUsingClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
8,017,799✔
1255
  if (!pStmt->usingTableProcessing || pCxt->usingDuplicateTable) {
8,017,799!
1256
    return TSDB_CODE_SUCCESS;
8,007,524✔
1257
  }
1258

1259
  int32_t code = parseBoundTagsClause(pCxt, pStmt);
10,275✔
1260
  if (TSDB_CODE_SUCCESS == code) {
10,278✔
1261
    code = parseTagsClause(pCxt, pStmt, false);
10,277✔
1262
  }
1263
  if (TSDB_CODE_SUCCESS == code) {
10,270✔
1264
    code = parseTableOptions(pCxt, pStmt);
10,257✔
1265
  }
1266

1267
  return code;
10,265✔
1268
}
1269

1270
static void setUserAuthInfo(SParseContext* pCxt, SName* pTbName, SUserAuthInfo* pInfo) {
8,012,402✔
1271
  snprintf(pInfo->user, sizeof(pInfo->user), "%s", pCxt->pUser);
8,012,402✔
1272
  memcpy(&pInfo->tbName, pTbName, sizeof(SName));
8,012,402✔
1273
  pInfo->type = AUTH_TYPE_WRITE;
8,012,402✔
1274
}
8,012,402✔
1275

1276
static int32_t checkAuth(SParseContext* pCxt, SName* pTbName, bool* pMissCache, SNode** pTagCond) {
8,011,484✔
1277
  int32_t       code = TSDB_CODE_SUCCESS;
8,011,484✔
1278
  SUserAuthInfo authInfo = {0};
8,011,484✔
1279
  setUserAuthInfo(pCxt, pTbName, &authInfo);
8,011,484✔
1280
  SUserAuthRes authRes = {0};
8,020,782✔
1281
  bool         exists = true;
8,020,782✔
1282
  if (pCxt->async) {
8,020,782!
1283
    code = catalogChkAuthFromCache(pCxt->pCatalog, &authInfo, &authRes, &exists);
8,026,791✔
1284
  } else {
1285
    SRequestConnInfo conn = {.pTrans = pCxt->pTransporter,
×
1286
                             .requestId = pCxt->requestId,
×
1287
                             .requestObjRefId = pCxt->requestRid,
×
1288
                             .mgmtEps = pCxt->mgmtEpSet};
1289
    code = catalogChkAuth(pCxt->pCatalog, &conn, &authInfo, &authRes);
×
1290
  }
1291
  if (TSDB_CODE_SUCCESS == code) {
8,005,273!
1292
    if (!exists) {
8,005,844✔
1293
      *pMissCache = true;
58✔
1294
    } else if (!authRes.pass[AUTH_RES_BASIC]) {
8,005,786!
1295
      code = TSDB_CODE_PAR_PERMISSION_DENIED;
×
1296
    } else if (NULL != authRes.pCond[AUTH_RES_BASIC]) {
8,005,786!
1297
      *pTagCond = authRes.pCond[AUTH_RES_BASIC];
×
1298
    }
1299
  }
1300
  return code;
8,005,273✔
1301
}
1302

1303
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMeta** pTableMeta, bool* pMissCache,
12,908✔
1304
                            bool bUsingTable) {
1305
  SParseContext* pComCxt = pCxt->pComCxt;
12,908✔
1306
  int32_t        code = TSDB_CODE_SUCCESS;
12,908✔
1307
  if (pComCxt->async) {
12,908✔
1308
    if (bUsingTable) {
1,083✔
1309
      code = catalogGetCachedSTableMeta(pComCxt->pCatalog, pTbName, pTableMeta);
522✔
1310
    } else {
1311
      code = catalogGetCachedTableMeta(pComCxt->pCatalog, pTbName, pTableMeta);
561✔
1312
    }
1313
  } else {
1314
    SRequestConnInfo conn = {.pTrans = pComCxt->pTransporter,
11,825✔
1315
                             .requestId = pComCxt->requestId,
11,825✔
1316
                             .requestObjRefId = pComCxt->requestRid,
11,825✔
1317
                             .mgmtEps = pComCxt->mgmtEpSet};
1318
    if (bUsingTable) {
11,825✔
1319
      code = catalogGetSTableMeta(pComCxt->pCatalog, &conn, pTbName, pTableMeta);
10,115✔
1320
    } else {
1321
      code = catalogGetTableMeta(pComCxt->pCatalog, &conn, pTbName, pTableMeta);
1,710✔
1322
    }
1323
  }
1324
  if (TSDB_CODE_SUCCESS == code) {
12,916✔
1325
    if (NULL == *pTableMeta) {
12,901✔
1326
      *pMissCache = true;
193✔
1327
    } else if (bUsingTable && TSDB_SUPER_TABLE != (*pTableMeta)->tableType) {
12,708!
1328
      code = buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
×
1329
    } else if (((*pTableMeta)->virtualStb) || TSDB_VIRTUAL_CHILD_TABLE == (*pTableMeta)->tableType ||
12,708!
1330
               TSDB_VIRTUAL_NORMAL_TABLE == (*pTableMeta)->tableType) {
12,721✔
1331
      code = TSDB_CODE_VTABLE_NOT_SUPPORT_STMT;
×
1332
    }
1333
  }
1334
  return code;
12,926✔
1335
}
1336

1337
static int32_t getTargetTableVgroup(SParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool isStb, bool* pMissCache) {
12,105✔
1338
  int32_t     code = TSDB_CODE_SUCCESS;
12,105✔
1339
  SVgroupInfo vg;
1340
  bool        exists = true;
12,105✔
1341
  if (pCxt->async) {
12,105✔
1342
    code = catalogGetCachedTableHashVgroup(pCxt->pCatalog, &pStmt->targetTableName, &vg, &exists);
368✔
1343
  } else {
1344
    SRequestConnInfo conn = {.pTrans = pCxt->pTransporter,
11,737✔
1345
                             .requestId = pCxt->requestId,
11,737✔
1346
                             .requestObjRefId = pCxt->requestRid,
11,737✔
1347
                             .mgmtEps = pCxt->mgmtEpSet};
1348
    code = catalogGetTableHashVgroup(pCxt->pCatalog, &conn, &pStmt->targetTableName, &vg);
11,737✔
1349
  }
1350
  if (TSDB_CODE_SUCCESS == code) {
12,116!
1351
    if (exists) {
12,116!
1352
      if (isStb) {
12,116✔
1353
        pStmt->pTableMeta->vgId = vg.vgId;
10,498✔
1354
      }
1355
      code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
12,116✔
1356
    }
1357
    *pMissCache = !exists;
12,120✔
1358
  }
1359
  return code;
12,120✔
1360
}
1361

1362
static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* pMissCache) {
8,000,353✔
1363
  SParseContext* pComCxt = pCxt->pComCxt;
8,000,353✔
1364
  int32_t        code = TSDB_CODE_SUCCESS;
8,000,353✔
1365
  if (pComCxt->async) {
8,000,353!
1366
    {
1367
      SVgroupInfo vg;
1368
      code = catalogGetCachedTableVgMeta(pComCxt->pCatalog, &pStmt->targetTableName, &vg, &pStmt->pTableMeta);
8,003,387✔
1369
      if (TSDB_CODE_SUCCESS == code) {
8,021,889✔
1370
        if (NULL != pStmt->pTableMeta) {
8,021,570✔
1371
          if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE) {
7,990,932✔
1372
            pStmt->stbSyntax = true;
2✔
1373
          } else {
1374
            code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
7,990,930✔
1375
          }
1376
        }
1377
        *pMissCache = (NULL == pStmt->pTableMeta);
8,019,894✔
1378
      }
1379
    }
1380
  } else {
1381
    bool bUsingTable = false;
×
1382
    code = getTableMeta(pCxt, &pStmt->targetTableName, &pStmt->pTableMeta, pMissCache, bUsingTable);
×
1383
    if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
1,674!
1384
      if (TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) {
1,670✔
1385
        pStmt->stbSyntax = true;
51✔
1386
      }
1387
      if (!pStmt->stbSyntax) {
1,670✔
1388
        code = getTargetTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
1,619✔
1389
      }
1390
    }
1391
  }
1392
  return code;
8,020,244✔
1393
}
1394

1395
static int32_t collectUseTable(const SName* pName, SHashObj* pTable) {
11,888✔
1396
  char    fullName[TSDB_TABLE_FNAME_LEN];
1397
  int32_t code = tNameExtractFullName(pName, fullName);
11,888✔
1398
  if (TSDB_CODE_SUCCESS != code) {
11,891!
1399
    return code;
×
1400
  }
1401
  return taosHashPut(pTable, fullName, strlen(fullName), pName, sizeof(SName));
11,891✔
1402
}
1403

1404
static int32_t collectUseDatabase(const SName* pName, SHashObj* pDbs) {
11,843✔
1405
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
11,843✔
1406
  (void)tNameGetFullDbName(pName, dbFName);
11,843✔
1407
  return taosHashPut(pDbs, dbFName, strlen(dbFName), dbFName, sizeof(dbFName));
11,852✔
1408
}
1409

1410
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
8,016,046✔
1411
  if (pCxt->forceUpdate) {
8,016,046✔
1412
    pCxt->missCache = true;
231✔
1413
    return TSDB_CODE_SUCCESS;
231✔
1414
  }
1415
  SNode*  pTagCond = NULL;
8,015,815✔
1416
  int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache, &pTagCond);
8,015,815✔
1417
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
8,005,325!
1418
    code = getTargetTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
8,007,093✔
1419
  }
1420

1421
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
8,016,619!
1422
    if (TSDB_SUPER_TABLE != pStmt->pTableMeta->tableType) {
7,988,098✔
1423
      pCxt->needTableTagVal = (NULL != pTagCond);
7,986,508✔
1424
      pCxt->missCache = (NULL != pTagCond);
7,986,508✔
1425
    } else {
1426
      pStmt->pTagCond = NULL;
1,590✔
1427
      code = nodesCloneNode(pTagCond, &pStmt->pTagCond);
1,590✔
1428
    }
1429
  }
1430
  nodesDestroyNode(pTagCond);
8,015,082✔
1431

1432
  if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
8,014,718!
1433
    code = collectUseDatabase(&pStmt->targetTableName, pStmt->pDbFNameHashObj);
1,670✔
1434
    if (TSDB_CODE_SUCCESS == code) {
1,670!
1435
      code = collectUseTable(&pStmt->targetTableName, pStmt->pTableNameHashObj);
1,670✔
1436
    }
1437
  }
1438
  return code;
8,017,278✔
1439
}
1440

1441
static int32_t preParseUsingTableName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) {
10,680✔
1442
  return insCreateSName(&pStmt->usingTableName, pTbName, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
10,680✔
1443
}
1444

1445
static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* ctbCacheHit) {
10,681✔
1446
  int32_t     code = TSDB_CODE_SUCCESS;
10,681✔
1447
  STableMeta* pStableMeta = NULL;
10,681✔
1448
  STableMeta* pCtableMeta = NULL;
10,681✔
1449
  if (pCxt->forceUpdate) {
10,681!
1450
    pCxt->missCache = true;
×
1451
    return TSDB_CODE_SUCCESS;
×
1452
  }
1453
  char stableFName[TSDB_TABLE_FNAME_LEN];
1454
  code = tNameExtractFullName(&pStmt->usingTableName, stableFName);
10,681✔
1455
  if (TSDB_CODE_SUCCESS != code) {
10,695!
1456
    return code;
×
1457
  }
1458

1459
  char ctableFName[TSDB_TABLE_FNAME_LEN];
1460
  code = tNameExtractFullName(&pStmt->targetTableName, ctableFName);
10,695✔
1461
  if (TSDB_CODE_SUCCESS != code) {
10,696!
1462
    return code;
×
1463
  }
1464

1465
  if (strcmp(stableFName, ctableFName) == 0) {
10,696!
1466
    return TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
×
1467
  }
1468
  if (!pCxt->missCache) {
10,696✔
1469
    char tbFName[TSDB_TABLE_FNAME_LEN];
1470
    code = tNameExtractFullName(&pStmt->usingTableName, tbFName);
10,695✔
1471
    if (TSDB_CODE_SUCCESS != code) {
10,695!
1472
      return code;
×
1473
    }
1474
    STableMeta** ppStableMeta = taosHashGet(pStmt->pSuperTableHashObj, tbFName, strlen(tbFName));
10,695✔
1475
    if (NULL != ppStableMeta) {
10,689✔
1476
      pStableMeta = *ppStableMeta;
48✔
1477
    }
1478
    if (NULL == pStableMeta) {
10,689✔
1479
      bool bUsingTable = true;
10,642✔
1480
      code = getTableMeta(pCxt, &pStmt->usingTableName, &pStableMeta, &pCxt->missCache, bUsingTable);
10,642✔
1481
      if (TSDB_CODE_SUCCESS == code) {
10,647✔
1482
        code = taosHashPut(pStmt->pSuperTableHashObj, tbFName, strlen(tbFName), &pStableMeta, POINTER_BYTES);
10,644✔
1483
      } else {
1484
        taosMemoryFreeClear(pStableMeta);
3!
1485
      }
1486
    }
1487
  }
1488
  if (pCxt->pComCxt->stmtBindVersion > 0) {
10,691✔
1489
    goto _no_ctb_cache;
10,087✔
1490
  }
1491

1492
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
604!
1493
    bool bUsingTable = false;
606✔
1494
    code = getTableMeta(pCxt, &pStmt->targetTableName, &pCtableMeta, &pCxt->missCache, bUsingTable);
606✔
1495
  }
1496

1497
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
606!
1498
    code = (pStableMeta->suid == pCtableMeta->suid) ? TSDB_CODE_SUCCESS : TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
413!
1499
    *ctbCacheHit = true;
413✔
1500
  }
1501
_no_ctb_cache:
193✔
1502
  if (TSDB_CODE_SUCCESS == code) {
10,693✔
1503
    if (*ctbCacheHit) {
10,685✔
1504
      code = cloneTableMeta(pCtableMeta, &pStmt->pTableMeta);
413✔
1505
    } else {
1506
      code = cloneTableMeta(pStableMeta, &pStmt->pTableMeta);
10,272✔
1507
    }
1508
  }
1509
  taosMemoryFree(pCtableMeta);
10,700!
1510
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
10,682✔
1511
    code = getTargetTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache);
10,487✔
1512
  }
1513
  if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
10,692✔
1514
    code = collectUseDatabase(&pStmt->usingTableName, pStmt->pDbFNameHashObj);
10,130✔
1515
    if (TSDB_CODE_SUCCESS == code) {
10,132!
1516
      code = collectUseTable(&pStmt->usingTableName, pStmt->pTableNameHashObj);
10,132✔
1517
    }
1518
  }
1519
  return code;
10,697✔
1520
}
1521

1522
static int32_t parseUsingTableNameImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
10,687✔
1523
  SToken token;
1524
  NEXT_TOKEN(pStmt->pSql, token);
10,687✔
1525
  bool    ctbCacheHit = false;
10,697✔
1526
  int32_t code = preParseUsingTableName(pCxt, pStmt, &token);
10,697✔
1527
  if (TSDB_CODE_SUCCESS == code) {
10,692!
1528
    code = getUsingTableSchema(pCxt, pStmt, &ctbCacheHit);
10,695✔
1529
    if (TSDB_CODE_SUCCESS == code && ctbCacheHit && !pCxt->missCache) {
10,685!
1530
      pStmt->usingTableProcessing = false;
413✔
1531
      return ignoreUsingClauseAndCheckTagValues(pCxt, pStmt);
413✔
1532
    }
1533
  }
1534
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
10,269!
1535
    code = storeChildTableMeta(pCxt, pStmt);
10,084✔
1536
  }
1537
  return code;
10,278✔
1538
}
1539

1540
// input pStmt->pSql:
1541
//   1(care). [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) [table_options]] ...
1542
//   2. VALUES ... | FILE ...
1543
// output pStmt->pSql:
1544
//   1. [(tag1_name, ...)] TAGS (tag1_value, ...) [table_options]] ...
1545
//   2. VALUES ... | FILE ...
1546
static int32_t parseUsingTableName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
8,027,015✔
1547
  SToken  token;
1548
  int32_t index = 0;
8,027,015✔
1549
  NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
8,027,015✔
1550
  if (TK_USING != token.type) {
8,032,570✔
1551
    return getTargetTableSchema(pCxt, pStmt);
8,023,694✔
1552
  }
1553
  pStmt->usingTableProcessing = true;
8,876✔
1554
  pCxt->stmtTbNameFlag |= USING_CLAUSE;
8,876✔
1555
  // pStmt->pSql -> stb_name [(tag1_name, ...)
1556
  pStmt->pSql += index;
8,876✔
1557
  int32_t code = parseDuplicateUsingClause(pCxt, pStmt, &pCxt->usingDuplicateTable);
8,876✔
1558
  if (TSDB_CODE_SUCCESS == code && !pCxt->usingDuplicateTable) {
10,678!
1559
    return parseUsingTableNameImpl(pCxt, pStmt);
10,692✔
1560
  }
1561
  return code;
×
1562
}
1563

1564
static int32_t preParseTargetTableName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) {
8,035,622✔
1565
  int32_t code = insCreateSName(&pStmt->targetTableName, pTbName, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
8,035,622✔
1566
  if (TSDB_CODE_SUCCESS == code) {
8,031,031!
1567
    if (IS_SYS_DBNAME(pStmt->targetTableName.dbname)) {
8,033,544!
1568
      return TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED;
×
1569
    }
1570
  }
1571

1572
  return code;
8,031,031✔
1573
}
1574

1575
// input pStmt->pSql:
1576
//   1(care). [(field1_name, ...)] ...
1577
//   2. [ USING ... ] ...
1578
//   3. VALUES ... | FILE ...
1579
// output pStmt->pSql:
1580
//   1. [ USING ... ] ...
1581
//   2. VALUES ... | FILE ...
1582
static int32_t preParseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
8,036,059✔
1583
  SToken  token;
1584
  int32_t index = 0;
8,036,059✔
1585
  NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
8,036,059✔
1586
  if (TK_NK_LP != token.type) {
8,031,235✔
1587
    return TSDB_CODE_SUCCESS;
7,932,171✔
1588
  }
1589

1590
  // pStmt->pSql -> field1_name, ...)
1591
  pStmt->pSql += index;
99,064✔
1592
  pStmt->pBoundCols = pStmt->pSql;
99,064✔
1593
  return skipParentheses(pCxt, &pStmt->pSql);
99,064✔
1594
}
1595

1596
static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt** pTableCxt) {
8,013,612✔
1597
  if (pCxt->pComCxt->async) {
8,013,612✔
1598
    return insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid),
8,027,253✔
1599
                              pStmt->pTableMeta, &pStmt->pCreateTblReq, pTableCxt, false, false);
8,006,621✔
1600
  }
1601

1602
  char    tbFName[TSDB_TABLE_FNAME_LEN];
1603
  int32_t code = 0;
6,991✔
1604
  if ((pCxt->stmtTbNameFlag & NO_DATA_USING_CLAUSE) == USING_CLAUSE) {
6,991✔
1605
    tstrncpy(pStmt->targetTableName.tname, pStmt->usingTableName.tname, sizeof(pStmt->targetTableName.tname));
27✔
1606
    tstrncpy(pStmt->targetTableName.dbname, pStmt->usingTableName.dbname, sizeof(pStmt->targetTableName.dbname));
27✔
1607
    pStmt->targetTableName.type = TSDB_SUPER_TABLE;
27✔
1608
    pStmt->pTableMeta->tableType = TSDB_SUPER_TABLE;
27✔
1609
  }
1610

1611
  code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
6,991✔
1612

1613
  if (TSDB_CODE_SUCCESS != code) {
11,745!
1614
    return code;
×
1615
  }
1616
  if (pStmt->usingTableProcessing) {
11,745✔
1617
    pStmt->pTableMeta->uid = 0;
10,084✔
1618
  }
1619
  return insGetTableDataCxt(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), pStmt->pTableMeta,
11,745✔
1620
                            &pStmt->pCreateTblReq, pTableCxt, NULL != pCxt->pComCxt->pStmtCb, false);
11,745✔
1621
}
1622

1623
static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt) {
8,028,339✔
1624
  SToken  token;
1625
  int32_t index = 0;
8,028,339✔
1626
  NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
8,028,339✔
1627
  if (TK_NK_LP == token.type) {
8,029,468✔
1628
    pStmt->pSql += index;
49✔
1629
    if (NULL != pStmt->pBoundCols) {
49!
1630
      return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
×
1631
    }
1632
    // pStmt->pSql -> field1_name, ...)
1633
    return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_COLUMNS, pStmt->pTableMeta, &pTableCxt->boundColsInfo);
49✔
1634
  }
1635

1636
  if (NULL != pStmt->pBoundCols) {
8,029,419✔
1637
    return parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_COLUMNS, pStmt->pTableMeta, &pTableCxt->boundColsInfo);
101,118✔
1638
  } else if (pTableCxt->boundColsInfo.hasBoundCols) {
7,928,301!
1639
    insResetBoundColsInfo(&pTableCxt->boundColsInfo);
×
1640
  }
1641

1642
  return TSDB_CODE_SUCCESS;
7,925,536✔
1643
}
1644

1645
int32_t initTableColSubmitData(STableDataCxt* pTableCxt) {
8,027,095✔
1646
  if (0 == (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT)) {
8,027,095✔
1647
    return TSDB_CODE_SUCCESS;
8,019,931✔
1648
  }
1649

1650
  for (int32_t i = 0; i < pTableCxt->boundColsInfo.numOfBound; ++i) {
49,455✔
1651
    SSchema*  pSchema = &pTableCxt->pMeta->schema[pTableCxt->boundColsInfo.pColIndex[i]];
42,293✔
1652
    SColData* pCol = taosArrayReserve(pTableCxt->pData->aCol, 1);
42,293✔
1653
    if (NULL == pCol) {
42,289!
1654
      return terrno;
×
1655
    }
1656
    tColDataInit(pCol, pSchema->colId, pSchema->type, pSchema->flags);
42,289✔
1657
  }
1658

1659
  return TSDB_CODE_SUCCESS;
7,162✔
1660
}
1661

1662
int32_t initTableColSubmitDataWithBoundInfo(STableDataCxt* pTableCxt, SBoundColInfo pBoundColsInfo) {
33✔
1663
  insDestroyBoundColInfo(&(pTableCxt->boundColsInfo));
33✔
1664
  pTableCxt->boundColsInfo = pBoundColsInfo;
33✔
1665
  for (int32_t i = 0; i < pBoundColsInfo.numOfBound; ++i) {
108✔
1666
    SSchema*  pSchema = &pTableCxt->pMeta->schema[pTableCxt->boundColsInfo.pColIndex[i]];
75✔
1667
    SColData* pCol = taosArrayReserve(pTableCxt->pData->aCol, 1);
75✔
1668
    if (NULL == pCol) {
75!
1669
      return terrno;
×
1670
    }
1671
    tColDataInit(pCol, pSchema->colId, pSchema->type, pSchema->flags);
75✔
1672
  }
1673

1674
  return TSDB_CODE_SUCCESS;
33✔
1675
}
1676

1677
// input pStmt->pSql:
1678
//   1. [(tag1_name, ...)] ...
1679
//   2. VALUES ... | FILE ...
1680
// output pStmt->pSql: VALUES ... | FILE ...
1681
static int32_t parseSchemaClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
8,020,204✔
1682
                                       STableDataCxt** pTableCxt) {
1683
  int32_t code = parseUsingClauseBottom(pCxt, pStmt);
8,020,204✔
1684
  if (TSDB_CODE_SUCCESS == code) {
8,018,969!
1685
    code = getTableDataCxt(pCxt, pStmt, pTableCxt);
8,020,745✔
1686
  }
1687
  if (TSDB_CODE_SUCCESS == code) {
8,035,312✔
1688
    code = parseBoundColumnsClause(pCxt, pStmt, *pTableCxt);
8,032,758✔
1689
  }
1690
  if (TSDB_CODE_SUCCESS == code) {
8,031,877✔
1691
    code = initTableColSubmitData(*pTableCxt);
8,030,772✔
1692
  }
1693
  return code;
8,028,054✔
1694
}
1695

1696
// input pStmt->pSql: [(field1_name, ...)] [ USING ... ] VALUES ... | FILE ...
1697
// output pStmt->pSql:
1698
//   1. [(tag1_name, ...)] ...
1699
//   2. VALUES ... | FILE ...
1700
static int32_t parseSchemaClauseTop(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) {
8,035,633✔
1701
  int32_t code = preParseTargetTableName(pCxt, pStmt, pTbName);
8,035,633✔
1702
  if (TSDB_CODE_SUCCESS == code) {
8,034,140!
1703
    // option: [(field1_name, ...)]
1704
    code = preParseBoundColumnsClause(pCxt, pStmt);
8,035,486✔
1705
  }
1706
  if (TSDB_CODE_SUCCESS == code) {
8,033,293✔
1707
    // option: [USING stb_name]
1708
    code = parseUsingTableName(pCxt, pStmt);
8,030,653✔
1709
  }
1710
  return code;
8,024,037✔
1711
}
1712

1713
static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema,
2,147,483,647✔
1714
                                   const SSchemaExt* pExtSchema, int16_t timePrec, SColVal* pVal) {
1715
  switch (pSchema->type) {
2,147,483,647!
1716
    case TSDB_DATA_TYPE_BOOL: {
13,629,198✔
1717
      if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
13,629,198!
1718
        if (IS_TRUE_STR(pToken->z, pToken->n)) {
10,215,853!
1719
          VALUE_SET_TRIVIAL_DATUM(&pVal->value, TRUE_VALUE);
5,665,985✔
1720
        } else if (IS_FALSE_STR(pToken->z, pToken->n)) {
4,549,868!
1721
          VALUE_SET_TRIVIAL_DATUM(&pVal->value, FALSE_VALUE);
4,595,363✔
1722
        } else if (TSDB_CODE_SUCCESS ==
×
1723
                   toDoubleEx(pToken->z, pToken->n, pToken->type, (double*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value))) {
×
1724
          int8_t v = (*(double*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value) == 0 ? FALSE_VALUE : TRUE_VALUE);
×
1725
          valueSetDatum(&pVal->value, TSDB_DATA_TYPE_BOOL, &v, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes);
×
1726
        } else {
1727
          return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
×
1728
        }
1729
      } else if (pToken->type == TK_NK_INTEGER) {
3,413,345✔
1730
        int8_t v = ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? FALSE_VALUE : TRUE_VALUE);
3,365,985✔
1731
        VALUE_SET_TRIVIAL_DATUM(&pVal->value, v);
3,378,168✔
1732
      } else if (pToken->type == TK_NK_FLOAT) {
47,360!
1733
        int8_t v = ((taosStr2Double(pToken->z, NULL) == 0) ? FALSE_VALUE : TRUE_VALUE);
49,000✔
1734
        VALUE_SET_TRIVIAL_DATUM(&pVal->value, v);
49,000✔
1735
      } else if ((pToken->type == TK_NK_HEX || pToken->type == TK_NK_BIN) &&
×
1736
                 (TSDB_CODE_SUCCESS ==
1737
                  toDoubleEx(pToken->z, pToken->n, pToken->type, (double*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value)))) {
×
1738
        int8_t v = *(double*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value) == 0 ? FALSE_VALUE : TRUE_VALUE;
×
1739
        valueSetDatum(&pVal->value, TSDB_DATA_TYPE_BOOL, &v, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes);
×
1740
      } else {
1741
        return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
×
1742
      }
1743
      break;
13,688,516✔
1744
    }
1745
    case TSDB_DATA_TYPE_TINYINT: {
13,605,311✔
1746
      int32_t code = toIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
13,605,311✔
1747
      if (TSDB_CODE_SUCCESS != code) {
13,623,713!
1748
        return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z);
×
1749
      } else if (!IS_VALID_TINYINT(VALUE_GET_TRIVIAL_DATUM(&pVal->value))) {
13,623,713!
1750
        return buildSyntaxErrMsg(&pCxt->msg, "tinyint data overflow", pToken->z);
×
1751
      }
1752
      break;
13,625,261✔
1753
    }
1754
    case TSDB_DATA_TYPE_UTINYINT: {
13,597,264✔
1755
      int32_t code =
1756
          toUIntegerEx(pToken->z, pToken->n, pToken->type, (uint64_t*)&VALUE_GET_TRIVIAL_DATUM(&pVal->value));
13,597,264✔
1757
      if (TSDB_CODE_SUCCESS != code) {
13,592,635!
1758
        return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z);
×
1759
      } else if (VALUE_GET_TRIVIAL_DATUM(&pVal->value) > UINT8_MAX) {
13,593,538!
1760
        return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z);
×
1761
      }
1762
      break;
13,593,538✔
1763
    }
1764
    case TSDB_DATA_TYPE_SMALLINT: {
13,617,184✔
1765
      int32_t code = toIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
13,617,184✔
1766
      if (TSDB_CODE_SUCCESS != code) {
13,640,039!
1767
        return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z);
×
1768
      } else if (!IS_VALID_SMALLINT(VALUE_GET_TRIVIAL_DATUM(&pVal->value))) {
13,640,039!
1769
        return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z);
3,574✔
1770
      }
1771
      break;
13,636,465✔
1772
    }
1773
    case TSDB_DATA_TYPE_USMALLINT: {
13,608,115✔
1774
      int32_t code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
13,608,115✔
1775
      if (TSDB_CODE_SUCCESS != code) {
13,608,271!
1776
        return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z);
×
1777
      } else if (VALUE_GET_TRIVIAL_DATUM(&pVal->value) > UINT16_MAX) {
13,610,410!
1778
        return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z);
×
1779
      }
1780
      break;
13,610,410✔
1781
    }
1782
    case TSDB_DATA_TYPE_INT: {
888,162,407✔
1783
      int32_t code = toIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
888,162,407✔
1784
      if (TSDB_CODE_SUCCESS != code) {
898,740,370!
1785
        return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z);
×
1786
      } else if (!IS_VALID_INT(VALUE_GET_TRIVIAL_DATUM(&pVal->value))) {
898,740,370!
1787
        return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z);
×
1788
      }
1789
      break;
902,090,138✔
1790
    }
1791
    case TSDB_DATA_TYPE_UINT: {
13,612,474✔
1792
      int32_t code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
13,612,474✔
1793
      if (TSDB_CODE_SUCCESS != code) {
13,614,997✔
1794
        return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z);
2,341✔
1795
      } else if (VALUE_GET_TRIVIAL_DATUM(&pVal->value) > UINT32_MAX) {
13,612,656!
1796
        return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z);
×
1797
      }
1798
      break;
13,612,656✔
1799
    }
1800
    case TSDB_DATA_TYPE_BIGINT: {
399,822,505✔
1801
      int32_t code = toIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
399,822,505✔
1802
      if (TSDB_CODE_SUCCESS != code) {
424,372,660!
1803
        return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z);
×
1804
      }
1805
      break;
424,568,556✔
1806
    }
1807
    case TSDB_DATA_TYPE_UBIGINT: {
13,614,121✔
1808
      int32_t code = toUIntegerEx(pToken->z, pToken->n, pToken->type, &VALUE_GET_TRIVIAL_DATUM(&pVal->value));
13,614,121✔
1809
      if (TSDB_CODE_SUCCESS != code) {
13,612,515!
1810
        return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z);
×
1811
      }
1812
      break;
13,613,017✔
1813
    }
1814
    case TSDB_DATA_TYPE_FLOAT: {
859,889,870✔
1815
      double  dv;
1816
      int32_t code = toDoubleEx(pToken->z, pToken->n, pToken->type, &dv);
859,889,870✔
1817
      if (TSDB_CODE_SUCCESS != code) {
860,473,022!
1818
        return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
×
1819
      }
1820
      if (dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
860,473,022!
1821
        return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
×
1822
      }
1823
      float f = dv;
860,482,904✔
1824
      valueSetDatum(&pVal->value, TSDB_DATA_TYPE_FLOAT, &f, sizeof(f));
860,482,904✔
1825
      break;
860,418,710✔
1826
    }
1827
    case TSDB_DATA_TYPE_DOUBLE: {
15,343,520✔
1828
      double  dv;
1829
      int32_t code = toDoubleEx(pToken->z, pToken->n, pToken->type, &dv);
15,343,520✔
1830
      if (TSDB_CODE_SUCCESS != code) {
15,408,459!
1831
        return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
×
1832
      }
1833
      if (isinf(dv) || isnan(dv)) {
15,411,440!
1834
        return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
16✔
1835
      }
1836
      VALUE_SET_TRIVIAL_DATUM(&pVal->value, (*(int64_t*)&dv));
15,411,424✔
1837
      break;
15,411,424✔
1838
    }
1839
    case TSDB_DATA_TYPE_BINARY: {
14,975,499✔
1840
      // Too long values will raise the invalid sql error message
1841
      if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
14,975,499!
1842
        return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
1843
      }
1844
      pVal->value.pData = taosMemoryMalloc(pToken->n);
14,975,499!
1845
      if (NULL == pVal->value.pData) {
15,050,997!
1846
        return terrno;
×
1847
      }
1848
      memcpy(pVal->value.pData, pToken->z, pToken->n);
15,050,997✔
1849
      pVal->value.nData = pToken->n;
15,050,997✔
1850
      break;
15,050,997✔
1851
    }
1852
    case TSDB_DATA_TYPE_VARBINARY: {
718,137✔
1853
      int32_t code = parseVarbinary(pToken, &pVal->value.pData, &pVal->value.nData, pSchema->bytes);
718,137✔
1854
      if (code != TSDB_CODE_SUCCESS) {
718,137!
1855
        return generateSyntaxErrMsg(&pCxt->msg, code, pSchema->name);
×
1856
      }
1857
      break;
718,137✔
1858
    }
1859
    case TSDB_DATA_TYPE_NCHAR: {
13,860,299✔
1860
      // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
1861
      int32_t len = 0;
13,860,299✔
1862
      int64_t realLen = pToken->n << 2;
13,860,299✔
1863
      if (realLen > pSchema->bytes - VARSTR_HEADER_SIZE) realLen = pSchema->bytes - VARSTR_HEADER_SIZE;
13,860,299!
1864
      char* pUcs4 = taosMemoryMalloc(realLen);
13,860,299!
1865
      if (NULL == pUcs4) {
13,836,540!
1866
        return terrno;
2✔
1867
      }
1868
      if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)pUcs4, realLen, &len, pCxt->pComCxt->charsetCxt)) {
13,836,540✔
1869
        taosMemoryFree(pUcs4);
9,490!
1870
        if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
2!
1871
          return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
1872
        }
1873
        char buf[512] = {0};
2✔
1874
        snprintf(buf, tListLen(buf), "%s", strerror(terrno));
2✔
1875
        return buildSyntaxErrMsg(&pCxt->msg, buf, pToken->z);
2✔
1876
      }
1877
      pVal->value.pData = pUcs4;
13,898,633✔
1878
      pVal->value.nData = len;
13,898,633✔
1879
      break;
13,898,633✔
1880
    }
1881
    case TSDB_DATA_TYPE_JSON: {
×
1882
      if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
×
1883
        return buildSyntaxErrMsg(&pCxt->msg, "json string too long than 4095", pToken->z);
×
1884
      }
1885
      pVal->value.pData = taosMemoryMalloc(pToken->n);
×
1886
      if (NULL == pVal->value.pData) {
×
1887
        return terrno;
×
1888
      }
1889
      memcpy(pVal->value.pData, pToken->z, pToken->n);
×
1890
      pVal->value.nData = pToken->n;
×
1891
      break;
×
1892
    }
1893
    case TSDB_DATA_TYPE_GEOMETRY: {
1,643,418✔
1894
      int32_t        code = TSDB_CODE_FAILED;
1,643,418✔
1895
      unsigned char* output = NULL;
1,643,418✔
1896
      size_t         size = 0;
1,643,418✔
1897

1898
      code = parseGeometry(pToken, &output, &size);
1,643,418✔
1899
      if (code != TSDB_CODE_SUCCESS) {
1,644,983!
1900
        code = buildSyntaxErrMsg(&pCxt->msg, getGeosErrMsg(code), pToken->z);
×
1901
      }
1902
      // Too long values will raise the invalid sql error message
1903
      else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) {
1,646,242!
1904
        code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
1905
      } else {
1906
        pVal->value.pData = taosMemoryMalloc(size);
1,646,242!
1907
        if (NULL == pVal->value.pData) {
1,645,759!
1908
          code = terrno;
×
1909
        } else {
1910
          memcpy(pVal->value.pData, output, size);
1,645,759✔
1911
          pVal->value.nData = size;
1,645,759✔
1912
        }
1913
      }
1914

1915
      geosFreeBuffer(output);
1,645,759✔
1916
      if (code != TSDB_CODE_SUCCESS) {
1,646,093!
1917
        return code;
×
1918
      }
1919

1920
      break;
1,646,093✔
1921
    }
1922
    case TSDB_DATA_TYPE_TIMESTAMP: {
686,465,490✔
1923
      if (parseTime(pSql, pToken, timePrec, &VALUE_GET_TRIVIAL_DATUM(&pVal->value), &pCxt->msg,
691,768,576!
1924
                    pCxt->pComCxt->timezone) != TSDB_CODE_SUCCESS) {
686,465,490✔
1925
        return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z);
×
1926
      }
1927
      break;
692,067,070✔
1928
    }
1929
    case TSDB_DATA_TYPE_BLOB:
×
1930
    case TSDB_DATA_TYPE_MEDIUMBLOB: {
1931
      int32_t code = parseBlob(pToken, &pVal->value.pData, &pVal->value.nData, pSchema->bytes);
×
1932
      if (code != TSDB_CODE_SUCCESS) {
×
1933
        return generateSyntaxErrMsg(&pCxt->msg, code, pSchema->name);
×
1934
      }
1935
    } break;
×
1936
    case TSDB_DATA_TYPE_DECIMAL: {
918,490✔
1937
      if (!pExtSchema) {
918,490!
1938
        qError("Decimal type without ext schema info, cannot parse decimal values");
×
1939
        return TSDB_CODE_PAR_INTERNAL_ERROR;
×
1940
      }
1941
      uint8_t precision = 0, scale = 0;
918,490✔
1942
      decimalFromTypeMod(pExtSchema->typeMod, &precision, &scale);
918,490✔
1943
      Decimal128 dec = {0};
918,269✔
1944
      int32_t    code = decimal128FromStr(pToken->z, pToken->n, precision, scale, &dec);
918,269✔
1945
      if (TSDB_CODE_SUCCESS != code) {
919,938!
1946
        return code;
×
1947
      }
1948

1949
      // precision check
1950
      // scale auto fit
1951

1952
      code = decimal128ToDataVal(&dec, &pVal->value);
919,938✔
1953
      if (TSDB_CODE_SUCCESS != code) {
927,069!
1954
        return code;
×
1955
      }
1956
      break;
927,069✔
1957
    }
1958
    case TSDB_DATA_TYPE_DECIMAL64: {
2,352,524✔
1959
      if (!pExtSchema) {
2,352,524!
1960
        qError("Decimal type without ext schema info, cannot parse decimal values");
×
1961
        return TSDB_CODE_PAR_INTERNAL_ERROR;
×
1962
      }
1963
      uint8_t precision = 0, scale = 0;
2,352,524✔
1964
      decimalFromTypeMod(pExtSchema->typeMod, &precision, &scale);
2,352,524✔
1965
      Decimal64 dec = {0};
2,353,351✔
1966
      int32_t   code = decimal64FromStr(pToken->z, pToken->n, precision, scale, &dec);
2,353,351✔
1967
      if (TSDB_CODE_SUCCESS != code) {
2,353,942!
1968
        return code;
×
1969
      }
1970
      code = decimal64ToDataVal(&dec, &pVal->value);
2,353,942✔
1971
      if (TSDB_CODE_SUCCESS != code) {
2,353,390!
1972
        return code;
×
1973
      }
1974
      break;
2,353,390✔
1975
    }
1976
    default:
×
1977

1978
      return TSDB_CODE_FAILED;
×
1979
  }
1980

1981
  pVal->flag = CV_FLAG_VALUE;
2,147,483,647✔
1982
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
1983
}
1984

1985
static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema,
2,147,483,647✔
1986
                               const SSchemaExt* pExtSchema, int16_t timePrec, SColVal* pVal) {
1987
  int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pSchema->type);
2,147,483,647✔
1988
  if (TSDB_CODE_SUCCESS == code && isNullValue(pSchema->type, pToken)) {
2,147,483,647✔
1989
    if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
8,433,629!
1990
      return buildSyntaxErrMsg(&pCxt->msg, "Primary timestamp column should not be null", pToken->z);
×
1991
    }
1992

1993
    pVal->flag = CV_FLAG_NULL;
8,433,629✔
1994
    return TSDB_CODE_SUCCESS;
8,433,629✔
1995
  }
1996

1997
  if (TSDB_CODE_SUCCESS == code) {
2,147,483,647✔
1998
    if (pToken->n == 0 && IS_NUMERIC_TYPE(pSchema->type)) {
2,147,483,647!
1999
      return buildSyntaxErrMsg(&pCxt->msg, "invalid numeric data", pToken->z);
×
2000
    }
2001
    code = parseValueTokenImpl(pCxt, pSql, pToken, pSchema, pExtSchema, timePrec, pVal);
2,147,483,647✔
2002
  }
2003

2004
  return code;
2,147,483,647✔
2005
}
2006

2007
static void clearColValArray(SArray* pCols) {
688,997,021✔
2008
  int32_t num = taosArrayGetSize(pCols);
688,997,021✔
2009
  for (int32_t i = 0; i < num; ++i) {
2,147,483,647✔
2010
    SColVal* pCol = taosArrayGet(pCols, i);
2,147,483,647✔
2011
    if (IS_VAR_DATA_TYPE(pCol->value.type) || pCol->value.type == TSDB_DATA_TYPE_DECIMAL) {
2,147,483,647!
2012
      taosMemoryFreeClear(pCol->value.pData);
30,582,612!
2013
    }
2014
  }
2015
}
688,421,179✔
2016

2017
typedef struct SStbRowsDataContext {
2018
  SName stbName;
2019

2020
  STableMeta*   pStbMeta;
2021
  SNode*        pTagCond;
2022
  SBoundColInfo boundColsInfo;
2023

2024
  // the following fields are for each stb row
2025
  SArray*        aTagVals;
2026
  SArray*        aColVals;
2027
  SArray*        aTagNames;
2028
  SName          ctbName;
2029
  STag*          pTag;
2030
  STableMeta*    pCtbMeta;
2031
  SVCreateTbReq* pCreateCtbReq;
2032
  bool           hasTimestampTag;
2033
  bool           isJsonTag;
2034
} SStbRowsDataContext;
2035

2036
typedef union SRowsDataContext {
2037
  STableDataCxt*       pTableDataCxt;
2038
  SStbRowsDataContext* pStbRowsCxt;
2039
} SRowsDataContext;
2040

2041
int32_t parseTbnameToken(SMsgBuf* pMsgBuf, char* tname, SToken* pToken, bool* pFoundCtbName) {
4✔
2042
  *pFoundCtbName = false;
4!
2043

2044
  if (isNullValue(TSDB_DATA_TYPE_BINARY, pToken)) {
4!
2045
    return buildInvalidOperationMsg(pMsgBuf, "tbname can not be null value");
×
2046
  }
2047

2048
  if (pToken->n > 0) {
4!
2049
    if (pToken->n <= TSDB_TABLE_NAME_LEN - 1) {
4!
2050
      for (int i = 0; i < pToken->n; ++i) {
16✔
2051
        if (pToken->z[i] == '.') {
12!
2052
          return buildInvalidOperationMsg(pMsgBuf, "tbname can not contain '.'");
×
2053
        } else {
2054
          tname[i] = pToken->z[i];
12✔
2055
        }
2056
      }
2057
      tname[pToken->n] = '\0';
4✔
2058
      *pFoundCtbName = true;
4✔
2059
    } else {
2060
      return buildInvalidOperationMsg(pMsgBuf, "tbname is too long");
×
2061
    }
2062
  } else {
2063
    return buildInvalidOperationMsg(pMsgBuf, "tbname can not be empty");
×
2064
  }
2065
  return TSDB_CODE_SUCCESS;
4✔
2066
}
2067

2068
static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
37✔
2069
                                          SStbRowsDataContext* pStbRowsCxt, bool ctbFirst, const SToken* tagTokens,
2070
                                          SSchema* const* tagSchemas, int numOfTagTokens) {
2071
  int32_t code = TSDB_CODE_SUCCESS;
37✔
2072
  uint8_t precision = pStmt->pTableMeta->tableInfo.precision;
37✔
2073

2074
  if (code == TSDB_CODE_SUCCESS && ctbFirst) {
37!
2075
    for (int32_t i = 0; code == TSDB_CODE_SUCCESS && i < numOfTagTokens; ++i) {
41!
2076
      SToken*  pTagToken = (SToken*)(tagTokens + i);
4✔
2077
      SSchema* pTagSchema = tagSchemas[i];
4✔
2078
      code = checkAndTrimValue(pTagToken, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
4✔
2079
      if (code == TSDB_CODE_SUCCESS && TK_NK_VARIABLE == pTagToken->type) {
4!
2080
        code = buildInvalidOperationMsg(&pCxt->msg, "not expected tag");
×
2081
      }
2082

2083
      if (code == TSDB_CODE_SUCCESS) {
4!
2084
        code = parseTagValue(&pCxt->msg, NULL, precision, pTagSchema, pTagToken, pStbRowsCxt->aTagNames,
4✔
2085
                             pStbRowsCxt->aTagVals, &pStbRowsCxt->pTag, pCxt->pComCxt->timezone,
4✔
2086
                             pCxt->pComCxt->charsetCxt);
4✔
2087
      }
2088
    }
2089
    if (code == TSDB_CODE_SUCCESS && !pStbRowsCxt->isJsonTag) {
37!
2090
      code = tTagNew(pStbRowsCxt->aTagVals, 1, false, &pStbRowsCxt->pTag);
37✔
2091
    }
2092
  }
2093

2094
  if (code == TSDB_CODE_SUCCESS && pStbRowsCxt->pTagCond) {
37!
2095
    code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond);
×
2096
  }
2097
  return code;
37✔
2098
}
2099

2100
static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
51✔
2101
                                 SStbRowsDataContext* pStbRowsCxt, SToken* pToken, const SBoundColInfo* pCols,
2102
                                 const SSchema* pSchemas, const SSchemaExt* pExtSchemas, SToken* tagTokens,
2103
                                 SSchema** tagSchemas, int* pNumOfTagTokens, bool* bFoundTbName, bool* setCtbName,
2104
                                 SBoundColInfo* ctbCols) {
2105
  int32_t code = TSDB_CODE_SUCCESS;
51✔
2106
  SArray* pTagNames = pStbRowsCxt->aTagNames;
51✔
2107
  SArray* pTagVals = pStbRowsCxt->aTagVals;
51✔
2108
  bool    canParseTagsAfter = !pStbRowsCxt->pTagCond && !pStbRowsCxt->hasTimestampTag;
51!
2109
  int32_t numOfCols = getNumOfColumns(pStbRowsCxt->pStbMeta);
51✔
2110
  int32_t numOfTags = getNumOfTags(pStbRowsCxt->pStbMeta);
51✔
2111
  int32_t tbnameIdx = getTbnameSchemaIndex(pStbRowsCxt->pStbMeta);
51✔
2112
  uint8_t precision = getTableInfo(pStbRowsCxt->pStbMeta).precision;
51✔
2113
  int     tag_index = 0;
51✔
2114
  int     col_index = 0;
51✔
2115
  for (int i = 0; i < pCols->numOfBound && (code) == TSDB_CODE_SUCCESS; ++i) {
306✔
2116
    const char* pTmpSql = *ppSql;
256✔
2117
    bool        ignoreComma = false;
256✔
2118
    NEXT_TOKEN_WITH_PREV_EXT(*ppSql, *pToken, &ignoreComma);
256✔
2119

2120
    if (ignoreComma) {
256!
2121
      code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pTmpSql);
×
2122
      break;
×
2123
    }
2124

2125
    if (TK_NK_RP == pToken->type) {
256!
2126
      code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
×
2127
      break;
×
2128
    }
2129

2130
    // cols tags tbname
2131
    if (TK_NK_QUESTION == pToken->type) {
256✔
2132
      if (pCxt->pComCxt->stmtBindVersion != 2) {
233✔
2133
        return buildInvalidOperationMsg(&pCxt->msg,
1✔
2134
                                        "insert into stb(...tbname...)values(...,?,...) only support in stmt2");
2135
      }
2136
      if (pCols->pColIndex[i] == tbnameIdx) {
232✔
2137
        *bFoundTbName = true;
45✔
2138
        char* tbName = NULL;
45✔
2139
        if ((*pCxt->pComCxt->pStmtCb->getTbNameFn)(pCxt->pComCxt->pStmtCb->pStmt, &tbName) == TSDB_CODE_SUCCESS) {
45✔
2140
          tstrncpy(pStbRowsCxt->ctbName.tname, tbName, sizeof(pStbRowsCxt->ctbName.tname));
33✔
2141
          tstrncpy(pStmt->usingTableName.tname, pStmt->targetTableName.tname, sizeof(pStmt->usingTableName.tname));
33✔
2142
          tstrncpy(pStmt->targetTableName.tname, tbName, sizeof(pStmt->targetTableName.tname));
33✔
2143
          tstrncpy(pStmt->usingTableName.dbname, pStmt->targetTableName.dbname, sizeof(pStmt->usingTableName.dbname));
33✔
2144
          pStmt->usingTableName.type = 1;
33✔
2145
          pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE;  // set the table type to child table for parse cache
33✔
2146
          *setCtbName = true;
33✔
2147
        }
2148
      } else if (pCols->pColIndex[i] < numOfCols) {
187✔
2149
        // bind column
2150
        if (ctbCols->pColIndex == NULL) {
107✔
2151
          ctbCols->pColIndex = taosMemoryCalloc(numOfCols, sizeof(int16_t));
45!
2152
          if (NULL == ctbCols->pColIndex) {
45!
2153
            return terrno;
×
2154
          }
2155
        }
2156
        ctbCols->pColIndex[col_index++] = pCols->pColIndex[i];
107✔
2157
        ctbCols->numOfBound++;
107✔
2158
        ctbCols->numOfCols++;
107✔
2159

2160
      } else if (pCols->pColIndex[i] < tbnameIdx) {
80!
2161
        if (pCxt->tags.pColIndex == NULL) {
80✔
2162
          pCxt->tags.pColIndex = taosMemoryCalloc(numOfTags, sizeof(int16_t));
30!
2163
          if (NULL == pCxt->tags.pColIndex) {
30!
2164
            return terrno;
×
2165
          }
2166
        }
2167
        if (!(tag_index < numOfTags)) {
80!
2168
          return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfTags");
×
2169
        }
2170
        pStmt->usingTableProcessing = true;
80✔
2171
        pCxt->tags.pColIndex[tag_index++] = pCols->pColIndex[i] - numOfCols;
80✔
2172
        pCxt->tags.mixTagsCols = true;
80✔
2173
        pCxt->tags.numOfBound++;
80✔
2174
        pCxt->tags.numOfCols++;
80✔
2175
      } else {
2176
        return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfBound");
×
2177
      }
2178
    } else {
2179
      if (pCxt->pComCxt->stmtBindVersion == 2 && pCxt->pComCxt->pStmtCb != NULL) {
23!
2180
        if (pCols->pColIndex[i] < numOfCols) {
11✔
2181
          const SSchema*    pSchema = &pSchemas[pCols->pColIndex[i]];
5✔
2182
          const SSchemaExt* pExtSchema = pExtSchemas + pCols->pColIndex[i];
5✔
2183
          SColVal*          pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]);
5✔
2184
          code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)pSchema, pExtSchema, precision, pVal);
5✔
2185
          if (TK_NK_VARIABLE == pToken->type) {
5!
2186
            code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
×
2187
          }
2188

2189
          if (ctbCols->pColIndex == NULL) {
5!
2190
            ctbCols->pColIndex = taosMemoryCalloc(numOfCols, sizeof(int16_t));
×
2191
            if (NULL == ctbCols->pColIndex) {
×
2192
              return terrno;
×
2193
            }
2194
          }
2195
          ctbCols->pColIndex[col_index++] = pCols->pColIndex[i];
5✔
2196
          ctbCols->numOfBound++;
5✔
2197
          ctbCols->numOfCols++;
5✔
2198

2199
          if (code == TSDB_CODE_SUCCESS && pCxt->pParsedValues == NULL) {
5!
2200
            pCxt->pParsedValues = taosArrayInit(16, sizeof(SColVal));
3✔
2201
            if (pCxt->pParsedValues == NULL) {
3!
2202
              return terrno;
×
2203
            }
2204
          }
2205

2206
          if (code == TSDB_CODE_SUCCESS) {
5!
2207
            SColVal clonedVal = *pVal;
5✔
2208
            if (COL_VAL_IS_VALUE(&clonedVal) && IS_VAR_DATA_TYPE(clonedVal.value.type)) {
5!
2209
              clonedVal.value.pData = taosMemoryMalloc(clonedVal.value.nData);
1!
2210
              if (NULL == clonedVal.value.pData) {
1!
2211
                code = terrno;
×
2212
                break;
×
2213
              }
2214
              memcpy(clonedVal.value.pData, pVal->value.pData, clonedVal.value.nData);
1✔
2215
            }
2216

2217
            clonedVal.cid = pSchema->colId;
5✔
2218
            if (taosArrayPush(pCxt->pParsedValues, &clonedVal) == NULL) {
10!
2219
              code = terrno;
×
2220
            }
2221
          }
2222
        } else if (pCols->pColIndex[i] < tbnameIdx) {
6!
2223
          if (pCxt->tags.parseredTags == NULL) {
6✔
2224
            pCxt->tags.parseredTags = taosMemoryCalloc(1, sizeof(STagsInfo));
4!
2225
            if (pCxt->tags.parseredTags == NULL) {
4!
2226
              code = terrno;
×
2227
            } else {
2228
              pCxt->tags.parseredTags->STagNames = taosArrayInit(numOfTags, sizeof(STagVal));
4✔
2229
              pCxt->tags.parseredTags->pTagVals = taosArrayInit(numOfTags, sizeof(STagVal));
4✔
2230
              pCxt->tags.parseredTags->pTagIndex = taosMemoryCalloc(numOfTags, sizeof(uint8_t));
4!
2231
              pCxt->tags.parseredTags->numOfTags = 0;
4✔
2232

2233
              if (pCxt->tags.parseredTags->STagNames == NULL || pCxt->tags.parseredTags->pTagVals == NULL ||
4!
2234
                  pCxt->tags.parseredTags->pTagIndex == NULL) {
4!
2235
                return terrno;
×
2236
              }
2237
            }
2238
          }
2239

2240
          const SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
6✔
2241
          // if (canParseTagsAfter) {
2242
          //   tagTokens[(*pNumOfTagTokens)] = *pToken;
2243
          //   tagSchemas[(*pNumOfTagTokens)] = (SSchema*)pTagSchema;
2244
          //   ++(*pNumOfTagTokens);
2245
          // } else {
2246
          code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
6✔
2247
          if (code == TSDB_CODE_SUCCESS && TK_NK_VARIABLE == pToken->type) {
6!
2248
            code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
×
2249
          }
2250
            if (code == TSDB_CODE_SUCCESS) {
6✔
2251
              code = parseTagValue(&pCxt->msg, ppSql, precision, (SSchema*)pTagSchema, pToken,
5✔
2252
                                   pCxt->tags.parseredTags->STagNames, pCxt->tags.parseredTags->pTagVals,
5✔
2253
                                   &pStbRowsCxt->pTag, pCxt->pComCxt->timezone, pCxt->pComCxt->charsetCxt);
5✔
2254
            }
2255

2256
            pCxt->tags.parseredTags->pTagIndex[pCxt->tags.parseredTags->numOfTags++] = pCols->pColIndex[i] - numOfCols;
6✔
2257

2258
            if (pCxt->tags.pColIndex == NULL) {
6✔
2259
              pCxt->tags.pColIndex = taosMemoryCalloc(numOfTags, sizeof(int16_t));
4!
2260
              if (NULL == pCxt->tags.pColIndex) {
4!
2261
                return terrno;
×
2262
              }
2263
            }
2264
            if (!(tag_index < numOfTags)) {
6!
2265
              return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfTags");
×
2266
            }
2267
            pStmt->usingTableProcessing = true;
6✔
2268
            pCxt->tags.pColIndex[tag_index++] = pCols->pColIndex[i] - numOfCols;
6✔
2269
            pCxt->tags.mixTagsCols = true;
6✔
2270
            pCxt->tags.numOfBound++;
6✔
2271
            pCxt->tags.numOfCols++;
6✔
2272
            // }
2273
        } else if (pCols->pColIndex[i] == tbnameIdx) {
×
2274
          return buildInvalidOperationMsg(&pCxt->msg, "STMT tbname in bound cols should not be a fixed value");
×
2275
        }
2276
      } else {
2277
        if (pCols->pColIndex[i] < numOfCols) {
12✔
2278
          const SSchema*    pSchema = &pSchemas[pCols->pColIndex[i]];
4✔
2279
          const SSchemaExt* pExtSchema = pExtSchemas + pCols->pColIndex[i];
4✔
2280
          SColVal*          pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]);
4✔
2281
          code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)pSchema, pExtSchema, precision, pVal);
4✔
2282
          if (TK_NK_VARIABLE == pToken->type) {
4!
2283
            code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
×
2284
          }
2285
        } else if (pCols->pColIndex[i] < tbnameIdx) {
8✔
2286
          const SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
4✔
2287
          if (canParseTagsAfter) {
4!
2288
            tagTokens[(*pNumOfTagTokens)] = *pToken;
4✔
2289
            tagSchemas[(*pNumOfTagTokens)] = (SSchema*)pTagSchema;
4✔
2290
            ++(*pNumOfTagTokens);
4✔
2291
          } else {
2292
            code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
×
2293
            if (code == TSDB_CODE_SUCCESS && TK_NK_VARIABLE == pToken->type) {
×
2294
              code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
×
2295
            }
2296
            if (code == TSDB_CODE_SUCCESS) {
×
2297
              code = parseTagValue(&pCxt->msg, ppSql, precision, (SSchema*)pTagSchema, pToken, pTagNames, pTagVals,
×
2298
                                   &pStbRowsCxt->pTag, pCxt->pComCxt->timezone, pCxt->pComCxt->charsetCxt);
×
2299
            }
2300
          }
2301
        } else if (pCols->pColIndex[i] == tbnameIdx) {
4!
2302
          code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, TSDB_DATA_TYPE_BINARY);
4✔
2303
          if (TK_NK_VARIABLE == pToken->type) {
4!
2304
            code = buildInvalidOperationMsg(&pCxt->msg, "not expected tbname");
×
2305
          }
2306

2307
          if (code == TSDB_CODE_SUCCESS) {
4!
2308
            code = parseTbnameToken(&pCxt->msg, pStbRowsCxt->ctbName.tname, pToken, bFoundTbName);
4✔
2309
          }
2310
        }
2311
      }
2312
    }
2313

2314
    if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) {
255✔
2315
      NEXT_VALID_TOKEN(*ppSql, *pToken);
205!
2316
      if (TK_NK_COMMA != pToken->type) {
205!
2317
        code = buildSyntaxErrMsg(&pCxt->msg, ", expected", pToken->z);
×
2318
      }
2319
    }
2320
  }
2321

2322
  return code;
50✔
2323
}
2324

2325
static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
51✔
2326
                               SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken, bool* pCtbFirst,
2327
                               bool* setCtbName, SBoundColInfo* ctbCols) {
2328
  SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
51✔
2329
  SSchema*       pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
51✔
2330
  SSchemaExt*    pExtSchemas = getTableColumnExtSchema(pStbRowsCxt->pStbMeta);
51✔
2331

2332
  bool        bFoundTbName = false;
51✔
2333
  const char* pOrigSql = *ppSql;
51✔
2334

2335
  int32_t  code = TSDB_CODE_SUCCESS;
51✔
2336
  SToken   tagTokens[TSDB_MAX_TAGS] = {0};
51✔
2337
  SSchema* tagSchemas[TSDB_MAX_TAGS] = {0};
51✔
2338
  int      numOfTagTokens = 0;
51✔
2339

2340
  code = doGetStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pToken, pCols, pSchemas, pExtSchemas, tagTokens, tagSchemas,
51✔
2341
                           &numOfTagTokens, &bFoundTbName, setCtbName, ctbCols);
2342

2343
  if (code != TSDB_CODE_SUCCESS) {
51✔
2344
    return code;
2✔
2345
  }
2346

2347
  if (!bFoundTbName) {
49!
2348
    code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql);
×
2349
  }
2350

2351
  bool ctbFirst = true;
49✔
2352
  char ctbFName[TSDB_TABLE_FNAME_LEN];
2353
  if (code == TSDB_CODE_SUCCESS) {
49!
2354
    code = tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
49✔
2355
  }
2356
  if (TSDB_CODE_SUCCESS == code) {
49✔
2357
    STableMeta** pCtbMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName));
37✔
2358
    ctbFirst = (pCtbMeta == NULL);
37✔
2359
    if (!ctbFirst) {
37!
2360
      pStbRowsCxt->pCtbMeta->uid = (*pCtbMeta)->uid;
×
2361
      pStbRowsCxt->pCtbMeta->vgId = (*pCtbMeta)->vgId;
×
2362
    }
2363
    *pCtbFirst = ctbFirst;
37✔
2364
  }
2365

2366
  if (code == TSDB_CODE_SUCCESS) {
49✔
2367
    code = processCtbTagsAfterCtbName(pCxt, pStmt, pStbRowsCxt, ctbFirst, tagTokens, tagSchemas, numOfTagTokens);
37✔
2368
  }
2369

2370
  if (code == TSDB_CODE_SUCCESS) {
49✔
2371
    *pGotRow = true;
37✔
2372
  }
2373
  return code;
49✔
2374
}
2375

2376
static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
37✔
2377
                                                SStbRowsDataContext* pStbRowsCxt) {
2378
  int32_t code = TSDB_CODE_SUCCESS;
37✔
2379

2380
  pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
37!
2381
  if (pStbRowsCxt->pCreateCtbReq == NULL) {
37!
2382
    code = terrno;
×
2383
  }
2384
  if (code == TSDB_CODE_SUCCESS) {
37!
2385
    code = insBuildCreateTbReq(pStbRowsCxt->pCreateCtbReq, pStbRowsCxt->ctbName.tname, pStbRowsCxt->pTag,
37✔
2386
                               pStbRowsCxt->pStbMeta->uid, pStbRowsCxt->stbName.tname, pStbRowsCxt->aTagNames,
37✔
2387
                               getNumOfTags(pStbRowsCxt->pStbMeta), TSDB_DEFAULT_TABLE_TTL);
37✔
2388
    pStbRowsCxt->pTag = NULL;
37✔
2389
  }
2390

2391
  if (code == TSDB_CODE_SUCCESS) {
37!
2392
    char ctbFName[TSDB_TABLE_FNAME_LEN];
2393
    code = tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
37✔
2394
    SVgroupInfo      vg;
2395
    SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter,
37✔
2396
                             .requestId = pCxt->pComCxt->requestId,
37✔
2397
                             .requestObjRefId = pCxt->pComCxt->requestRid,
37✔
2398
                             .mgmtEps = pCxt->pComCxt->mgmtEpSet};
37✔
2399
    if (TSDB_CODE_SUCCESS == code) {
37!
2400
      code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStbRowsCxt->ctbName, &vg);
37✔
2401
    }
2402
    if (code == TSDB_CODE_SUCCESS) {
37!
2403
      code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg));
37✔
2404
    }
2405
    STableMeta* pBackup = NULL;
37✔
2406
    if (TSDB_CODE_SUCCESS == code) {
37!
2407
      pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1;
37✔
2408
      pStbRowsCxt->pCtbMeta->vgId = vg.vgId;
37✔
2409

2410
      code = cloneTableMeta(pStbRowsCxt->pCtbMeta, &pBackup);
37✔
2411
    }
2412
    if (TSDB_CODE_SUCCESS == code) {
37!
2413
      code = taosHashPut(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName), &pBackup, POINTER_BYTES);
37✔
2414
    }
2415
    if (TSDB_CODE_SUCCESS == code) {
37!
2416
      code = collectUseTable(&pStbRowsCxt->ctbName, pStmt->pTableNameHashObj);
37✔
2417
    }
2418
  }
2419
  return code;
37✔
2420
}
2421

2422
static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
89✔
2423
  if (pStbRowsCxt == NULL) return;
89!
2424

2425
  taosArrayClear(pStbRowsCxt->aTagNames);
89✔
2426
  for (int i = 0; i < taosArrayGetSize(pStbRowsCxt->aTagVals); ++i) {
93✔
2427
    STagVal* p = (STagVal*)taosArrayGet(pStbRowsCxt->aTagVals, i);
4✔
2428
    if (IS_VAR_DATA_TYPE(p->type)) {
4!
2429
      taosMemoryFreeClear(p->pData);
×
2430
    }
2431
  }
2432
  taosArrayClear(pStbRowsCxt->aTagVals);
89✔
2433

2434
  clearColValArray(pStbRowsCxt->aColVals);
89✔
2435

2436
  tTagFree(pStbRowsCxt->pTag);
89✔
2437
  pStbRowsCxt->pTag = NULL;
89✔
2438
  tdDestroySVCreateTbReq(pStbRowsCxt->pCreateCtbReq);
89!
2439
  taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq);
89!
2440

2441
  if (pStbRowsCxt->boundColsInfo.parseredTags != NULL) {
89!
2442
    if (pStbRowsCxt->boundColsInfo.parseredTags->STagNames != NULL) {
×
2443
      taosArrayDestroy(pStbRowsCxt->boundColsInfo.parseredTags->STagNames);
×
2444
    }
2445
    if (pStbRowsCxt->boundColsInfo.parseredTags->pTagVals != NULL) {
×
2446
      taosArrayDestroy(pStbRowsCxt->boundColsInfo.parseredTags->pTagVals);
×
2447
    }
2448
    taosMemoryFreeClear(pStbRowsCxt->boundColsInfo.parseredTags);
×
2449
  }
2450
}
2451

2452
static void clearInsertParseContext(SInsertParseContext* pCxt) {
7,919,049✔
2453
  if (pCxt == NULL) return;
7,919,049!
2454

2455
  if (pCxt->pParsedValues != NULL) {
7,919,049✔
2456
    taosArrayDestroy(pCxt->pParsedValues);
8✔
2457
    pCxt->pParsedValues = NULL;
8✔
2458
  }
2459
}
2460

2461
static int32_t parseStbBoundInfo(SVnodeModifyOpStmt* pStmt, SStbRowsDataContext* pStbRowsCxt,
13✔
2462
                                 STableDataCxt** ppTableDataCxt) {
2463
  char    tbFName[TSDB_TABLE_FNAME_LEN];
2464
  int32_t code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
13✔
2465
  if (TSDB_CODE_SUCCESS != code) {
13!
2466
    return code;
×
2467
  }
2468
  if (pStmt->usingTableProcessing) {
13✔
2469
    pStmt->pTableMeta->uid = 0;
10✔
2470
  }
2471

2472
  code = insGetTableDataCxt(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), pStmt->pTableMeta,
13✔
2473
                            &pStmt->pCreateTblReq, ppTableDataCxt, false, true);
2474
  if (code != TSDB_CODE_SUCCESS) {
13!
2475
    return code;
×
2476
  }
2477

2478
  insDestroyBoundColInfo(&((*ppTableDataCxt)->boundColsInfo));
13✔
2479
  (*ppTableDataCxt)->boundColsInfo = pStbRowsCxt->boundColsInfo;
13✔
2480

2481
  (*ppTableDataCxt)->boundColsInfo.pColIndex = taosMemoryCalloc(pStbRowsCxt->boundColsInfo.numOfBound, sizeof(int16_t));
13!
2482
  if (NULL == (*ppTableDataCxt)->boundColsInfo.pColIndex) {
13!
2483
    return terrno;
×
2484
  }
2485
  (void)memcpy((*ppTableDataCxt)->boundColsInfo.pColIndex, pStbRowsCxt->boundColsInfo.pColIndex,
13✔
2486
               sizeof(int16_t) * pStmt->pStbRowsCxt->boundColsInfo.numOfBound);
13✔
2487
  return TSDB_CODE_SUCCESS;
13✔
2488
}
2489

2490
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
51✔
2491
                              SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken,
2492
                              STableDataCxt** ppTableDataCxt) {
2493
  bool          bFirstTable = false;
51✔
2494
  bool          setCtbName = false;
51✔
2495
  SBoundColInfo ctbCols = {0};
51✔
2496
  int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken, &bFirstTable, &setCtbName, &ctbCols);
51✔
2497

2498
  if (!setCtbName && pCxt->pComCxt->stmtBindVersion == 2) {
51✔
2499
    taosMemoryFreeClear(ctbCols.pColIndex);
13!
2500
    return parseStbBoundInfo(pStmt, pStbRowsCxt, ppTableDataCxt);
13✔
2501
  }
2502

2503
  if (code != TSDB_CODE_SUCCESS || !*pGotRow) {
38!
2504
    return code;
1✔
2505
  }
2506

2507
  if (code == TSDB_CODE_SUCCESS && bFirstTable) {
37!
2508
    code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt);
37✔
2509
  }
2510
  if (code == TSDB_CODE_SUCCESS) {
37!
2511
    if (pCxt->pComCxt->stmtBindVersion == 2) {
37✔
2512
      char ctbFName[TSDB_TABLE_FNAME_LEN];
2513
      code = tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
33✔
2514
      if (code != TSDB_CODE_SUCCESS) {
33!
2515
        return code;
×
2516
      }
2517
      code = insGetTableDataCxt(pStmt->pTableBlockHashObj, ctbFName, strlen(ctbFName), pStbRowsCxt->pCtbMeta,
33✔
2518
                                &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, true, true);
2519
    } else {
2520
      code =
2521
          insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
4✔
2522
                             pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true);
2523
    }
2524
  }
2525
  if (code == TSDB_CODE_SUCCESS) {
37!
2526
    if (pCxt->pComCxt->stmtBindVersion == 2) {
37✔
2527
      int32_t tbnameIdx = getTbnameSchemaIndex(pStbRowsCxt->pStbMeta);
33✔
2528
      code = initTableColSubmitDataWithBoundInfo(*ppTableDataCxt, ctbCols);
33✔
2529
    } else {
2530
      code = initTableColSubmitData(*ppTableDataCxt);
4✔
2531
    }
2532
  }
2533
  if (code == TSDB_CODE_SUCCESS && pCxt->pComCxt->stmtBindVersion == 0) {
37!
2534
    SRow**            pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1);
4✔
2535
    SRowBuildScanInfo sinfo = {0};
4✔
2536
    code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow, &sinfo);
4✔
2537
    if (TSDB_CODE_SUCCESS == code) {
4!
2538
      SRowKey key;
2539
      tRowGetKey(*pRow, &key);
8!
2540
      insCheckTableDataOrder(*ppTableDataCxt, &key);
4✔
2541
    }
2542
  }
2543

2544
  if (code == TSDB_CODE_SUCCESS) {
37!
2545
    *pGotRow = true;
37✔
2546
  }
2547

2548
  clearStbRowsDataContext(pStbRowsCxt);
37✔
2549

2550
  return code;
37✔
2551
}
2552

2553
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
690,516,124✔
2554
                       SToken* pToken) {
2555
  SBoundColInfo*    pCols = &pTableCxt->boundColsInfo;
690,516,124✔
2556
  SSchema*          pSchemas = getTableColumnSchema(pTableCxt->pMeta);
690,516,124✔
2557
  const SSchemaExt* pExtSchemas = getTableColumnExtSchema(pTableCxt->pMeta);
688,910,266✔
2558

2559
  int32_t code = TSDB_CODE_SUCCESS;
688,018,603✔
2560
  // 1. set the parsed value from sql string
2561
  for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) {
2,147,483,647!
2562
    const char* pOrigSql = *pSql;
2,147,483,647✔
2563
    bool        ignoreComma = false;
2,147,483,647✔
2564
    NEXT_TOKEN_WITH_PREV_EXT(*pSql, *pToken, &ignoreComma);
2,147,483,647✔
2565
    if (ignoreComma) {
2,147,483,647!
2566
      code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pOrigSql);
×
2567
      break;
×
2568
    }
2569

2570
    SSchema*          pSchema = &pSchemas[pCols->pColIndex[i]];
2,147,483,647✔
2571
    const SSchemaExt* pExtSchema = pExtSchemas + pCols->pColIndex[i];
2,147,483,647✔
2572
    SColVal*          pVal = taosArrayGet(pTableCxt->pValues, pCols->pColIndex[i]);
2,147,483,647✔
2573

2574
    if (pToken->type == TK_NK_QUESTION) {
2,147,483,647✔
2575
      if (pCxt->pComCxt->stmtBindVersion == 0) {
9,225,670!
2576
        code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z);
×
2577
        break;
×
2578
      }
2579
    } else {
2580
      if (TK_NK_RP == pToken->type) {
2,147,483,647!
2581
        code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
×
2582
        break;
×
2583
      }
2584

2585
      if (TSDB_CODE_SUCCESS == code) {
2,147,483,647!
2586
        code = parseValueToken(pCxt, pSql, pToken, pSchema, pExtSchema, getTableInfo(pTableCxt->pMeta).precision, pVal);
2,147,483,647✔
2587

2588
        if (TSDB_CODE_SUCCESS == code && NULL != pCxt->pComCxt->pStmtCb) {
2,147,483,647!
2589
          if (NULL == pCxt->pParsedValues) {
8✔
2590
            pCxt->pParsedValues = taosArrayInit(16, sizeof(SColVal));
5✔
2591
            if (NULL == pCxt->pParsedValues) {
5!
2592
              code = terrno;
×
2593
              break;
×
2594
            }
2595
          }
2596

2597
          SColVal clonedVal = *pVal;
8✔
2598
          if (COL_VAL_IS_VALUE(&clonedVal) && IS_VAR_DATA_TYPE(clonedVal.value.type)) {
8!
2599
            clonedVal.value.pData = taosMemoryMalloc(clonedVal.value.nData);
2!
2600
            if (NULL == clonedVal.value.pData) {
2!
2601
              code = terrno;
×
2602
              break;
×
2603
            }
2604
            memcpy(clonedVal.value.pData, pVal->value.pData, clonedVal.value.nData);
2✔
2605
          }
2606

2607
          if (taosArrayPush(pCxt->pParsedValues, &clonedVal) == NULL) {
16!
2608
            code = terrno;
×
2609
            break;
×
2610
          }
2611
        }
2612
      }
2613
    }
2614

2615
    if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) {
2,147,483,647✔
2616
      NEXT_VALID_TOKEN(*pSql, *pToken);
2,147,483,647!
2617
      if (TK_NK_COMMA != pToken->type) {
2,147,483,647!
2618
        code = buildSyntaxErrMsg(&pCxt->msg, ", expected", pToken->z);
×
2619
      }
2620
    }
2621
  }
2622

2623
  if (TSDB_CODE_SUCCESS == code && pCxt->pComCxt->stmtBindVersion == 0) {
680,957,004!
2624
    SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
688,220,015✔
2625
    if (pTableCxt->hasBlob) {
687,384,963!
2626
      SRowBuildScanInfo sinfo = {.hasBlob = 1, .scanType = ROW_BUILD_UPDATE};
×
2627
      if (pTableCxt->pData->pBlobSet == NULL) {
×
2628
        code = tBlobSetCreate(1024, 0, &pTableCxt->pData->pBlobSet);
×
2629
        TAOS_CHECK_RETURN(code);
×
2630
      }
2631
      code = tRowBuildWithBlob(pTableCxt->pValues, pTableCxt->pSchema, pRow, pTableCxt->pData->pBlobSet, &sinfo);
×
2632
    } else {
2633
      SRowBuildScanInfo sinfo = {0};
687,384,963✔
2634
      code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow, &sinfo);
687,384,963✔
2635
    }
2636
    if (TSDB_CODE_SUCCESS == code) {
691,619,113✔
2637
      SRowKey key;
2638
      tRowGetKey(*pRow, &key);
1,382,976,020✔
2639
      insCheckTableDataOrder(pTableCxt, &key);
690,811,258✔
2640
    }
2641
  }
2642

2643
  if (TSDB_CODE_SUCCESS == code && pCxt->pComCxt->stmtBindVersion == 0) {
682,965,145!
2644
    *pGotRow = true;
690,361,145✔
2645
  }
2646

2647
  clearColValArray(pTableCxt->pValues);
682,965,145✔
2648

2649
  return code;
685,914,291✔
2650
}
2651

2652
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
2653
static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
8,016,912✔
2654
                           int32_t* pNumOfRows, SToken* pToken) {
2655
  int32_t code = TSDB_CODE_SUCCESS;
8,016,912✔
2656

2657
  (*pNumOfRows) = 0;
8,016,912✔
2658
  while (TSDB_CODE_SUCCESS == code) {
692,770,687✔
2659
    int32_t index = 0;
692,558,427✔
2660
    NEXT_TOKEN_KEEP_SQL(pStmt->pSql, *pToken, index);
692,558,427✔
2661
    if (TK_NK_LP != pToken->type) {
701,718,344✔
2662
      break;
8,038,342✔
2663
    }
2664
    pStmt->pSql += index;
693,680,002✔
2665

2666
    bool gotRow = false;
693,680,002✔
2667
    if (TSDB_CODE_SUCCESS == code) {
693,680,002!
2668
      if (!pStmt->stbSyntax) {
693,892,595!
2669
        code = parseOneRow(pCxt, &pStmt->pSql, rowsDataCxt.pTableDataCxt, &gotRow, pToken);
694,000,103✔
2670
      } else {
2671
        // foreach subtable
2672
        STableDataCxt* pTableDataCxt = NULL;
×
2673
        code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, rowsDataCxt.pStbRowsCxt, &gotRow, pToken, &pTableDataCxt);
×
2674
      }
2675
    }
2676

2677
    if (TSDB_CODE_SUCCESS == code) {
684,840,441!
2678
      NEXT_VALID_TOKEN(pStmt->pSql, *pToken);
685,368,528!
2679
      if (TK_NK_COMMA == pToken->type) {
684,496,262✔
2680
        code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
3✔
2681
      } else if (TK_NK_RP != pToken->type) {
684,496,259✔
2682
        code = buildSyntaxErrMsg(&pCxt->msg, ") expected", pToken->z);
2✔
2683
      }
2684
    }
2685

2686
    if (TSDB_CODE_SUCCESS == code && gotRow) {
684,753,775!
2687
      (*pNumOfRows)++;
684,701,321✔
2688
    }
2689
  }
2690

2691
  if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) &&
8,250,602✔
2692
      (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
11,649!
2693
    code = buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
×
2694
  }
2695
  return code;
8,036,959✔
2696
}
2697

2698
// VALUES (field1_value, ...) [(field1_value2, ...) ...]
2699
static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataContext,
8,021,877✔
2700
                                 SToken* pToken) {
2701
  int32_t numOfRows = 0;
8,021,877✔
2702
  int32_t code = parseValues(pCxt, pStmt, rowsDataContext, &numOfRows, pToken);
8,021,877✔
2703
  if (TSDB_CODE_SUCCESS == code) {
8,030,745!
2704
    pStmt->totalRowsNum += numOfRows;
8,033,036✔
2705
    pStmt->totalTbNum += 1;
8,033,036✔
2706
    TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
8,033,036✔
2707
  }
2708
  return code;
8,030,745✔
2709
}
2710

2711
// Simplified CSV parser - only handles newlines within quotes
2712
static int32_t csvParserReadLine(SCsvParser* parser) {
109✔
2713
  if (!parser) {
109!
2714
    return TSDB_CODE_INVALID_PARA;
×
2715
  }
2716

2717
  size_t  lineLen = 0;
109✔
2718
  bool    inQuotes = false;
109✔
2719
  char    currentQuote = '\0';  // Track which quote character we're inside
109✔
2720
  int32_t code = TSDB_CODE_SUCCESS;
109✔
2721

2722
  while (true) {
198,108✔
2723
    // Fill buffer if needed
2724
    if (parser->bufferPos >= parser->bufferLen) {
198,217✔
2725
      code = csvParserFillBuffer(parser);
5✔
2726
      if (code != TSDB_CODE_SUCCESS) {
5!
2727
        break;
×
2728
      }
2729
      if (parser->bufferPos >= parser->bufferLen && parser->eof) {
5!
2730
        // End of file
2731
        if (lineLen == 0) {
2!
2732
          code = TSDB_CODE_TSC_QUERY_CANCELLED;  // Use this to indicate EOF
2✔
2733
        }
2734
        break;
2✔
2735
      }
2736
    }
2737

2738
    char ch = parser->buffer[parser->bufferPos++];
198,215✔
2739

2740
    // Handle quotes - support both single and double quotes
2741
    if (!inQuotes && (ch == CSV_QUOTE_SINGLE || ch == CSV_QUOTE_DOUBLE)) {
198,215✔
2742
      // Starting a quoted section
2743
      inQuotes = true;
510✔
2744
      currentQuote = ch;
510✔
2745
    } else if (inQuotes && ch == currentQuote) {
197,705✔
2746
      // Check for escaped quote (double quote)
2747
      if (parser->bufferPos < parser->bufferLen && parser->buffer[parser->bufferPos] == currentQuote) {
510!
2748
        // Escaped quote - keep both quotes in line for subsequent processing
2749
        // Ensure enough space for both quote characters
2750
        code = csvParserExpandLineBuffer(parser, lineLen + 2);
×
2751
        if (code != TSDB_CODE_SUCCESS) {
×
2752
          break;
×
2753
        }
2754

2755
        // Add the first quote character to the line
2756
        parser->lineBuffer[lineLen++] = ch;
×
2757

2758
        // Consume and add the second quote character
2759
        parser->bufferPos++;
×
2760
        ch = parser->buffer[parser->bufferPos - 1];  // The second quote
×
2761
        parser->lineBuffer[lineLen++] = ch;
×
2762
        continue;
×
2763
      } else {
2764
        // End of quoted section
2765
        inQuotes = false;
510✔
2766
        currentQuote = '\0';
510✔
2767
      }
2768
    }
2769

2770
    // Handle newlines
2771
    if (ch == '\n' && !inQuotes) {
198,215!
2772
      // End of line (not inside quotes)
2773
      break;
107✔
2774
    }
2775

2776
    // Skip \r characters only when outside quotes
2777
    if (ch == '\r' && !inQuotes) {
198,108!
2778
      continue;
107✔
2779
    }
2780

2781
    // Expand buffer if needed
2782
    code = csvParserExpandLineBuffer(parser, lineLen + 1);
198,001✔
2783
    if (code != TSDB_CODE_SUCCESS) {
198,001!
2784
      break;
×
2785
    }
2786

2787
    // Add character to line
2788
    parser->lineBuffer[lineLen++] = ch;
198,001✔
2789
  }
2790

2791
  if (code == TSDB_CODE_SUCCESS) {
109✔
2792
    parser->lineBuffer[lineLen] = '\0';
107✔
2793
  }
2794

2795
  return code;
109✔
2796
}
2797

2798
static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
2✔
2799
                            int32_t* pNumOfRows) {
2800
  int32_t code = TSDB_CODE_SUCCESS;
2✔
2801
  (*pNumOfRows) = 0;
2✔
2802

2803
  // Initialize or use existing CSV parser in pStmt
2804
  if (pStmt->pCsvParser == NULL) {
2!
2805
    // First time - allocate and initialize CSV parser
2806
    pStmt->pCsvParser = taosMemoryMalloc(sizeof(SCsvParser));
2!
2807
    if (!pStmt->pCsvParser) {
2!
2808
      return terrno;
×
2809
    }
2810
    code = csvParserInit(pStmt->pCsvParser, pStmt->fp);
2✔
2811
    if (code != TSDB_CODE_SUCCESS) {
2!
2812
      taosMemoryFree(pStmt->pCsvParser);
×
2813
      pStmt->pCsvParser = NULL;
×
2814
      return code;
×
2815
    }
2816
  }
2817
  // If pStmt->pCsvParser exists, we continue from where we left off
2818

2819
  bool firstLine = (pStmt->fileProcessing == false);
2✔
2820
  pStmt->fileProcessing = false;
2✔
2821

2822
  while (TSDB_CODE_SUCCESS == code) {
109!
2823
    // Read one line from CSV using the parser in pStmt
2824
    code = csvParserReadLine(pStmt->pCsvParser);
109✔
2825
    if (code == TSDB_CODE_TSC_QUERY_CANCELLED) {
109✔
2826
      // End of file
2827
      code = TSDB_CODE_SUCCESS;
2✔
2828
      break;
2✔
2829
    }
2830
    if (code != TSDB_CODE_SUCCESS) {
107!
2831
      break;
×
2832
    }
2833

2834
    // Skip empty lines
2835
    if (!pStmt->pCsvParser->lineBuffer || strlen(pStmt->pCsvParser->lineBuffer) == 0) {
107!
2836
      firstLine = false;
×
2837
      continue;
1✔
2838
    }
2839

2840
    bool   gotRow = false;
107✔
2841
    SToken token;
2842
    (void)strtolower(pStmt->pCsvParser->lineBuffer, pStmt->pCsvParser->lineBuffer);
107✔
2843
    const char* pRow = pStmt->pCsvParser->lineBuffer;
107✔
2844

2845
    if (!pStmt->stbSyntax) {
107!
2846
      code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token);
107✔
2847
    } else {
2848
      STableDataCxt* pTableDataCxt = NULL;
×
2849
      code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token, &pTableDataCxt);
×
2850
      if (code == TSDB_CODE_SUCCESS) {
×
2851
        SStbRowsDataContext* pStbRowsCxt = rowsDataCxt.pStbRowsCxt;
×
2852
        void*                pData = pTableDataCxt;
×
2853
        code = taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
×
2854
                           &pData, POINTER_BYTES);
2855
        if (TSDB_CODE_SUCCESS != code) {
×
2856
          break;
×
2857
        }
2858
      }
2859
    }
2860

2861
    if (code && firstLine) {
107!
2862
      firstLine = false;
1✔
2863
      code = 0;
1✔
2864
      continue;
1✔
2865
    }
2866

2867
    if (TSDB_CODE_SUCCESS == code && gotRow) {
106!
2868
      (*pNumOfRows)++;
106✔
2869
    }
2870

2871
    if (TSDB_CODE_SUCCESS == code && (*pNumOfRows) >= tsMaxInsertBatchRows) {
106!
2872
      // Reached batch limit - keep the parser in pStmt for next batch
2873
      pStmt->fileProcessing = true;
×
2874
      break;
×
2875
    }
2876
    firstLine = false;
106✔
2877
  }
2878

2879
  // Don't destroy the parser here - it will be cleaned up when file processing is complete
2880

2881
  parserDebug("QID:0x%" PRIx64 ", %d rows have been parsed", pCxt->pComCxt->requestId, *pNumOfRows);
2!
2882

2883
  if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) && 0 == pStmt->totalRowsNum &&
2!
2884
      (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) && !pStmt->fileProcessing) {
×
2885
    code = buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
×
2886
  }
2887
  return code;
2✔
2888
}
2889

2890
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
2✔
2891
                                     SRowsDataContext rowsDataCxt) {
2892
  // init only for file
2893
  if (NULL == pStmt->pTableCxtHashObj) {
2!
2894
    pStmt->pTableCxtHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
2✔
2895
    if (!pStmt->pTableCxtHashObj) {
2!
2896
      return terrno;
×
2897
    }
2898
  }
2899
  int32_t numOfRows = 0;
2✔
2900
  int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
2✔
2901
  if (TSDB_CODE_SUCCESS == code) {
2!
2902
    pStmt->totalRowsNum += numOfRows;
2✔
2903
    pStmt->totalTbNum += 1;
2✔
2904
    TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT);
2✔
2905
    if (rowsDataCxt.pTableDataCxt && rowsDataCxt.pTableDataCxt->pData) {
2!
2906
      rowsDataCxt.pTableDataCxt->pData->flags |= SUBMIT_REQ_FROM_FILE;
2✔
2907
    }
2908
    if (!pStmt->fileProcessing) {
2!
2909
      // File processing is complete, clean up saved CSV parser
2910
      destroySavedCsvParser(pStmt);
2✔
2911
      code = taosCloseFile(&pStmt->fp);
2✔
2912
      if (TSDB_CODE_SUCCESS != code) {
2!
2913
        parserWarn("QID:0x%" PRIx64 ", failed to close file.", pCxt->pComCxt->requestId);
×
2914
      }
2915
    } else {
2916
      parserDebug("QID:0x%" PRIx64 ", insert from csv. File is too large, do it in batches.", pCxt->pComCxt->requestId);
×
2917
    }
2918
    if (pStmt->insertType != TSDB_QUERY_TYPE_FILE_INSERT) {
2!
2919
      destroySavedCsvParser(pStmt);
×
2920
      return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is exclusive", NULL);
×
2921
    }
2922
  } else {
2923
    // On error, also clean up saved CSV parser
2924
    destroySavedCsvParser(pStmt);
×
2925
    if (code == TSDB_CODE_TSC_SQL_SYNTAX_ERROR) {
×
2926
      return code;
×
2927
    }
2928
    return buildInvalidOperationMsg(&pCxt->msg, tstrerror(code));
×
2929
  }
2930

2931
  // just record pTableCxt whose data come from file
2932
  if (!pStmt->stbSyntax && numOfRows > 0) {
2!
2933
    void* pData = rowsDataCxt.pTableDataCxt;
2✔
2934
    code = taosHashPut(pStmt->pTableCxtHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid), &pData,
2✔
2935
                       POINTER_BYTES);
2936
  }
2937

2938
  return code;
2✔
2939
}
2940

2941
static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pFilePath,
2✔
2942
                                 SRowsDataContext rowsDataCxt) {
2943
  char filePathStr[PATH_MAX + 16] = {0};
2✔
2944
  if (TK_NK_STRING == pFilePath->type) {
2✔
2945
    (void)trimString(pFilePath->z, pFilePath->n, filePathStr, sizeof(filePathStr));
1✔
2946
    if (strlen(filePathStr) >= PATH_MAX) {
1!
2947
      return buildSyntaxErrMsg(&pCxt->msg, "file path is too long, max length is 4096", pFilePath->z);
×
2948
    }
2949
  } else {
2950
    if (pFilePath->n >= PATH_MAX) {
1!
2951
      return buildSyntaxErrMsg(&pCxt->msg, "file path is too long, max length is 4096", pFilePath->z);
×
2952
    }
2953
    strncpy(filePathStr, pFilePath->z, pFilePath->n);
1✔
2954
  }
2955
  pStmt->fp = taosOpenFile(filePathStr, TD_FILE_READ);
2✔
2956
  if (NULL == pStmt->fp) {
2!
2957
    return terrno;
×
2958
  }
2959

2960
  return parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt);
2✔
2961
}
2962

2963
static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
2✔
2964
                               SToken* pToken) {
2965
  if (tsUseAdapter) {
2!
2966
    return buildInvalidOperationMsg(&pCxt->msg, "proxy mode does not support csv loading");
×
2967
  }
2968

2969
  NEXT_TOKEN(pStmt->pSql, *pToken);
2✔
2970
  if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) {
2!
2971
    return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z);
×
2972
  }
2973
  return parseDataFromFile(pCxt, pStmt, pToken, rowsDataCxt);
2✔
2974
}
2975

2976
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
2977
static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) {
8,015,031✔
2978
  SToken token;
2979
  NEXT_TOKEN(pStmt->pSql, token);
8,015,031✔
2980
  switch (token.type) {
8,033,577!
2981
    case TK_VALUES:
8,037,071✔
2982
      if (TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
8,037,071!
2983
        return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is exclusive", token.z);
×
2984
      }
2985
      return parseValuesClause(pCxt, pStmt, rowsDataCxt, &token);
8,037,071✔
2986
    case TK_FILE:
×
2987
      return parseFileClause(pCxt, pStmt, rowsDataCxt, &token);
×
2988
    default:
×
2989
      break;
×
2990
  }
2991
  return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
×
2992
}
2993

2994
static void destroyStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
16,020,718✔
2995
  if (pStbRowsCxt == NULL) return;
16,020,718!
2996
  clearStbRowsDataContext(pStbRowsCxt);
×
2997
  taosArrayDestroy(pStbRowsCxt->aColVals);
52✔
2998
  pStbRowsCxt->aColVals = NULL;
52✔
2999
  taosArrayDestroy(pStbRowsCxt->aTagVals);
52✔
3000
  pStbRowsCxt->aTagVals = NULL;
52✔
3001
  taosArrayDestroy(pStbRowsCxt->aTagNames);
52✔
3002
  pStbRowsCxt->aTagNames = NULL;
52✔
3003
  insDestroyBoundColInfo(&pStbRowsCxt->boundColsInfo);
52✔
3004
  tTagFree(pStbRowsCxt->pTag);
52✔
3005
  pStbRowsCxt->pTag = NULL;
52✔
3006
  taosMemoryFreeClear(pStbRowsCxt->pCtbMeta);
52!
3007
  tdDestroySVCreateTbReq(pStbRowsCxt->pCreateCtbReq);
52!
3008
  taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq);
52!
3009
}
3010

3011
static int32_t constructStbRowsDataContext(SVnodeModifyOpStmt* pStmt, SStbRowsDataContext** ppStbRowsCxt) {
53✔
3012
  SStbRowsDataContext* pStbRowsCxt = taosMemoryCalloc(1, sizeof(SStbRowsDataContext));
53!
3013
  if (!pStbRowsCxt) {
53!
3014
    return terrno;
×
3015
  }
3016
  tNameAssign(&pStbRowsCxt->stbName, &pStmt->targetTableName);
53✔
3017
  int32_t code = collectUseTable(&pStbRowsCxt->stbName, pStmt->pTableNameHashObj);
53✔
3018
  if (TSDB_CODE_SUCCESS == code) {
53!
3019
    code = collectUseDatabase(&pStbRowsCxt->stbName, pStmt->pDbFNameHashObj);
53✔
3020
  }
3021
  if (TSDB_CODE_SUCCESS == code) {
53!
3022
    pStbRowsCxt->ctbName.type = TSDB_TABLE_NAME_T;
53✔
3023
    pStbRowsCxt->ctbName.acctId = pStbRowsCxt->stbName.acctId;
53✔
3024
    memcpy(pStbRowsCxt->ctbName.dbname, pStbRowsCxt->stbName.dbname, sizeof(pStbRowsCxt->stbName.dbname));
53✔
3025

3026
    pStbRowsCxt->pTagCond = pStmt->pTagCond;
53✔
3027
    pStbRowsCxt->pStbMeta = pStmt->pTableMeta;
53✔
3028

3029
    code = cloneTableMeta(pStbRowsCxt->pStbMeta, &pStbRowsCxt->pCtbMeta);
53✔
3030
  }
3031
  if (TSDB_CODE_SUCCESS == code) {
53!
3032
    pStbRowsCxt->pCtbMeta->tableType = TSDB_CHILD_TABLE;
53✔
3033
    pStbRowsCxt->pCtbMeta->suid = pStbRowsCxt->pStbMeta->uid;
53✔
3034

3035
    pStbRowsCxt->aTagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
53✔
3036
    if (!pStbRowsCxt->aTagNames) {
53!
3037
      code = terrno;
×
3038
    }
3039
  }
3040
  if (TSDB_CODE_SUCCESS == code) {
53!
3041
    pStbRowsCxt->aTagVals = taosArrayInit(8, sizeof(STagVal));
53✔
3042
    if (!pStbRowsCxt->aTagVals) {
53!
3043
      code = terrno;
×
3044
    }
3045
  }
3046
  if (TSDB_CODE_SUCCESS == code) {
53!
3047
    // col values and bound cols info of STableDataContext is not used
3048
    pStbRowsCxt->aColVals = taosArrayInit(getNumOfColumns(pStbRowsCxt->pStbMeta), sizeof(SColVal));
53✔
3049
    if (!pStbRowsCxt->aColVals) code = terrno;
53!
3050
  }
3051
  if (TSDB_CODE_SUCCESS == code) {
53!
3052
    code = insInitColValues(pStbRowsCxt->pStbMeta, pStbRowsCxt->aColVals);
53✔
3053
  }
3054
  if (TSDB_CODE_SUCCESS == code) {
53!
3055
    STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta);
53✔
3056
    code = insInitBoundColsInfo(tblInfo.numOfColumns + tblInfo.numOfTags + 1, &pStbRowsCxt->boundColsInfo);
53✔
3057
  }
3058
  if (TSDB_CODE_SUCCESS == code) {
53!
3059
    *ppStbRowsCxt = pStbRowsCxt;
53✔
3060
  } else {
3061
    clearStbRowsDataContext(pStbRowsCxt);
×
3062
  }
3063
  return code;
53✔
3064
}
3065

3066
static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
54✔
3067
  int32_t code = TSDB_CODE_SUCCESS;
54✔
3068
  if (!pStmt->pBoundCols) {
54✔
3069
    return buildSyntaxErrMsg(&pCxt->msg, "(...tbname, ts...) bounded cols is expected for supertable insertion",
1✔
3070
                             pStmt->pSql);
3071
  }
3072

3073
  SStbRowsDataContext* pStbRowsCxt = NULL;
53✔
3074
  code = constructStbRowsDataContext(pStmt, &pStbRowsCxt);
53✔
3075

3076
  if (code == TSDB_CODE_SUCCESS) {
53!
3077
    code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta,
53✔
3078
                             &pStbRowsCxt->boundColsInfo);
53✔
3079
    pStbRowsCxt->hasTimestampTag = false;
53✔
3080
    for (int32_t i = 0; i < pStbRowsCxt->boundColsInfo.numOfBound; ++i) {
316✔
3081
      int16_t schemaIndex = pStbRowsCxt->boundColsInfo.pColIndex[i];
263✔
3082
      if (schemaIndex != getTbnameSchemaIndex(pStmt->pTableMeta) && schemaIndex >= getNumOfColumns(pStmt->pTableMeta)) {
263✔
3083
        if (pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_TIMESTAMP) {
91✔
3084
          pStbRowsCxt->hasTimestampTag = true;
3✔
3085
        }
3086
        if (pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_JSON) {
91!
3087
          pStbRowsCxt->isJsonTag = true;
×
3088
        }
3089
      }
3090
    }
3091
    pStmt->pStbRowsCxt = pStbRowsCxt;
53✔
3092
  }
3093

3094
  if (code == TSDB_CODE_SUCCESS) {
53✔
3095
    SRowsDataContext rowsDataCxt;
3096
    rowsDataCxt.pStbRowsCxt = pStbRowsCxt;
49✔
3097
    code = parseDataClause(pCxt, pStmt, rowsDataCxt);
49✔
3098
  }
3099

3100
  return code;
53✔
3101
}
3102

3103
// input pStmt->pSql:
3104
//   1. [(tag1_name, ...)] ...
3105
//   2. VALUES ... | FILE ...
3106
static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
8,017,935✔
3107
  if (!pStmt->stbSyntax) {
8,017,935!
3108
    STableDataCxt*   pTableCxt = NULL;
8,022,237✔
3109
    int32_t          code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt);
8,022,237✔
3110
    SRowsDataContext rowsDataCxt;
3111
    rowsDataCxt.pTableDataCxt = pTableCxt;
8,024,642✔
3112
    if (TSDB_CODE_SUCCESS == code) {
8,024,642!
3113
      code = parseDataClause(pCxt, pStmt, rowsDataCxt);
8,026,584✔
3114
    }
3115
    return code;
8,002,990✔
3116
  } else {
3117
    int32_t code = parseInsertStbClauseBottom(pCxt, pStmt);
×
3118
    return code;
54✔
3119
  }
3120
}
3121

3122
static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
8,045,886✔
3123
  insDestroyBoundColInfo(&pCxt->tags);
8,045,886✔
3124
  taosArrayDestroy(pCxt->pParsedValues);
8,044,457✔
3125
  taosMemoryFreeClear(pStmt->pTableMeta);
8,042,164!
3126
  nodesDestroyNode(pStmt->pTagCond);
8,042,167✔
3127
  taosArrayDestroy(pStmt->pTableTag);
8,040,734✔
3128
  tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
8,038,431!
3129
  taosMemoryFreeClear(pStmt->pCreateTblReq);
8,038,431!
3130
  pCxt->missCache = false;
8,038,431✔
3131
  pCxt->usingDuplicateTable = false;
8,038,431✔
3132
  pStmt->pBoundCols = NULL;
8,038,431✔
3133
  pStmt->usingTableProcessing = false;
8,038,431✔
3134
  pStmt->fileProcessing = false;
8,038,431✔
3135
  pStmt->usingTableName.type = 0;
8,038,431✔
3136

3137
  destroyStbRowsDataContext(pStmt->pStbRowsCxt);
8,038,431✔
3138
  taosMemoryFreeClear(pStmt->pStbRowsCxt);
8,036,837!
3139
  pStmt->stbSyntax = false;
8,036,837✔
3140
}
8,036,837✔
3141

3142
// input pStmt->pSql: [(field1_name, ...)] [ USING ... ] VALUES ... | FILE ...
3143
static int32_t parseInsertTableClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) {
8,046,241✔
3144
  resetEnvPreTable(pCxt, pStmt);
8,046,241✔
3145
  int32_t code = parseSchemaClauseTop(pCxt, pStmt, pTbName);
8,037,477✔
3146
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
8,021,765!
3147
    code = parseInsertTableClauseBottom(pCxt, pStmt);
7,993,449✔
3148
  }
3149

3150
  return code;
8,000,276✔
3151
}
3152

3153
static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName,
15,988,882✔
3154
                                          bool* pHasData) {
3155
  // no data in the sql string anymore.
3156
  if (0 == pTbName->n) {
15,988,882✔
3157
    if (0 != pTbName->type && '\0' != pStmt->pSql[0]) {
7,970,051!
3158
      return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pTbName->z);
×
3159
    }
3160

3161
    if (0 == pStmt->totalRowsNum && (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
7,970,051!
3162
      return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
×
3163
    }
3164

3165
    *pHasData = false;
7,970,051✔
3166
    return TSDB_CODE_SUCCESS;
7,970,051✔
3167
  }
3168

3169
  if (TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && pStmt->totalTbNum > 0) {
8,018,831!
3170
    return buildInvalidOperationMsg(&pCxt->msg, "single table allowed in one stmt");
×
3171
  }
3172

3173
  if (TK_NK_QUESTION == pTbName->type) {
8,018,831✔
3174
    pCxt->stmtTbNameFlag &= ~IS_FIXED_VALUE;
10,178✔
3175
    if (pCxt->pComCxt->stmtBindVersion == 0) {
10,178!
3176
      return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z);
×
3177
    }
3178

3179
    char*   tbName = NULL;
10,178✔
3180
    int32_t code = (*pCxt->pComCxt->pStmtCb->getTbNameFn)(pCxt->pComCxt->pStmtCb->pStmt, &tbName);
10,178✔
3181
    if (TSDB_CODE_SUCCESS == code) {
10,169✔
3182
      pCxt->stmtTbNameFlag |= HAS_BIND_VALUE;
10,168✔
3183
      pTbName->z = tbName;
10,168✔
3184
      pTbName->n = strlen(tbName);
10,168✔
3185
    }
3186
    if (code == TSDB_CODE_TSC_STMT_TBNAME_ERROR) {
10,169✔
3187
      pCxt->stmtTbNameFlag &= ~HAS_BIND_VALUE;
11✔
3188
      code = TSDB_CODE_SUCCESS;
11✔
3189
    }
3190
    return code;
10,169✔
3191
  }
3192

3193
  if (TK_NK_ID != pTbName->type && TK_NK_STRING != pTbName->type && TK_NK_QUESTION != pTbName->type) {
8,008,653!
3194
    return buildSyntaxErrMsg(&pCxt->msg, "table_name is expected", pTbName->z);
1✔
3195
  }
3196

3197
  // db.? situation,ensure that the only thing following the '.' mark is '?'
3198
  char* tbNameAfterDbName = strnchr(pTbName->z, '.', pTbName->n, true);
8,008,652✔
3199
  if (tbNameAfterDbName != NULL) {
8,011,364✔
3200
    if (*(tbNameAfterDbName + 1) == '?') {
8,002,368✔
3201
      pCxt->stmtTbNameFlag &= ~IS_FIXED_VALUE;
59✔
3202
      char* tbName = NULL;
59✔
3203
      if (NULL == pCxt->pComCxt->pStmtCb) {
59!
3204
        return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pTbName->z);
×
3205
      }
3206
      int32_t code = (*pCxt->pComCxt->pStmtCb->getTbNameFn)(pCxt->pComCxt->pStmtCb->pStmt, &tbName);
59✔
3207
      if (TSDB_CODE_SUCCESS == code) {
59✔
3208
        pCxt->stmtTbNameFlag |= HAS_BIND_VALUE;
36✔
3209
        pTbName->z = tbName;
36✔
3210
        pTbName->n = strlen(tbName);
36✔
3211
      }
3212
      if (code == TSDB_CODE_TSC_STMT_TBNAME_ERROR) {
59✔
3213
        pCxt->stmtTbNameFlag &= ~HAS_BIND_VALUE;
23✔
3214
        code = TSDB_CODE_SUCCESS;
23✔
3215
      }
3216
    } else {
3217
      pCxt->stmtTbNameFlag |= IS_FIXED_VALUE;
8,002,309✔
3218
      parserWarn("QID:0x%" PRIx64 ", table name is specified in sql, ignore the table name in bind param",
8,002,309!
3219
                 pCxt->pComCxt->requestId);
3220
      *pHasData = true;
8,025,959✔
3221
    }
3222
    return TSDB_CODE_SUCCESS;
8,026,018✔
3223
  }
3224

3225
  if (TK_NK_ID == pTbName->type) {
8,996!
3226
    pCxt->stmtTbNameFlag |= IS_FIXED_VALUE;
10,740✔
3227
  }
3228

3229
  *pHasData = true;
8,996✔
3230
  return TSDB_CODE_SUCCESS;
8,996✔
3231
}
3232

3233
static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
11,664✔
3234
  SBoundColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags));
11,664!
3235
  if (NULL == tags) {
11,665!
3236
    return terrno;
×
3237
  }
3238
  memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
11,665✔
3239

3240
  SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb;
11,665✔
3241
  int32_t        code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, pCxt->pParsedValues,
11,665✔
3242
                                       &pStmt->targetTableName, pStmt->usingTableProcessing, pStmt->pVgroupsHashObj,
11,665✔
3243
                                       pStmt->pTableBlockHashObj, pStmt->usingTableName.tname, pCxt->stmtTbNameFlag);
11,665✔
3244

3245
  memset(&pCxt->tags, 0, sizeof(pCxt->tags));
11,672✔
3246
  pStmt->pVgroupsHashObj = NULL;
11,672✔
3247
  pStmt->pTableBlockHashObj = NULL;
11,672✔
3248
  return code;
11,672✔
3249
}
3250

3251
static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
7,980,144✔
3252
  if (TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
7,980,144✔
3253
    return setStmtInfo(pCxt, pStmt);
11,677✔
3254
  }
3255

3256
  // release old array alloced by merge
3257
  pStmt->freeArrayFunc(pStmt->pVgDataBlocks);
7,968,467✔
3258
  pStmt->pVgDataBlocks = NULL;
7,949,673✔
3259

3260
  bool fileOnly = (pStmt->insertType == TSDB_QUERY_TYPE_FILE_INSERT);
7,949,673✔
3261
  if (fileOnly) {
7,949,673!
3262
    // none data, skip merge & buildvgdata
3263
    if (0 == taosHashGetSize(pStmt->pTableCxtHashObj)) {
×
3264
      pCxt->needRequest = false;
×
3265
      return TSDB_CODE_SUCCESS;
×
3266
    }
3267
  }
3268

3269
  // merge according to vgId
3270
  int32_t code = insMergeTableDataCxt(fileOnly ? pStmt->pTableCxtHashObj : pStmt->pTableBlockHashObj,
7,953,481✔
3271
                                      &pStmt->pVgDataBlocks, pStmt->fileProcessing);
7,953,481✔
3272
  // clear tmp hashobj only
3273
  taosHashClear(pStmt->pTableCxtHashObj);
7,964,126✔
3274

3275
  if (TSDB_CODE_SUCCESS == code) {
7,964,643!
3276
    code = insBuildVgDataBlocks(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false);
7,967,160✔
3277
  }
3278

3279
  return code;
7,952,875✔
3280
}
3281

3282
// tb_name
3283
//     [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
3284
//     [(field1_name, ...)]
3285
//     VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
3286
// [...];
3287
static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
8,028,836✔
3288
  SToken  token;
3289
  int32_t code = TSDB_CODE_SUCCESS;
8,028,836✔
3290
  bool    hasData = true;
8,028,836✔
3291
  // for each table
3292
  while (TSDB_CODE_SUCCESS == code && hasData && !pCxt->missCache && !pStmt->fileProcessing) {
23,980,768!
3293
    // pStmt->pSql -> tb_name ...
3294
    NEXT_TOKEN(pStmt->pSql, token);
16,010,358✔
3295
    code = checkTableClauseFirstToken(pCxt, pStmt, &token, &hasData);
16,004,408✔
3296
    if (TSDB_CODE_SUCCESS == code && hasData) {
16,000,682!
3297
      code = parseInsertTableClause(pCxt, pStmt, &token);
8,046,469✔
3298
    }
3299

3300
    if( TSDB_CODE_SUCCESS == code && pStmt->pTableMeta &&
15,947,509!
3301
        (((pStmt->pTableMeta->virtualStb == 1) && (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE)) ||
15,939,427!
3302
        (pStmt->pTableMeta->tableType == TSDB_VIRTUAL_NORMAL_TABLE ||
15,939,427!
3303
          pStmt->pTableMeta->tableType == TSDB_VIRTUAL_CHILD_TABLE))) {
15,943,850!
3304
      code = buildInvalidOperationMsg(&pCxt->msg, "Virtual table can not be written");
×
3305
    }
3306
  }
3307

3308
  if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
7,970,410!
3309
    code = parseInsertBodyBottom(pCxt, pStmt);
7,985,226✔
3310
  }
3311
  return code;
7,981,569✔
3312
}
3313

3314
static void destroySubTableHashElem(void* p) { taosMemoryFree(*(STableMeta**)p); }
20,958!
3315

3316
static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, SNode** pOutput) {
7,984,570✔
3317
  SVnodeModifyOpStmt* pStmt = NULL;
7,984,570✔
3318
  int32_t             code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&pStmt);
7,984,570✔
3319
  if (NULL == pStmt) {
7,989,714!
3320
    return code;
×
3321
  }
3322

3323
  if (pCxt->pComCxt->pStmtCb) {
7,989,714✔
3324
    TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
11,702✔
3325
  }
3326
  pStmt->pSql = pCxt->pComCxt->pSql;
7,989,714✔
3327

3328
  pStmt->freeHashFunc = insDestroyTableDataCxtHashMap;
7,989,714✔
3329
  pStmt->freeArrayFunc = insDestroyVgroupDataCxtList;
7,989,714✔
3330
  pStmt->freeStbRowsCxtFunc = destroyStbRowsDataContext;
7,989,714✔
3331
  pStmt->pCsvParser = NULL;
7,989,714✔
3332

3333
  if (!reentry) {
7,989,714✔
3334
    pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
7,988,742✔
3335
    if (pCxt->pComCxt->pStmtCb) {
7,991,940✔
3336
      pStmt->pTableBlockHashObj =
11,686✔
3337
          taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
11,684✔
3338
    } else {
3339
      pStmt->pTableBlockHashObj =
7,980,754✔
3340
          taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
7,980,256✔
3341
    }
3342
  }
3343
  pStmt->pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
7,993,412✔
3344
  pStmt->pSuperTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
7,993,181✔
3345
  pStmt->pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
7,993,270✔
3346
  pStmt->pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
7,993,297✔
3347
  if ((!reentry && (NULL == pStmt->pVgroupsHashObj || NULL == pStmt->pTableBlockHashObj)) ||
7,993,453!
3348
      NULL == pStmt->pSubTableHashObj || NULL == pStmt->pTableNameHashObj || NULL == pStmt->pDbFNameHashObj) {
7,994,111!
3349
    nodesDestroyNode((SNode*)pStmt);
×
3350
    return TSDB_CODE_OUT_OF_MEMORY;
×
3351
  }
3352

3353
  taosHashSetFreeFp(pStmt->pSubTableHashObj, destroySubTableHashElem);
7,994,069✔
3354
  taosHashSetFreeFp(pStmt->pSuperTableHashObj, destroySubTableHashElem);
7,992,275✔
3355

3356
  *pOutput = (SNode*)pStmt;
7,990,490✔
3357
  return TSDB_CODE_SUCCESS;
7,990,490✔
3358
}
3359

3360
static int32_t createInsertQuery(SInsertParseContext* pCxt, SQuery** pOutput) {
7,994,273✔
3361
  SQuery* pQuery = NULL;
7,994,273✔
3362
  int32_t code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
7,994,273✔
3363
  if (NULL == pQuery) {
7,988,148!
3364
    return code;
×
3365
  }
3366

3367
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
7,988,148✔
3368
  pQuery->haveResultSet = false;
7,988,148✔
3369
  pQuery->msgType = TDMT_VND_SUBMIT;
7,988,148✔
3370

3371
  code = createVnodeModifOpStmt(pCxt, false, &pQuery->pRoot);
7,988,148✔
3372
  if (TSDB_CODE_SUCCESS == code) {
7,989,528✔
3373
    *pOutput = pQuery;
7,988,948✔
3374
  } else {
3375
    nodesDestroyNode((SNode*)pQuery);
580✔
3376
  }
3377
  return code;
7,989,141✔
3378
}
3379

3380
static int32_t checkAuthFromMetaData(const SArray* pUsers, SNode** pTagCond) {
32,524✔
3381
  if (1 != taosArrayGetSize(pUsers)) {
32,524!
3382
    return TSDB_CODE_FAILED;
×
3383
  }
3384

3385
  SMetaRes* pRes = taosArrayGet(pUsers, 0);
32,525✔
3386
  if (TSDB_CODE_SUCCESS == pRes->code) {
32,525!
3387
    SUserAuthRes* pAuth = pRes->pRes;
32,525✔
3388
    pRes->code = nodesCloneNode(pAuth->pCond[AUTH_RES_BASIC], pTagCond);
32,525✔
3389
    if (TSDB_CODE_SUCCESS == pRes->code) {
32,524!
3390
      return pAuth->pass[AUTH_RES_BASIC] ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED;
32,524!
3391
    }
3392
  }
3393
  return pRes->code;
×
3394
}
3395

3396
static int32_t getTableMetaFromMetaData(const SArray* pTables, STableMeta** pMeta) {
32,523✔
3397
  if (1 != taosArrayGetSize(pTables) && 2 != taosArrayGetSize(pTables)) {
32,523!
3398
    return TSDB_CODE_FAILED;
×
3399
  }
3400

3401
  taosMemoryFreeClear(*pMeta);
32,524!
3402
  SMetaRes* pRes = taosArrayGet(pTables, 0);
32,524✔
3403
  if (TSDB_CODE_SUCCESS == pRes->code) {
32,523✔
3404
    *pMeta = tableMetaDup((const STableMeta*)pRes->pRes);
32,505✔
3405
    if (NULL == *pMeta) {
32,498!
3406
      return TSDB_CODE_OUT_OF_MEMORY;
×
3407
    }
3408
  }
3409
  return pRes->code;
32,516✔
3410
}
3411

3412
static int32_t addTableVgroupFromMetaData(const SArray* pTables, SVnodeModifyOpStmt* pStmt, bool isStb) {
32,498✔
3413
  if (1 != taosArrayGetSize(pTables)) {
32,498!
3414
    return TSDB_CODE_FAILED;
×
3415
  }
3416

3417
  SMetaRes* pRes = taosArrayGet(pTables, 0);
32,499✔
3418
  if (TSDB_CODE_SUCCESS != pRes->code) {
32,500!
3419
    return pRes->code;
×
3420
  }
3421

3422
  SVgroupInfo* pVg = pRes->pRes;
32,500✔
3423
  if (isStb) {
32,500✔
3424
    pStmt->pTableMeta->vgId = pVg->vgId;
193✔
3425
  }
3426
  return taosHashPut(pStmt->pVgroupsHashObj, (const char*)&pVg->vgId, sizeof(pVg->vgId), (char*)pVg,
32,500✔
3427
                     sizeof(SVgroupInfo));
3428
}
3429

3430
static int32_t buildTagNameFromMeta(STableMeta* pMeta, SArray** pTagName) {
×
3431
  *pTagName = taosArrayInit(pMeta->tableInfo.numOfTags, TSDB_COL_NAME_LEN);
×
3432
  if (NULL == *pTagName) {
×
3433
    return terrno;
×
3434
  }
3435
  SSchema* pSchema = getTableTagSchema(pMeta);
×
3436
  int32_t  code = 0;
×
3437
  for (int32_t i = 0; i < pMeta->tableInfo.numOfTags; ++i) {
×
3438
    if (NULL == taosArrayPush(*pTagName, pSchema[i].name)) {
×
3439
      code = terrno;
×
3440
      taosArrayDestroy(*pTagName);
×
3441
      *pTagName = NULL;
×
3442
      break;
×
3443
    }
3444
  }
3445
  return code;
×
3446
}
3447

3448
static int32_t checkSubtablePrivilegeForTable(const SArray* pTables, SVnodeModifyOpStmt* pStmt) {
×
3449
  if (1 != taosArrayGetSize(pTables)) {
×
3450
    return TSDB_CODE_FAILED;
×
3451
  }
3452

3453
  SMetaRes* pRes = taosArrayGet(pTables, 0);
×
3454
  if (TSDB_CODE_SUCCESS != pRes->code) {
×
3455
    return pRes->code;
×
3456
  }
3457

3458
  SArray* pTagName = NULL;
×
3459
  int32_t code = buildTagNameFromMeta(pStmt->pTableMeta, &pTagName);
×
3460
  if (TSDB_CODE_SUCCESS == code) {
×
3461
    code = checkSubtablePrivilege((SArray*)pRes->pRes, pTagName, &pStmt->pTagCond);
×
3462
  }
3463
  taosArrayDestroy(pTagName);
×
3464
  return code;
×
3465
}
3466

3467
static int32_t processTableSchemaFromMetaData(SInsertParseContext* pCxt, const SMetaData* pMetaData,
32,498✔
3468
                                              SVnodeModifyOpStmt* pStmt, bool isStb) {
3469
  int32_t code = TSDB_CODE_SUCCESS;
32,498✔
3470
  if (!isStb && TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) {
32,498!
3471
    code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported");
×
3472
  }
3473

3474
  if (TSDB_CODE_SUCCESS == code && isStb) {
32,498✔
3475
    code = storeChildTableMeta(pCxt, pStmt);
193✔
3476
  }
3477
  if (TSDB_CODE_SUCCESS == code) {
32,498!
3478
    code = addTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb);
32,499✔
3479
  }
3480
  if (TSDB_CODE_SUCCESS == code && !isStb && NULL != pStmt->pTagCond) {
32,503!
3481
    code = checkSubtablePrivilegeForTable(pMetaData->pTableTag, pStmt);
×
3482
  }
3483
  return code;
32,504✔
3484
}
3485

3486
static void destoryTablesReq(void* p) {
65,044✔
3487
  STablesReq* pRes = (STablesReq*)p;
65,044✔
3488
  taosArrayDestroy(pRes->pTables);
65,044✔
3489
}
65,043✔
3490

3491
static void clearCatalogReq(SCatalogReq* pCatalogReq) {
32,524✔
3492
  if (NULL == pCatalogReq) {
32,524!
3493
    return;
×
3494
  }
3495

3496
  taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq);
32,524✔
3497
  pCatalogReq->pTableMeta = NULL;
32,523✔
3498
  taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq);
32,523✔
3499
  pCatalogReq->pTableHash = NULL;
32,525✔
3500
  taosArrayDestroy(pCatalogReq->pUser);
32,525✔
3501
  pCatalogReq->pUser = NULL;
32,525✔
3502
  taosArrayDestroy(pCatalogReq->pTableTag);
32,525✔
3503
  pCatalogReq->pTableTag = NULL;
32,525✔
3504
}
3505

3506
static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
32,524✔
3507
                                   SVnodeModifyOpStmt* pStmt) {
3508
  clearCatalogReq(pCatalogReq);
32,524✔
3509
  int32_t code = checkAuthFromMetaData(pMetaData->pUser, &pStmt->pTagCond);
32,524✔
3510
  if (code == TSDB_CODE_SUCCESS) {
32,524!
3511
    code = getTableMetaFromMetaData(pMetaData->pTableMeta, &pStmt->pTableMeta);
32,524✔
3512
  }
3513
  if (code == TSDB_CODE_SUCCESS) {
32,516✔
3514
    if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE && !pStmt->usingTableProcessing) {
32,498✔
3515
      pStmt->stbSyntax = true;
1✔
3516
    }
3517
    if (!pStmt->stbSyntax) {
32,498✔
3518
      if (pStmt->usingTableProcessing) {
32,497✔
3519
        return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true);
193✔
3520
      }
3521
      return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false);
32,304✔
3522
    }
3523
  }
3524
  return code;
19✔
3525
}
3526

3527
static int32_t resetVnodeModifOpStmt(SInsertParseContext* pCxt, SQuery* pQuery) {
16✔
3528
  nodesDestroyNode(pQuery->pRoot);
16✔
3529

3530
  int32_t code = createVnodeModifOpStmt(pCxt, true, &pQuery->pRoot);
16✔
3531
  if (TSDB_CODE_SUCCESS == code) {
16!
3532
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
16✔
3533

3534
    code = (*pCxt->pComCxt->pStmtCb->getExecInfoFn)(pCxt->pComCxt->pStmtCb->pStmt, &pStmt->pVgroupsHashObj,
16✔
3535
                                                    &pStmt->pTableBlockHashObj);
3536
    if (TSDB_CODE_SUCCESS == code) {
16!
3537
      if (NULL == pStmt->pVgroupsHashObj) {
16!
3538
        pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
3539
      }
3540
      if (NULL == pStmt->pTableBlockHashObj) {
16!
3541
        pStmt->pTableBlockHashObj =
×
3542
            taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
×
3543
      }
3544
      if (NULL == pStmt->pVgroupsHashObj || NULL == pStmt->pTableBlockHashObj) {
16!
3545
        code = TSDB_CODE_OUT_OF_MEMORY;
×
3546
      }
3547
    }
3548
  }
3549

3550
  return code;
16✔
3551
}
3552

3553
static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
8,028,185✔
3554
                               SQuery** pQuery) {
3555
  if (NULL == *pQuery) {
8,028,185✔
3556
    return createInsertQuery(pCxt, pQuery);
7,998,128✔
3557
  }
3558

3559
  if (NULL != pCxt->pComCxt->pStmtCb) {
30,057✔
3560
    return resetVnodeModifOpStmt(pCxt, *pQuery);
16✔
3561
  }
3562

3563
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(*pQuery)->pRoot;
30,041✔
3564

3565
  if (!pStmt->fileProcessing) {
30,041!
3566
    return setVnodeModifOpStmt(pCxt, pCatalogReq, pMetaData, pStmt);
32,524✔
3567
  }
3568

3569
  return TSDB_CODE_SUCCESS;
×
3570
}
3571

3572
static int32_t setRefreshMeta(SQuery* pQuery) {
7,936,861✔
3573
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
7,936,861✔
3574
  int32_t             code = 0;
7,936,861✔
3575

3576
  if (taosHashGetSize(pStmt->pTableNameHashObj) > 0) {
7,936,861✔
3577
    taosArrayDestroy(pQuery->pTableList);
11,745✔
3578
    pQuery->pTableList = taosArrayInit(taosHashGetSize(pStmt->pTableNameHashObj), sizeof(SName));
11,748✔
3579
    if (!pQuery->pTableList) {
11,742!
3580
      code = terrno;
×
3581
    } else {
3582
      SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL);
11,742✔
3583
      while (NULL != pTable) {
23,579✔
3584
        if (NULL == taosArrayPush(pQuery->pTableList, pTable)) {
23,635!
3585
          code = terrno;
×
3586
          taosHashCancelIterate(pStmt->pTableNameHashObj, pTable);
×
3587
          break;
×
3588
        }
3589
        pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable);
11,816✔
3590
      }
3591
    }
3592
  }
3593

3594
  if (TSDB_CODE_SUCCESS == code && taosHashGetSize(pStmt->pDbFNameHashObj) > 0) {
7,908,654✔
3595
    taosArrayDestroy(pQuery->pDbList);
11,756✔
3596
    pQuery->pDbList = taosArrayInit(taosHashGetSize(pStmt->pDbFNameHashObj), TSDB_DB_FNAME_LEN);
11,757✔
3597
    if (!pQuery->pDbList) {
11,740!
3598
      code = terrno;
×
3599
    } else {
3600
      char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL);
11,740✔
3601
      while (NULL != pDb) {
20,777✔
3602
        if (NULL == taosArrayPush(pQuery->pDbList, pDb)) {
23,524!
3603
          code = terrno;
×
3604
          taosHashCancelIterate(pStmt->pDbFNameHashObj, pDb);
×
3605
          break;
×
3606
        }
3607
        pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb);
11,763✔
3608
      }
3609
    }
3610
  }
3611

3612
  return code;
7,890,830✔
3613
}
3614

3615
// INSERT INTO
3616
//   tb_name
3617
//       [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) [table_options]]
3618
//       [(field1_name, ...)]
3619
//       VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
3620
//   [...];
3621
static int32_t parseInsertSqlFromStart(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
7,987,226✔
3622
  int32_t code = skipInsertInto(&pStmt->pSql, &pCxt->msg);
7,987,226✔
3623
  if (TSDB_CODE_SUCCESS == code) {
7,998,634!
3624
    code = parseInsertBody(pCxt, pStmt);
7,998,986✔
3625
  }
3626
  return code;
7,942,338✔
3627
}
3628

3629
static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
×
3630
  int32_t          code = TSDB_CODE_SUCCESS;
×
3631
  SRowsDataContext rowsDataCxt;
3632

3633
  if (!pStmt->stbSyntax) {
×
3634
    STableDataCxt* pTableCxt = NULL;
×
3635
    code = getTableDataCxt(pCxt, pStmt, &pTableCxt);
×
3636
    rowsDataCxt.pTableDataCxt = pTableCxt;
×
3637
  } else {
3638
    rowsDataCxt.pStbRowsCxt = pStmt->pStbRowsCxt;
×
3639
  }
3640
  if (TSDB_CODE_SUCCESS == code) {
×
3641
    code = parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt);
×
3642
  }
3643

3644
  if (TSDB_CODE_SUCCESS == code) {
×
3645
    if (pStmt->fileProcessing) {
×
3646
      code = parseInsertBodyBottom(pCxt, pStmt);
×
3647
    } else {
3648
      code = parseInsertBody(pCxt, pStmt);
×
3649
    }
3650
  }
3651

3652
  return code;
×
3653
}
3654

3655
static int32_t parseInsertSqlFromTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
32,505✔
3656
  int32_t code = parseInsertTableClauseBottom(pCxt, pStmt);
32,505✔
3657
  if (TSDB_CODE_SUCCESS == code) {
32,500✔
3658
    code = parseInsertBody(pCxt, pStmt);
32,482✔
3659
  }
3660
  return code;
32,502✔
3661
}
3662

3663
static int32_t parseInsertSqlImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
8,019,741✔
3664
  if (pStmt->pSql == pCxt->pComCxt->pSql || NULL != pCxt->pComCxt->pStmtCb) {
8,019,741!
3665
    return parseInsertSqlFromStart(pCxt, pStmt);
7,987,235✔
3666
  }
3667

3668
  if (pStmt->fileProcessing) {
32,506!
3669
    return parseInsertSqlFromCsv(pCxt, pStmt);
×
3670
  }
3671

3672
  return parseInsertSqlFromTable(pCxt, pStmt);
32,506✔
3673
}
3674

3675
static int32_t buildUsingInsertTableReq(SName* pSName, SName* pCName, SArray** pTables) {
193✔
3676
  if (NULL == *pTables) {
193!
3677
    *pTables = taosArrayInit(2, sizeof(SName));
193✔
3678
    if (NULL == *pTables) {
193!
3679
      goto _err;
×
3680
    }
3681
  }
3682
  if (NULL == taosArrayPush(*pTables, pSName)) {
386!
3683
    goto _err;
×
3684
  }
3685
  if (NULL == taosArrayPush(*pTables, pCName)) {
386!
3686
    goto _err;
×
3687
  }
3688
  return TSDB_CODE_SUCCESS;
193✔
3689

3690
_err:
×
3691
  if (NULL != *pTables) {
×
3692
    taosArrayDestroy(*pTables);
×
3693
    *pTables = NULL;
×
3694
  }
3695
  return terrno;
×
3696
}
3697

3698
static int32_t buildInsertTableReq(SName* pName, SArray** pTables) {
65,666✔
3699
  *pTables = taosArrayInit(1, sizeof(SName));
65,666✔
3700
  if (NULL == *pTables) {
65,663!
3701
    return terrno;
×
3702
  }
3703

3704
  if (NULL == taosArrayPush(*pTables, pName)) {
131,332!
3705
    taosArrayDestroy(*pTables);
×
3706
    *pTables = NULL;
×
3707
    return terrno;
×
3708
  }
3709
  return TSDB_CODE_SUCCESS;
65,669✔
3710
}
3711

3712
static int32_t buildInsertUsingDbReq(SName* pSName, SName* pCName, SArray** pDbs) {
193✔
3713
  if (NULL == *pDbs) {
193!
3714
    *pDbs = taosArrayInit(1, sizeof(STablesReq));
193✔
3715
    if (NULL == *pDbs) {
193!
3716
      return terrno;
×
3717
    }
3718
  }
3719

3720
  STablesReq req = {0};
193✔
3721
  req.autoCreate = 1;
193✔
3722
  (void)tNameGetFullDbName(pSName, req.dbFName);
193✔
3723
  (void)tNameGetFullDbName(pCName, req.dbFName);
193✔
3724

3725
  int32_t code = buildUsingInsertTableReq(pSName, pCName, &req.pTables);
193✔
3726
  if (TSDB_CODE_SUCCESS == code && NULL == taosArrayPush(*pDbs, &req)) {
386!
3727
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3728
  }
3729
  return code;
193✔
3730
}
3731

3732
static int32_t buildInsertDbReq(SName* pName, SArray** pDbs) {
65,664✔
3733
  if (NULL == *pDbs) {
65,664!
3734
    *pDbs = taosArrayInit(1, sizeof(STablesReq));
65,665✔
3735
    if (NULL == *pDbs) {
65,664!
3736
      return terrno;
×
3737
    }
3738
  }
3739

3740
  STablesReq req = {0};
65,663✔
3741
  (void)tNameGetFullDbName(pName, req.dbFName);
65,663✔
3742
  int32_t code = buildInsertTableReq(pName, &req.pTables);
65,670✔
3743
  if (TSDB_CODE_SUCCESS == code && NULL == taosArrayPush(*pDbs, &req)) {
131,338!
3744
    code = TSDB_CODE_OUT_OF_MEMORY;
×
3745
  }
3746

3747
  return code;
65,668✔
3748
}
3749

3750
static int32_t buildInsertUserAuthReq(const char* pUser, SName* pName, SArray** pUserAuth) {
32,923✔
3751
  *pUserAuth = taosArrayInit(1, sizeof(SUserAuthInfo));
32,923✔
3752
  if (NULL == *pUserAuth) {
32,928!
3753
    return terrno;
×
3754
  }
3755

3756
  SUserAuthInfo userAuth = {.type = AUTH_TYPE_WRITE};
32,928✔
3757
  snprintf(userAuth.user, sizeof(userAuth.user), "%s", pUser);
32,928✔
3758
  memcpy(&userAuth.tbName, pName, sizeof(SName));
32,928✔
3759
  if (NULL == taosArrayPush(*pUserAuth, &userAuth)) {
65,857!
3760
    taosArrayDestroy(*pUserAuth);
×
3761
    *pUserAuth = NULL;
×
3762
    return terrno;
×
3763
  }
3764

3765
  return TSDB_CODE_SUCCESS;
32,929✔
3766
}
3767

3768
static int32_t buildInsertTableTagReq(SName* pName, SArray** pTables) { return buildInsertTableReq(pName, pTables); }
×
3769

3770
static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SCatalogReq* pCatalogReq) {
32,923✔
3771
  int32_t code = buildInsertUserAuthReq(
32,923✔
3772
      pCxt->pComCxt->pUser, (0 == pStmt->usingTableName.type ? &pStmt->targetTableName : &pStmt->usingTableName),
65,846✔
3773
      &pCatalogReq->pUser);
3774
  if (TSDB_CODE_SUCCESS == code && pCxt->needTableTagVal) {
32,929!
3775
    code = buildInsertTableTagReq(&pStmt->targetTableName, &pCatalogReq->pTableTag);
×
3776
  }
3777
  if (TSDB_CODE_SUCCESS == code) {
32,929!
3778
    if (0 == pStmt->usingTableName.type) {
32,931✔
3779
      code = buildInsertDbReq(&pStmt->targetTableName, &pCatalogReq->pTableMeta);
32,738✔
3780
    } else {
3781
      code = buildInsertUsingDbReq(&pStmt->usingTableName, &pStmt->targetTableName, &pCatalogReq->pTableMeta);
193✔
3782
    }
3783
  }
3784
  if (TSDB_CODE_SUCCESS == code) {
32,929!
3785
    code = buildInsertDbReq(&pStmt->targetTableName, &pCatalogReq->pTableHash);
32,931✔
3786
  }
3787
  return code;
32,931✔
3788
}
3789

3790
static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) {
7,977,861✔
3791
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
7,977,861✔
3792
  if (pCxt->missCache) {
7,977,861✔
3793
    parserDebug("QID:0x%" PRIx64 ", %d rows of %d tables will be inserted before obtain the cache",
32,924✔
3794
                pCxt->pComCxt->requestId, pStmt->totalRowsNum, pStmt->totalTbNum);
3795

3796
    pQuery->execStage = QUERY_EXEC_STAGE_PARSE;
32,924✔
3797
    return buildInsertCatalogReq(pCxt, pStmt, pCatalogReq);
32,924✔
3798
  }
3799

3800
  parserDebug("QID:0x%" PRIx64 ", %d rows of %d tables will be inserted", pCxt->pComCxt->requestId, pStmt->totalRowsNum,
7,944,937✔
3801
              pStmt->totalTbNum);
3802

3803
  pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE;
7,959,147✔
3804
  return TSDB_CODE_SUCCESS;
7,959,147✔
3805
}
3806

3807
int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData) {
8,024,348✔
3808
  SInsertParseContext context = {.pComCxt = pCxt,
16,048,696✔
3809
                                 .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
8,024,348✔
3810
                                 .missCache = false,
3811
                                 .usingDuplicateTable = false,
3812
                                 .needRequest = true,
3813
                                 .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)};
8,024,348✔
3814

3815
  int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
8,024,348✔
3816
  if (TSDB_CODE_SUCCESS == code) {
8,020,687✔
3817
    code = parseInsertSqlImpl(&context, (SVnodeModifyOpStmt*)((*pQuery)->pRoot));
8,020,427✔
3818
  }
3819

3820
  if (TSDB_CODE_SUCCESS == code) {
7,970,362!
3821
    code = setNextStageInfo(&context, *pQuery, pCatalogReq);
7,982,047✔
3822
  }
3823
  if ((TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) &&
7,979,123!
3824
      QUERY_EXEC_STAGE_SCHEDULE == (*pQuery)->execStage) {
7,979,012✔
3825
    code = setRefreshMeta(*pQuery);
7,941,540✔
3826
  }
3827

3828
  insDestroyBoundColInfo(&context.tags);
7,928,336✔
3829
  clearInsertParseContext(&context);
7,921,667✔
3830
  // if no data to insert, set emptyMode to avoid request server
3831
  if (!context.needRequest) {
7,924,972!
3832
    (*pQuery)->execMode = QUERY_EXEC_MODE_EMPTY_RESULT;
×
3833
  }
3834
  return code;
7,924,972✔
3835
}
3836

3837
// CSV Parser Implementation
3838
static int32_t csvParserInit(SCsvParser* parser, TdFilePtr pFile) {
2✔
3839
  if (!parser || !pFile) {
2!
3840
    return TSDB_CODE_INVALID_PARA;
×
3841
  }
3842

3843
  memset(parser, 0, sizeof(SCsvParser));
2✔
3844

3845
  // Set default CSV format
3846
  parser->delimiter = CSV_DEFAULT_DELIMITER;
2✔
3847
  parser->quote = CSV_QUOTE_SINGLE;  // Default to single quote for TDengine compatibility
2✔
3848
  parser->escape = CSV_ESCAPE_CHAR;
2✔
3849
  parser->allowNewlineInField = true;
2✔
3850

3851
  // Initialize buffer
3852
  parser->bufferSize = 64 * 1024;  // 64KB buffer
2✔
3853
  parser->buffer = taosMemoryMalloc(parser->bufferSize);
2!
3854
  if (!parser->buffer) {
2!
3855
    return terrno;
×
3856
  }
3857

3858
  // Initialize line buffer for reuse
3859
  parser->lineBufferCapacity = 64 * 1024;  // Initial 64KB line buffer
2✔
3860
  parser->lineBuffer = taosMemoryMalloc(parser->lineBufferCapacity);
2!
3861
  if (!parser->lineBuffer) {
2!
3862
    return terrno;
×
3863
  }
3864

3865
  parser->bufferPos = 0;
2✔
3866
  parser->bufferLen = 0;
2✔
3867
  parser->eof = false;
2✔
3868
  parser->pFile = pFile;
2✔
3869

3870
  // Fill initial buffer to detect quote type
3871
  int32_t code = csvParserFillBuffer(parser);
2✔
3872
  if (code != TSDB_CODE_SUCCESS) {
2!
3873
    return code;
×
3874
  }
3875

3876
  // Auto-detect quote character by finding the first quote in the file
3877
  // Skip the header line and look for the first quote character in data
3878
  bool foundFirstQuote = false;
2✔
3879
  bool inFirstLine = true;
2✔
3880

3881
  for (size_t i = 0; i < parser->bufferLen && !foundFirstQuote; i++) {
1,806!
3882
    char ch = parser->buffer[i];
1,804✔
3883

3884
    // Skip the first line (header)
3885
    if (inFirstLine) {
1,804✔
3886
      if (ch == '\n') {
1,802✔
3887
        inFirstLine = false;
2✔
3888
      }
3889
      continue;
1,802✔
3890
    }
3891

3892
    // Look for the first quote character in data lines
3893
    if (ch == CSV_QUOTE_SINGLE) {
2✔
3894
      parser->quote = CSV_QUOTE_SINGLE;
1✔
3895
      foundFirstQuote = true;
1✔
3896
    } else if (ch == CSV_QUOTE_DOUBLE) {
1!
3897
      parser->quote = CSV_QUOTE_DOUBLE;
1✔
3898
      foundFirstQuote = true;
1✔
3899
    }
3900
  }
3901

3902
  // If no quotes found, keep default (single quote for TDengine compatibility)
3903

3904
  // Reset buffer position for actual parsing
3905
  parser->bufferPos = 0;
2✔
3906

3907
  return TSDB_CODE_SUCCESS;
2✔
3908
}
3909

3910
static void csvParserDestroy(SCsvParser* parser) {
2✔
3911
  if (parser) {
2!
3912
    taosMemoryFree(parser->buffer);
2!
3913
    taosMemoryFree(parser->lineBuffer);
2!
3914
    memset(parser, 0, sizeof(SCsvParser));
2✔
3915
  }
3916
}
2✔
3917

3918
static int32_t csvParserFillBuffer(SCsvParser* parser) {
7✔
3919
  if (!parser || parser->eof) {
7!
3920
    return TSDB_CODE_SUCCESS;
×
3921
  }
3922

3923
  // Move remaining data to beginning of buffer
3924
  // Since this function is only called when bufferPos >= bufferLen,
3925
  // we can simplify by always resetting the buffer
3926
  parser->bufferLen = 0;
7✔
3927
  parser->bufferPos = 0;
7✔
3928

3929
  // Read more data
3930
  size_t spaceLeft = parser->bufferSize - parser->bufferLen;
7✔
3931
  if (spaceLeft > 0) {
7!
3932
    int64_t bytesRead = taosReadFile(parser->pFile, parser->buffer + parser->bufferLen, spaceLeft);
7✔
3933
    if (bytesRead < 0) {
7!
3934
      return TAOS_SYSTEM_ERROR(errno);
×
3935
    }
3936
    if (bytesRead == 0) {
7✔
3937
      parser->eof = true;
2✔
3938
    } else {
3939
      parser->bufferLen += bytesRead;
5✔
3940
    }
3941
  }
3942

3943
  return TSDB_CODE_SUCCESS;
7✔
3944
}
3945

3946
// Destroy saved CSV parser in SVnodeModifyOpStmt
3947
static void destroySavedCsvParser(SVnodeModifyOpStmt* pStmt) {
2✔
3948
  if (pStmt && pStmt->pCsvParser) {
2!
3949
    csvParserDestroy(pStmt->pCsvParser);
2✔
3950
    taosMemoryFree(pStmt->pCsvParser);
2!
3951
    pStmt->pCsvParser = NULL;
2✔
3952
  }
3953
}
2✔
3954

3955
static int32_t csvParserExpandLineBuffer(SCsvParser* parser, size_t requiredLen) {
198,001✔
3956
  if (!parser || requiredLen <= parser->lineBufferCapacity) {
198,001!
3957
    return TSDB_CODE_SUCCESS;
198,001✔
3958
  }
3959

3960
  size_t newCapacity = parser->lineBufferCapacity;
×
3961
  while (newCapacity < requiredLen) {
×
3962
    newCapacity *= 2;
×
3963
  }
3964

3965
  char* newLineBuffer = taosMemoryRealloc(parser->lineBuffer, newCapacity);
×
3966
  if (!newLineBuffer) {
×
3967
    return TSDB_CODE_OUT_OF_MEMORY;
×
3968
  }
3969

3970
  parser->lineBuffer = newLineBuffer;
×
3971
  parser->lineBufferCapacity = newCapacity;
×
3972
  return TSDB_CODE_SUCCESS;
×
3973
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc