• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4416

03 Jul 2025 10:49AM UTC coverage: 61.007% (-1.2%) from 62.241%
#4416

push

travis-ci

GitHub
Merge pull request #31575 from taosdata/fix/huoh/taos_log

150735 of 316232 branches covered (47.67%)

Branch coverage included in aggregate %.

233783 of 314057 relevant lines covered (74.44%)

6782670.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.31
/source/client/src/clientStmt2.c
1
#include "clientInt.h"
2
#include "clientLog.h"
3
#include "tdef.h"
4

5
#include "clientStmt.h"
6
#include "clientStmt2.h"
7
/*
8
char* gStmtStatusStr[] = {"unknown",     "init", "prepare", "settbname", "settags",
9
                          "fetchFields", "bind", "bindCol", "addBatch",  "exec"};
10
*/
11
static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pBuf) {
12
  if (pTblBuf->buffOffset < pTblBuf->buffSize) {
201✔
13
    *pBuf = (char*)pTblBuf->pCurBuff + pTblBuf->buffOffset;
201✔
14
    pTblBuf->buffOffset += pTblBuf->buffUnit;
201✔
15
  } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) {
×
16
    pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++);
×
17
    if (NULL == pTblBuf->pCurBuff) {
×
18
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
19
    }
20
    *pBuf = pTblBuf->pCurBuff;
×
21
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
22
  } else {
23
    void* buff = taosMemoryMalloc(pTblBuf->buffSize);
×
24
    if (NULL == buff) {
×
25
      return terrno;
×
26
    }
27

28
    if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
×
29
      return terrno;
×
30
    }
31

32
    pTblBuf->buffIdx++;
×
33
    pTblBuf->pCurBuff = buff;
×
34
    *pBuf = buff;
×
35
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
36
  }
37

38
  return TSDB_CODE_SUCCESS;
201✔
39
}
40

41
static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
200✔
42
  int i = 0;
200✔
43
  while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
1,121✔
44
    if (pStmt->queue.stopQueue) {
966✔
45
      return false;
45✔
46
    }
47
    if (i < 10) {
921✔
48
      taosUsleep(1);
859✔
49
      i++;
859✔
50
    } else {
51
      (void)taosThreadMutexLock(&pStmt->queue.mutex);
62✔
52
      if (pStmt->queue.stopQueue) {
62!
53
        (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
×
54
        return false;
×
55
      }
56
      if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
62!
57
        (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
62✔
58
      }
59
      (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
62✔
60
    }
61
  }
62

63
  if (pStmt->queue.stopQueue && 0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
155!
64
    return false;
×
65
  }
66

67
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
155✔
68
  if (pStmt->queue.head == pStmt->queue.tail) {
155!
69
    pStmt->queue.qRemainNum = 0;
×
70
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
×
71
    STMT2_ELOG_E("interlace queue is empty, cannot dequeue");
×
72
    return false;
×
73
  }
74

75
  SStmtQNode* node = pStmt->queue.head->next;
155✔
76
  pStmt->queue.head->next = node->next;
155✔
77
  if (pStmt->queue.tail == node) {
155✔
78
    pStmt->queue.tail = pStmt->queue.head;
73✔
79
  }
80
  node->next = NULL;
155✔
81
  *param = node;
155✔
82

83
  (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
155✔
84
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
155✔
85

86
  STMT2_DLOG("dequeue success, node:%p, remainNum:%" PRId64, node, pStmt->queue.qRemainNum);
155!
87

88
  return true;
155✔
89
}
90

91
static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
155✔
92
  if (param == NULL) {
155!
93
    STMT2_ELOG_E("enqueue param is NULL");
×
94
    return;
×
95
  }
96

97
  param->next = NULL;
155✔
98

99
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
155✔
100

101
  pStmt->queue.tail->next = param;
155✔
102
  pStmt->queue.tail = param;
155✔
103
  pStmt->stat.bindDataNum++;
155✔
104

105
  (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
155✔
106
  (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
155✔
107

108
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
155✔
109

110
  STMT2_TLOG("enqueue param:%p, remainNum:%" PRId64 ", restoreTbCols:%d", param, pStmt->queue.qRemainNum,
155!
111
             param->restoreTbCols);
112
}
113

114
static int32_t stmtCreateRequest(STscStmt2* pStmt) {
824✔
115
  int32_t code = 0;
824✔
116

117
  if (pStmt->exec.pRequest == NULL) {
824✔
118
    code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
140✔
119
                        pStmt->reqid);
120
    if (pStmt->reqid != 0) {
140!
121
      pStmt->reqid++;
×
122
    }
123
    pStmt->exec.pRequest->type = RES_TYPE__QUERY;
140✔
124
    if (pStmt->db != NULL) {
140✔
125
      taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
139!
126
      pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
139!
127
    }
128
    if (TSDB_CODE_SUCCESS == code) {
140!
129
      pStmt->exec.pRequest->syncQuery = true;
140✔
130
      pStmt->exec.pRequest->isStmtBind = true;
140✔
131
    }
132
  }
133

134
  STMT2_TLOG("stmtCreateRequest, pRequest:%p, code:%d, db:%s", pStmt->exec.pRequest, code, pStmt->db);
824!
135

136
  return code;
824✔
137
}
138

139
static int32_t stmtSwitchStatus(STscStmt2* pStmt, STMT_STATUS newStatus) {
1,091✔
140
  int32_t code = 0;
1,091✔
141

142
  if (newStatus >= STMT_INIT && newStatus < STMT_MAX) {
1,091!
143
    STMT_LOG_SEQ(newStatus);
1,091!
144
  }
145

146
  if (pStmt->errCode && newStatus != STMT_PREPARE) {
1,091!
147
    STMT2_ELOG("stmt already failed with err:%s, please use stmt prepare", tstrerror(pStmt->errCode));
×
148
    return pStmt->errCode;
×
149
  }
150

151
  switch (newStatus) {
1,091!
152
    case STMT_PREPARE:
114✔
153
      pStmt->errCode = 0;
114✔
154
      break;
114✔
155
    case STMT_SETTBNAME:
220✔
156
      if (STMT_STATUS_EQ(INIT)) {
220!
157
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
158
      }
159
      if (!pStmt->sql.stbInterlaceMode && (STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL))) {
220!
160
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
161
      }
162
      break;
220✔
163
    case STMT_SETTAGS:
181✔
164
      if (STMT_STATUS_EQ(INIT)) {
181!
165
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
166
      }
167
      break;
181✔
168
    case STMT_FETCH_FIELDS:
70✔
169
      if (STMT_STATUS_EQ(INIT)) {
70!
170
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
171
      }
172
      break;
70✔
173
    case STMT_BIND:
233✔
174
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
233!
175
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
176
      }
177
      /*
178
            if ((pStmt->sql.type == STMT_TYPE_MULTI_INSERT) && ()) {
179
              code = TSDB_CODE_TSC_STMT_API_ERROR;
180
            }
181
      */
182
      break;
233✔
183
    case STMT_BIND_COL:
×
184
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
×
185
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
186
      }
187
      break;
×
188
    case STMT_ADD_BATCH:
169✔
189
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
169!
190
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
191
      }
192
      break;
169✔
193
    case STMT_EXECUTE:
104✔
194
      if (STMT_TYPE_QUERY == pStmt->sql.type) {
104✔
195
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
5!
196
            STMT_STATUS_NE(BIND_COL)) {
×
197
          code = TSDB_CODE_TSC_STMT_API_ERROR;
×
198
        }
199
      } else {
200
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
99!
201
          code = TSDB_CODE_TSC_STMT_API_ERROR;
1✔
202
        }
203
      }
204
      break;
104✔
205
    default:
×
206
      code = TSDB_CODE_APP_ERROR;
×
207
      break;
×
208
  }
209

210
  STMT_ERR_RET(code);
1,091✔
211

212
  pStmt->sql.status = newStatus;
1,090✔
213

214
  return TSDB_CODE_SUCCESS;
1,090✔
215
}
216

217
static int32_t stmtGetTbName(TAOS_STMT2* stmt, char** tbName) {
87✔
218
  STscStmt2* pStmt = (STscStmt2*)stmt;
87✔
219

220
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
87✔
221

222
  if ('\0' == pStmt->bInfo.tbName[0]) {
87✔
223
    tscWarn("no table name set, OK if it is a stmt get fields");
33!
224
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
33!
225
  }
226

227
  *tbName = pStmt->bInfo.tbName;
54✔
228

229
  return TSDB_CODE_SUCCESS;
54✔
230
}
231

232
static int32_t stmtUpdateBindInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SName* tbName,
93✔
233
                                  const char* sTableName, bool autoCreateTbl, int8_t tbNameFlag) {
234
  STscStmt2* pStmt = (STscStmt2*)stmt;
93✔
235
  char       tbFName[TSDB_TABLE_FNAME_LEN];
236
  int32_t    code = tNameExtractFullName(tbName, tbFName);
93✔
237
  if (code != 0) {
93!
238
    return code;
×
239
  }
240

241
  (void)memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
93✔
242
  tstrncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName));
93✔
243
  pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
93✔
244

245
  pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
93✔
246
  pStmt->bInfo.tbSuid = pTableMeta->suid;
93✔
247
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
93✔
248
  pStmt->bInfo.tbType = pTableMeta->tableType;
93✔
249

250
  if (!pStmt->bInfo.tagsCached) {
93✔
251
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
90✔
252
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
90!
253
  }
254

255
  pStmt->bInfo.boundTags = tags;
93✔
256
  pStmt->bInfo.tagsCached = false;
93✔
257
  pStmt->bInfo.tbNameFlag = tbNameFlag;
93✔
258
  tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
93✔
259

260
  return TSDB_CODE_SUCCESS;
93✔
261
}
262

263
static int32_t stmtUpdateExecInfo(TAOS_STMT2* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
93✔
264
  STscStmt2* pStmt = (STscStmt2*)stmt;
93✔
265

266
  pStmt->sql.pVgHash = pVgHash;
93✔
267
  pStmt->exec.pBlockHash = pBlockHash;
93✔
268

269
  return TSDB_CODE_SUCCESS;
93✔
270
}
271

272
static int32_t stmtUpdateInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, bool autoCreateTbl,
93✔
273
                              SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName, uint8_t tbNameFlag) {
274
  STscStmt2* pStmt = (STscStmt2*)stmt;
93✔
275

276
  STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbName, sTableName, autoCreateTbl, tbNameFlag));
93!
277
  STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
93!
278

279
  pStmt->sql.autoCreateTbl = autoCreateTbl;
93✔
280

281
  return TSDB_CODE_SUCCESS;
93✔
282
}
283

284
static int32_t stmtGetExecInfo(TAOS_STMT2* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
11✔
285
  STscStmt2* pStmt = (STscStmt2*)stmt;
11✔
286

287
  *pVgHash = pStmt->sql.pVgHash;
11✔
288
  pStmt->sql.pVgHash = NULL;
11✔
289

290
  *pBlockHash = pStmt->exec.pBlockHash;
11✔
291
  pStmt->exec.pBlockHash = NULL;
11✔
292

293
  return TSDB_CODE_SUCCESS;
11✔
294
}
295

296
static int32_t stmtParseSql(STscStmt2* pStmt) {
123✔
297
  pStmt->exec.pCurrBlock = NULL;
123✔
298

299
  STMT2_DLOG_E("start to stmtParseSql");
123!
300

301
  SStmtCallback stmtCb = {
123✔
302
      .pStmt = pStmt,
303
      .getTbNameFn = stmtGetTbName,
304
      .setInfoFn = stmtUpdateInfo,
305
      .getExecInfoFn = stmtGetExecInfo,
306
  };
307

308
  STMT_ERR_RET(stmtCreateRequest(pStmt));
123!
309

310
  pStmt->stat.parseSqlNum++;
123✔
311
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
123✔
312

313
  pStmt->sql.siInfo.pQuery = pStmt->sql.pQuery;
100✔
314

315
  pStmt->bInfo.needParse = false;
100✔
316

317
  if (pStmt->sql.type == 0) {
100✔
318
    if (pStmt->sql.pQuery->pRoot && LEGAL_INSERT(nodeType(pStmt->sql.pQuery->pRoot))) {
12!
319
      pStmt->sql.type = STMT_TYPE_INSERT;
11✔
320
      pStmt->sql.stbInterlaceMode = false;
11✔
321
    } else if (pStmt->sql.pQuery->pPrepareRoot && LEGAL_SELECT(nodeType(pStmt->sql.pQuery->pPrepareRoot))) {
1!
322
      pStmt->sql.type = STMT_TYPE_QUERY;
×
323
      pStmt->sql.stbInterlaceMode = false;
×
324

325
      return TSDB_CODE_SUCCESS;
×
326
    } else {
327
      pStmt->bInfo.needParse = true;
1✔
328
      STMT2_ELOG_E("only support select or insert sql");
1!
329
      if (pStmt->exec.pRequest->msgBuf) {
1!
330
        tstrncpy(pStmt->exec.pRequest->msgBuf, "stmt only support select or insert", pStmt->exec.pRequest->msgBufLen);
1✔
331
      }
332
      return TSDB_CODE_TSC_STMT_API_ERROR;
1✔
333
    }
334
  } else if (pStmt->sql.type == STMT_TYPE_QUERY) {
88✔
335
    pStmt->sql.stbInterlaceMode = false;
6✔
336
    return TSDB_CODE_SUCCESS;
6✔
337
  } else if (pStmt->sql.type == STMT_TYPE_INSERT) {
82!
338
    pStmt->sql.stbInterlaceMode = false;
×
339
  }
340

341
  STableDataCxt** pSrc =
342
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
93✔
343
  if (NULL == pSrc || NULL == *pSrc) {
93!
344
    STMT2_ELOG("fail to get exec.pBlockHash, maybe parse failed, tbFName:%s", pStmt->bInfo.tbFName);
×
345
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
346
  }
347

348
  STableDataCxt* pTableCtx = *pSrc;
93✔
349
  if (pStmt->sql.stbInterlaceMode && pTableCtx->pData->pCreateTbReq && (pStmt->bInfo.tbNameFlag & USING_CLAUSE) == 0) {
93✔
350
    STMT2_TLOG("destroy pCreateTbReq for no-using insert, tbFName:%s", pStmt->bInfo.tbFName);
10!
351
    tdDestroySVCreateTbReq(pTableCtx->pData->pCreateTbReq);
10!
352
    taosMemoryFreeClear(pTableCtx->pData->pCreateTbReq);
10!
353
    pTableCtx->pData->pCreateTbReq = NULL;
10✔
354
  }
355
  // if (pStmt->sql.stbInterlaceMode) {
356
  //   int16_t lastIdx = -1;
357

358
  //   for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) {
359
  //     if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) {
360
  //       pStmt->sql.stbInterlaceMode = false;
361
  //       break;
362
  //     }
363

364
  //     lastIdx = pTableCtx->boundColsInfo.pColIndex[i];
365
  //   }
366
  // }
367

368
  if (NULL == pStmt->sql.pBindInfo) {
93✔
369
    pStmt->sql.pBindInfo = taosMemoryMalloc(pTableCtx->boundColsInfo.numOfBound * sizeof(*pStmt->sql.pBindInfo));
82!
370
    if (NULL == pStmt->sql.pBindInfo) {
82!
371
      STMT2_ELOG_E("fail to malloc pBindInfo");
×
372
      return terrno;
×
373
    }
374
  }
375

376
  return TSDB_CODE_SUCCESS;
93✔
377
}
378

379
static int32_t stmtCleanBindInfo(STscStmt2* pStmt) {
285✔
380
  pStmt->bInfo.tbUid = 0;
285✔
381
  pStmt->bInfo.tbVgId = -1;
285✔
382
  pStmt->bInfo.tbType = 0;
285✔
383
  pStmt->bInfo.needParse = true;
285✔
384
  pStmt->bInfo.inExecCache = false;
285✔
385

386
  pStmt->bInfo.tbName[0] = 0;
285✔
387
  pStmt->bInfo.tbFName[0] = 0;
285✔
388
  if (!pStmt->bInfo.tagsCached) {
285✔
389
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
207✔
390
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
207!
391
  }
392
  if (!pStmt->sql.autoCreateTbl) {
285✔
393
    pStmt->bInfo.stbFName[0] = 0;
182✔
394
    pStmt->bInfo.tbSuid = 0;
182✔
395
  }
396

397
  STMT2_TLOG("finish clean bind info, tagsCached:%d, autoCreateTbl:%d", pStmt->bInfo.tagsCached,
285!
398
             pStmt->sql.autoCreateTbl);
399

400
  return TSDB_CODE_SUCCESS;
285✔
401
}
402

403
static void stmtFreeTableBlkList(STableColsData* pTb) {
×
404
  (void)qResetStmtColumns(pTb->aCol, true);
×
405
  taosArrayDestroy(pTb->aCol);
×
406
}
×
407

408
static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
50✔
409
  pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0);
50✔
410
  if (NULL == pTblBuf->pCurBuff) {
50!
411
    tscError("QInfo:%p, fail to get buffer from list", pTblBuf);
×
412
    return;
×
413
  }
414
  pTblBuf->buffIdx = 1;
50✔
415
  pTblBuf->buffOffset = sizeof(*pQueue->head);
50✔
416

417
  pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
50✔
418
  pQueue->qRemainNum = 0;
50✔
419
  pQueue->head->next = NULL;
50✔
420
}
421

422
static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
218✔
423
  if (pStmt->sql.stbInterlaceMode) {
218✔
424
    if (deepClean) {
88✔
425
      taosHashCleanup(pStmt->exec.pBlockHash);
38✔
426
      pStmt->exec.pBlockHash = NULL;
38✔
427

428
      if (NULL != pStmt->exec.pCurrBlock) {
38✔
429
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->boundColsInfo.pColIndex);
33!
430
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->pData);
33!
431
        qDestroyStmtDataBlock(pStmt->exec.pCurrBlock);
33✔
432
        pStmt->exec.pCurrBlock = NULL;
33✔
433
      }
434
      if (STMT_TYPE_QUERY != pStmt->sql.type) {
38!
435
        taos_free_result(pStmt->exec.pRequest);
38✔
436
        pStmt->exec.pRequest = NULL;
38✔
437
      }
438
    } else {
439
      pStmt->sql.siInfo.pTableColsIdx = 0;
50✔
440
      stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue);
50✔
441
      tSimpleHashClear(pStmt->sql.siInfo.pTableUidHash);
50✔
442
    }
443
    if (NULL != pStmt->exec.pRequest) {
88✔
444
      pStmt->exec.pRequest->body.resInfo.numOfRows = 0;
50✔
445
    }
446
  } else {
447
    if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
130✔
448
      // if (!pStmt->options.asyncExecFn) {
449
      taos_free_result(pStmt->exec.pRequest);
125✔
450
      pStmt->exec.pRequest = NULL;
125✔
451
      //}
452
    }
453

454
    size_t keyLen = 0;
130✔
455
    void*  pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
130✔
456
    while (pIter) {
274✔
457
      STableDataCxt* pBlocks = *(STableDataCxt**)pIter;
144✔
458
      char*          key = taosHashGetKey(pIter, &keyLen);
144✔
459
      STableMeta*    pMeta = qGetTableMetaInDataBlock(pBlocks);
144✔
460

461
      if (keepTable && pBlocks == pStmt->exec.pCurrBlock) {
144✔
462
        TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData);
48✔
463
        STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
101!
464

465
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
48✔
466
        continue;
48✔
467
      }
468

469
      qDestroyStmtDataBlock(pBlocks);
96✔
470
      STMT_ERR_RET(taosHashRemove(pStmt->exec.pBlockHash, key, keyLen));
96!
471

472
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
96✔
473
    }
474

475
    if (keepTable) {
130✔
476
      STMT2_TLOG("finish clean exec info, stbInterlaceMode:%d, keepTable:%d, deepClean:%d", pStmt->sql.stbInterlaceMode,
53!
477
                 keepTable, deepClean);
478
      return TSDB_CODE_SUCCESS;
53✔
479
    }
480

481
    taosHashCleanup(pStmt->exec.pBlockHash);
77✔
482
    pStmt->exec.pBlockHash = NULL;
77✔
483

484
    tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
77✔
485
    taosMemoryFreeClear(pStmt->exec.pCurrTbData);
77!
486
  }
487

488
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
165!
489
  STMT2_TLOG("finish clean exec info, stbInterlaceMode:%d, keepTable:%d, deepClean:%d", pStmt->sql.stbInterlaceMode,
165!
490
             keepTable, deepClean);
491

492
  return TSDB_CODE_SUCCESS;
165✔
493
}
494

495
static void stmtFreeTbBuf(void* buf) {
45✔
496
  void* pBuf = *(void**)buf;
45✔
497
  taosMemoryFree(pBuf);
45!
498
}
45✔
499

500
static void stmtFreeTbCols(void* buf) {
34,000✔
501
  SArray* pCols = *(SArray**)buf;
34,000✔
502
  taosArrayDestroy(pCols);
34,000✔
503
}
34,000✔
504

505
static int32_t stmtCleanSQLInfo(STscStmt2* pStmt) {
92✔
506
  STMT2_TLOG_E("start to free SQL info");
92!
507

508
  taosMemoryFree(pStmt->sql.pBindInfo);
92!
509
  taosMemoryFree(pStmt->sql.queryRes.fields);
92!
510
  taosMemoryFree(pStmt->sql.queryRes.userFields);
92!
511
  taosMemoryFree(pStmt->sql.sqlStr);
92!
512
  qDestroyQuery(pStmt->sql.pQuery);
92✔
513
  taosArrayDestroy(pStmt->sql.nodeList);
92✔
514
  taosHashCleanup(pStmt->sql.pVgHash);
92✔
515
  pStmt->sql.pVgHash = NULL;
92✔
516
  if (pStmt->sql.fixValueTags) {
92✔
517
    tdDestroySVCreateTbReq(pStmt->sql.fixValueTbReq);
4!
518
  }
519

520
  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
92✔
521
  while (pIter) {
108✔
522
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
16✔
523

524
    qDestroyStmtDataBlock(pCache->pDataCtx);
16✔
525
    qDestroyBoundColInfo(pCache->boundTags);
16✔
526
    taosMemoryFreeClear(pCache->boundTags);
16!
527

528
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
16✔
529
  }
530
  taosHashCleanup(pStmt->sql.pTableCache);
92✔
531
  pStmt->sql.pTableCache = NULL;
92✔
532

533
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
92!
534
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
92!
535

536
  taos_free_result(pStmt->sql.siInfo.pRequest);
92✔
537
  taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
92✔
538
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
92✔
539
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableUidHash);
92✔
540
  taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
92✔
541
  taosMemoryFree(pStmt->sql.siInfo.pTSchema);
92!
542
  qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
92✔
543
  taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
92✔
544
  pStmt->sql.siInfo.pTableCols = NULL;
92✔
545

546
  (void)memset(&pStmt->sql, 0, sizeof(pStmt->sql));
92✔
547
  pStmt->sql.siInfo.tableColsReady = true;
92✔
548

549
  STMT2_TLOG_E("end to free SQL info");
92!
550

551
  return TSDB_CODE_SUCCESS;
92✔
552
}
553

554
static int32_t stmtTryAddTableVgroupInfo(STscStmt2* pStmt, int32_t* vgId) {
140✔
555
  if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) {
140✔
556
    return TSDB_CODE_SUCCESS;
10✔
557
  }
558

559
  SVgroupInfo      vgInfo = {0};
130✔
560
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
130✔
561
                           .requestId = pStmt->exec.pRequest->requestId,
130✔
562
                           .requestObjRefId = pStmt->exec.pRequest->self,
130✔
563
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
130✔
564

565
  int32_t code = catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo);
130✔
566
  if (TSDB_CODE_SUCCESS != code) {
130!
567
    STMT2_ELOG("fail to get vgroup info from catalog, code:%d", code);
×
568
    return code;
×
569
  }
570

571
  code =
572
      taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
130✔
573
  if (TSDB_CODE_SUCCESS != code) {
130!
574
    STMT2_ELOG("fail to put vgroup info, code:%d", code);
×
575
    return code;
×
576
  }
577

578
  *vgId = vgInfo.vgId;
130✔
579

580
  return TSDB_CODE_SUCCESS;
130✔
581
}
582

583
static int32_t stmtRebuildDataBlock(STscStmt2* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
66✔
584
                                    uint64_t suid, int32_t vgId) {
585
  STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
66!
586
  STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgId, pStmt->sql.autoCreateTbl));
66!
587

588
  STMT2_DLOG("uid:%" PRId64 ", rebuild table data context, vgId:%d", uid, vgId);
66!
589

590
  return TSDB_CODE_SUCCESS;
66✔
591
}
592

593
static int32_t stmtGetFromCache(STscStmt2* pStmt) {
146✔
594
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) {
146!
595
    pStmt->bInfo.needParse = false;
×
596
    pStmt->bInfo.inExecCache = false;
×
597
    return TSDB_CODE_SUCCESS;
×
598
  }
599

600
  pStmt->bInfo.needParse = true;
146✔
601
  pStmt->bInfo.inExecCache = false;
146✔
602

603
  STableDataCxt** pCxtInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
146✔
604
  if (pCxtInExec) {
146✔
605
    pStmt->bInfo.needParse = false;
26✔
606
    pStmt->bInfo.inExecCache = true;
26✔
607

608
    pStmt->exec.pCurrBlock = *pCxtInExec;
26✔
609

610
    if (pStmt->sql.autoCreateTbl) {
26✔
611
      STMT2_DLOG("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
22!
612
      return TSDB_CODE_SUCCESS;
22✔
613
    }
614
  }
615

616
  if (NULL == pStmt->pCatalog) {
124✔
617
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
36!
618
    pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
36✔
619
  }
620

621
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
124!
622
    if (pStmt->bInfo.inExecCache) {
54!
623
      pStmt->bInfo.needParse = false;
×
624
      STMT2_DLOG("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
×
625
      return TSDB_CODE_SUCCESS;
×
626
    }
627

628
    STMT2_DLOG("no stmt block cache for tb %s", pStmt->bInfo.tbFName);
54!
629
    return TSDB_CODE_SUCCESS;
54✔
630
  }
631

632
  if (pStmt->sql.autoCreateTbl) {
70✔
633
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
54✔
634
    if (pCache) {
54!
635
      pStmt->bInfo.needParse = false;
54✔
636
      pStmt->bInfo.tbUid = 0;
54✔
637

638
      STableDataCxt* pNewBlock = NULL;
54✔
639
      STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid, -1));
54!
640

641
      if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
54!
642
                      POINTER_BYTES)) {
643
        STMT_ERR_RET(terrno);
×
644
      }
645

646
      pStmt->exec.pCurrBlock = pNewBlock;
54✔
647

648
      STMT2_DLOG("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
54!
649

650
      return TSDB_CODE_SUCCESS;
54✔
651
    }
652

653
    STMT_RET(stmtCleanBindInfo(pStmt));
×
654
  }
655

656
  uint64_t uid, suid;
657
  int32_t  vgId;
658
  int8_t   tableType;
659

660
  STableMeta*      pTableMeta = NULL;
16✔
661
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
16✔
662
                           .requestId = pStmt->exec.pRequest->requestId,
16✔
663
                           .requestObjRefId = pStmt->exec.pRequest->self,
16✔
664
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
16✔
665
  int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
16✔
666

667
  pStmt->stat.ctgGetTbMetaNum++;
16✔
668

669
  if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
16!
670
    STMT2_DLOG("tb %s not exist", pStmt->bInfo.tbFName);
×
671
    STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
672

673
    STMT_ERR_RET(code);
×
674
  }
675

676
  STMT_ERR_RET(code);
16!
677

678
  uid = pTableMeta->uid;
16✔
679
  suid = pTableMeta->suid;
16✔
680
  tableType = pTableMeta->tableType;
16✔
681
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
16✔
682
  vgId = pTableMeta->vgId;
16✔
683

684
  taosMemoryFree(pTableMeta);
16!
685

686
  uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
16!
687

688
  if (uid == pStmt->bInfo.tbUid) {
16!
689
    pStmt->bInfo.needParse = false;
×
690

691
    STMT2_DLOG("tb %s is current table", pStmt->bInfo.tbFName);
×
692

693
    return TSDB_CODE_SUCCESS;
×
694
  }
695

696
  if (pStmt->bInfo.inExecCache) {
16✔
697
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
4✔
698
    if (NULL == pCache) {
4!
699
      STMT2_ELOG("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash",
×
700
                 pStmt->bInfo.tbFName, uid, cacheUid);
701

702
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
703
    }
704

705
    pStmt->bInfo.needParse = false;
4✔
706

707
    pStmt->bInfo.tbUid = uid;
4✔
708
    pStmt->bInfo.tbSuid = suid;
4✔
709
    pStmt->bInfo.tbType = tableType;
4✔
710
    pStmt->bInfo.boundTags = pCache->boundTags;
4✔
711
    pStmt->bInfo.tagsCached = true;
4✔
712

713
    STMT2_DLOG("tb %s in execBlock list, set to current", pStmt->bInfo.tbFName);
4!
714

715
    return TSDB_CODE_SUCCESS;
4✔
716
  }
717

718
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
12✔
719
  if (pCache) {
12!
720
    pStmt->bInfo.needParse = false;
12✔
721

722
    pStmt->bInfo.tbUid = uid;
12✔
723
    pStmt->bInfo.tbSuid = suid;
12✔
724
    pStmt->bInfo.tbType = tableType;
12✔
725
    pStmt->bInfo.boundTags = pCache->boundTags;
12✔
726
    pStmt->bInfo.tagsCached = true;
12✔
727

728
    STableDataCxt* pNewBlock = NULL;
12✔
729
    STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid, vgId));
12!
730

731
    if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
12!
732
                    POINTER_BYTES)) {
733
      STMT_ERR_RET(terrno);
×
734
    }
735

736
    pStmt->exec.pCurrBlock = pNewBlock;
12✔
737

738
    tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);
12!
739

740
    return TSDB_CODE_SUCCESS;
12✔
741
  }
742

743
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
744

745
  return TSDB_CODE_SUCCESS;
×
746
}
747

748
static int32_t stmtResetStmt(STscStmt2* pStmt) {
×
749
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
×
750

751
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
752
  if (NULL == pStmt->sql.pTableCache) {
×
753
    STMT2_ELOG("fail to allocate memory for pTableCache in stmtResetStmt:%s", tstrerror(terrno));
×
754
    STMT_ERR_RET(terrno);
×
755
  }
756

757
  pStmt->sql.status = STMT_INIT;
×
758

759
  return TSDB_CODE_SUCCESS;
×
760
}
761

762
static int32_t stmtAsyncOutput(STscStmt2* pStmt, void* param) {
155✔
763
  SStmtQNode* pParam = (SStmtQNode*)param;
155✔
764

765
  if (pParam->restoreTbCols) {
155✔
766
    for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) {
155✔
767
      SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i);
105✔
768
      *p = taosArrayInit(20, POINTER_BYTES);
105✔
769
      if (*p == NULL) {
105!
770
        STMT_ERR_RET(terrno);
×
771
      }
772
    }
773
    atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
50✔
774
    STMT2_TLOG_E("restore pTableCols finished");
50!
775
  } else {
776
    int code = qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
105✔
777
                                      &pStmt->sql.siInfo, pParam->pCreateTbReq);
778
    // taosMemoryFree(pParam->pTbData);
779
    (void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
105✔
780
    if (code != TSDB_CODE_SUCCESS) {
105!
781
      STMT2_ELOG("async append stmt output failed, tbname:%s, err:%s", pParam->tblData.tbName, tstrerror(code));
×
782
      STMT_ERR_RET(code);
×
783
    }
784
  }
785
  return TSDB_CODE_SUCCESS;
155✔
786
}
787

788
static void* stmtBindThreadFunc(void* param) {
45✔
789
  setThreadName("stmt2Bind");
45✔
790

791
  STscStmt2* pStmt = (STscStmt2*)param;
45✔
792
  STMT2_ILOG_E("stmt2 bind thread started");
45!
793

794
  while (true) {
155✔
795
    SStmtQNode* asyncParam = NULL;
200✔
796

797
    if (!stmtDequeue(pStmt, &asyncParam)) {
200✔
798
      if (pStmt->queue.stopQueue && 0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
45!
799
        STMT2_DLOG_E("queue is empty and stopQueue is set, thread will exit");
45!
800
        break;
45✔
801
      }
802
      continue;
×
803
    }
804

805
    int ret = stmtAsyncOutput(pStmt, asyncParam);
155✔
806
    if (ret != 0) {
155!
807
      STMT2_ELOG("stmtAsyncOutput failed, reason:%s", tstrerror(ret));
×
808
    }
809
  }
810

811
  STMT2_ILOG_E("stmt2 bind thread stopped");
45!
812
  return NULL;
45✔
813
}
814

815
static int32_t stmtStartBindThread(STscStmt2* pStmt) {
45✔
816
  TdThreadAttr thAttr;
817
  if (taosThreadAttrInit(&thAttr) != 0) {
45!
818
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
819
  }
820
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
45!
821
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
822
  }
823

824
  if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) {
45!
825
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
826
    STMT_ERR_RET(terrno);
×
827
  }
828

829
  pStmt->bindThreadInUse = true;
45✔
830

831
  (void)taosThreadAttrDestroy(&thAttr);
45✔
832
  return TSDB_CODE_SUCCESS;
45✔
833
}
834

835
static int32_t stmtInitQueue(STscStmt2* pStmt) {
45✔
836
  (void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
45✔
837
  (void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
45✔
838
  STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
90!
839
  pStmt->queue.tail = pStmt->queue.head;
45✔
840

841
  return TSDB_CODE_SUCCESS;
45✔
842
}
843

844
static int32_t stmtIniAsyncBind(STscStmt2* pStmt) {
94✔
845
  (void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL);
94✔
846
  (void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL);
94✔
847
  pStmt->asyncBindParam.asyncBindNum = 0;
94✔
848

849
  return TSDB_CODE_SUCCESS;
94✔
850
}
851

852
static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
45✔
853
  pTblBuf->buffUnit = sizeof(SStmtQNode);
45✔
854
  pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
45✔
855
  pTblBuf->pBufList = taosArrayInit(100, POINTER_BYTES);
45✔
856
  if (NULL == pTblBuf->pBufList) {
45!
857
    return terrno;
×
858
  }
859
  void* buff = taosMemoryMalloc(pTblBuf->buffSize);
45!
860
  if (NULL == buff) {
45!
861
    return terrno;
×
862
  }
863

864
  if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
90!
865
    return terrno;
×
866
  }
867

868
  pTblBuf->pCurBuff = buff;
45✔
869
  pTblBuf->buffIdx = 1;
45✔
870
  pTblBuf->buffOffset = 0;
45✔
871

872
  return TSDB_CODE_SUCCESS;
45✔
873
}
874

875
TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
94✔
876
  STscObj*   pObj = (STscObj*)taos;
94✔
877
  STscStmt2* pStmt = NULL;
94✔
878
  int32_t    code = 0;
94✔
879

880
  pStmt = taosMemoryCalloc(1, sizeof(STscStmt2));
94!
881
  if (NULL == pStmt) {
94!
882
    STMT2_ELOG_E("fail to allocate memory for pStmt");
×
883
    return NULL;
×
884
  }
885

886
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
94✔
887
  if (NULL == pStmt->sql.pTableCache) {
94!
888
    STMT2_ELOG("fail to allocate memory for pTableCache in stmtInit2:%s", tstrerror(terrno));
×
889
    taosMemoryFree(pStmt);
×
890
    return NULL;
×
891
  }
892

893
  pStmt->taos = pObj;
94✔
894
  if (taos->db[0] != '\0') {
94✔
895
    pStmt->db = taosStrdup(taos->db);
54!
896
  }
897
  pStmt->bInfo.needParse = true;
94✔
898
  pStmt->sql.status = STMT_INIT;
94✔
899
  pStmt->errCode = TSDB_CODE_SUCCESS;
94✔
900

901
  if (NULL != pOptions) {
94!
902
    (void)memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
94✔
903
    if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) {
94✔
904
      pStmt->stbInterlaceMode = true;
26✔
905
    }
906

907
    pStmt->reqid = pOptions->reqid;
94✔
908
  }
909

910
  if (pStmt->stbInterlaceMode) {
94✔
911
    pStmt->sql.siInfo.transport = taos->pAppInfo->pTransporter;
26✔
912
    pStmt->sql.siInfo.acctId = taos->acctId;
26✔
913
    pStmt->sql.siInfo.dbname = taos->db;
26✔
914
    pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
26✔
915

916
    pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
26✔
917
    if (NULL == pStmt->sql.siInfo.pTableHash) {
26!
918
      STMT2_ELOG("fail to allocate memory for pTableHash:%s", tstrerror(terrno));
×
919
      (void)stmtClose2(pStmt);
×
920
      return NULL;
×
921
    }
922

923
    pStmt->sql.siInfo.pTableUidHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
26✔
924
    if (NULL == pStmt->sql.siInfo.pTableUidHash) {
26!
925
      STMT2_ELOG("fail to allocate memory for pTableUidHash:%s", tstrerror(terrno));
×
926
      (void)stmtClose2(pStmt);
×
927
      return NULL;
×
928
    }
929

930
    pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
26✔
931
    if (NULL == pStmt->sql.siInfo.pTableCols) {
26!
932
      STMT2_ELOG("fail to allocate memory for pTableCols:%s", tstrerror(terrno));
×
933
      (void)stmtClose2(pStmt);
×
934
      return NULL;
×
935
    }
936

937
    code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
26✔
938
    if (TSDB_CODE_SUCCESS == code) {
26!
939
      code = stmtInitQueue(pStmt);
26✔
940
    }
941
    if (TSDB_CODE_SUCCESS == code) {
26!
942
      code = stmtStartBindThread(pStmt);
26✔
943
    }
944
    if (TSDB_CODE_SUCCESS != code) {
26!
945
      terrno = code;
×
946
      STMT2_ELOG("fail to init stmt2 bind thread:%s", tstrerror(code));
×
947
      (void)stmtClose2(pStmt);
×
948
      return NULL;
×
949
    }
950
  }
951

952
  pStmt->sql.siInfo.tableColsReady = true;
94✔
953
  if (pStmt->options.asyncExecFn) {
94✔
954
    if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) {
6!
955
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
956
      STMT2_ELOG("fail to init asyncExecSem:%s", tstrerror(terrno));
×
957
      (void)stmtClose2(pStmt);
×
958
      return NULL;
×
959
    }
960
  }
961
  code = stmtIniAsyncBind(pStmt);
94✔
962
  if (TSDB_CODE_SUCCESS != code) {
94!
963
    terrno = code;
×
964
    STMT2_ELOG("fail to start init asyncExecSem:%s", tstrerror(code));
×
965

966
    (void)stmtClose2(pStmt);
×
967
    return NULL;
×
968
  }
969

970
  pStmt->execSemWaited = false;
94✔
971

972
  // STMT_LOG_SEQ(STMT_INIT);
973

974
  STMT2_DLOG("stmt2 initialize finished, seqId:%d, db:%s, interlaceMode:%d, asyncExec:%d", pStmt->seqId, pStmt->db,
94!
975
             pStmt->stbInterlaceMode, pStmt->options.asyncExecFn != NULL);
976

977
  return pStmt;
94✔
978
}
979

980
static int stmtSetDbName2(TAOS_STMT2* stmt, const char* dbName) {
70✔
981
  STscStmt2* pStmt = (STscStmt2*)stmt;
70✔
982
  if (dbName == NULL || dbName[0] == '\0') {
70!
983
    STMT2_ELOG_E("dbname in sql is illegal");
×
984
    return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
×
985
  }
986

987
  STMT2_DLOG("dbname is specified in sql:%s", dbName);
70!
988
  if (pStmt->db == NULL || pStmt->db[0] == '\0') {
70!
989
    taosMemoryFreeClear(pStmt->db);
36!
990
    STMT2_DLOG("dbname:%s is by sql, not by taosconnect", dbName);
36!
991
    pStmt->db = taosStrdup(dbName);
36!
992
    (void)strdequote(pStmt->db);
36✔
993
  }
994
  STMT_ERR_RET(stmtCreateRequest(pStmt));
70!
995

996
  // The SQL statement specifies a database name, overriding the previously specified database
997
  taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
70!
998
  pStmt->exec.pRequest->pDb = taosStrdup(dbName);
70!
999
  (void)strdequote(pStmt->exec.pRequest->pDb);
70✔
1000
  if (pStmt->exec.pRequest->pDb == NULL) {
70!
1001
    return terrno;
×
1002
  }
1003
  if (pStmt->sql.stbInterlaceMode) {
70✔
1004
    pStmt->sql.siInfo.dbname = pStmt->exec.pRequest->pDb;
27✔
1005
  }
1006
  return TSDB_CODE_SUCCESS;
70✔
1007
}
1008
static int32_t stmtResetStbInterlaceCache(STscStmt2* pStmt) {
19✔
1009
  int32_t code = TSDB_CODE_SUCCESS;
19✔
1010

1011
  if (pStmt->bindThreadInUse) {
19!
1012
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
19!
1013
      taosUsleep(1);
×
1014
    }
1015
    (void)taosThreadMutexLock(&pStmt->queue.mutex);
19✔
1016
    pStmt->queue.stopQueue = true;
19✔
1017
    (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
19✔
1018
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
19✔
1019

1020
    (void)taosThreadJoin(pStmt->bindThread, NULL);
19✔
1021
    pStmt->bindThreadInUse = false;
19✔
1022
    pStmt->queue.head = NULL;
19✔
1023
    pStmt->queue.tail = NULL;
19✔
1024
    pStmt->queue.qRemainNum = 0;
19✔
1025

1026
    (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
19✔
1027
    (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
19✔
1028
  }
1029

1030
  pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
19✔
1031
  if (NULL == pStmt->sql.siInfo.pTableHash) {
19!
1032
    return terrno;
×
1033
  }
1034

1035
  pStmt->sql.siInfo.pTableUidHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
19✔
1036
  if (NULL == pStmt->sql.siInfo.pTableUidHash) {
19!
1037
    return terrno;
×
1038
  }
1039

1040
  pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
19✔
1041
  if (NULL == pStmt->sql.siInfo.pTableCols) {
19!
1042
    return terrno;
×
1043
  }
1044

1045
  code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
19✔
1046

1047
  if (TSDB_CODE_SUCCESS == code) {
19!
1048
    code = stmtInitQueue(pStmt);
19✔
1049
    pStmt->queue.stopQueue = false;
19✔
1050
  }
1051
  if (TSDB_CODE_SUCCESS == code) {
19!
1052
    code = stmtStartBindThread(pStmt);
19✔
1053
  }
1054
  if (TSDB_CODE_SUCCESS != code) {
19!
1055
    return code;
×
1056
  }
1057

1058
  return TSDB_CODE_SUCCESS;
19✔
1059
}
1060

1061
static int32_t stmtDeepReset(STscStmt2* pStmt) {
23✔
1062
  char*             db = pStmt->db;
23✔
1063
  bool              stbInterlaceMode = pStmt->stbInterlaceMode;
23✔
1064
  TAOS_STMT2_OPTION options = pStmt->options;
23✔
1065
  uint32_t          reqid = pStmt->reqid;
23✔
1066

1067
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
23!
1068
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
2!
1069
      STMT2_ELOG_E("bind param wait asyncExecSem failed");
×
1070
    }
1071
    pStmt->execSemWaited = true;
2✔
1072
  }
1073
  pStmt->sql.autoCreateTbl = false;
23✔
1074
  taosMemoryFree(pStmt->sql.pBindInfo);
23!
1075
  pStmt->sql.pBindInfo = NULL;
23✔
1076

1077
  taosMemoryFree(pStmt->sql.queryRes.fields);
23!
1078
  pStmt->sql.queryRes.fields = NULL;
23✔
1079

1080
  taosMemoryFree(pStmt->sql.queryRes.userFields);
23!
1081
  pStmt->sql.queryRes.userFields = NULL;
23✔
1082

1083
  pStmt->sql.type = 0;
23✔
1084
  pStmt->sql.runTimes = 0;
23✔
1085
  taosMemoryFree(pStmt->sql.sqlStr);
23!
1086
  pStmt->sql.sqlStr = NULL;
23✔
1087

1088
  qDestroyQuery(pStmt->sql.pQuery);
23✔
1089
  pStmt->sql.pQuery = NULL;
23✔
1090

1091
  taosArrayDestroy(pStmt->sql.nodeList);
23✔
1092
  pStmt->sql.nodeList = NULL;
23✔
1093

1094
  taosHashCleanup(pStmt->sql.pVgHash);
23✔
1095
  pStmt->sql.pVgHash = NULL;
23✔
1096

1097
  if (pStmt->sql.fixValueTags) {
23✔
1098
    tdDestroySVCreateTbReq(pStmt->sql.fixValueTbReq);
9!
1099
    pStmt->sql.fixValueTbReq = NULL;
9✔
1100
  }
1101
  pStmt->sql.fixValueTags = false;
23✔
1102

1103
  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
23✔
1104
  while (pIter) {
26✔
1105
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
3✔
1106

1107
    qDestroyStmtDataBlock(pCache->pDataCtx);
3✔
1108
    qDestroyBoundColInfo(pCache->boundTags);
3✔
1109
    taosMemoryFreeClear(pCache->boundTags);
3!
1110

1111
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
3✔
1112
  }
1113
  taosHashCleanup(pStmt->sql.pTableCache);
23✔
1114

1115
  if (pStmt->sql.stbInterlaceMode) {
23✔
1116
    pStmt->bInfo.tagsCached = false;
18✔
1117
  }
1118
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
23!
1119

1120
  if (pStmt->exec.pRequest) {
23!
1121
    taos_free_result(pStmt->exec.pRequest);
×
1122
    pStmt->exec.pRequest = NULL;
×
1123
  }
1124

1125
  if (pStmt->sql.siInfo.pTableCols) {
23✔
1126
    taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
19✔
1127
    pStmt->sql.siInfo.pTableCols = NULL;
19✔
1128
  }
1129

1130
  if (pStmt->sql.siInfo.tbBuf.pBufList) {
23✔
1131
    taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
19✔
1132
    pStmt->sql.siInfo.tbBuf.pBufList = NULL;
19✔
1133
  }
1134

1135
  if (pStmt->sql.siInfo.pTableHash) {
23✔
1136
    tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
19✔
1137
    pStmt->sql.siInfo.pTableHash = NULL;
19✔
1138
  }
1139

1140
  if (pStmt->sql.siInfo.pTableUidHash) {
23✔
1141
    tSimpleHashCleanup(pStmt->sql.siInfo.pTableUidHash);
19✔
1142
    pStmt->sql.siInfo.pTableUidHash = NULL;
19✔
1143
  }
1144

1145
  if (pStmt->sql.siInfo.pVgroupHash) {
23!
1146
    taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
×
1147
    pStmt->sql.siInfo.pVgroupHash = NULL;
×
1148
  }
1149

1150
  if (pStmt->sql.siInfo.pVgroupList) {
23!
1151
    taosArrayDestroy(pStmt->sql.siInfo.pVgroupList);
×
1152
    pStmt->sql.siInfo.pVgroupList = NULL;
×
1153
  }
1154

1155
  if (pStmt->sql.siInfo.pDataCtx) {
23✔
1156
    qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
17✔
1157
    pStmt->sql.siInfo.pDataCtx = NULL;
17✔
1158
  }
1159

1160
  if (pStmt->sql.siInfo.pTSchema) {
23✔
1161
    taosMemoryFree(pStmt->sql.siInfo.pTSchema);
17!
1162
    pStmt->sql.siInfo.pTSchema = NULL;
17✔
1163
  }
1164

1165
  if (pStmt->sql.siInfo.pRequest) {
23✔
1166
    taos_free_result(pStmt->sql.siInfo.pRequest);
17✔
1167
    pStmt->sql.siInfo.pRequest = NULL;
17✔
1168
  }
1169

1170
  if (stbInterlaceMode) {
23✔
1171
    STMT_ERR_RET(stmtResetStbInterlaceCache(pStmt));
19!
1172
  }
1173

1174
  pStmt->db = db;
23✔
1175
  pStmt->stbInterlaceMode = stbInterlaceMode;
23✔
1176
  pStmt->options = options;
23✔
1177
  pStmt->reqid = reqid;
23✔
1178

1179
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
23✔
1180
  if (NULL == pStmt->sql.pTableCache) {
23!
1181
    STMT2_ELOG("fail to allocate memory for pTableCache in stmtResetStmt:%s", tstrerror(terrno));
×
1182
    return terrno;
×
1183
  }
1184

1185
  pStmt->sql.status = STMT_INIT;
23✔
1186

1187
  return TSDB_CODE_SUCCESS;
23✔
1188
}
1189

1190
int stmtPrepare2(TAOS_STMT2* stmt, const char* sql, unsigned long length) {
115✔
1191
  STscStmt2* pStmt = (STscStmt2*)stmt;
115✔
1192
  int32_t    code = 0;
115✔
1193

1194
  STMT2_DLOG("start to prepare with sql:%s", sql);
115!
1195

1196
  if (stmt == NULL || sql == NULL) {
115!
1197
    STMT2_ELOG_E("stmt or sql is NULL");
×
1198
    return TSDB_CODE_INVALID_PARA;
×
1199
  }
1200

1201
  if (pStmt->sql.status >= STMT_PREPARE) {
115✔
1202
    STMT2_DLOG("stmt status is %d, need to reset stmt2 cache before prepare", pStmt->sql.status);
23!
1203
    STMT_ERR_RET(stmtDeepReset(pStmt));
23!
1204
  }
1205

1206
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
115✔
1207
    STMT2_ELOG("errCode is not success before, ErrCode: 0x%x, errorsyt: %s\n. ", pStmt->errCode,
1!
1208
               tstrerror(pStmt->errCode));
1209
    return pStmt->errCode;
1✔
1210
  }
1211

1212
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));
114!
1213

1214
  if (length <= 0) {
114!
1215
    length = strlen(sql);
114✔
1216
  }
1217

1218
  pStmt->sql.sqlStr = taosStrndup(sql, length);
114!
1219
  if (!pStmt->sql.sqlStr) {
114!
1220
    STMT2_ELOG("fail to allocate memory for sqlStr:%s", tstrerror(terrno));
×
1221
    return terrno;
×
1222
  }
1223
  pStmt->sql.sqlLen = length;
114✔
1224
  pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode;
114✔
1225

1226
  char* dbName = NULL;
114✔
1227
  if (qParseDbName(sql, length, &dbName)) {
114✔
1228
    STMT_ERR_RET(stmtSetDbName2(stmt, dbName));
70!
1229
    taosMemoryFreeClear(dbName);
70!
1230
  }
1231

1232
  return TSDB_CODE_SUCCESS;
114✔
1233
}
1234

1235
static int32_t stmtInitStbInterlaceTableInfo(STscStmt2* pStmt) {
34✔
1236
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
34✔
1237
  if (!pSrc) {
34!
1238
    return terrno;
×
1239
  }
1240
  STableDataCxt* pDst = NULL;
34✔
1241

1242
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
34!
1243
  pStmt->sql.siInfo.pDataCtx = pDst;
34✔
1244

1245
  SArray* pTblCols = NULL;
34✔
1246
  for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) {
34,034✔
1247
    pTblCols = taosArrayInit(20, POINTER_BYTES);
34,000✔
1248
    if (NULL == pTblCols) {
34,000!
1249
      return terrno;
×
1250
    }
1251

1252
    if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
68,000!
1253
      return terrno;
×
1254
    }
1255
  }
1256

1257
  pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
34✔
1258

1259
  STMT2_TLOG("init stb interlace table info, tbName:%s, pDataCtx:%p, boundTags:%p", pStmt->bInfo.tbFName,
34!
1260
             pStmt->sql.siInfo.pDataCtx, pStmt->sql.siInfo.boundTags);
1261

1262
  return TSDB_CODE_SUCCESS;
34✔
1263
}
1264

1265
int stmtIsInsert2(TAOS_STMT2* stmt, int* insert) {
518✔
1266
  STscStmt2* pStmt = (STscStmt2*)stmt;
518✔
1267

1268
  // STMT_DLOG_E("start is insert");
1269

1270
  if (pStmt->sql.type) {
518✔
1271
    *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
404✔
1272
  } else {
1273
    *insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
114✔
1274
  }
1275

1276
  return TSDB_CODE_SUCCESS;
518✔
1277
}
1278

1279
int stmtSetTbName2(TAOS_STMT2* stmt, const char* tbName) {
220✔
1280
  STscStmt2* pStmt = (STscStmt2*)stmt;
220✔
1281

1282
  int64_t startUs = taosGetTimestampUs();
220✔
1283

1284
  STMT2_TLOG("start to set tbName:%s", tbName);
220!
1285

1286
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
220!
1287
    return pStmt->errCode;
×
1288
  }
1289

1290
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
220!
1291

1292
  int32_t insert = 0;
220✔
1293
  STMT_ERR_RET(stmtIsInsert2(stmt, &insert));
220!
1294
  if (0 == insert) {
220!
1295
    STMT2_ELOG_E("set tb name not available for none insert statement");
×
1296
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1297
  }
1298

1299
  if (!pStmt->sql.stbInterlaceMode || NULL == pStmt->sql.siInfo.pDataCtx) {
220✔
1300
    STMT_ERR_RET(stmtCreateRequest(pStmt));
147!
1301

1302
    STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
147!
1303
                              pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1304
    STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
147✔
1305

1306
    STMT_ERR_RET(stmtGetFromCache(pStmt));
146!
1307

1308
    if (pStmt->bInfo.needParse) {
146✔
1309
      tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
54✔
1310
      pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
54✔
1311

1312
      STMT_ERR_RET(stmtParseSql(pStmt));
54!
1313
    }
1314
  } else {
1315
    tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
73✔
1316
    pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
73✔
1317
    pStmt->exec.pRequest->requestId++;
73✔
1318
    pStmt->bInfo.needParse = false;
73✔
1319
  }
1320

1321
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
219✔
1322
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
34!
1323
  }
1324

1325
  int64_t startUs2 = taosGetTimestampUs();
219✔
1326
  pStmt->stat.setTbNameUs += startUs2 - startUs;
219✔
1327

1328
  return TSDB_CODE_SUCCESS;
219✔
1329
}
1330

1331
int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags, SVCreateTbReq** pCreateTbReq) {
159✔
1332
  STscStmt2* pStmt = (STscStmt2*)stmt;
159✔
1333

1334
  STMT2_TLOG_E("start to set tbTags");
159!
1335

1336
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
159!
1337
    return pStmt->errCode;
×
1338
  }
1339

1340
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
159!
1341

1342
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
159!
1343
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1344
    pStmt->bInfo.needParse = false;
×
1345
  }
1346
  STMT_ERR_RET(stmtCreateRequest(pStmt));
159!
1347

1348
  if (pStmt->bInfo.needParse) {
159!
1349
    STMT_ERR_RET(stmtParseSql(pStmt));
×
1350
  }
1351
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
159!
1352
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1353
  }
1354

1355
  SBoundColInfo* tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
159✔
1356
  // if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) {
1357
  //   tscWarn("no tags or cols bound in sql, will not bound tags");
1358
  //   return TSDB_CODE_SUCCESS;
1359
  // }
1360
  if (pStmt->sql.autoCreateTbl && pStmt->sql.stbInterlaceMode) {
159!
1361
    STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, pStmt->bInfo.tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
65!
1362
                              pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1363
    STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
65!
1364
  }
1365

1366
  STableDataCxt** pDataBlock = NULL;
159✔
1367
  if (pStmt->exec.pCurrBlock) {
159✔
1368
    pDataBlock = &pStmt->exec.pCurrBlock;
124✔
1369
  } else {
1370
    pDataBlock =
1371
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
35✔
1372
    if (NULL == pDataBlock) {
35!
1373
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
1374
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1375
    }
1376
  }
1377
  if (pStmt->bInfo.inExecCache && !pStmt->sql.autoCreateTbl) {
159!
1378
    return TSDB_CODE_SUCCESS;
×
1379
  }
1380

1381
  STMT2_TLOG_E("start to bind stmt tag values");
159!
1382

1383
  void* boundTags = NULL;
159✔
1384
  if (pStmt->sql.stbInterlaceMode) {
159✔
1385
    boundTags = pStmt->sql.siInfo.boundTags;
65✔
1386
    *pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
65!
1387
    if (NULL == pCreateTbReq) {
65!
1388
      return terrno;
×
1389
    }
1390
    int32_t vgId = -1;
65✔
1391
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
65!
1392
    (*pCreateTbReq)->uid = vgId;
65✔
1393
  } else {
1394
    boundTags = pStmt->bInfo.boundTags;
94✔
1395
  }
1396

1397
  STMT_ERR_RET(qBindStmtTagsValue2(*pDataBlock, boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
159✔
1398
                                   pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
1399
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt, *pCreateTbReq));
1400

1401
  return TSDB_CODE_SUCCESS;
158✔
1402
}
1403

1404
int stmtCheckTags2(TAOS_STMT2* stmt, SVCreateTbReq** pCreateTbReq) {
22✔
1405
  STscStmt2* pStmt = (STscStmt2*)stmt;
22✔
1406

1407
  STMT2_TLOG_E("start to clone createTbRequest for fixed tags");
22!
1408

1409
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
22!
1410
    return pStmt->errCode;
×
1411
  }
1412

1413
  if (!pStmt->sql.stbInterlaceMode) {
22!
1414
    return TSDB_CODE_SUCCESS;
×
1415
  }
1416

1417
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
22!
1418

1419
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
22!
1420
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1421
    pStmt->bInfo.needParse = false;
×
1422
  }
1423
  STMT_ERR_RET(stmtCreateRequest(pStmt));
22!
1424

1425
  if (pStmt->bInfo.needParse) {
22!
1426
    STMT_ERR_RET(stmtParseSql(pStmt));
×
1427
    if (!pStmt->sql.autoCreateTbl) {
×
1428
      STMT2_WLOG_E("don't need to create table, will not check tags");
×
1429
      return TSDB_CODE_SUCCESS;
×
1430
    }
1431
  }
1432

1433
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
22!
1434
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1435
  }
1436

1437
  STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, pStmt->bInfo.tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
22!
1438
                            pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1439
  STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
22!
1440

1441
  STableDataCxt** pDataBlock = NULL;
22✔
1442
  if (pStmt->exec.pCurrBlock) {
22✔
1443
    pDataBlock = &pStmt->exec.pCurrBlock;
9✔
1444
  } else {
1445
    pDataBlock =
1446
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
13✔
1447
    if (NULL == pDataBlock) {
13!
1448
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
1449
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1450
    }
1451
  }
1452

1453
  if (!((*pDataBlock)->pData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE)) {
22!
1454
    STMT2_DLOG_E("don't need to create, will not check tags");
×
1455
    return TSDB_CODE_SUCCESS;
×
1456
  }
1457

1458
  if (pStmt->sql.fixValueTags) {
22✔
1459
    STMT2_TLOG_E("tags are fixed, use one createTbReq");
9!
1460
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
9!
1461
    if ((*pCreateTbReq)->name) {
9!
1462
      taosMemoryFree((*pCreateTbReq)->name);
9!
1463
    }
1464
    (*pCreateTbReq)->name = taosStrdup(pStmt->bInfo.tbName);
9!
1465
    int32_t vgId = -1;
9✔
1466
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
9!
1467
    (*pCreateTbReq)->uid = vgId;
9✔
1468
    return TSDB_CODE_SUCCESS;
9✔
1469
  }
1470

1471
  if ((*pDataBlock)->pData->pCreateTbReq) {
13!
1472
    STMT2_TLOG_E("tags are fixed, set createTbReq first time");
13!
1473
    pStmt->sql.fixValueTags = true;
13✔
1474
    STMT_ERR_RET(cloneSVreateTbReq((*pDataBlock)->pData->pCreateTbReq, &pStmt->sql.fixValueTbReq));
13!
1475
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
13!
1476
    (*pCreateTbReq)->uid = (*pDataBlock)->pMeta->vgId;
13✔
1477
  }
1478

1479
  return TSDB_CODE_SUCCESS;
13✔
1480
}
1481

1482
static int stmtFetchColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
×
1483
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
1484
    return pStmt->errCode;
×
1485
  }
1486

1487
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
1488
    tscError("invalid operation to get query column fileds");
×
1489
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1490
  }
1491

1492
  STableDataCxt** pDataBlock = NULL;
×
1493

1494
  if (pStmt->sql.stbInterlaceMode) {
×
1495
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
×
1496
  } else {
1497
    pDataBlock =
1498
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
1499
    if (NULL == pDataBlock) {
×
1500
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
1501
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1502
    }
1503
  }
1504

1505
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
×
1506

1507
  return TSDB_CODE_SUCCESS;
×
1508
}
1509

1510
static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_ALL** fields) {
42✔
1511
  int32_t code = 0;
42✔
1512
  int32_t preCode = pStmt->errCode;
42✔
1513

1514
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
42!
1515
    return pStmt->errCode;
×
1516
  }
1517

1518
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
42!
1519
    STMT2_ELOG_E("stmtFetchStbColFields2 only for insert statement");
×
1520
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1521
  }
1522

1523
  STableDataCxt** pDataBlock = NULL;
42✔
1524
  bool            cleanStb = false;
42✔
1525

1526
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
42✔
1527
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
7✔
1528
  } else {
1529
    cleanStb = true;
35✔
1530
    pDataBlock =
1531
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
35✔
1532
  }
1533

1534
  if (NULL == pDataBlock || NULL == *pDataBlock) {
42!
1535
    STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
1536
    STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
1537
  }
1538

1539
  STMT_ERRI_JRET(
42!
1540
      qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbNameFlag, fieldNum, fields));
1541

1542
  if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE && cleanStb) {
42!
1543
    taosMemoryFreeClear((*pDataBlock)->boundColsInfo.pColIndex);
28!
1544
    qDestroyStmtDataBlock(*pDataBlock);
28✔
1545
    *pDataBlock = NULL;
28✔
1546
    if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
28!
1547
      STMT2_ELOG("fail to remove remove stb:%s exec blockHash", pStmt->bInfo.tbFName);
×
1548
      STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
1549
    }
1550
    pStmt->sql.autoCreateTbl = false;
28✔
1551
    pStmt->bInfo.tagsCached = false;
28✔
1552
    pStmt->bInfo.sname = (SName){0};
28✔
1553
    STMT_ERR_RET(stmtCleanBindInfo(pStmt));
28!
1554
  }
1555

1556
_return:
14✔
1557

1558
  pStmt->errCode = preCode;
42✔
1559

1560
  return code;
42✔
1561
}
1562
/*
1563
SArray* stmtGetFreeCol(STscStmt2* pStmt, int32_t* idx) {
1564
  while (true) {
1565
    if (pStmt->exec.smInfo.pColIdx >= STMT_COL_BUF_SIZE) {
1566
      pStmt->exec.smInfo.pColIdx = 0;
1567
    }
1568

1569
    if ((pStmt->exec.smInfo.pColIdx + 1) == atomic_load_32(&pStmt->exec.smInfo.pColFreeIdx)) {
1570
      taosUsleep(1);
1571
      continue;
1572
    }
1573

1574
    *idx = pStmt->exec.smInfo.pColIdx;
1575
    return pStmt->exec.smInfo.pCols[pStmt->exec.smInfo.pColIdx++];
1576
  }
1577
}
1578
*/
1579
static int32_t stmtAppendTablePostHandle(STscStmt2* pStmt, SStmtQNode* param) {
105✔
1580
  if (NULL == pStmt->sql.siInfo.pVgroupHash) {
105✔
1581
    pStmt->sql.siInfo.pVgroupHash =
50✔
1582
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
50✔
1583
  }
1584
  if (NULL == pStmt->sql.siInfo.pVgroupList) {
105✔
1585
    pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES);
50✔
1586
  }
1587

1588
  if (NULL == pStmt->sql.siInfo.pRequest) {
105✔
1589
    STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false,
32!
1590
                              (SRequestObj**)&pStmt->sql.siInfo.pRequest, pStmt->reqid));
1591

1592
    if (pStmt->reqid != 0) {
32!
1593
      pStmt->reqid++;
×
1594
    }
1595
    pStmt->exec.pRequest->syncQuery = true;
32✔
1596

1597
    pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId;
32✔
1598
    pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self;
32✔
1599
  }
1600

1601
  if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] &&
105✔
1602
      0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) {
38✔
1603
    pStmt->sql.siInfo.tbFromHash = true;
12✔
1604
  }
1605

1606
  if (0 == pStmt->sql.siInfo.firstName[0]) {
105✔
1607
    tstrncpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
16✔
1608
  }
1609

1610
  param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash;
105✔
1611
  param->next = NULL;
105✔
1612

1613
  (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
105✔
1614

1615
  if (pStmt->queue.stopQueue) {
105!
1616
    STMT2_ELOG_E("bind thread already stopped, cannot enqueue");
×
1617
    return TSDB_CODE_TSC_STMT_API_ERROR;
×
1618
  }
1619
  stmtEnqueue(pStmt, param);
105✔
1620

1621
  return TSDB_CODE_SUCCESS;
105✔
1622
}
1623

1624
static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt2* pStmt, SArray** pTableCols) {
1625
  while (true) {
1626
    if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) {
106!
1627
      *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++);
106✔
1628
      break;
106✔
1629
    } else {
1630
      SArray* pTblCols = NULL;
×
1631
      for (int32_t i = 0; i < 100; i++) {
×
1632
        pTblCols = taosArrayInit(20, POINTER_BYTES);
×
1633
        if (NULL == pTblCols) {
×
1634
          return terrno;
×
1635
        }
1636

1637
        if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
×
1638
          return terrno;
×
1639
        }
1640
      }
1641
    }
1642
  }
1643

1644
  return TSDB_CODE_SUCCESS;
106✔
1645
}
1646

1647
static int32_t stmtCacheBlock(STscStmt2* pStmt) {
119✔
1648
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
119✔
1649
    return TSDB_CODE_SUCCESS;
9✔
1650
  }
1651

1652
  uint64_t uid = pStmt->bInfo.tbUid;
110✔
1653
  uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
110!
1654

1655
  if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
110✔
1656
    STMT2_TLOG("table %s already cached, no need to cache again", pStmt->bInfo.tbFName);
91!
1657
    return TSDB_CODE_SUCCESS;
91✔
1658
  }
1659

1660
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
19✔
1661
  if (!pSrc) {
19!
1662
    STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
1663
    return terrno;
×
1664
  }
1665
  STableDataCxt* pDst = NULL;
19✔
1666

1667
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
19!
1668

1669
  SStmtTableCache cache = {
19✔
1670
      .pDataCtx = pDst,
1671
      .boundTags = pStmt->bInfo.boundTags,
19✔
1672
  };
1673

1674
  if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
19!
1675
    STMT2_ELOG("fail to put table cache:%s", tstrerror(terrno));
×
1676
    return terrno;
×
1677
  }
1678

1679
  if (pStmt->sql.autoCreateTbl) {
19✔
1680
    pStmt->bInfo.tagsCached = true;
17✔
1681
  } else {
1682
    pStmt->bInfo.boundTags = NULL;
2✔
1683
  }
1684

1685
  return TSDB_CODE_SUCCESS;
19✔
1686
}
1687

1688
static int stmtAddBatch2(TAOS_STMT2* stmt) {
169✔
1689
  STscStmt2* pStmt = (STscStmt2*)stmt;
169✔
1690

1691
  int64_t startUs = taosGetTimestampUs();
169✔
1692

1693
  // STMT2_TLOG_E("start to add batch");
1694

1695
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
169!
1696
    return pStmt->errCode;
×
1697
  }
1698

1699
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
169!
1700

1701
  if (pStmt->sql.stbInterlaceMode) {
169✔
1702
    int64_t startUs2 = taosGetTimestampUs();
50✔
1703
    pStmt->stat.addBatchUs += startUs2 - startUs;
50✔
1704

1705
    pStmt->sql.siInfo.tableColsReady = false;
50✔
1706

1707
    SStmtQNode* param = NULL;
50✔
1708
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
100!
1709
    param->restoreTbCols = true;
50✔
1710
    param->next = NULL;
50✔
1711

1712
    if (pStmt->sql.autoCreateTbl) {
50✔
1713
      pStmt->bInfo.tagsCached = true;
43✔
1714
    }
1715

1716
    if (pStmt->queue.stopQueue) {
50!
1717
      STMT2_ELOG_E("stmt bind thread is stopped,cannot enqueue bind request");
×
1718
      return TSDB_CODE_TSC_STMT_API_ERROR;
×
1719
    }
1720

1721
    stmtEnqueue(pStmt, param);
50✔
1722

1723
    return TSDB_CODE_SUCCESS;
50✔
1724
  }
1725

1726
  STMT_ERR_RET(stmtCacheBlock(pStmt));
119!
1727

1728
  return TSDB_CODE_SUCCESS;
119✔
1729
}
1730
/*
1731
static int32_t stmtBackupQueryFields(STscStmt2* pStmt) {
1732
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
1733
  pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
1734
  pRes->precision = pStmt->exec.pRequest->body.resInfo.precision;
1735

1736
  int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
1737
  pRes->fields = taosMemoryMalloc(size);
1738
  pRes->userFields = taosMemoryMalloc(size);
1739
  if (NULL == pRes->fields || NULL == pRes->userFields) {
1740
    STMT_ERR_RET(terrno);
1741
  }
1742
  (void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
1743
  (void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
1744

1745
  return TSDB_CODE_SUCCESS;
1746
}
1747

1748
static int32_t stmtRestoreQueryFields(STscStmt2* pStmt) {
1749
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
1750
  int32_t            size = pRes->numOfCols * sizeof(TAOS_FIELD);
1751

1752
  pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols;
1753
  pStmt->exec.pRequest->body.resInfo.precision = pRes->precision;
1754

1755
  if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
1756
    pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size);
1757
    if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
1758
      STMT_ERR_RET(terrno);
1759
    }
1760
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
1761
  }
1762

1763
  if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
1764
    pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size);
1765
    if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
1766
      STMT_ERR_RET(terrno);
1767
    }
1768
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
1769
  }
1770

1771
  return TSDB_CODE_SUCCESS;
1772
}
1773
*/
1774
int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx, SVCreateTbReq* pCreateTbReq) {
233✔
1775
  STscStmt2* pStmt = (STscStmt2*)stmt;
233✔
1776
  int32_t    code = 0;
233✔
1777

1778
  int64_t startUs = taosGetTimestampUs();
233✔
1779

1780
  STMT2_TLOG("start to bind data, colIdx:%d", colIdx);
233!
1781

1782
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
233!
1783
    return pStmt->errCode;
×
1784
  }
1785

1786
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
233!
1787

1788
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
233!
1789
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1790
    pStmt->bInfo.needParse = false;
×
1791
  }
1792

1793
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
233✔
1794
    taos_free_result(pStmt->exec.pRequest);
1✔
1795
    pStmt->exec.pRequest = NULL;
1✔
1796
  }
1797

1798
  STMT_ERR_RET(stmtCreateRequest(pStmt));
233!
1799
  if (pStmt->bInfo.needParse) {
233✔
1800
    code = stmtParseSql(pStmt);
6✔
1801
    if (code != TSDB_CODE_SUCCESS) {
6✔
1802
      goto cleanup_root;
2✔
1803
    }
1804
  }
1805

1806
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
231✔
1807
    code = qStmtBindParams2(pStmt->sql.pQuery, bind, colIdx, pStmt->taos->optionInfo.charsetCxt);
5✔
1808
    if (code != TSDB_CODE_SUCCESS) {
5!
1809
      goto cleanup_root;
×
1810
    }
1811
    SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
5✔
1812
                         .acctId = pStmt->taos->acctId,
5✔
1813
                         .db = pStmt->exec.pRequest->pDb,
5✔
1814
                         .topicQuery = false,
1815
                         .pSql = pStmt->sql.sqlStr,
5✔
1816
                         .sqlLen = pStmt->sql.sqlLen,
5✔
1817
                         .pMsg = pStmt->exec.pRequest->msgBuf,
5✔
1818
                         .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1819
                         .pTransporter = pStmt->taos->pAppInfo->pTransporter,
5✔
1820
                         .pStmtCb = NULL,
1821
                         .pUser = pStmt->taos->user};
5✔
1822
    ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
5✔
1823
    code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog);
5✔
1824
    if (code != TSDB_CODE_SUCCESS) {
5!
1825
      goto cleanup_root;
×
1826
    }
1827
    code = qStmtParseQuerySql(&ctx, pStmt->sql.pQuery);
5✔
1828
    if (code != TSDB_CODE_SUCCESS) {
5!
1829
      goto cleanup_root;
×
1830
    }
1831

1832
    if (pStmt->sql.pQuery->haveResultSet) {
5!
1833
      STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
12!
1834
                                    pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true));
1835
      taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
5!
1836
      taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
5!
1837
      setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
5✔
1838
    }
1839

1840
    TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
5✔
1841
    TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
5✔
1842
    TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
5✔
1843

1844
    // if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
1845
    //   STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
1846
    // }
1847

1848
    // STMT_ERR_RET(stmtBackupQueryFields(pStmt));
1849

1850
    return TSDB_CODE_SUCCESS;
5✔
1851

1852
  cleanup_root:
2✔
1853
    STMT2_ELOG("parse query statment unexpected failed code:%d, need to clean node", code);
2!
1854
    if (pStmt->sql.pQuery && pStmt->sql.pQuery->pRoot) {
2!
1855
      nodesDestroyNode(pStmt->sql.pQuery->pRoot);
1✔
1856
      pStmt->sql.pQuery->pRoot = NULL;
1✔
1857
    }
1858
    STMT_ERR_RET(code);
2!
1859
  }
1860

1861
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
226!
1862
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1863
  }
1864

1865
  STableDataCxt** pDataBlock = NULL;
226✔
1866

1867
  if (pStmt->exec.pCurrBlock) {
226✔
1868
    pDataBlock = &pStmt->exec.pCurrBlock;
170✔
1869
  } else {
1870
    pDataBlock =
1871
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
56✔
1872
    if (NULL == pDataBlock) {
56!
1873
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
1874
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1875
    }
1876
    pStmt->exec.pCurrBlock = *pDataBlock;
56✔
1877
    if (pStmt->sql.stbInterlaceMode) {
56✔
1878
      taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
33✔
1879
      (*pDataBlock)->pData->aCol = NULL;
33✔
1880
    }
1881
    if (colIdx < -1) {
56✔
1882
      pStmt->sql.bindRowFormat = true;
1✔
1883
      taosArrayDestroy((*pDataBlock)->pData->aCol);
1✔
1884
      (*pDataBlock)->pData->aCol = taosArrayInit(20, POINTER_BYTES);
1✔
1885
    }
1886
  }
1887

1888
  int64_t startUs2 = taosGetTimestampUs();
226✔
1889
  pStmt->stat.bindDataUs1 += startUs2 - startUs;
226✔
1890

1891
  SStmtQNode* param = NULL;
226✔
1892
  if (pStmt->sql.stbInterlaceMode) {
226✔
1893
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
212!
1894
    STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, &param->tblData.aCol));
212!
1895
    taosArrayClear(param->tblData.aCol);
106✔
1896

1897
    // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES);
1898

1899
    param->restoreTbCols = false;
106✔
1900
    param->tblData.isOrdered = true;
106✔
1901
    param->tblData.isDuplicateTs = false;
106✔
1902
    tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
106✔
1903

1904
    param->pCreateTbReq = pCreateTbReq;
106✔
1905
  }
1906

1907
  int64_t startUs3 = taosGetTimestampUs();
226✔
1908
  pStmt->stat.bindDataUs2 += startUs3 - startUs2;
226✔
1909

1910
  SArray* pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol;
226✔
1911

1912
  if (colIdx < 0) {
226✔
1913
    if (pStmt->sql.stbInterlaceMode) {
220✔
1914
      // (*pDataBlock)->pData->flags = 0;
1915
      (*pDataBlock)->pData->flags &= ~SUBMIT_REQ_COLUMN_DATA_FORMAT;
106✔
1916
      code = qBindStmtStbColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
106✔
1917
                                    pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
106✔
1918
                                    pStmt->taos->optionInfo.charsetCxt);
106✔
1919
      param->tblData.isOrdered = (*pDataBlock)->ordered;
106✔
1920
      param->tblData.isDuplicateTs = (*pDataBlock)->duplicateTs;
106✔
1921
    } else {
1922
      if (colIdx == -1) {
114✔
1923
        if (pStmt->sql.bindRowFormat) {
112✔
1924
          STMT2_ELOG_E("can't mix bind row format and bind column format");
1!
1925
          STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
1!
1926
        }
1927
        code = qBindStmtColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
111✔
1928
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt);
111✔
1929
      } else {
1930
        code = qBindStmt2RowValue(*pDataBlock, (*pDataBlock)->pData->aRowP, bind, pStmt->exec.pRequest->msgBuf,
2✔
1931
                                  pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
2✔
1932
                                  pStmt->taos->optionInfo.charsetCxt);
2✔
1933
      }
1934
    }
1935

1936
    if (code) {
219✔
1937
      STMT2_ELOG("bind cols or rows failed, error:%s", tstrerror(code));
1!
1938
      STMT_ERR_RET(code);
1!
1939
    }
1940
  } else {
1941
    if (pStmt->sql.stbInterlaceMode) {
6!
1942
      STMT2_ELOG_E("bind single column not allowed in stb insert mode");
×
1943
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1944
    }
1945

1946
    if (pStmt->sql.bindRowFormat) {
6!
1947
      STMT2_ELOG_E("can't mix bind row format and bind column format");
×
1948
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1949
    }
1950

1951
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
6!
1952
      STMT2_ELOG_E("bind column index not in sequence");
×
1953
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1954
    }
1955

1956
    pStmt->bInfo.sBindLastIdx = colIdx;
6✔
1957

1958
    if (0 == colIdx) {
6✔
1959
      pStmt->bInfo.sBindRowNum = bind->num;
3✔
1960
    }
1961

1962
    code = qBindStmtSingleColValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
6✔
1963
                                    pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum,
6✔
1964
                                    pStmt->taos->optionInfo.charsetCxt);
6✔
1965
    if (code) {
6!
1966
      STMT2_ELOG("bind single col failed, error:%s", tstrerror(code));
×
1967
      STMT_ERR_RET(code);
×
1968
    }
1969
  }
1970

1971
  int64_t startUs4 = taosGetTimestampUs();
224✔
1972
  pStmt->stat.bindDataUs3 += startUs4 - startUs3;
224✔
1973

1974
  if (pStmt->sql.stbInterlaceMode) {
224✔
1975
    STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param));
105!
1976
  } else {
1977
    STMT_ERR_RET(stmtAddBatch2(pStmt));
119!
1978
  }
1979

1980
  pStmt->stat.bindDataUs4 += taosGetTimestampUs() - startUs4;
224✔
1981

1982
  return TSDB_CODE_SUCCESS;
224✔
1983
}
1984
/*
1985
int stmtUpdateTableUid(STscStmt2* pStmt, SSubmitRsp* pRsp) {
1986
  tscDebug("stmt start to update tbUid, blockNum:%d", pRsp->nBlocks);
1987

1988
  int32_t code = 0;
1989
  int32_t finalCode = 0;
1990
  size_t  keyLen = 0;
1991
  void*   pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
1992
  while (pIter) {
1993
    STableDataCxt* pBlock = *(STableDataCxt**)pIter;
1994
    char*          key = taosHashGetKey(pIter, &keyLen);
1995

1996
    STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
1997
    if (pMeta->uid) {
1998
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1999
      continue;
2000
    }
2001

2002
    SSubmitBlkRsp* blkRsp = NULL;
2003
    int32_t        i = 0;
2004
    for (; i < pRsp->nBlocks; ++i) {
2005
      blkRsp = pRsp->pBlocks + i;
2006
      if (strlen(blkRsp->tblFName) != keyLen) {
2007
        continue;
2008
      }
2009

2010
      if (strncmp(blkRsp->tblFName, key, keyLen)) {
2011
        continue;
2012
      }
2013

2014
      break;
2015
    }
2016

2017
    if (i < pRsp->nBlocks) {
2018
      tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid,
2019
               blkRsp->uid);
2020

2021
      pMeta->uid = blkRsp->uid;
2022
      pStmt->bInfo.tbUid = blkRsp->uid;
2023
    } else {
2024
      tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
2025
      if (NULL == pStmt->pCatalog) {
2026
        code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
2027
        if (code) {
2028
          pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
2029
          finalCode = code;
2030
          continue;
2031
        }
2032
      }
2033

2034
      code = stmtCreateRequest(pStmt);
2035
      if (code) {
2036
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
2037
        finalCode = code;
2038
        continue;
2039
      }
2040

2041
      STableMeta*      pTableMeta = NULL;
2042
      SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
2043
                               .requestId = pStmt->exec.pRequest->requestId,
2044
                               .requestObjRefId = pStmt->exec.pRequest->self,
2045
                               .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
2046
      code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
2047

2048
      pStmt->stat.ctgGetTbMetaNum++;
2049

2050
      taos_free_result(pStmt->exec.pRequest);
2051
      pStmt->exec.pRequest = NULL;
2052

2053
      if (code || NULL == pTableMeta) {
2054
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
2055
        finalCode = code;
2056
        taosMemoryFree(pTableMeta);
2057
        continue;
2058
      }
2059

2060
      pMeta->uid = pTableMeta->uid;
2061
      pStmt->bInfo.tbUid = pTableMeta->uid;
2062
      taosMemoryFree(pTableMeta);
2063
    }
2064

2065
    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
2066
  }
2067

2068
  return finalCode;
2069
}
2070
*/
2071
/*
2072
int stmtStaticModeExec(TAOS_STMT* stmt) {
2073
  STscStmt2*   pStmt = (STscStmt2*)stmt;
2074
  int32_t     code = 0;
2075
  SSubmitRsp* pRsp = NULL;
2076
  if (pStmt->sql.staticMode) {
2077
    return TSDB_CODE_TSC_STMT_API_ERROR;
2078
  }
2079

2080
  STMT_DLOG_E("start to exec");
2081

2082
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
2083

2084
  STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList,
2085
pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum));
2086

2087
  launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
2088

2089
  if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
2090
    code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
2091
    if (code) {
2092
      pStmt->exec.pRequest->code = code;
2093
    } else {
2094
      tFreeSSubmitRsp(pRsp);
2095
      STMT_ERR_RET(stmtResetStmt(pStmt));
2096
      STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
2097
    }
2098
  }
2099

2100
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
2101

2102
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
2103
  pStmt->affectedRows += pStmt->exec.affectedRows;
2104

2105
_return:
2106

2107
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
2108

2109
  tFreeSSubmitRsp(pRsp);
2110

2111
  ++pStmt->sql.runTimes;
2112

2113
  STMT_RET(code);
2114
}
2115
*/
2116

2117
static int32_t createParseContext(const SRequestObj* pRequest, SParseContext** pCxt, SSqlCallbackWrapper* pWrapper) {
15✔
2118
  const STscObj* pTscObj = pRequest->pTscObj;
15✔
2119

2120
  *pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
15!
2121
  if (*pCxt == NULL) {
15!
2122
    return terrno;
×
2123
  }
2124

2125
  **pCxt = (SParseContext){.requestId = pRequest->requestId,
15✔
2126
                           .requestRid = pRequest->self,
15✔
2127
                           .acctId = pTscObj->acctId,
15✔
2128
                           .db = pRequest->pDb,
15✔
2129
                           .topicQuery = false,
2130
                           .pSql = pRequest->sqlstr,
15✔
2131
                           .sqlLen = pRequest->sqlLen,
15✔
2132
                           .pMsg = pRequest->msgBuf,
15✔
2133
                           .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
2134
                           .pTransporter = pTscObj->pAppInfo->pTransporter,
15✔
2135
                           .pStmtCb = NULL,
2136
                           .pUser = pTscObj->user,
15✔
2137
                           .pEffectiveUser = pRequest->effectiveUser,
15✔
2138
                           .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
15✔
2139
                           .enableSysInfo = pTscObj->sysInfo,
15✔
2140
                           .async = true,
2141
                           .svrVer = pTscObj->sVer,
15✔
2142
                           .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
15✔
2143
                           .allocatorId = pRequest->allocatorRefId,
15✔
2144
                           .parseSqlFp = clientParseSql,
2145
                           .parseSqlParam = pWrapper};
2146
  int8_t biMode = atomic_load_8(&((STscObj*)pTscObj)->biMode);
15✔
2147
  (*pCxt)->biMode = biMode;
15✔
2148
  return TSDB_CODE_SUCCESS;
15✔
2149
}
2150

2151
static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
15✔
2152
  STscStmt2*        pStmt = userdata;
15✔
2153
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
15✔
2154
  pStmt->asyncExecCb = true;
15✔
2155

2156
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
15✔
2157
  pStmt->affectedRows += pStmt->exec.affectedRows;
15✔
2158

2159
  fp(pStmt->options.userdata, res, code);
15✔
2160

2161
  while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
15!
2162
    taosUsleep(1);
×
2163
  }
2164
  (void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
15✔
2165
  ++pStmt->sql.runTimes;
15✔
2166

2167
  if (tsem_post(&pStmt->asyncExecSem) != 0) {
15!
2168
    STMT2_ELOG_E("fail to post asyncExecSem");
×
2169
  }
2170
}
15✔
2171

2172
int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
104✔
2173
  STscStmt2* pStmt = (STscStmt2*)stmt;
104✔
2174
  int32_t    code = 0;
104✔
2175
  int64_t    startUs = taosGetTimestampUs();
104✔
2176

2177
  STMT2_DLOG_E("start to exec");
104!
2178

2179
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
104!
2180
    return pStmt->errCode;
×
2181
  }
2182

2183
  STMT_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
104!
2184
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
104!
2185
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
2186
  }
2187
  STMT_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
104!
2188

2189
  if (pStmt->sql.stbInterlaceMode) {
104✔
2190
    STMT_ERR_RET(stmtAddBatch2(pStmt));
50!
2191
  }
2192

2193
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
104✔
2194
  pStmt->asyncExecCb = false;
103✔
2195

2196
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
103✔
2197
    if (pStmt->sql.stbInterlaceMode) {
98✔
2198
      int64_t startTs = taosGetTimestampUs();
50✔
2199
      // wait for stmt bind thread to finish
2200
      while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
311✔
2201
        taosUsleep(1);
261✔
2202
      }
2203

2204
      pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs;
50✔
2205
      STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList));
50!
2206
      taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
50✔
2207
      pStmt->sql.siInfo.pVgroupHash = NULL;
50✔
2208
      pStmt->sql.siInfo.pVgroupList = NULL;
50✔
2209
    } else {
2210
      tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
48✔
2211
      taosMemoryFreeClear(pStmt->exec.pCurrTbData);
48!
2212

2213
      STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
48!
2214

2215
      STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
48!
2216
    }
2217
  }
2218

2219
  SRequestObj*      pRequest = pStmt->exec.pRequest;
103✔
2220
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
103✔
2221

2222
  if (!fp) {
103✔
2223
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
88✔
2224

2225
    if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
88!
2226
      STMT2_ELOG_E("exec failed errorcode:NEED_CLIENT_HANDLE_ERROR, need to refresh meta and retry");
×
2227
      code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
×
2228
      if (code) {
×
2229
        pStmt->exec.pRequest->code = code;
×
2230

2231
      } else {
2232
        STMT_ERR_RET(stmtResetStmt(pStmt));
×
2233
        STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
×
2234
      }
2235
    }
2236

2237
    STMT_ERR_JRET(pStmt->exec.pRequest->code);
88!
2238

2239
    pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
88✔
2240
    if (affected_rows) {
88✔
2241
      *affected_rows = pStmt->exec.affectedRows;
83✔
2242
    }
2243
    pStmt->affectedRows += pStmt->exec.affectedRows;
88✔
2244

2245
    // wait for stmt bind thread to finish
2246
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
88!
2247
      taosUsleep(1);
×
2248
    }
2249

2250
    STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
88!
2251

2252
    ++pStmt->sql.runTimes;
88✔
2253
  } else {
2254
    SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
15!
2255
    if (pWrapper == NULL) {
15!
2256
      code = terrno;
×
2257
    } else {
2258
      pWrapper->pRequest = pRequest;
15✔
2259
      pRequest->pWrapper = pWrapper;
15✔
2260
    }
2261
    if (TSDB_CODE_SUCCESS == code) {
15!
2262
      code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper);
15✔
2263
    }
2264
    pRequest->syncQuery = false;
15✔
2265
    pRequest->body.queryFp = asyncQueryCb;
15✔
2266
    ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
15✔
2267

2268
    pStmt->execSemWaited = false;
15✔
2269
    launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
15✔
2270
  }
2271

2272
_return:
103✔
2273
  if (code) {
103!
2274
    STMT2_ELOG("exec failed, error:%s", tstrerror(code));
×
2275
  }
2276
  pStmt->stat.execUseUs += taosGetTimestampUs() - startUs;
103✔
2277

2278
  STMT_RET(code);
103!
2279
}
2280

2281
int stmtClose2(TAOS_STMT2* stmt) {
92✔
2282
  STscStmt2* pStmt = (STscStmt2*)stmt;
92✔
2283

2284
  STMT2_DLOG_E("start to close stmt");
92!
2285
  taosMemoryFreeClear(pStmt->db);
92!
2286

2287
  if (pStmt->bindThreadInUse) {
92✔
2288
    // wait for stmt bind thread to finish
2289
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
26!
2290
      taosUsleep(1);
×
2291
    }
2292

2293
    (void)taosThreadMutexLock(&pStmt->queue.mutex);
26✔
2294
    pStmt->queue.stopQueue = true;
26✔
2295
    (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
26✔
2296
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
26✔
2297

2298
    (void)taosThreadJoin(pStmt->bindThread, NULL);
26✔
2299
    pStmt->bindThreadInUse = false;
26✔
2300

2301
    (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
26✔
2302
    (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
26✔
2303
  }
2304

2305
  TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
92!
2306
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
92!
2307
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
2308
  }
2309
  TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
92!
2310

2311
  (void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond);
92✔
2312
  (void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex);
92✔
2313

2314
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
92!
2315
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
6!
2316
      STMT2_ELOG_E("fail to wait asyncExecSem");
×
2317
    }
2318
  }
2319

2320
  STMT2_DLOG("stmt %p closed, stbInterlaceMode:%d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
92!
2321
             ", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
2322
             ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
2323
             ", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64
2324
             ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64,
2325
             pStmt, pStmt->sql.stbInterlaceMode, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo,
2326
             pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum, pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND],
2327
             pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE], pStmt->stat.setTbNameUs,
2328
             pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
2329
             pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
2330
  if (pStmt->sql.stbInterlaceMode) {
92✔
2331
    pStmt->bInfo.tagsCached = false;
20✔
2332
  }
2333

2334
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
92!
2335

2336
  if (pStmt->options.asyncExecFn) {
92✔
2337
    if (tsem_destroy(&pStmt->asyncExecSem) != 0) {
6!
2338
      STMT2_ELOG_E("fail to destroy asyncExecSem");
×
2339
    }
2340
  }
2341
  taosMemoryFree(stmt);
92!
2342

2343
  return TSDB_CODE_SUCCESS;
92✔
2344
}
2345

2346
const char* stmtErrstr2(TAOS_STMT2* stmt) {
5✔
2347
  STscStmt2* pStmt = (STscStmt2*)stmt;
5✔
2348

2349
  if (stmt == NULL || NULL == pStmt->exec.pRequest) {
5!
2350
    return (char*)tstrerror(terrno);
3✔
2351
  }
2352

2353
  // if stmt async exec ,error code is pStmt->exec.pRequest->code
2354
  if (!(pStmt->sql.status >= STMT_EXECUTE && pStmt->options.asyncExecFn != NULL && pStmt->asyncExecCb)) {
2!
2355
    pStmt->exec.pRequest->code = terrno;
2✔
2356
  }
2357

2358
  SRequestObj* pRequest = pStmt->exec.pRequest;
2✔
2359
  if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) {
2!
2360
    return pRequest->msgBuf;
2✔
2361
  }
2362
  return (const char*)tstrerror(pRequest->code);
×
2363
}
2364
/*
2365
int stmtAffectedRows(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->affectedRows; }
2366

2367
int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->exec.affectedRows; }
2368
*/
2369

2370
int stmtParseColFields2(TAOS_STMT2* stmt) {
57✔
2371
  int32_t    code = 0;
57✔
2372
  STscStmt2* pStmt = (STscStmt2*)stmt;
57✔
2373
  int32_t    preCode = pStmt->errCode;
57✔
2374

2375
  STMT2_DLOG_E("start to get col fields for insert");
57!
2376

2377
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
57!
2378
    return pStmt->errCode;
×
2379
  }
2380

2381
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
57!
2382
    STMT2_ELOG_E("stmtParseColFields2 only for insert");
×
2383
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2384
  }
2385

2386
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
57!
2387

2388
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
57!
2389
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
3!
2390
    pStmt->bInfo.needParse = false;
×
2391
  }
2392
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
57✔
2393
    pStmt->bInfo.needParse = false;
7✔
2394
  }
2395

2396
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
57!
2397

2398
  if (pStmt->bInfo.needParse) {
57✔
2399
    STMT_ERRI_JRET(stmtParseSql(pStmt));
50✔
2400
  }
2401

2402
_return:
42✔
2403
  // compatible with previous versions
2404
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST && (pStmt->bInfo.tbNameFlag & NO_DATA_USING_CLAUSE) == 0x0) {
57!
2405
    code = TSDB_CODE_TSC_STMT_TBNAME_ERROR;
1✔
2406
  }
2407

2408
  if (code != TSDB_CODE_SUCCESS) {
57✔
2409
    STMT2_ELOG("stmt get fileds parse failed, code:%d", code);
15!
2410
    taos_free_result(pStmt->exec.pRequest);
15✔
2411
    pStmt->exec.pRequest = NULL;
15✔
2412
  }
2413

2414
  pStmt->errCode = preCode;
57✔
2415

2416
  return code;
57✔
2417
}
2418

2419
int stmtGetStbColFields2(TAOS_STMT2* stmt, int* nums, TAOS_FIELD_ALL** fields) {
57✔
2420
  int32_t code = stmtParseColFields2(stmt);
57✔
2421
  if (code != TSDB_CODE_SUCCESS) {
57✔
2422
    return code;
15✔
2423
  }
2424

2425
  return stmtFetchStbColFields2(stmt, nums, fields);
42✔
2426
}
2427

2428
int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) {
13✔
2429
  int32_t    code = 0;
13✔
2430
  STscStmt2* pStmt = (STscStmt2*)stmt;
13✔
2431
  int32_t    preCode = pStmt->errCode;
13✔
2432

2433
  STMT2_DLOG_E("start to get param num for query");
13!
2434

2435
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
13!
2436
    return pStmt->errCode;
×
2437
  }
2438

2439
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
13!
2440

2441
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
13!
2442
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
2443
    pStmt->bInfo.needParse = false;
×
2444
  }
2445

2446
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
13!
2447
    taos_free_result(pStmt->exec.pRequest);
×
2448
    pStmt->exec.pRequest = NULL;
×
2449
  }
2450

2451
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
13!
2452

2453
  if (pStmt->bInfo.needParse) {
13!
2454
    STMT_ERRI_JRET(stmtParseSql(pStmt));
13✔
2455
  }
2456

2457
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
6!
2458
    *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
6✔
2459
  } else {
2460
    STMT_ERRI_JRET(stmtFetchColFields2(stmt, nums, NULL));
×
2461
  }
2462

2463
  STMT2_TLOG("get param num success, nums:%d", *nums);
6!
2464

2465
_return:
6✔
2466
  if (code != TSDB_CODE_SUCCESS) {
13✔
2467
    taos_free_result(pStmt->exec.pRequest);
7✔
2468
    pStmt->exec.pRequest = NULL;
7✔
2469
  }
2470
  pStmt->errCode = preCode;
13✔
2471

2472
  return code;
13✔
2473
}
2474

2475
TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) {
5✔
2476
  STscStmt2* pStmt = (STscStmt2*)stmt;
5✔
2477

2478
  STMT2_TLOG_E("start to use result");
5!
2479

2480
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
5!
2481
    STMT2_ELOG_E("useResult only for query statement");
×
2482
    return NULL;
×
2483
  }
2484

2485
  return pStmt->exec.pRequest;
5✔
2486
}
2487

2488
int32_t stmtAsyncBindThreadFunc(void* args) {
×
2489
  qInfo("async stmt bind thread started");
×
2490

2491
  ThreadArgs* targs = (ThreadArgs*)args;
×
2492
  STscStmt2*  pStmt = (STscStmt2*)targs->stmt;
×
2493

2494
  int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx);
×
2495
  targs->fp(targs->param, NULL, code);
×
2496
  (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
×
2497
  (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
×
2498
  (void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
×
2499
  (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
×
2500
  taosMemoryFree(args);
×
2501

2502
  qInfo("async stmt bind thread stopped");
×
2503

2504
  return code;
×
2505
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc