• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4069

12 May 2025 05:35AM UTC coverage: 63.048% (+0.5%) from 62.547%
#4069

push

travis-ci

web-flow
Merge pull request #31053 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

157521 of 317858 branches covered (49.56%)

Branch coverage included in aggregate %.

374 of 573 new or added lines in 31 files covered. (65.27%)

4949 existing lines in 87 files now uncovered.

242707 of 316936 relevant lines covered (76.58%)

18229906.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.47
/source/client/src/clientStmt2.c
1
#include "clientInt.h"
2
#include "clientLog.h"
3
#include "tdef.h"
4

5
#include "clientStmt.h"
6
#include "clientStmt2.h"
7
/*
8
char* gStmtStatusStr[] = {"unknown",     "init", "prepare", "settbname", "settags",
9
                          "fetchFields", "bind", "bindCol", "addBatch",  "exec"};
10
*/
11
static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pBuf) {
12
  if (pTblBuf->buffOffset < pTblBuf->buffSize) {
9,569✔
13
    *pBuf = (char*)pTblBuf->pCurBuff + pTblBuf->buffOffset;
9,565✔
14
    pTblBuf->buffOffset += pTblBuf->buffUnit;
9,565✔
15
  } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) {
4!
16
    pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++);
×
17
    if (NULL == pTblBuf->pCurBuff) {
×
18
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
19
    }
20
    *pBuf = pTblBuf->pCurBuff;
×
21
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
22
  } else {
23
    void* buff = taosMemoryMalloc(pTblBuf->buffSize);
×
24
    if (NULL == buff) {
×
25
      return terrno;
×
26
    }
27

28
    if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
×
29
      return terrno;
×
30
    }
31

32
    pTblBuf->buffIdx++;
×
33
    pTblBuf->pCurBuff = buff;
×
34
    *pBuf = buff;
×
35
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
36
  }
37

38
  return TSDB_CODE_SUCCESS;
9,565✔
39
}
40

41
static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
9,574✔
42
  int i = 0;
9,574✔
43
  while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
53,968✔
44
    if (pStmt->queue.stopQueue) {
44,342!
45
      STMT2_DLOG_E("stmt stopQueue,but remainNum is 0");
×
46
      return false;
×
47
    }
48
    if (i < 10) {
44,342✔
49
      taosUsleep(1);
41,912✔
50
      i++;
41,936✔
51
    } else {
52
      (void)taosThreadMutexLock(&pStmt->queue.mutex);
2,430✔
53
      if (pStmt->queue.stopQueue) {
2,456!
54
        (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
×
55
        STMT2_DLOG_E("stmt stopQueue,but remainNum is 0");
×
56
        return false;
×
57
      }
58
      if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
2,456✔
59
        (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
2,442✔
60
      }
61
      (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
2,456✔
62
    }
63
  }
64
  if (pStmt->queue.stopQueue) {
9,452✔
65
    STMT2_DLOG_E("stmt stopQueue");
92!
66
    return false;
92✔
67
  }
68
  SStmtQNode* orig = pStmt->queue.head;
9,360✔
69
  SStmtQNode* node = pStmt->queue.head->next;
9,360✔
70
  pStmt->queue.head = pStmt->queue.head->next;
9,360✔
71
  *param = node;
9,360✔
72

73
  (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
9,360✔
74

75
  return true;
9,487✔
76
}
77

78
static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
9,478✔
79
  pStmt->queue.tail->next = param;
9,478✔
80
  pStmt->queue.tail = param;
9,478✔
81

82
  pStmt->stat.bindDataNum++;
9,478✔
83

84
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
9,478✔
85
  (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
9,480✔
86
  (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
9,485✔
87
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
9,478✔
88
}
9,483✔
89

90
static int32_t stmtCreateRequest(STscStmt2* pStmt) {
6,018✔
91
  int32_t code = 0;
6,018✔
92

93
  if (pStmt->exec.pRequest == NULL) {
6,018✔
94
    code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
189✔
95
                        pStmt->reqid);
96
    if (pStmt->reqid != 0) {
189!
97
      pStmt->reqid++;
×
98
    }
99
    pStmt->exec.pRequest->type = RES_TYPE__QUERY;
189✔
100
    if (pStmt->db != NULL) {
189✔
101
      taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
128!
102
      pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
128!
103
    }
104
    if (TSDB_CODE_SUCCESS == code) {
189✔
105
      pStmt->exec.pRequest->syncQuery = true;
188✔
106
      pStmt->exec.pRequest->isStmtBind = true;
188✔
107
    }
108
  }
109

110
  return code;
6,018✔
111
}
112

113
static int32_t stmtSwitchStatus(STscStmt2* pStmt, STMT_STATUS newStatus) {
19,695✔
114
  int32_t code = 0;
19,695✔
115

116
  if (newStatus >= STMT_INIT && newStatus < STMT_MAX) {
19,695!
117
    STMT_LOG_SEQ(newStatus);
19,697!
118
  }
119

120
  if (pStmt->errCode && newStatus != STMT_PREPARE) {
19,729!
121
    STMT_DLOG("stmt already failed with err:%s", tstrerror(pStmt->errCode));
×
122
    return pStmt->errCode;
×
123
  }
124

125
  switch (newStatus) {
19,729!
126
    case STMT_PREPARE:
163✔
127
      pStmt->errCode = 0;
163✔
128
      break;
163✔
129
    case STMT_SETTBNAME:
5,363✔
130
      if (STMT_STATUS_EQ(INIT)) {
5,363!
131
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
132
      }
133
      if (!pStmt->sql.stbInterlaceMode && (STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL))) {
5,363!
134
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
135
      }
136
      break;
5,363✔
137
    case STMT_SETTAGS:
146✔
138
      if (STMT_STATUS_EQ(INIT)) {
146!
139
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
140
      }
141
      break;
146✔
142
    case STMT_FETCH_FIELDS:
68✔
143
      if (STMT_STATUS_EQ(INIT)) {
68!
144
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
145
      }
146
      break;
68✔
147
    case STMT_BIND:
5,373✔
148
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
5,373!
149
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
150
      }
151
      /*
152
            if ((pStmt->sql.type == STMT_TYPE_MULTI_INSERT) && ()) {
153
              code = TSDB_CODE_TSC_STMT_API_ERROR;
154
            }
155
      */
156
      break;
5,373✔
157
    case STMT_BIND_COL:
×
158
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
×
159
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
160
      }
161
      break;
×
162
    case STMT_ADD_BATCH:
4,337✔
163
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
4,337!
164
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
165
      }
166
      break;
4,337✔
167
    case STMT_EXECUTE:
4,279✔
168
      if (STMT_TYPE_QUERY == pStmt->sql.type) {
4,279✔
169
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
5!
170
            STMT_STATUS_NE(BIND_COL)) {
×
171
          code = TSDB_CODE_TSC_STMT_API_ERROR;
×
172
        }
173
      } else {
174
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
4,274!
175
          code = TSDB_CODE_TSC_STMT_API_ERROR;
1✔
176
        }
177
      }
178
      break;
4,279✔
179
    default:
×
180
      code = TSDB_CODE_APP_ERROR;
×
181
      break;
×
182
  }
183

184
  STMT_ERR_RET(code);
19,729✔
185

186
  pStmt->sql.status = newStatus;
19,728✔
187

188
  return TSDB_CODE_SUCCESS;
19,728✔
189
}
190

191
static int32_t stmtGetTbName(TAOS_STMT2* stmt, char** tbName) {
138✔
192
  STscStmt2* pStmt = (STscStmt2*)stmt;
138✔
193

194
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
138✔
195

196
  if ('\0' == pStmt->bInfo.tbName[0]) {
138✔
197
    tscWarn("no table name set, OK if it is a stmt get fields");
33!
198
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
33!
199
  }
200

201
  *tbName = pStmt->bInfo.tbName;
105✔
202

203
  return TSDB_CODE_SUCCESS;
105✔
204
}
205

206
static int32_t stmtUpdateBindInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SName* tbName,
143✔
207
                                  const char* sTableName, bool autoCreateTbl, int8_t tbNameFlag) {
208
  STscStmt2* pStmt = (STscStmt2*)stmt;
143✔
209
  char       tbFName[TSDB_TABLE_FNAME_LEN];
210
  int32_t    code = tNameExtractFullName(tbName, tbFName);
143✔
211
  if (code != 0) {
144!
212
    return code;
×
213
  }
214

215
  (void)memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
144✔
216
  tstrncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName));
144✔
217
  pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
144✔
218

219
  pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
144✔
220
  pStmt->bInfo.tbSuid = pTableMeta->suid;
144✔
221
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
144✔
222
  pStmt->bInfo.tbType = pTableMeta->tableType;
144✔
223

224
  if (!pStmt->bInfo.tagsCached) {
144✔
225
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
126✔
226
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
123!
227
  }
228

229
  pStmt->bInfo.boundTags = tags;
140✔
230
  pStmt->bInfo.tagsCached = false;
140✔
231
  pStmt->bInfo.tbNameFlag = tbNameFlag;
140✔
232
  tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
140✔
233

234
  return TSDB_CODE_SUCCESS;
140✔
235
}
236

237
static int32_t stmtUpdateExecInfo(TAOS_STMT2* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
140✔
238
  STscStmt2* pStmt = (STscStmt2*)stmt;
140✔
239

240
  pStmt->sql.pVgHash = pVgHash;
140✔
241
  pStmt->exec.pBlockHash = pBlockHash;
140✔
242

243
  return TSDB_CODE_SUCCESS;
140✔
244
}
245

246
static int32_t stmtUpdateInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, bool autoCreateTbl,
143✔
247
                              SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName, uint8_t tbNameFlag) {
248
  STscStmt2* pStmt = (STscStmt2*)stmt;
143✔
249

250
  STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbName, sTableName, autoCreateTbl, tbNameFlag));
143!
251
  STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
140!
252

253
  pStmt->sql.autoCreateTbl = autoCreateTbl;
141✔
254

255
  return TSDB_CODE_SUCCESS;
141✔
256
}
257

258
static int32_t stmtGetExecInfo(TAOS_STMT2* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
11✔
259
  STscStmt2* pStmt = (STscStmt2*)stmt;
11✔
260

261
  *pVgHash = pStmt->sql.pVgHash;
11✔
262
  pStmt->sql.pVgHash = NULL;
11✔
263

264
  *pBlockHash = pStmt->exec.pBlockHash;
11✔
265
  pStmt->exec.pBlockHash = NULL;
11✔
266

267
  return TSDB_CODE_SUCCESS;
11✔
268
}
269

270
static int32_t stmtParseSql(STscStmt2* pStmt) {
171✔
271
  pStmt->exec.pCurrBlock = NULL;
171✔
272

273
  SStmtCallback stmtCb = {
171✔
274
      .pStmt = pStmt,
275
      .getTbNameFn = stmtGetTbName,
276
      .setInfoFn = stmtUpdateInfo,
277
      .getExecInfoFn = stmtGetExecInfo,
278
  };
279

280
  STMT_ERR_RET(stmtCreateRequest(pStmt));
171!
281

282
  pStmt->stat.parseSqlNum++;
170✔
283
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
170✔
284
  pStmt->sql.siInfo.pQuery = pStmt->sql.pQuery;
149✔
285

286
  pStmt->bInfo.needParse = false;
149✔
287

288
  if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) {
149✔
289
    pStmt->sql.type = STMT_TYPE_INSERT;
11✔
290
    pStmt->sql.stbInterlaceMode = false;
11✔
291
  } else if (pStmt->sql.pQuery->pPrepareRoot) {
138✔
292
    pStmt->sql.type = STMT_TYPE_QUERY;
6✔
293
    pStmt->sql.stbInterlaceMode = false;
6✔
294

295
    return TSDB_CODE_SUCCESS;
6✔
296
  }
297

298
  STableDataCxt** pSrc =
299
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
143✔
300
  if (NULL == pSrc || NULL == *pSrc) {
144!
301
    return terrno;
×
302
  }
303

304
  STableDataCxt* pTableCtx = *pSrc;
144✔
305
  // if (pStmt->sql.stbInterlaceMode) {
306
  //   int16_t lastIdx = -1;
307

308
  //   for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) {
309
  //     if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) {
310
  //       pStmt->sql.stbInterlaceMode = false;
311
  //       break;
312
  //     }
313

314
  //     lastIdx = pTableCtx->boundColsInfo.pColIndex[i];
315
  //   }
316
  // }
317

318
  if (NULL == pStmt->sql.pBindInfo) {
144✔
319
    pStmt->sql.pBindInfo = taosMemoryMalloc(pTableCtx->boundColsInfo.numOfBound * sizeof(*pStmt->sql.pBindInfo));
133!
320
    if (NULL == pStmt->sql.pBindInfo) {
133!
321
      return terrno;
×
322
    }
323
  }
324

325
  return TSDB_CODE_SUCCESS;
144✔
326
}
327

328
static int32_t stmtCleanBindInfo(STscStmt2* pStmt) {
4,554✔
329
  pStmt->bInfo.tbUid = 0;
4,554✔
330
  pStmt->bInfo.tbVgId = -1;
4,554✔
331
  pStmt->bInfo.tbType = 0;
4,554✔
332
  pStmt->bInfo.needParse = true;
4,554✔
333
  pStmt->bInfo.inExecCache = false;
4,554✔
334

335
  pStmt->bInfo.tbName[0] = 0;
4,554✔
336
  pStmt->bInfo.tbFName[0] = 0;
4,554✔
337
  if (!pStmt->bInfo.tagsCached) {
4,554✔
338
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
4,450✔
339
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
4,446!
340
  }
341
  if (!pStmt->sql.autoCreateTbl) {
4,552✔
342
    pStmt->bInfo.stbFName[0] = 0;
4,446✔
343
    pStmt->bInfo.tbSuid = 0;
4,446✔
344
  }
345

346
  return TSDB_CODE_SUCCESS;
4,552✔
347
}
348

349
static void stmtFreeTableBlkList(STableColsData* pTb) {
×
350
  (void)qResetStmtColumns(pTb->aCol, true);
×
351
  taosArrayDestroy(pTb->aCol);
×
352
}
×
353

354
static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
4,220✔
355
  pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0);
4,220✔
356
  if (NULL == pTblBuf->pCurBuff) {
4,221✔
357
    tscError("QInfo:%p, failed to get buffer from list", pTblBuf);
2!
358
    return;
×
359
  }
360
  pTblBuf->buffIdx = 1;
4,219✔
361
  pTblBuf->buffOffset = sizeof(*pQueue->head);
4,219✔
362

363
  pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
4,219✔
364
  pQueue->qRemainNum = 0;
4,219✔
365
  pQueue->head->next = NULL;
4,219✔
366
}
367

368
static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
4,438✔
369
  if (pStmt->sql.stbInterlaceMode) {
4,438✔
370
    if (deepClean) {
4,311✔
371
      taosHashCleanup(pStmt->exec.pBlockHash);
89✔
372
      pStmt->exec.pBlockHash = NULL;
89✔
373

374
      if (NULL != pStmt->exec.pCurrBlock) {
89✔
375
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->pData);
85!
376
        qDestroyStmtDataBlock(pStmt->exec.pCurrBlock);
85✔
377
        pStmt->exec.pCurrBlock = NULL;
85✔
378
      }
379
    } else {
380
      pStmt->sql.siInfo.pTableColsIdx = 0;
4,222✔
381
      stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue);
4,222✔
382
    }
383
    if (NULL != pStmt->exec.pRequest) {
4,309✔
384
      pStmt->exec.pRequest->body.resInfo.numOfRows = 0;
4,308✔
385
    }
386
  } else {
387
    if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
127✔
388
      // if (!pStmt->options.asyncExecFn) {
389
      taos_free_result(pStmt->exec.pRequest);
122✔
390
      pStmt->exec.pRequest = NULL;
122✔
391
      //}
392
    }
393

394
    size_t keyLen = 0;
127✔
395
    void*  pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
127✔
396
    while (pIter) {
269✔
397
      STableDataCxt* pBlocks = *(STableDataCxt**)pIter;
142✔
398
      char*          key = taosHashGetKey(pIter, &keyLen);
142✔
399
      STableMeta*    pMeta = qGetTableMetaInDataBlock(pBlocks);
142✔
400

401
      if (keepTable && pBlocks == pStmt->exec.pCurrBlock) {
142✔
402
        TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData);
47✔
403
        STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
99!
404

405
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
47✔
406
        continue;
47✔
407
      }
408

409
      qDestroyStmtDataBlock(pBlocks);
95✔
410
      STMT_ERR_RET(taosHashRemove(pStmt->exec.pBlockHash, key, keyLen));
95!
411

412
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
95✔
413
    }
414

415
    if (keepTable) {
127✔
416
      return TSDB_CODE_SUCCESS;
52✔
417
    }
418

419
    taosHashCleanup(pStmt->exec.pBlockHash);
75✔
420
    pStmt->exec.pBlockHash = NULL;
75✔
421

422
    tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
75✔
423
    taosMemoryFreeClear(pStmt->exec.pCurrTbData);
75!
424
  }
425

426
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
4,384!
427

428
  return TSDB_CODE_SUCCESS;
4,381✔
429
}
430

431
static void stmtFreeTbBuf(void* buf) {
96✔
432
  void* pBuf = *(void**)buf;
96✔
433
  taosMemoryFree(pBuf);
96!
434
}
96✔
435

436
static void stmtFreeTbCols(void* buf) {
86,000✔
437
  SArray* pCols = *(SArray**)buf;
86,000✔
438
  taosArrayDestroy(pCols);
86,000✔
439
}
86,000✔
440

441
static int32_t stmtCleanSQLInfo(STscStmt2* pStmt) {
143✔
442
  STMT2_DLOG_E("start to free SQL info");
143!
443

444
  taosMemoryFreeClear(pStmt->db);
143!
445
  taosMemoryFree(pStmt->sql.pBindInfo);
143!
446
  taosMemoryFree(pStmt->sql.queryRes.fields);
143!
447
  taosMemoryFree(pStmt->sql.queryRes.userFields);
143!
448
  taosMemoryFree(pStmt->sql.sqlStr);
143!
449
  qDestroyQuery(pStmt->sql.pQuery);
143✔
450
  taosArrayDestroy(pStmt->sql.nodeList);
143✔
451
  taosHashCleanup(pStmt->sql.pVgHash);
143✔
452
  pStmt->sql.pVgHash = NULL;
143✔
453
  if (pStmt->sql.fixValueTags) {
143✔
454
    tdDestroySVCreateTbReq(pStmt->sql.fixValueTbReq);
5!
455
  }
456

457
  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
143✔
458
  while (pIter) {
158✔
459
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
15✔
460

461
    qDestroyStmtDataBlock(pCache->pDataCtx);
15✔
462
    qDestroyBoundColInfo(pCache->boundTags);
15✔
463
    taosMemoryFreeClear(pCache->boundTags);
15!
464

465
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
15✔
466
  }
467
  taosHashCleanup(pStmt->sql.pTableCache);
143✔
468
  pStmt->sql.pTableCache = NULL;
143✔
469

470
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
143!
471
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
143!
472

473
  taos_free_result(pStmt->sql.siInfo.pRequest);
143✔
474
  taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
143✔
475
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
143✔
476
  taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
143✔
477
  taosMemoryFree(pStmt->sql.siInfo.pTSchema);
143!
478
  qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
143✔
479
  if (pStmt->sql.siInfo.tableColsReady) {
143!
480
    taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
143✔
481
    pStmt->sql.siInfo.pTableCols = NULL;
143✔
482
  } else {
483
    STMT2_DLOG_E("stmt close tableCols not ready, skip free");
×
484
  }
485
  (void)memset(&pStmt->sql, 0, sizeof(pStmt->sql));
143✔
486
  pStmt->sql.siInfo.tableColsReady = true;
143✔
487

488
  STMT_DLOG_E("end to free SQL info");
143!
489

490
  return TSDB_CODE_SUCCESS;
143✔
491
}
492

493
static int32_t stmtTryAddTableVgroupInfo(STscStmt2* pStmt, int32_t* vgId) {
113✔
494
  if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) {
113✔
495
    return TSDB_CODE_SUCCESS;
15✔
496
  }
497

498
  SVgroupInfo      vgInfo = {0};
98✔
499
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
98✔
500
                           .requestId = pStmt->exec.pRequest->requestId,
98✔
501
                           .requestObjRefId = pStmt->exec.pRequest->self,
98✔
502
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
98✔
503

504
  int32_t code = catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo);
98✔
505
  if (TSDB_CODE_SUCCESS != code) {
98!
506
    return code;
×
507
  }
508

509
  code =
510
      taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
98✔
511
  if (TSDB_CODE_SUCCESS != code) {
98!
512
    return code;
×
513
  }
514

515
  *vgId = vgInfo.vgId;
98✔
516

517
  return TSDB_CODE_SUCCESS;
98✔
518
}
519

520
static int32_t stmtRebuildDataBlock(STscStmt2* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
66✔
521
                                    uint64_t suid, int32_t vgId) {
522
  STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
66!
523
  STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgId, pStmt->sql.autoCreateTbl));
66!
524

525
  STMT_DLOG("uid:%" PRId64 ", rebuild table data context, vgId:%d", uid, vgId);
66!
526

527
  return TSDB_CODE_SUCCESS;
66✔
528
}
529

530
static int32_t stmtGetFromCache(STscStmt2* pStmt) {
192✔
531
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) {
192!
532
    pStmt->bInfo.needParse = false;
×
533
    pStmt->bInfo.inExecCache = false;
×
534
    return TSDB_CODE_SUCCESS;
×
535
  }
536

537
  pStmt->bInfo.needParse = true;
192✔
538
  pStmt->bInfo.inExecCache = false;
192✔
539

540
  STableDataCxt** pCxtInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
192✔
541
  if (pCxtInExec) {
194✔
542
    pStmt->bInfo.needParse = false;
24✔
543
    pStmt->bInfo.inExecCache = true;
24✔
544

545
    pStmt->exec.pCurrBlock = *pCxtInExec;
24✔
546

547
    if (pStmt->sql.autoCreateTbl) {
24✔
548
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
18!
549
      return TSDB_CODE_SUCCESS;
18✔
550
    }
551
  }
552

553
  if (NULL == pStmt->pCatalog) {
176✔
554
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
87!
555
    pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
89✔
556
  }
557

558
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
178✔
559
    if (pStmt->bInfo.inExecCache) {
104!
560
      pStmt->bInfo.needParse = false;
×
561
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
×
562
      return TSDB_CODE_SUCCESS;
×
563
    }
564

565
    tscDebug("no stmt block cache for tb %s", pStmt->bInfo.tbFName);
104!
566
    return TSDB_CODE_SUCCESS;
104✔
567
  }
568

569
  if (pStmt->sql.autoCreateTbl) {
73✔
570
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
48✔
571
    if (pCache) {
48!
572
      pStmt->bInfo.needParse = false;
48✔
573
      pStmt->bInfo.tbUid = 0;
48✔
574

575
      STableDataCxt* pNewBlock = NULL;
48✔
576
      STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid, -1));
48!
577

578
      if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
48!
579
                      POINTER_BYTES)) {
580
        STMT_ERR_RET(terrno);
×
581
      }
582

583
      pStmt->exec.pCurrBlock = pNewBlock;
48✔
584

585
      tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
48!
586

587
      return TSDB_CODE_SUCCESS;
48✔
588
    }
589

590
    STMT_RET(stmtCleanBindInfo(pStmt));
×
591
  }
592

593
  uint64_t uid, suid;
594
  int32_t  vgId;
595
  int8_t   tableType;
596

597
  STableMeta*      pTableMeta = NULL;
25✔
598
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
25✔
599
                           .requestId = pStmt->exec.pRequest->requestId,
25✔
600
                           .requestObjRefId = pStmt->exec.pRequest->self,
25✔
601
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
25✔
602
  int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
24✔
603

604
  pStmt->stat.ctgGetTbMetaNum++;
24✔
605

606
  if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
24!
607
    tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
×
608
    STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
609

610
    STMT_ERR_RET(code);
×
611
  }
612

613
  STMT_ERR_RET(code);
24!
614

615
  uid = pTableMeta->uid;
24✔
616
  suid = pTableMeta->suid;
24✔
617
  tableType = pTableMeta->tableType;
24✔
618
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
24✔
619
  vgId = pTableMeta->vgId;
24✔
620

621
  taosMemoryFree(pTableMeta);
24!
622

623
  uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
24!
624

625
  if (uid == pStmt->bInfo.tbUid) {
24!
626
    pStmt->bInfo.needParse = false;
×
627

628
    tscDebug("tb %s is current table", pStmt->bInfo.tbFName);
×
629

630
    return TSDB_CODE_SUCCESS;
×
631
  }
632

633
  if (pStmt->bInfo.inExecCache) {
24✔
634
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
6✔
635
    if (NULL == pCache) {
6!
636
      tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash",
×
637
               pStmt->bInfo.tbFName, uid, cacheUid);
638

639
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
640
    }
641

642
    pStmt->bInfo.needParse = false;
6✔
643

644
    pStmt->bInfo.tbUid = uid;
6✔
645
    pStmt->bInfo.tbSuid = suid;
6✔
646
    pStmt->bInfo.tbType = tableType;
6✔
647
    pStmt->bInfo.boundTags = pCache->boundTags;
6✔
648
    pStmt->bInfo.tagsCached = true;
6✔
649

650
    tscDebug("tb %s in execBlock list, set to current", pStmt->bInfo.tbFName);
6!
651

652
    return TSDB_CODE_SUCCESS;
6✔
653
  }
654

655
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
18✔
656
  if (pCache) {
18!
657
    pStmt->bInfo.needParse = false;
18✔
658

659
    pStmt->bInfo.tbUid = uid;
18✔
660
    pStmt->bInfo.tbSuid = suid;
18✔
661
    pStmt->bInfo.tbType = tableType;
18✔
662
    pStmt->bInfo.boundTags = pCache->boundTags;
18✔
663
    pStmt->bInfo.tagsCached = true;
18✔
664

665
    STableDataCxt* pNewBlock = NULL;
18✔
666
    STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid, vgId));
18!
667

668
    if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
18!
669
                    POINTER_BYTES)) {
670
      STMT_ERR_RET(terrno);
×
671
    }
672

673
    pStmt->exec.pCurrBlock = pNewBlock;
18✔
674

675
    tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);
18!
676

677
    return TSDB_CODE_SUCCESS;
18✔
678
  }
679

680
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
681

682
  return TSDB_CODE_SUCCESS;
×
683
}
684

685
static int32_t stmtResetStmt(STscStmt2* pStmt) {
×
686
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
×
687

688
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
689
  if (NULL == pStmt->sql.pTableCache) {
×
690
    STMT_ERR_RET(terrno);
×
691
  }
692

693
  pStmt->sql.status = STMT_INIT;
×
694

695
  return TSDB_CODE_SUCCESS;
×
696
}
697

698
static int32_t stmtAsyncOutput(STscStmt2* pStmt, void* param) {
9,482✔
699
  SStmtQNode* pParam = (SStmtQNode*)param;
9,482✔
700

701
  if (pParam->restoreTbCols) {
9,482✔
702
    for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) {
9,479✔
703
      SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i);
5,262✔
704
      *p = taosArrayInit(20, POINTER_BYTES);
5,262✔
705
      if (*p == NULL) {
5,262!
706
        STMT_ERR_RET(terrno);
×
707
      }
708
    }
709

710
    atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
4,217✔
711
  } else {
712
    int code = qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
5,257✔
713
                                      &pStmt->sql.siInfo, pParam->pCreateTbReq);
714
    // taosMemoryFree(pParam->pTbData);
715
    (void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
5,249✔
716
    STMT_ERR_RET(code);
5,265!
717
  }
718
  return TSDB_CODE_SUCCESS;
9,487✔
719
}
720

721
static void* stmtBindThreadFunc(void* param) {
96✔
722
  setThreadName("stmt2Bind");
96✔
723
  qInfo("stmt bind thread started");
96!
724

725
  STscStmt2* pStmt = (STscStmt2*)param;
96✔
726

727
  while (true) {
9,579✔
728
    if (pStmt->queue.stopQueue) {
9,675✔
729
      STMT2_DLOG_E("stmt bind thread received stop signal");
96!
730
      break;
96✔
731
    }
732

733
    SStmtQNode* asyncParam = NULL;
9,579✔
734
    if (!stmtDequeue(pStmt, &asyncParam)) {
9,579✔
735
      STMT2_DLOG_E("stmt dequeue failed, continue");
92!
736
      continue;
92✔
737
    }
738

739
    int ret = stmtAsyncOutput(pStmt, asyncParam);
9,483✔
740
    if (ret != 0) {
9,486!
741
      STMT2_ELOG("stmtAsyncOutput failed, reason:%s", tstrerror(ret));
×
742
    }
743
  }
744

745
  qInfo("stmt bind thread stopped");
96!
746
  return NULL;
96✔
747
}
748

749
static int32_t stmtStartBindThread(STscStmt2* pStmt) {
96✔
750
  TdThreadAttr thAttr;
751
  if (taosThreadAttrInit(&thAttr) != 0) {
96!
752
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
753
  }
754
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
96!
755
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
756
  }
757

758
  if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) {
96!
759
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
760
    STMT_ERR_RET(terrno);
×
761
  }
762

763
  pStmt->bindThreadInUse = true;
96✔
764

765
  (void)taosThreadAttrDestroy(&thAttr);
96✔
766
  return TSDB_CODE_SUCCESS;
96✔
767
}
768

769
static int32_t stmtInitQueue(STscStmt2* pStmt) {
96✔
770
  (void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
96✔
771
  (void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
96✔
772
  STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
192!
773
  pStmt->queue.tail = pStmt->queue.head;
96✔
774

775
  return TSDB_CODE_SUCCESS;
96✔
776
}
777

778
static int32_t stmtIniAsyncBind(STscStmt2* pStmt) {
145✔
779
  (void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL);
145✔
780
  (void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL);
145✔
781
  pStmt->asyncBindParam.asyncBindNum = 0;
145✔
782

783
  return TSDB_CODE_SUCCESS;
145✔
784
}
785

786
static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
96✔
787
  pTblBuf->buffUnit = sizeof(SStmtQNode);
96✔
788
  pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
96✔
789
  pTblBuf->pBufList = taosArrayInit(100, POINTER_BYTES);
96✔
790
  if (NULL == pTblBuf->pBufList) {
96!
791
    return terrno;
×
792
  }
793
  void* buff = taosMemoryMalloc(pTblBuf->buffSize);
96!
794
  if (NULL == buff) {
96!
795
    return terrno;
×
796
  }
797

798
  if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
192!
799
    return terrno;
×
800
  }
801

802
  pTblBuf->pCurBuff = buff;
96✔
803
  pTblBuf->buffIdx = 1;
96✔
804
  pTblBuf->buffOffset = 0;
96✔
805

806
  return TSDB_CODE_SUCCESS;
96✔
807
}
808

809
TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
144✔
810
  STscObj*   pObj = (STscObj*)taos;
144✔
811
  STscStmt2* pStmt = NULL;
144✔
812
  int32_t    code = 0;
144✔
813

814
  pStmt = taosMemoryCalloc(1, sizeof(STscStmt2));
144!
815
  if (NULL == pStmt) {
144!
816
    return NULL;
×
817
  }
818

819
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
144✔
820
  if (NULL == pStmt->sql.pTableCache) {
145!
821
    taosMemoryFree(pStmt);
×
822
    return NULL;
×
823
  }
824

825
  pStmt->taos = pObj;
145✔
826
  if (taos->db[0] != '\0') {
145✔
827
    pStmt->db = taosStrdup(taos->db);
51!
828
  }
829
  pStmt->bInfo.needParse = true;
145✔
830
  pStmt->sql.status = STMT_INIT;
145✔
831
  pStmt->errCode = TSDB_CODE_SUCCESS;
145✔
832

833
  if (NULL != pOptions) {
145!
834
    (void)memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
145✔
835
    if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) {
145✔
836
      pStmt->stbInterlaceMode = true;
79✔
837
    }
838

839
    pStmt->reqid = pOptions->reqid;
145✔
840
  }
841

842
  if (pStmt->stbInterlaceMode) {
145✔
843
    pStmt->sql.siInfo.transport = taos->pAppInfo->pTransporter;
79✔
844
    pStmt->sql.siInfo.acctId = taos->acctId;
79✔
845
    pStmt->sql.siInfo.dbname = taos->db;
79✔
846
    pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
79✔
847
    pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
79✔
848
    if (NULL == pStmt->sql.siInfo.pTableHash) {
79!
849
      (void)stmtClose2(pStmt);
×
850
      return NULL;
×
851
    }
852
    pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
79✔
853
    if (NULL == pStmt->sql.siInfo.pTableCols) {
79!
854
      terrno = terrno;
×
855
      (void)stmtClose2(pStmt);
×
856
      return NULL;
×
857
    }
858

859
    code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
79✔
860
    if (TSDB_CODE_SUCCESS == code) {
79!
861
      code = stmtInitQueue(pStmt);
79✔
862
    }
863
    if (TSDB_CODE_SUCCESS == code) {
79!
864
      code = stmtStartBindThread(pStmt);
79✔
865
    }
866
    if (TSDB_CODE_SUCCESS != code) {
79!
867
      terrno = code;
×
868
      (void)stmtClose2(pStmt);
×
869
      return NULL;
×
870
    }
871
  }
872

873
  pStmt->sql.siInfo.tableColsReady = true;
145✔
874
  if (pStmt->options.asyncExecFn) {
145✔
875
    if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) {
5!
876
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
877
      (void)stmtClose2(pStmt);
×
878
      return NULL;
×
879
    }
880
  }
881
  code = stmtIniAsyncBind(pStmt);
145✔
882
  if (TSDB_CODE_SUCCESS != code) {
145!
883
    terrno = code;
×
884
    (void)stmtClose2(pStmt);
×
885
    return NULL;
×
886
  }
887

888
  pStmt->execSemWaited = false;
145✔
889

890
  STMT_LOG_SEQ(STMT_INIT);
145!
891

892
  tscDebug("stmt:%p initialized", pStmt);
145!
893

894
  return pStmt;
145✔
895
}
896

897
static int stmtSetDbName2(TAOS_STMT2* stmt, const char* dbName) {
64✔
898
  STscStmt2* pStmt = (STscStmt2*)stmt;
64✔
899

900
  STMT2_DLOG("dbname is specified in sql:%s", dbName);
64!
901

902
  pStmt->db = taosStrdup(dbName);
64!
903
  (void)strdequote(pStmt->db);
64✔
904
  STMT_ERR_RET(stmtCreateRequest(pStmt));
64!
905

906
  // The SQL statement specifies a database name, overriding the previously specified database
907
  taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
64!
908
  pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
64!
909
  if (pStmt->exec.pRequest->pDb == NULL) {
64!
910
    return terrno;
×
911
  }
912
  if (pStmt->sql.stbInterlaceMode) {
64✔
913
    pStmt->sql.siInfo.dbname = pStmt->db;
22✔
914
  }
915
  return TSDB_CODE_SUCCESS;
64✔
916
}
917
static int32_t stmtResetStbInterlaceCache(STscStmt2* pStmt) {
17✔
918
  int32_t code = TSDB_CODE_SUCCESS;
17✔
919

920
  if (pStmt->bindThreadInUse) {
17!
921
    (void)taosThreadMutexLock(&pStmt->queue.mutex);
17✔
922
    pStmt->queue.stopQueue = true;
17✔
923
    (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
17✔
924
    (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
17✔
925
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
17✔
926

927
    (void)taosThreadJoin(pStmt->bindThread, NULL);
17✔
928
    pStmt->bindThreadInUse = false;
17✔
929
    pStmt->queue.head = NULL;
17✔
930
    pStmt->queue.tail = NULL;
17✔
931
    pStmt->queue.qRemainNum = 0;
17✔
932

933
    (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
17✔
934
    (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
17✔
935
  }
936

937
  pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
17✔
938
  if (NULL == pStmt->sql.siInfo.pTableHash) {
17!
939
    return terrno;
×
940
  }
941

942
  pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
17✔
943
  if (NULL == pStmt->sql.siInfo.pTableCols) {
17!
944
    return terrno;
×
945
  }
946

947
  code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
17✔
948

949
  if (TSDB_CODE_SUCCESS == code) {
17!
950
    code = stmtInitQueue(pStmt);
17✔
951
    pStmt->queue.stopQueue = false;
17✔
952
  }
953
  if (TSDB_CODE_SUCCESS == code) {
17!
954
    code = stmtStartBindThread(pStmt);
17✔
955
  }
956
  if (TSDB_CODE_SUCCESS != code) {
17!
957
    return code;
×
958
  }
959

960
  return TSDB_CODE_SUCCESS;
17✔
961
}
962

963
static int32_t stmtResetStmtForPrepare(STscStmt2* pStmt) {
21✔
964
  char*             db = pStmt->db;
21✔
965
  bool              stbInterlaceMode = pStmt->stbInterlaceMode;
21✔
966
  TAOS_STMT2_OPTION options = pStmt->options;
21✔
967
  uint32_t          reqid = pStmt->reqid;
21✔
968

969
  taosMemoryFree(pStmt->sql.pBindInfo);
21!
970
  pStmt->sql.pBindInfo = NULL;
21✔
971

972
  taosMemoryFree(pStmt->sql.queryRes.fields);
21!
973
  pStmt->sql.queryRes.fields = NULL;
21✔
974

975
  taosMemoryFree(pStmt->sql.queryRes.userFields);
21!
976
  pStmt->sql.queryRes.userFields = NULL;
21✔
977

978
  pStmt->sql.type = 0;
21✔
979
  pStmt->sql.runTimes = 0;
21✔
980
  taosMemoryFree(pStmt->sql.sqlStr);
21!
981
  pStmt->sql.sqlStr = NULL;
21✔
982

983
  qDestroyQuery(pStmt->sql.pQuery);
21✔
984
  pStmt->sql.pQuery = NULL;
21✔
985

986
  taosArrayDestroy(pStmt->sql.nodeList);
21✔
987
  pStmt->sql.nodeList = NULL;
21✔
988

989
  taosHashCleanup(pStmt->sql.pVgHash);
21✔
990
  pStmt->sql.pVgHash = NULL;
21✔
991

992
  if (pStmt->sql.fixValueTags) {
21✔
993
    tdDestroySVCreateTbReq(pStmt->sql.fixValueTbReq);
12!
994
    pStmt->sql.fixValueTbReq = NULL;
12✔
995
  }
996
  pStmt->sql.fixValueTags = false;
21✔
997

998
  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
21✔
999
  while (pIter) {
24✔
1000
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
3✔
1001

1002
    qDestroyStmtDataBlock(pCache->pDataCtx);
3✔
1003
    qDestroyBoundColInfo(pCache->boundTags);
3✔
1004
    taosMemoryFreeClear(pCache->boundTags);
3!
1005

1006
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
3✔
1007
  }
1008
  taosHashCleanup(pStmt->sql.pTableCache);
21✔
1009
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
21✔
1010
  if (NULL == pStmt->sql.pTableCache) {
21!
1011
    return terrno;
×
1012
  }
1013

1014
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
21!
1015

1016
  if (pStmt->exec.pRequest) {
21✔
1017
    taos_free_result(pStmt->exec.pRequest);
15✔
1018
    pStmt->exec.pRequest = NULL;
15✔
1019
  }
1020

1021
  if (pStmt->sql.siInfo.pTableCols) {
21✔
1022
    taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
17✔
1023
    pStmt->sql.siInfo.pTableCols = NULL;
17✔
1024
  }
1025

1026
  if (pStmt->sql.siInfo.tbBuf.pBufList) {
21✔
1027
    taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
17✔
1028
    pStmt->sql.siInfo.tbBuf.pBufList = NULL;
17✔
1029
  }
1030

1031
  if (pStmt->sql.siInfo.pTableHash) {
21✔
1032
    tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
17✔
1033
    pStmt->sql.siInfo.pTableHash = NULL;
17✔
1034
  }
1035

1036
  if (pStmt->sql.siInfo.pVgroupHash) {
21!
1037
    taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
×
1038
    pStmt->sql.siInfo.pVgroupHash = NULL;
×
1039
  }
1040

1041
  if (pStmt->sql.siInfo.pVgroupList) {
21!
1042
    taosArrayDestroy(pStmt->sql.siInfo.pVgroupList);
×
1043
    pStmt->sql.siInfo.pVgroupList = NULL;
×
1044
  }
1045

1046
  if (pStmt->sql.siInfo.pDataCtx) {
21✔
1047
    qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
15✔
1048
    pStmt->sql.siInfo.pDataCtx = NULL;
15✔
1049
  }
1050

1051
  if (pStmt->sql.siInfo.pTSchema) {
21✔
1052
    taosMemoryFree(pStmt->sql.siInfo.pTSchema);
15!
1053
    pStmt->sql.siInfo.pTSchema = NULL;
15✔
1054
  }
1055

1056
  if (pStmt->sql.siInfo.pRequest) {
21✔
1057
    taos_free_result(pStmt->sql.siInfo.pRequest);
15✔
1058
    pStmt->sql.siInfo.pRequest = NULL;
15✔
1059
  }
1060

1061
  if (stbInterlaceMode) {
21✔
1062
    STMT_ERR_RET(stmtResetStbInterlaceCache(pStmt));
17!
1063
  }
1064

1065
  pStmt->db = db;
21✔
1066
  pStmt->stbInterlaceMode = stbInterlaceMode;
21✔
1067
  pStmt->options = options;
21✔
1068
  pStmt->reqid = reqid;
21✔
1069

1070
  pStmt->sql.status = STMT_INIT;
21✔
1071

1072
  return TSDB_CODE_SUCCESS;
21✔
1073
}
1074

1075
int stmtPrepare2(TAOS_STMT2* stmt, const char* sql, unsigned long length) {
162✔
1076
  STscStmt2* pStmt = (STscStmt2*)stmt;
162✔
1077
  int32_t    code = 0;
162✔
1078

1079
  if (stmt == NULL || sql == NULL) {
162!
1080
    STMT2_ELOG_E("stmt or sql is NULL");
×
1081
    return TSDB_CODE_INVALID_PARA;
×
1082
  }
1083

1084
  if (pStmt->sql.status >= STMT_PREPARE) {
162✔
1085
    STMT2_DLOG("stmt status is %d, need to reset stmt2 cache before prepare", pStmt->sql.status);
21!
1086
    STMT_ERR_RET(stmtResetStmtForPrepare(pStmt));
21!
1087
  }
1088

1089
  STMT2_DLOG("start to prepare with sql:%s", sql);
162!
1090

1091
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
162✔
1092
    STMT2_ELOG("stmt errCode is not success, ErrCode: 0x%x, ErrMessage: %s\n. ", pStmt->errCode,
1!
1093
               strerror(pStmt->errCode));
1094
    return pStmt->errCode;
1✔
1095
  }
1096

1097
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));
161!
1098

1099
  if (length <= 0) {
162✔
1100
    length = strlen(sql);
103✔
1101
  }
1102

1103
  pStmt->sql.sqlStr = taosStrndup(sql, length);
162!
1104
  if (!pStmt->sql.sqlStr) {
163!
1105
    return terrno;
×
1106
  }
1107
  pStmt->sql.sqlLen = length;
163✔
1108
  pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode;
163✔
1109

1110
  char* dbName = NULL;
163✔
1111
  if (qParseDbName(sql, length, &dbName)) {
163✔
1112
    STMT_ERR_RET(stmtSetDbName2(stmt, dbName));
64!
1113
    taosMemoryFreeClear(dbName);
64!
1114
  }
1115

1116
  return TSDB_CODE_SUCCESS;
163✔
1117
}
1118

1119
static int32_t stmtInitStbInterlaceTableInfo(STscStmt2* pStmt) {
86✔
1120
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
86✔
1121
  if (!pSrc) {
86!
1122
    return terrno;
×
1123
  }
1124
  STableDataCxt* pDst = NULL;
86✔
1125

1126
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
86!
1127
  pStmt->sql.siInfo.pDataCtx = pDst;
83✔
1128

1129
  SArray* pTblCols = NULL;
83✔
1130
  for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) {
74,822✔
1131
    pTblCols = taosArrayInit(20, POINTER_BYTES);
74,722✔
1132
    if (NULL == pTblCols) {
77,320!
1133
      return terrno;
×
1134
    }
1135

1136
    if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
152,059!
1137
      return terrno;
×
1138
    }
1139
  }
1140

1141
  pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
100✔
1142

1143
  return TSDB_CODE_SUCCESS;
100✔
1144
}
1145

1146
int stmtIsInsert2(TAOS_STMT2* stmt, int* insert) {
10,791✔
1147
  STscStmt2* pStmt = (STscStmt2*)stmt;
10,791✔
1148

1149
  STMT_DLOG_E("start is insert");
10,791!
1150

1151
  if (pStmt->sql.type) {
10,792✔
1152
    *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
10,630✔
1153
  } else {
1154
    *insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
162✔
1155
  }
1156

1157
  return TSDB_CODE_SUCCESS;
10,792✔
1158
}
1159

1160
int stmtSetTbName2(TAOS_STMT2* stmt, const char* tbName) {
5,365✔
1161
  STscStmt2* pStmt = (STscStmt2*)stmt;
5,365✔
1162

1163
  int64_t startUs = taosGetTimestampUs();
5,362✔
1164

1165
  STMT_DLOG("start to set tbName:%s", tbName);
5,362!
1166

1167
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
5,364!
1168
    return pStmt->errCode;
×
1169
  }
1170

1171
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
5,364!
1172

1173
  int32_t insert = 0;
5,363✔
1174
  STMT_ERR_RET(stmtIsInsert2(stmt, &insert));
5,363!
1175
  if (0 == insert) {
5,364!
1176
    tscError("set tb name not available for none insert statement");
×
1177
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1178
  }
1179

1180
  if (!pStmt->sql.stbInterlaceMode || NULL == pStmt->sql.siInfo.pDataCtx) {
5,364✔
1181
    STMT_ERR_RET(stmtCreateRequest(pStmt));
195!
1182

1183
    STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
195!
1184
                              pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1185
    STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
195✔
1186

1187
    STMT_ERR_RET(stmtGetFromCache(pStmt));
194!
1188

1189
    if (pStmt->bInfo.needParse) {
194✔
1190
      tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
105✔
1191
      pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
105✔
1192

1193
      STMT_ERR_RET(stmtParseSql(pStmt));
105!
1194
    }
1195
  } else {
1196
    tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
5,169✔
1197
    pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
5,169✔
1198
    pStmt->exec.pRequest->requestId++;
5,169✔
1199
    pStmt->bInfo.needParse = false;
5,169✔
1200
  }
1201

1202
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
5,363✔
1203
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
86!
1204
  }
1205

1206
  int64_t startUs2 = taosGetTimestampUs();
5,364✔
1207
  pStmt->stat.setTbNameUs += startUs2 - startUs;
5,364✔
1208

1209
  return TSDB_CODE_SUCCESS;
5,364✔
1210
}
1211

1212
int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags, SVCreateTbReq** pCreateTbReq) {
120✔
1213
  STscStmt2* pStmt = (STscStmt2*)stmt;
120✔
1214

1215
  STMT_DLOG_E("start to set tbTags");
120!
1216

1217
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
120!
1218
    return pStmt->errCode;
×
1219
  }
1220

1221
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
120!
1222

1223
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
120!
1224
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1225
    pStmt->bInfo.needParse = false;
×
1226
  }
1227
  STMT_ERR_RET(stmtCreateRequest(pStmt));
120!
1228

1229
  if (pStmt->bInfo.needParse) {
120!
1230
    STMT_ERR_RET(stmtParseSql(pStmt));
×
1231
  }
1232
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
120!
1233
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1234
  }
1235

1236
  SBoundColInfo* tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
120✔
1237
  // if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) {
1238
  //   tscWarn("no tags or cols bound in sql, will not bound tags");
1239
  //   return TSDB_CODE_SUCCESS;
1240
  // }
1241
  if (pStmt->sql.autoCreateTbl && pStmt->sql.stbInterlaceMode) {
120!
1242
    STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, pStmt->bInfo.tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
38!
1243
                              pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1244
    STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
38!
1245
  }
1246

1247
  STableDataCxt** pDataBlock = NULL;
120✔
1248
  if (pStmt->exec.pCurrBlock) {
120✔
1249
    pDataBlock = &pStmt->exec.pCurrBlock;
95✔
1250
  } else {
1251
    pDataBlock =
1252
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
25✔
1253
    if (NULL == pDataBlock) {
25!
1254
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1255
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1256
    }
1257
    // pStmt->exec.pCurrBlock = *pDataBlock;
1258
    // if (pStmt->sql.stbInterlaceMode) {
1259
    //   taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
1260
    //   (*pDataBlock)->pData->aCol = NULL;
1261
    // }
1262
  }
1263
  if (pStmt->bInfo.inExecCache && !pStmt->sql.autoCreateTbl) {
120!
1264
    return TSDB_CODE_SUCCESS;
×
1265
  }
1266

1267
  tscDebug("start to bind stmt tag values");
120!
1268

1269
  void* boundTags = NULL;
120✔
1270
  if (pStmt->sql.stbInterlaceMode) {
120✔
1271
    boundTags = pStmt->sql.siInfo.boundTags;
38✔
1272
    *pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
38!
1273
    if (NULL == pCreateTbReq) {
38!
1274
      return terrno;
×
1275
    }
1276
    int32_t vgId = -1;
38✔
1277
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
38!
1278
    (*pCreateTbReq)->uid = vgId;
38✔
1279
  } else {
1280
    boundTags = pStmt->bInfo.boundTags;
82✔
1281
  }
1282

1283
  STMT_ERR_RET(qBindStmtTagsValue2(*pDataBlock, boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
120✔
1284
                                   pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
1285
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt, *pCreateTbReq));
1286

1287
  return TSDB_CODE_SUCCESS;
119✔
1288
}
1289

1290
int stmtCheckTags2(TAOS_STMT2* stmt, SVCreateTbReq** pCreateTbReq) {
26✔
1291
  STscStmt2* pStmt = (STscStmt2*)stmt;
26✔
1292

1293
  STMT_DLOG_E("start to set tbTags");
26!
1294

1295
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
26!
1296
    return pStmt->errCode;
×
1297
  }
1298

1299
  if (!pStmt->sql.stbInterlaceMode) {
26!
1300
    return TSDB_CODE_SUCCESS;
×
1301
  }
1302

1303
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
26!
1304

1305
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
26!
1306
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1307
    pStmt->bInfo.needParse = false;
×
1308
  }
1309
  STMT_ERR_RET(stmtCreateRequest(pStmt));
26!
1310

1311
  if (pStmt->bInfo.needParse) {
26!
1312
    STMT_ERR_RET(stmtParseSql(pStmt));
×
1313
    if (!pStmt->sql.autoCreateTbl) {
×
1314
      return TSDB_CODE_SUCCESS;
×
1315
    }
1316
  }
1317

1318
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
26!
1319
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1320
  }
1321

1322
  STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, pStmt->bInfo.tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
26!
1323
                            pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1324
  STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
26!
1325

1326
  STableDataCxt** pDataBlock = NULL;
26✔
1327
  if (pStmt->exec.pCurrBlock) {
26✔
1328
    pDataBlock = &pStmt->exec.pCurrBlock;
9✔
1329
  } else {
1330
    pDataBlock =
1331
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
17✔
1332
    if (NULL == pDataBlock) {
17!
1333
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1334
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1335
    }
1336
  }
1337

1338
  if (!((*pDataBlock)->pData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE)) {
26!
1339
    return TSDB_CODE_SUCCESS;
×
1340
  }
1341

1342
  if (pStmt->sql.fixValueTags) {
26✔
1343
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
9!
1344
    if ((*pCreateTbReq)->name) {
9!
1345
      taosMemoryFree((*pCreateTbReq)->name);
9!
1346
    }
1347
    (*pCreateTbReq)->name = taosStrdup(pStmt->bInfo.tbName);
9!
1348
    int32_t vgId = -1;
9✔
1349
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
9!
1350
    (*pCreateTbReq)->uid = vgId;
9✔
1351
    return TSDB_CODE_SUCCESS;
9✔
1352
  }
1353

1354
  if ((*pDataBlock)->pData->pCreateTbReq) {
17!
1355
    pStmt->sql.fixValueTags = true;
17✔
1356
    STMT_ERR_RET(cloneSVreateTbReq((*pDataBlock)->pData->pCreateTbReq, &pStmt->sql.fixValueTbReq));
17!
1357
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
17!
1358
    (*pCreateTbReq)->uid = (*pDataBlock)->pMeta->vgId;
17✔
1359
  }
1360

1361
  return TSDB_CODE_SUCCESS;
17✔
1362
}
1363

1364
static int stmtFetchColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
×
1365
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
1366
    return pStmt->errCode;
×
1367
  }
1368

1369
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
1370
    tscError("invalid operation to get query column fileds");
×
1371
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1372
  }
1373

1374
  STableDataCxt** pDataBlock = NULL;
×
1375

1376
  if (pStmt->sql.stbInterlaceMode) {
×
1377
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
×
1378
  } else {
1379
    pDataBlock =
1380
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
1381
    if (NULL == pDataBlock) {
×
1382
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1383
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1384
    }
1385
  }
1386

1387
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
×
1388

1389
  return TSDB_CODE_SUCCESS;
×
1390
}
1391

1392
static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_ALL** fields) {
42✔
1393
  int32_t code = 0;
42✔
1394
  int32_t preCode = pStmt->errCode;
42✔
1395

1396
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
42!
1397
    return pStmt->errCode;
×
1398
  }
1399

1400
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
42!
1401
    tscError("invalid operation to get query column fileds");
×
1402
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1403
  }
1404

1405
  STableDataCxt** pDataBlock = NULL;
42✔
1406
  bool            cleanStb = false;
42✔
1407

1408
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
42✔
1409
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
7✔
1410
  } else {
1411
    cleanStb = true;
35✔
1412
    pDataBlock =
1413
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
35✔
1414
  }
1415

1416
  if (NULL == pDataBlock || NULL == *pDataBlock) {
42!
1417
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1418
    STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
1419
  }
1420

1421
  STMT_ERRI_JRET(
42!
1422
      qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbNameFlag, fieldNum, fields));
1423

1424
  if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE && cleanStb) {
42!
1425
    qDestroyStmtDataBlock(*pDataBlock);
28✔
1426
    *pDataBlock = NULL;
28✔
1427
    if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
28!
1428
      tscError("get fileds %s remove exec blockHash fail", pStmt->bInfo.tbFName);
×
1429
      STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
1430
    }
1431
    pStmt->sql.autoCreateTbl = false;
28✔
1432
    pStmt->bInfo.tagsCached = false;
28✔
1433
    pStmt->bInfo.sname = (SName){0};
28✔
1434
    STMT_ERR_RET(stmtCleanBindInfo(pStmt));
28!
1435
  }
1436

1437
_return:
14✔
1438

1439
  pStmt->errCode = preCode;
42✔
1440

1441
  return code;
42✔
1442
}
1443
/*
1444
SArray* stmtGetFreeCol(STscStmt2* pStmt, int32_t* idx) {
1445
  while (true) {
1446
    if (pStmt->exec.smInfo.pColIdx >= STMT_COL_BUF_SIZE) {
1447
      pStmt->exec.smInfo.pColIdx = 0;
1448
    }
1449

1450
    if ((pStmt->exec.smInfo.pColIdx + 1) == atomic_load_32(&pStmt->exec.smInfo.pColFreeIdx)) {
1451
      taosUsleep(1);
1452
      continue;
1453
    }
1454

1455
    *idx = pStmt->exec.smInfo.pColIdx;
1456
    return pStmt->exec.smInfo.pCols[pStmt->exec.smInfo.pColIdx++];
1457
  }
1458
}
1459
*/
1460
static int32_t stmtAppendTablePostHandle(STscStmt2* pStmt, SStmtQNode* param) {
5,257✔
1461
  if (NULL == pStmt->sql.siInfo.pVgroupHash) {
5,257✔
1462
    pStmt->sql.siInfo.pVgroupHash =
4,218✔
1463
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
4,221✔
1464
  }
1465
  if (NULL == pStmt->sql.siInfo.pVgroupList) {
5,254✔
1466
    pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES);
4,220✔
1467
  }
1468

1469
  if (NULL == pStmt->sql.siInfo.pRequest) {
5,256✔
1470
    STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false,
84!
1471
                              (SRequestObj**)&pStmt->sql.siInfo.pRequest, pStmt->reqid));
1472

1473
    if (pStmt->reqid != 0) {
84!
1474
      pStmt->reqid++;
×
1475
    }
1476
    pStmt->exec.pRequest->syncQuery = true;
84✔
1477

1478
    pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId;
84✔
1479
    pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self;
84✔
1480
  }
1481

1482
  if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] &&
5,256✔
1483
      0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) {
91✔
1484
    pStmt->sql.siInfo.tbFromHash = true;
33✔
1485
  }
1486

1487
  if (0 == pStmt->sql.siInfo.firstName[0]) {
5,256✔
1488
    tstrncpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
70✔
1489
  }
1490

1491
  param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash;
5,256✔
1492
  param->next = NULL;
5,256✔
1493

1494
  (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
5,256✔
1495

1496
  stmtEnqueue(pStmt, param);
5,265✔
1497

1498
  return TSDB_CODE_SUCCESS;
5,260✔
1499
}
1500

1501
static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt2* pStmt, SArray** pTableCols) {
1502
  while (true) {
1503
    if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) {
5,252!
1504
      *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++);
5,253✔
1505
      break;
5,255✔
1506
    } else {
1507
      SArray* pTblCols = NULL;
×
1508
      for (int32_t i = 0; i < 100; i++) {
×
1509
        pTblCols = taosArrayInit(20, POINTER_BYTES);
×
1510
        if (NULL == pTblCols) {
×
1511
          return terrno;
×
1512
        }
1513

1514
        if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
×
1515
          return terrno;
×
1516
        }
1517
      }
1518
    }
1519
  }
1520

1521
  return TSDB_CODE_SUCCESS;
5,255✔
1522
}
1523

1524
static int32_t stmtCacheBlock(STscStmt2* pStmt) {
116✔
1525
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
116✔
1526
    return TSDB_CODE_SUCCESS;
9✔
1527
  }
1528

1529
  uint64_t uid = pStmt->bInfo.tbUid;
107✔
1530
  uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
107!
1531

1532
  if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
107✔
1533
    return TSDB_CODE_SUCCESS;
89✔
1534
  }
1535

1536
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
18✔
1537
  if (!pSrc) {
18!
1538
    return terrno;
×
1539
  }
1540
  STableDataCxt* pDst = NULL;
18✔
1541

1542
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
18!
1543

1544
  SStmtTableCache cache = {
18✔
1545
      .pDataCtx = pDst,
1546
      .boundTags = pStmt->bInfo.boundTags,
18✔
1547
  };
1548

1549
  if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
18!
1550
    return terrno;
×
1551
  }
1552

1553
  if (pStmt->sql.autoCreateTbl) {
18✔
1554
    pStmt->bInfo.tagsCached = true;
15✔
1555
  } else {
1556
    pStmt->bInfo.boundTags = NULL;
3✔
1557
  }
1558

1559
  return TSDB_CODE_SUCCESS;
18✔
1560
}
1561

1562
static int stmtAddBatch2(TAOS_STMT2* stmt) {
4,338✔
1563
  STscStmt2* pStmt = (STscStmt2*)stmt;
4,338✔
1564

1565
  int64_t startUs = taosGetTimestampUs();
4,339✔
1566

1567
  STMT_DLOG_E("start to add batch");
4,339!
1568

1569
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
4,337!
1570
    return pStmt->errCode;
×
1571
  }
1572

1573
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
4,337!
1574

1575
  if (pStmt->sql.stbInterlaceMode) {
4,336✔
1576
    int64_t startUs2 = taosGetTimestampUs();
4,220✔
1577
    pStmt->stat.addBatchUs += startUs2 - startUs;
4,220✔
1578

1579
    pStmt->sql.siInfo.tableColsReady = false;
4,220✔
1580

1581
    SStmtQNode* param = NULL;
4,220✔
1582
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
8,437!
1583
    param->restoreTbCols = true;
4,217✔
1584
    param->next = NULL;
4,217✔
1585

1586
    if (pStmt->sql.autoCreateTbl) {
4,217✔
1587
      pStmt->bInfo.tagsCached = true;
36✔
1588
    }
1589

1590
    stmtEnqueue(pStmt, param);
4,217✔
1591

1592
    return TSDB_CODE_SUCCESS;
4,226✔
1593
  }
1594

1595
  STMT_ERR_RET(stmtCacheBlock(pStmt));
116!
1596

1597
  return TSDB_CODE_SUCCESS;
116✔
1598
}
1599
/*
1600
static int32_t stmtBackupQueryFields(STscStmt2* pStmt) {
1601
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
1602
  pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
1603
  pRes->precision = pStmt->exec.pRequest->body.resInfo.precision;
1604

1605
  int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
1606
  pRes->fields = taosMemoryMalloc(size);
1607
  pRes->userFields = taosMemoryMalloc(size);
1608
  if (NULL == pRes->fields || NULL == pRes->userFields) {
1609
    STMT_ERR_RET(terrno);
1610
  }
1611
  (void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
1612
  (void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
1613

1614
  return TSDB_CODE_SUCCESS;
1615
}
1616

1617
static int32_t stmtRestoreQueryFields(STscStmt2* pStmt) {
1618
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
1619
  int32_t            size = pRes->numOfCols * sizeof(TAOS_FIELD);
1620

1621
  pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols;
1622
  pStmt->exec.pRequest->body.resInfo.precision = pRes->precision;
1623

1624
  if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
1625
    pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size);
1626
    if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
1627
      STMT_ERR_RET(terrno);
1628
    }
1629
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
1630
  }
1631

1632
  if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
1633
    pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size);
1634
    if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
1635
      STMT_ERR_RET(terrno);
1636
    }
1637
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
1638
  }
1639

1640
  return TSDB_CODE_SUCCESS;
1641
}
1642
*/
1643
int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx, SVCreateTbReq* pCreateTbReq) {
5,375✔
1644
  STscStmt2* pStmt = (STscStmt2*)stmt;
5,375✔
1645
  int32_t    code = 0;
5,375✔
1646

1647
  int64_t startUs = taosGetTimestampUs();
5,375✔
1648

1649
  STMT_DLOG("start to bind stmt data, colIdx:%d", colIdx);
5,375!
1650

1651
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
5,373!
1652
    return pStmt->errCode;
×
1653
  }
1654

1655
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
5,373!
1656

1657
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
5,375!
1658
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1659
    pStmt->bInfo.needParse = false;
×
1660
  }
1661

1662
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
5,375✔
1663
    taos_free_result(pStmt->exec.pRequest);
1✔
1664
    pStmt->exec.pRequest = NULL;
1✔
1665
  }
1666

1667
  STMT_ERR_RET(stmtCreateRequest(pStmt));
5,375!
1668
  if (pStmt->bInfo.needParse) {
5,375✔
1669
    code = stmtParseSql(pStmt);
6✔
1670
    if (code != TSDB_CODE_SUCCESS) {
6!
1671
      goto cleanup_root;
×
1672
    }
1673
  }
1674

1675
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
5,375✔
1676
    code = qStmtBindParams2(pStmt->sql.pQuery, bind, colIdx, pStmt->taos->optionInfo.charsetCxt);
5✔
1677
    if (code != TSDB_CODE_SUCCESS) {
5!
1678
      goto cleanup_root;
×
1679
    }
1680
    SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
5✔
1681
                         .acctId = pStmt->taos->acctId,
5✔
1682
                         .db = pStmt->exec.pRequest->pDb,
5✔
1683
                         .topicQuery = false,
1684
                         .pSql = pStmt->sql.sqlStr,
5✔
1685
                         .sqlLen = pStmt->sql.sqlLen,
5✔
1686
                         .pMsg = pStmt->exec.pRequest->msgBuf,
5✔
1687
                         .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1688
                         .pTransporter = pStmt->taos->pAppInfo->pTransporter,
5✔
1689
                         .pStmtCb = NULL,
1690
                         .pUser = pStmt->taos->user};
5✔
1691
    ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
5✔
1692
    code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog);
5✔
1693
    if (code != TSDB_CODE_SUCCESS) {
5!
1694
      goto cleanup_root;
×
1695
    }
1696
    code = qStmtParseQuerySql(&ctx, pStmt->sql.pQuery);
5✔
1697
    if (code != TSDB_CODE_SUCCESS) {
5!
1698
      goto cleanup_root;
×
1699
    }
1700

1701
    if (pStmt->sql.pQuery->haveResultSet) {
5!
1702
      STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
10!
1703
                                    pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true));
1704
      taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
5!
1705
      taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
5!
1706
      setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
5✔
1707
    }
1708

1709
    TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
5✔
1710
    TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
5✔
1711
    TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
5✔
1712

1713
    // if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
1714
    //   STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
1715
    // }
1716

1717
    // STMT_ERR_RET(stmtBackupQueryFields(pStmt));
1718

1719
    return TSDB_CODE_SUCCESS;
5✔
1720

1721
  cleanup_root:
×
1722
    if (pStmt->sql.pQuery->pRoot) {
×
1723
      nodesDestroyNode(pStmt->sql.pQuery->pRoot);
×
1724
      pStmt->sql.pQuery->pRoot = NULL;
×
1725
    }
1726
    STMT_ERR_RET(code);
×
1727
  }
1728

1729
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
5,370!
1730
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1731
  }
1732

1733
  STableDataCxt** pDataBlock = NULL;
5,368✔
1734

1735
  if (pStmt->exec.pCurrBlock) {
5,368✔
1736
    pDataBlock = &pStmt->exec.pCurrBlock;
5,261✔
1737
  } else {
1738
    pDataBlock =
1739
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
107✔
1740
    if (NULL == pDataBlock) {
107!
1741
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1742
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1743
    }
1744
    pStmt->exec.pCurrBlock = *pDataBlock;
107✔
1745
    if (pStmt->sql.stbInterlaceMode) {
107✔
1746
      taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
85✔
1747
      (*pDataBlock)->pData->aCol = NULL;
85✔
1748
    }
1749
    if (colIdx < -1) {
107✔
1750
      pStmt->sql.bindRowFormat = true;
1✔
1751
      taosArrayDestroy((*pDataBlock)->pData->aCol);
1✔
1752
      (*pDataBlock)->pData->aCol = taosArrayInit(20, POINTER_BYTES);
1✔
1753
    }
1754
  }
1755

1756
  int64_t startUs2 = taosGetTimestampUs();
5,366✔
1757
  pStmt->stat.bindDataUs1 += startUs2 - startUs;
5,366✔
1758

1759
  SStmtQNode* param = NULL;
5,366✔
1760
  if (pStmt->sql.stbInterlaceMode) {
5,366✔
1761
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
10,505!
1762
    STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, &param->tblData.aCol));
10,507!
1763
    taosArrayClear(param->tblData.aCol);
5,255✔
1764

1765
    // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES);
1766

1767
    param->restoreTbCols = false;
5,249✔
1768
    param->tblData.isOrdered = true;
5,249✔
1769
    param->tblData.isDuplicateTs = false;
5,249✔
1770
    tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
5,249✔
1771

1772
    param->pCreateTbReq = pCreateTbReq;
5,249✔
1773
  }
1774

1775
  int64_t startUs3 = taosGetTimestampUs();
5,365✔
1776
  pStmt->stat.bindDataUs2 += startUs3 - startUs2;
5,365✔
1777

1778
  SArray* pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol;
5,365✔
1779

1780
  if (colIdx < 0) {
5,365✔
1781
    if (pStmt->sql.stbInterlaceMode) {
5,362✔
1782
      // (*pDataBlock)->pData->flags = 0;
1783
      (*pDataBlock)->pData->flags &= ~SUBMIT_REQ_COLUMN_DATA_FORMAT;
5,252✔
1784
      code = qBindStmtStbColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
5,252✔
1785
                                    pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
5,252✔
1786
                                    pStmt->taos->optionInfo.charsetCxt);
5,252✔
1787
      param->tblData.isOrdered = (*pDataBlock)->ordered;
5,257✔
1788
      param->tblData.isDuplicateTs = (*pDataBlock)->duplicateTs;
5,257✔
1789
    } else {
1790
      if (colIdx == -1) {
111✔
1791
        if (pStmt->sql.bindRowFormat) {
109✔
1792
          tscError("can't mix bind row format and bind column format");
1!
1793
          STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
1!
1794
        }
1795
        code = qBindStmtColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
108✔
1796
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt);
108✔
1797
      } else {
1798
        code = qBindStmt2RowValue(*pDataBlock, (*pDataBlock)->pData->aRowP, bind, pStmt->exec.pRequest->msgBuf,
2✔
1799
                                  pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
2✔
1800
                                  pStmt->taos->optionInfo.charsetCxt);
2✔
1801
      }
1802
    }
1803

1804
    if (code) {
5,367✔
1805
      tscError("qBindStmtColsValue failed, error:%s", tstrerror(code));
1!
1806
      STMT_ERR_RET(code);
1!
1807
    }
1808
  } else {
1809
    if (pStmt->sql.stbInterlaceMode) {
6!
1810
      tscError("bind single column not allowed in stb insert mode");
×
1811
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1812
    }
1813

1814
    if (pStmt->sql.bindRowFormat) {
6!
1815
      tscError("can't mix bind row format and bind column format");
×
1816
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1817
    }
1818

1819
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
6!
1820
      tscError("bind column index not in sequence");
×
1821
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1822
    }
1823

1824
    pStmt->bInfo.sBindLastIdx = colIdx;
6✔
1825

1826
    if (0 == colIdx) {
6✔
1827
      pStmt->bInfo.sBindRowNum = bind->num;
3✔
1828
    }
1829

1830
    code = qBindStmtSingleColValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
6✔
1831
                                    pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum,
6✔
1832
                                    pStmt->taos->optionInfo.charsetCxt);
6✔
1833
    if (code) {
6!
1834
      tscError("qBindStmtSingleColValue failed, error:%s", tstrerror(code));
×
1835
      STMT_ERR_RET(code);
×
1836
    }
1837
  }
1838

1839
  int64_t startUs4 = taosGetTimestampUs();
5,372✔
1840
  pStmt->stat.bindDataUs3 += startUs4 - startUs3;
5,372✔
1841

1842
  if (pStmt->sql.stbInterlaceMode) {
5,372✔
1843
    STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param));
5,258!
1844
  } else {
1845
    STMT_ERR_RET(stmtAddBatch2(pStmt));
116!
1846
  }
1847

1848
  pStmt->stat.bindDataUs4 += taosGetTimestampUs() - startUs4;
5,375✔
1849

1850
  return TSDB_CODE_SUCCESS;
5,375✔
1851
}
1852
/*
1853
int stmtUpdateTableUid(STscStmt2* pStmt, SSubmitRsp* pRsp) {
1854
  tscDebug("stmt start to update tbUid, blockNum:%d", pRsp->nBlocks);
1855

1856
  int32_t code = 0;
1857
  int32_t finalCode = 0;
1858
  size_t  keyLen = 0;
1859
  void*   pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
1860
  while (pIter) {
1861
    STableDataCxt* pBlock = *(STableDataCxt**)pIter;
1862
    char*          key = taosHashGetKey(pIter, &keyLen);
1863

1864
    STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
1865
    if (pMeta->uid) {
1866
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1867
      continue;
1868
    }
1869

1870
    SSubmitBlkRsp* blkRsp = NULL;
1871
    int32_t        i = 0;
1872
    for (; i < pRsp->nBlocks; ++i) {
1873
      blkRsp = pRsp->pBlocks + i;
1874
      if (strlen(blkRsp->tblFName) != keyLen) {
1875
        continue;
1876
      }
1877

1878
      if (strncmp(blkRsp->tblFName, key, keyLen)) {
1879
        continue;
1880
      }
1881

1882
      break;
1883
    }
1884

1885
    if (i < pRsp->nBlocks) {
1886
      tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid,
1887
               blkRsp->uid);
1888

1889
      pMeta->uid = blkRsp->uid;
1890
      pStmt->bInfo.tbUid = blkRsp->uid;
1891
    } else {
1892
      tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
1893
      if (NULL == pStmt->pCatalog) {
1894
        code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
1895
        if (code) {
1896
          pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1897
          finalCode = code;
1898
          continue;
1899
        }
1900
      }
1901

1902
      code = stmtCreateRequest(pStmt);
1903
      if (code) {
1904
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1905
        finalCode = code;
1906
        continue;
1907
      }
1908

1909
      STableMeta*      pTableMeta = NULL;
1910
      SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
1911
                               .requestId = pStmt->exec.pRequest->requestId,
1912
                               .requestObjRefId = pStmt->exec.pRequest->self,
1913
                               .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
1914
      code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
1915

1916
      pStmt->stat.ctgGetTbMetaNum++;
1917

1918
      taos_free_result(pStmt->exec.pRequest);
1919
      pStmt->exec.pRequest = NULL;
1920

1921
      if (code || NULL == pTableMeta) {
1922
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1923
        finalCode = code;
1924
        taosMemoryFree(pTableMeta);
1925
        continue;
1926
      }
1927

1928
      pMeta->uid = pTableMeta->uid;
1929
      pStmt->bInfo.tbUid = pTableMeta->uid;
1930
      taosMemoryFree(pTableMeta);
1931
    }
1932

1933
    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1934
  }
1935

1936
  return finalCode;
1937
}
1938
*/
1939
/*
1940
int stmtStaticModeExec(TAOS_STMT* stmt) {
1941
  STscStmt2*   pStmt = (STscStmt2*)stmt;
1942
  int32_t     code = 0;
1943
  SSubmitRsp* pRsp = NULL;
1944
  if (pStmt->sql.staticMode) {
1945
    return TSDB_CODE_TSC_STMT_API_ERROR;
1946
  }
1947

1948
  STMT_DLOG_E("start to exec");
1949

1950
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
1951

1952
  STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList,
1953
pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum));
1954

1955
  launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
1956

1957
  if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
1958
    code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
1959
    if (code) {
1960
      pStmt->exec.pRequest->code = code;
1961
    } else {
1962
      tFreeSSubmitRsp(pRsp);
1963
      STMT_ERR_RET(stmtResetStmt(pStmt));
1964
      STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
1965
    }
1966
  }
1967

1968
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
1969

1970
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
1971
  pStmt->affectedRows += pStmt->exec.affectedRows;
1972

1973
_return:
1974

1975
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
1976

1977
  tFreeSSubmitRsp(pRsp);
1978

1979
  ++pStmt->sql.runTimes;
1980

1981
  STMT_RET(code);
1982
}
1983
*/
1984

1985
static int32_t createParseContext(const SRequestObj* pRequest, SParseContext** pCxt, SSqlCallbackWrapper* pWrapper) {
12✔
1986
  const STscObj* pTscObj = pRequest->pTscObj;
12✔
1987

1988
  *pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
12!
1989
  if (*pCxt == NULL) {
12!
1990
    return terrno;
×
1991
  }
1992

1993
  **pCxt = (SParseContext){.requestId = pRequest->requestId,
12✔
1994
                           .requestRid = pRequest->self,
12✔
1995
                           .acctId = pTscObj->acctId,
12✔
1996
                           .db = pRequest->pDb,
12✔
1997
                           .topicQuery = false,
1998
                           .pSql = pRequest->sqlstr,
12✔
1999
                           .sqlLen = pRequest->sqlLen,
12✔
2000
                           .pMsg = pRequest->msgBuf,
12✔
2001
                           .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
2002
                           .pTransporter = pTscObj->pAppInfo->pTransporter,
12✔
2003
                           .pStmtCb = NULL,
2004
                           .pUser = pTscObj->user,
12✔
2005
                           .pEffectiveUser = pRequest->effectiveUser,
12✔
2006
                           .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
12✔
2007
                           .enableSysInfo = pTscObj->sysInfo,
12✔
2008
                           .async = true,
2009
                           .svrVer = pTscObj->sVer,
12✔
2010
                           .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
12✔
2011
                           .allocatorId = pRequest->allocatorRefId,
12✔
2012
                           .parseSqlFp = clientParseSql,
2013
                           .parseSqlParam = pWrapper};
2014
  int8_t biMode = atomic_load_8(&((STscObj*)pTscObj)->biMode);
12✔
2015
  (*pCxt)->biMode = biMode;
12✔
2016
  return TSDB_CODE_SUCCESS;
12✔
2017
}
2018

2019
static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
12✔
2020
  STscStmt2*        pStmt = userdata;
12✔
2021
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
12✔
2022

2023
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
12✔
2024
  pStmt->affectedRows += pStmt->exec.affectedRows;
12✔
2025

2026
  fp(pStmt->options.userdata, res, code);
12✔
2027

2028
  while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady) && !pStmt->queue.stopQueue) {
12!
2029
    taosUsleep(1);
×
2030
  }
2031
  (void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
12✔
2032
  ++pStmt->sql.runTimes;
12✔
2033

2034
  if (tsem_post(&pStmt->asyncExecSem) != 0) {
12!
2035
    tscError("failed to post asyncExecSem");
×
2036
  }
2037
}
12✔
2038

2039
int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
4,272✔
2040
  STscStmt2* pStmt = (STscStmt2*)stmt;
4,272✔
2041
  int32_t    code = 0;
4,272✔
2042
  int64_t    startUs = taosGetTimestampUs();
4,276✔
2043

2044
  STMT_DLOG_E("start to exec");
4,276!
2045

2046
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
4,272!
2047
    return pStmt->errCode;
×
2048
  }
2049

2050
  TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
4,272!
2051
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
4,277!
2052
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
2053
  }
2054
  TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
4,273!
2055

2056
  if (pStmt->sql.stbInterlaceMode) {
4,276✔
2057
    STMT_ERR_RET(stmtAddBatch2(pStmt));
4,223!
2058
  }
2059

2060
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
4,279✔
2061

2062
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
4,276✔
2063
    if (pStmt->sql.stbInterlaceMode) {
4,271✔
2064
      int64_t startTs = taosGetTimestampUs();
4,224✔
2065
      while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
8,727✔
2066
        taosUsleep(1);
4,506✔
2067
      }
2068
      pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs;
4,220✔
2069

2070
      STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList));
4,220!
2071
      taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
4,225✔
2072
      pStmt->sql.siInfo.pVgroupHash = NULL;
4,225✔
2073
      pStmt->sql.siInfo.pVgroupList = NULL;
4,225✔
2074
    } else {
2075
      tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
47✔
2076
      taosMemoryFreeClear(pStmt->exec.pCurrTbData);
47!
2077

2078
      STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
47!
2079

2080
      STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
47!
2081
    }
2082
  }
2083

2084
  SRequestObj*      pRequest = pStmt->exec.pRequest;
4,277✔
2085
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
4,277✔
2086

2087
  if (!fp) {
4,277✔
2088
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
4,265✔
2089

2090
    if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
4,259!
2091
      code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
×
2092
      if (code) {
×
2093
        pStmt->exec.pRequest->code = code;
×
2094
      } else {
2095
        STMT_ERR_RET(stmtResetStmt(pStmt));
×
2096
        STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
×
2097
      }
2098
    }
2099

2100
    STMT_ERR_JRET(pStmt->exec.pRequest->code);
4,259!
2101

2102
    pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
4,259✔
2103
    if (affected_rows) {
4,263✔
2104
      *affected_rows = pStmt->exec.affectedRows;
4,258✔
2105
    }
2106
    pStmt->affectedRows += pStmt->exec.affectedRows;
4,263✔
2107

2108
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady) && !pStmt->queue.stopQueue) {
4,263!
2109
      taosUsleep(1);
×
2110
    }
2111

2112
    STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
4,263✔
2113

2114
    ++pStmt->sql.runTimes;
4,256✔
2115
  } else {
2116
    SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
12!
2117
    if (pWrapper == NULL) {
12!
2118
      code = terrno;
×
2119
    } else {
2120
      pWrapper->pRequest = pRequest;
12✔
2121
      pRequest->pWrapper = pWrapper;
12✔
2122
    }
2123
    if (TSDB_CODE_SUCCESS == code) {
12!
2124
      code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper);
12✔
2125
    }
2126
    pRequest->syncQuery = false;
12✔
2127
    pRequest->body.queryFp = asyncQueryCb;
12✔
2128
    ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
12✔
2129

2130
    pStmt->execSemWaited = false;
12✔
2131
    launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
12✔
2132
  }
2133

2134
_return:
4,268✔
2135
  pStmt->stat.execUseUs += taosGetTimestampUs() - startUs;
4,269✔
2136

2137
  STMT_RET(code);
4,269!
2138
}
2139

2140
int stmtClose2(TAOS_STMT2* stmt) {
143✔
2141
  STscStmt2* pStmt = (STscStmt2*)stmt;
143✔
2142

2143
  STMT_DLOG_E("start to free stmt");
143!
2144

2145
  if (pStmt->bindThreadInUse) {
143✔
2146
    (void)taosThreadMutexLock(&pStmt->queue.mutex);
79✔
2147
    pStmt->queue.stopQueue = true;
79✔
2148
    (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
79✔
2149
    (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
79✔
2150
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
79✔
2151

2152
    (void)taosThreadJoin(pStmt->bindThread, NULL);
79✔
2153
    pStmt->bindThreadInUse = false;
79✔
2154

2155
    (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
79✔
2156
    (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
79✔
2157
  }
2158

2159
  TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
143!
2160
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
143!
2161
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
2162
  }
2163
  TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
143!
2164

2165
  (void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond);
143✔
2166
  (void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex);
143✔
2167

2168
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
143!
2169
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
5!
2170
      tscError("failed to wait asyncExecSem");
×
2171
    }
2172
  }
2173

2174
  STMT_DLOG("stmt %p closed, stbInterlaceMode:%d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
143!
2175
            ", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
2176
            ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
2177
            ", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64
2178
            ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64,
2179
            pStmt, pStmt->sql.stbInterlaceMode, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo,
2180
            pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum, pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND],
2181
            pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE], pStmt->stat.setTbNameUs,
2182
            pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
2183
            pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
2184

2185
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
143!
2186

2187
  if (pStmt->options.asyncExecFn) {
143✔
2188
    if (tsem_destroy(&pStmt->asyncExecSem) != 0) {
5!
2189
      tscError("failed to destroy asyncExecSem");
×
2190
    }
2191
  }
2192
  taosMemoryFree(stmt);
143!
2193

2194
  return TSDB_CODE_SUCCESS;
143✔
2195
}
2196

2197
const char* stmtErrstr2(TAOS_STMT2* stmt) {
3✔
2198
  STscStmt2* pStmt = (STscStmt2*)stmt;
3✔
2199

2200
  if (stmt == NULL || NULL == pStmt->exec.pRequest) {
3!
2201
    return (char*)tstrerror(terrno);
3✔
2202
  }
2203

2204
  pStmt->exec.pRequest->code = terrno;
×
2205

2206
  SRequestObj* pRequest = pStmt->exec.pRequest;
×
2207
  if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) {
×
2208
    return pRequest->msgBuf;
×
2209
  }
2210
  return (const char*)tstrerror(pRequest->code);
×
2211
}
2212
/*
2213
int stmtAffectedRows(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->affectedRows; }
2214

2215
int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->exec.affectedRows; }
2216
*/
2217

2218
int stmtParseColFields2(TAOS_STMT2* stmt) {
57✔
2219
  int32_t    code = 0;
57✔
2220
  STscStmt2* pStmt = (STscStmt2*)stmt;
57✔
2221
  int32_t    preCode = pStmt->errCode;
57✔
2222

2223
  STMT_DLOG_E("start to get col fields");
57!
2224

2225
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
57!
2226
    return pStmt->errCode;
×
2227
  }
2228

2229
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
57!
2230
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2231
  }
2232

2233
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
57!
2234

2235
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
57!
2236
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
3!
2237
    pStmt->bInfo.needParse = false;
×
2238
  }
2239
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
57✔
2240
    pStmt->bInfo.needParse = false;
7✔
2241
  }
2242
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
57!
2243
    taos_free_result(pStmt->exec.pRequest);
×
2244
    pStmt->exec.pRequest = NULL;
×
2245
    STMT_ERRI_JRET(stmtCreateRequest(pStmt));
×
2246
  }
2247

2248
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
57!
2249

2250
  if (pStmt->bInfo.needParse) {
57✔
2251
    STMT_ERRI_JRET(stmtParseSql(pStmt));
50✔
2252
  }
2253

2254
_return:
42✔
2255
  // compatible with previous versions
2256
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST && (pStmt->bInfo.tbNameFlag & NO_DATA_USING_CLAUSE) == 0x0) {
57!
2257
    code = TSDB_CODE_TSC_STMT_TBNAME_ERROR;
1✔
2258
  }
2259

2260
  if (code != TSDB_CODE_SUCCESS) {
57✔
2261
    taos_free_result(pStmt->exec.pRequest);
15✔
2262
    pStmt->exec.pRequest = NULL;
15✔
2263
  }
2264

2265
  pStmt->errCode = preCode;
57✔
2266

2267
  return code;
57✔
2268
}
2269

2270
int stmtGetStbColFields2(TAOS_STMT2* stmt, int* nums, TAOS_FIELD_ALL** fields) {
57✔
2271
  int32_t code = stmtParseColFields2(stmt);
57✔
2272
  if (code != TSDB_CODE_SUCCESS) {
57✔
2273
    return code;
15✔
2274
  }
2275

2276
  return stmtFetchStbColFields2(stmt, nums, fields);
42✔
2277
}
2278

2279
int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) {
11✔
2280
  int32_t    code = 0;
11✔
2281
  STscStmt2* pStmt = (STscStmt2*)stmt;
11✔
2282
  int32_t    preCode = pStmt->errCode;
11✔
2283

2284
  STMT_DLOG_E("start to get param num");
11!
2285

2286
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
11!
UNCOV
2287
    return pStmt->errCode;
×
2288
  }
2289

2290
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
11!
2291

2292
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
11!
UNCOV
2293
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
UNCOV
2294
    pStmt->bInfo.needParse = false;
×
2295
  }
2296

2297
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
11!
2298
    taos_free_result(pStmt->exec.pRequest);
×
2299
    pStmt->exec.pRequest = NULL;
×
2300
  }
2301

2302
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
11!
2303

2304
  if (pStmt->bInfo.needParse) {
11!
2305
    STMT_ERRI_JRET(stmtParseSql(pStmt));
11✔
2306
  }
2307

2308
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
4!
2309
    *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
4✔
2310
  } else {
UNCOV
2311
    STMT_ERRI_JRET(stmtFetchColFields2(stmt, nums, NULL));
×
2312
  }
2313

UNCOV
2314
_return:
×
2315
  if (code != TSDB_CODE_SUCCESS) {
11✔
2316
    taos_free_result(pStmt->exec.pRequest);
7✔
2317
    pStmt->exec.pRequest = NULL;
7✔
2318
  }
2319
  pStmt->errCode = preCode;
11✔
2320

2321
  return code;
11✔
2322
}
2323

2324
TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) {
5✔
2325
  STscStmt2* pStmt = (STscStmt2*)stmt;
5✔
2326

2327
  STMT_DLOG_E("start to use result");
5!
2328

2329
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
5!
UNCOV
2330
    tscError("useResult only for query statement");
×
UNCOV
2331
    return NULL;
×
2332
  }
2333

2334
  return pStmt->exec.pRequest;
5✔
2335
}
2336

UNCOV
2337
int32_t stmtAsyncBindThreadFunc(void* args) {
×
2338
  qInfo("async stmt bind thread started");
×
2339

UNCOV
2340
  ThreadArgs* targs = (ThreadArgs*)args;
×
UNCOV
2341
  STscStmt2*  pStmt = (STscStmt2*)targs->stmt;
×
2342

UNCOV
2343
  int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx);
×
UNCOV
2344
  targs->fp(targs->param, NULL, code);
×
2345
  (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
×
2346
  (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
×
UNCOV
2347
  (void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
×
2348
  (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
×
2349
  taosMemoryFree(args);
×
2350

2351
  qInfo("async stmt bind thread stopped");
×
2352

2353
  return code;
×
2354
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc