• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3910

23 Apr 2025 02:47AM UTC coverage: 62.362% (-0.7%) from 63.063%
#3910

push

travis-ci

web-flow
docs(datain): add missing health status types (#30828)

155061 of 317305 branches covered (48.87%)

Branch coverage included in aggregate %.

240172 of 316469 relevant lines covered (75.89%)

6269478.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.8
/source/client/src/clientStmt2.c
1
#include "clientInt.h"
2
#include "clientLog.h"
3
#include "tdef.h"
4

5
#include "clientStmt.h"
6
#include "clientStmt2.h"
7
/*
8
char* gStmtStatusStr[] = {"unknown",     "init", "prepare", "settbname", "settags",
9
                          "fetchFields", "bind", "bindCol", "addBatch",  "exec"};
10
*/
11
static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pBuf) {
12
  if (pTblBuf->buffOffset < pTblBuf->buffSize) {
10,257✔
13
    *pBuf = (char*)pTblBuf->pCurBuff + pTblBuf->buffOffset;
10,262✔
14
    pTblBuf->buffOffset += pTblBuf->buffUnit;
10,262✔
15
  } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) {
×
16
    pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++);
×
17
    if (NULL == pTblBuf->pCurBuff) {
×
18
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
19
    }
20
    *pBuf = pTblBuf->pCurBuff;
×
21
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
22
  } else {
23
    void* buff = taosMemoryMalloc(pTblBuf->buffSize);
×
24
    if (NULL == buff) {
×
25
      return terrno;
×
26
    }
27

28
    if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
×
29
      return terrno;
×
30
    }
31

32
    pTblBuf->buffIdx++;
×
33
    pTblBuf->pCurBuff = buff;
×
34
    *pBuf = buff;
×
35
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
36
  }
37

38
  return TSDB_CODE_SUCCESS;
10,262✔
39
}
40

41
static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
10,271✔
42
  int i = 0;
10,271✔
43
  while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
56,461✔
44
    if (i < 10) {
46,187✔
45
      taosUsleep(1);
43,804✔
46
      i++;
43,807✔
47
    } else {
48
      (void)taosThreadMutexLock(&pStmt->queue.mutex);
2,383✔
49
      if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
2,383✔
50
        (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
2,375✔
51
      }
52
      (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
2,384✔
53
    }
54
  }
55
  if (pStmt->queue.stopQueue) {
10,097✔
56
    return false;
92✔
57
  }
58
  SStmtQNode* orig = pStmt->queue.head;
10,005✔
59
  SStmtQNode* node = pStmt->queue.head->next;
10,005✔
60
  pStmt->queue.head = pStmt->queue.head->next;
10,005✔
61
  *param = node;
10,005✔
62

63
  (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
10,005✔
64

65
  return true;
10,175✔
66
}
67

68
static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
10,172✔
69
  pStmt->queue.tail->next = param;
10,172✔
70
  pStmt->queue.tail = param;
10,172✔
71

72
  pStmt->stat.bindDataNum++;
10,172✔
73

74
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
10,172✔
75
  (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
10,177✔
76
  (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
10,177✔
77
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
10,175✔
78
}
10,177✔
79

80
static int32_t stmtCreateRequest(STscStmt2* pStmt) {
6,495✔
81
  int32_t code = 0;
6,495✔
82

83
  if (pStmt->exec.pRequest == NULL) {
6,495✔
84
    code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
188✔
85
                        pStmt->reqid);
86
    if (pStmt->reqid != 0) {
187!
87
      pStmt->reqid++;
×
88
    }
89
    pStmt->exec.pRequest->type = RES_TYPE__QUERY;
187✔
90
    if (pStmt->db != NULL) {
187✔
91
      taosMemoryFreeClear(pStmt->exec.pRequest->pDb); 
128!
92
      pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
128!
93
    }
94
    if (TSDB_CODE_SUCCESS == code) {
187!
95
      pStmt->exec.pRequest->syncQuery = true;
187✔
96
      pStmt->exec.pRequest->isStmtBind = true;
187✔
97
    }
98
  }
99

100
  return code;
6,494✔
101
}
102

103
static int32_t stmtSwitchStatus(STscStmt2* pStmt, STMT_STATUS newStatus) {
21,071✔
104
  int32_t code = 0;
21,071✔
105

106
  if (newStatus >= STMT_INIT && newStatus < STMT_MAX) {
21,071!
107
    STMT_LOG_SEQ(newStatus);
21,079!
108
  }
109

110
  if (pStmt->errCode && newStatus != STMT_PREPARE) {
21,103!
111
    STMT_DLOG("stmt already failed with err:%s", tstrerror(pStmt->errCode));
×
112
    return pStmt->errCode;
×
113
  }
114

115
  switch (newStatus) {
21,103!
116
    case STMT_PREPARE:
163✔
117
      pStmt->errCode = 0;
163✔
118
      break;
163✔
119
    case STMT_SETTBNAME:
5,855✔
120
      if (STMT_STATUS_EQ(INIT)) {
5,855!
121
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
122
      }
123
      if (!pStmt->sql.stbInterlaceMode && (STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL))) {
5,855!
124
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
125
      }
126
      break;
5,855✔
127
    case STMT_SETTAGS:
142✔
128
      if (STMT_STATUS_EQ(INIT)) {
142!
129
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
130
      }
131
      break;
142✔
132
    case STMT_FETCH_FIELDS:
68✔
133
      if (STMT_STATUS_EQ(INIT)) {
68!
134
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
135
      }
136
      break;
68✔
137
    case STMT_BIND:
5,865✔
138
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
5,865!
139
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
140
      }
141
      /*
142
            if ((pStmt->sql.type == STMT_TYPE_MULTI_INSERT) && ()) {
143
              code = TSDB_CODE_TSC_STMT_API_ERROR;
144
            }
145
      */
146
      break;
5,865✔
147
    case STMT_BIND_COL:
×
148
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
×
149
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
150
      }
151
      break;
×
152
    case STMT_ADD_BATCH:
4,533✔
153
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
4,533!
154
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
155
      }
156
      break;
4,533✔
157
    case STMT_EXECUTE:
4,477✔
158
      if (STMT_TYPE_QUERY == pStmt->sql.type) {
4,477✔
159
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
5!
160
            STMT_STATUS_NE(BIND_COL)) {
×
161
          code = TSDB_CODE_TSC_STMT_API_ERROR;
×
162
        }
163
      } else {
164
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
4,472!
165
          code = TSDB_CODE_TSC_STMT_API_ERROR;
1✔
166
        }
167
      }
168
      break;
4,477✔
169
    default:
×
170
      code = TSDB_CODE_APP_ERROR;
×
171
      break;
×
172
  }
173

174
  STMT_ERR_RET(code);
21,103✔
175

176
  pStmt->sql.status = newStatus;
21,102✔
177

178
  return TSDB_CODE_SUCCESS;
21,102✔
179
}
180

181
static int32_t stmtGetTbName(TAOS_STMT2* stmt, char** tbName) {
137✔
182
  STscStmt2* pStmt = (STscStmt2*)stmt;
137✔
183

184
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
137✔
185

186
  if ('\0' == pStmt->bInfo.tbName[0]) {
137✔
187
    tscWarn("no table name set, OK if it is a stmt get fields");
33!
188
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
33!
189
  }
190

191
  *tbName = pStmt->bInfo.tbName;
104✔
192

193
  return TSDB_CODE_SUCCESS;
104✔
194
}
195

196
static int32_t stmtUpdateBindInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SName* tbName,
141✔
197
                                  const char* sTableName, bool autoCreateTbl, int8_t tbNameFlag) {
198
  STscStmt2* pStmt = (STscStmt2*)stmt;
141✔
199
  char       tbFName[TSDB_TABLE_FNAME_LEN];
200
  int32_t    code = tNameExtractFullName(tbName, tbFName);
141✔
201
  if (code != 0) {
143!
202
    return code;
×
203
  }
204

205
  (void)memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
143✔
206
  tstrncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName));
143✔
207
  pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
143✔
208

209
  pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
143✔
210
  pStmt->bInfo.tbSuid = pTableMeta->suid;
143✔
211
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
143✔
212
  pStmt->bInfo.tbType = pTableMeta->tableType;
143✔
213

214
  if (!pStmt->bInfo.tagsCached) {
143✔
215
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
129✔
216
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
125!
217
  }
218

219
  pStmt->bInfo.boundTags = tags;
138✔
220
  pStmt->bInfo.tagsCached = false;
138✔
221
  pStmt->bInfo.tbNameFlag = tbNameFlag;
138✔
222
  tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
138✔
223

224
  return TSDB_CODE_SUCCESS;
138✔
225
}
226

227
static int32_t stmtUpdateExecInfo(TAOS_STMT2* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
139✔
228
  STscStmt2* pStmt = (STscStmt2*)stmt;
139✔
229

230
  pStmt->sql.pVgHash = pVgHash;
139✔
231
  pStmt->exec.pBlockHash = pBlockHash;
139✔
232

233
  return TSDB_CODE_SUCCESS;
139✔
234
}
235

236
static int32_t stmtUpdateInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, bool autoCreateTbl,
141✔
237
                              SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName, uint8_t tbNameFlag) {
238
  STscStmt2* pStmt = (STscStmt2*)stmt;
141✔
239

240
  STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbName, sTableName, autoCreateTbl, tbNameFlag));
141!
241
  STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
138!
242

243
  pStmt->sql.autoCreateTbl = autoCreateTbl;
138✔
244

245
  return TSDB_CODE_SUCCESS;
138✔
246
}
247

248
static int32_t stmtGetExecInfo(TAOS_STMT2* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
11✔
249
  STscStmt2* pStmt = (STscStmt2*)stmt;
11✔
250

251
  *pVgHash = pStmt->sql.pVgHash;
11✔
252
  pStmt->sql.pVgHash = NULL;
11✔
253

254
  *pBlockHash = pStmt->exec.pBlockHash;
11✔
255
  pStmt->exec.pBlockHash = NULL;
11✔
256

257
  return TSDB_CODE_SUCCESS;
11✔
258
}
259

260
static int32_t stmtParseSql(STscStmt2* pStmt) {
170✔
261
  pStmt->exec.pCurrBlock = NULL;
170✔
262

263
  SStmtCallback stmtCb = {
170✔
264
      .pStmt = pStmt,
265
      .getTbNameFn = stmtGetTbName,
266
      .setInfoFn = stmtUpdateInfo,
267
      .getExecInfoFn = stmtGetExecInfo,
268
  };
269

270
  STMT_ERR_RET(stmtCreateRequest(pStmt));
170!
271

272
  pStmt->stat.parseSqlNum++;
169✔
273
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
169✔
274
  pStmt->sql.siInfo.pQuery = pStmt->sql.pQuery;
148✔
275

276
  pStmt->bInfo.needParse = false;
148✔
277

278
  if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) {
148✔
279
    pStmt->sql.type = STMT_TYPE_INSERT;
11✔
280
    pStmt->sql.stbInterlaceMode = false;
11✔
281
  } else if (pStmt->sql.pQuery->pPrepareRoot) {
137✔
282
    pStmt->sql.type = STMT_TYPE_QUERY;
6✔
283
    pStmt->sql.stbInterlaceMode = false;
6✔
284

285
    return TSDB_CODE_SUCCESS;
6✔
286
  }
287

288
  STableDataCxt** pSrc =
289
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
142✔
290
  if (NULL == pSrc || NULL == *pSrc) {
144!
291
    return terrno;
×
292
  }
293

294
  STableDataCxt* pTableCtx = *pSrc;
144✔
295
  // if (pStmt->sql.stbInterlaceMode) {
296
  //   int16_t lastIdx = -1;
297

298
  //   for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) {
299
  //     if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) {
300
  //       pStmt->sql.stbInterlaceMode = false;
301
  //       break;
302
  //     }
303

304
  //     lastIdx = pTableCtx->boundColsInfo.pColIndex[i];
305
  //   }
306
  // }
307

308
  if (NULL == pStmt->sql.pBindInfo) {
144✔
309
    pStmt->sql.pBindInfo = taosMemoryMalloc(pTableCtx->boundColsInfo.numOfBound * sizeof(*pStmt->sql.pBindInfo));
133!
310
    if (NULL == pStmt->sql.pBindInfo) {
133!
311
      return terrno;
×
312
    }
313
  }
314

315
  return TSDB_CODE_SUCCESS;
144✔
316
}
317

318
static int32_t stmtCleanBindInfo(STscStmt2* pStmt) {
4,752✔
319
  pStmt->bInfo.tbUid = 0;
4,752✔
320
  pStmt->bInfo.tbVgId = -1;
4,752✔
321
  pStmt->bInfo.tbType = 0;
4,752✔
322
  pStmt->bInfo.needParse = true;
4,752✔
323
  pStmt->bInfo.inExecCache = false;
4,752✔
324

325
  pStmt->bInfo.tbName[0] = 0;
4,752✔
326
  pStmt->bInfo.tbFName[0] = 0;
4,752✔
327
  if (!pStmt->bInfo.tagsCached) {
4,752✔
328
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
4,650✔
329
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
4,652!
330
  }
331
  if (!pStmt->sql.autoCreateTbl) {
4,747✔
332
    pStmt->bInfo.stbFName[0] = 0;
4,653✔
333
    pStmt->bInfo.tbSuid = 0;
4,653✔
334
  }
335

336
  return TSDB_CODE_SUCCESS;
4,747✔
337
}
338

339
static void stmtFreeTableBlkList(STableColsData* pTb) {
×
340
  (void)qResetStmtColumns(pTb->aCol, true);
×
341
  taosArrayDestroy(pTb->aCol);
×
342
}
×
343

344
static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
4,414✔
345
  pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0);
4,414✔
346
  if (NULL == pTblBuf->pCurBuff) {
4,417✔
347
    tscError("QInfo:%p, failed to get buffer from list", pTblBuf);
3!
348
    return;
×
349
  }
350
  pTblBuf->buffIdx = 1;
4,414✔
351
  pTblBuf->buffOffset = sizeof(*pQueue->head);
4,414✔
352

353
  pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
4,414✔
354
  pQueue->qRemainNum = 0;
4,414✔
355
  pQueue->head->next = NULL;
4,414✔
356
}
357

358
static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
4,626✔
359
  if (pStmt->sql.stbInterlaceMode) {
4,626✔
360
    if (deepClean) {
4,502✔
361
      taosHashCleanup(pStmt->exec.pBlockHash);
89✔
362
      pStmt->exec.pBlockHash = NULL;
89✔
363

364
      if (NULL != pStmt->exec.pCurrBlock) {
89✔
365
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->pData);
85!
366
        qDestroyStmtDataBlock(pStmt->exec.pCurrBlock);
85✔
367
        pStmt->exec.pCurrBlock = NULL;
85✔
368
      }
369
    } else {
370
      pStmt->sql.siInfo.pTableColsIdx = 0;
4,413✔
371
      stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue);
4,413✔
372
    }
373
    if (NULL != pStmt->exec.pRequest) {
4,504✔
374
      pStmt->exec.pRequest->body.resInfo.numOfRows = 0;
4,503✔
375
    }
376
  } else {
377
    if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
127✔
378
      // if (!pStmt->options.asyncExecFn) {
379
      taos_free_result(pStmt->exec.pRequest);
122✔
380
      pStmt->exec.pRequest = NULL;
122✔
381
      //}
382
    }
383

384
    size_t keyLen = 0;
127✔
385
    void*  pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
127✔
386
    while (pIter) {
269✔
387
      STableDataCxt* pBlocks = *(STableDataCxt**)pIter;
142✔
388
      char*          key = taosHashGetKey(pIter, &keyLen);
142✔
389
      STableMeta*    pMeta = qGetTableMetaInDataBlock(pBlocks);
142✔
390

391
      if (keepTable && pBlocks == pStmt->exec.pCurrBlock) {
142✔
392
        TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData);
47✔
393
        STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
99!
394

395
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
47✔
396
        continue;
47✔
397
      }
398

399
      qDestroyStmtDataBlock(pBlocks);
95✔
400
      STMT_ERR_RET(taosHashRemove(pStmt->exec.pBlockHash, key, keyLen));
95!
401

402
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
95✔
403
    }
404

405
    if (keepTable) {
127✔
406
      return TSDB_CODE_SUCCESS;
52✔
407
    }
408

409
    taosHashCleanup(pStmt->exec.pBlockHash);
75✔
410
    pStmt->exec.pBlockHash = NULL;
75✔
411

412
    tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
75✔
413
    taosMemoryFreeClear(pStmt->exec.pCurrTbData);
75!
414
  }
415

416
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
4,579!
417

418
  return TSDB_CODE_SUCCESS;
4,574✔
419
}
420

421
static void stmtFreeTbBuf(void* buf) {
96✔
422
  void* pBuf = *(void**)buf;
96✔
423
  taosMemoryFree(pBuf);
96!
424
}
96✔
425

426
static void stmtFreeTbCols(void* buf) {
86,000✔
427
  SArray* pCols = *(SArray**)buf;
86,000✔
428
  taosArrayDestroy(pCols);
86,000✔
429
}
86,000✔
430

431
static int32_t stmtCleanSQLInfo(STscStmt2* pStmt) {
146✔
432
  STMT_DLOG_E("start to free SQL info");
146!
433

434
  taosMemoryFreeClear(pStmt->db);
146!
435
  taosMemoryFree(pStmt->sql.pBindInfo);
146!
436
  taosMemoryFree(pStmt->sql.queryRes.fields);
146!
437
  taosMemoryFree(pStmt->sql.queryRes.userFields);
146!
438
  taosMemoryFree(pStmt->sql.sqlStr);
146!
439
  qDestroyQuery(pStmt->sql.pQuery);
146✔
440
  taosArrayDestroy(pStmt->sql.nodeList);
146✔
441
  taosHashCleanup(pStmt->sql.pVgHash);
146✔
442
  pStmt->sql.pVgHash = NULL;
146✔
443
  if (pStmt->sql.fixValueTags) {
146✔
444
    tdDestroySVCreateTbReq(pStmt->sql.fixValueTbReq);
4!
445
  }
446

447
  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
146✔
448
  while (pIter) {
161✔
449
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
15✔
450

451
    qDestroyStmtDataBlock(pCache->pDataCtx);
15✔
452
    qDestroyBoundColInfo(pCache->boundTags);
15✔
453
    taosMemoryFreeClear(pCache->boundTags);
15!
454

455
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
15✔
456
  }
457
  taosHashCleanup(pStmt->sql.pTableCache);
146✔
458
  pStmt->sql.pTableCache = NULL;
146✔
459

460
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
146!
461
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
146!
462

463
  taos_free_result(pStmt->sql.siInfo.pRequest);
146✔
464
  taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
146✔
465
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
146✔
466
  taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
146✔
467
  taosMemoryFree(pStmt->sql.siInfo.pTSchema);
146!
468
  qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
146✔
469
  taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
146✔
470

471
  (void)memset(&pStmt->sql, 0, sizeof(pStmt->sql));
146✔
472
  pStmt->sql.siInfo.tableColsReady = true;
146✔
473

474
  STMT_DLOG_E("end to free SQL info");
146!
475

476
  return TSDB_CODE_SUCCESS;
146✔
477
}
478

479
static int32_t stmtTryAddTableVgroupInfo(STscStmt2* pStmt, int32_t* vgId) {
113✔
480
  if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) {
113✔
481
    return TSDB_CODE_SUCCESS;
15✔
482
  }
483

484
  SVgroupInfo      vgInfo = {0};
98✔
485
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
98✔
486
                           .requestId = pStmt->exec.pRequest->requestId,
98✔
487
                           .requestObjRefId = pStmt->exec.pRequest->self,
98✔
488
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
98✔
489

490
  int32_t code = catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo);
98✔
491
  if (TSDB_CODE_SUCCESS != code) {
98!
492
    return code;
×
493
  }
494

495
  code =
496
      taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
98✔
497
  if (TSDB_CODE_SUCCESS != code) {
98!
498
    return code;
×
499
  }
500

501
  *vgId = vgInfo.vgId;
98✔
502

503
  return TSDB_CODE_SUCCESS;
98✔
504
}
505

506
static int32_t stmtRebuildDataBlock(STscStmt2* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
66✔
507
                                    uint64_t suid, int32_t vgId) {
508
  STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
66!
509
  STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgId, pStmt->sql.autoCreateTbl));
66!
510

511
  STMT_DLOG("uid:%" PRId64 ", rebuild table data context, vgId:%d", uid, vgId);
66!
512

513
  return TSDB_CODE_SUCCESS;
66✔
514
}
515

516
static int32_t stmtGetFromCache(STscStmt2* pStmt) {
191✔
517
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) {
191!
518
    pStmt->bInfo.needParse = false;
×
519
    pStmt->bInfo.inExecCache = false;
×
520
    return TSDB_CODE_SUCCESS;
×
521
  }
522

523
  pStmt->bInfo.needParse = true;
191✔
524
  pStmt->bInfo.inExecCache = false;
191✔
525

526
  STableDataCxt** pCxtInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
191✔
527
  if (pCxtInExec) {
189✔
528
    pStmt->bInfo.needParse = false;
24✔
529
    pStmt->bInfo.inExecCache = true;
24✔
530

531
    pStmt->exec.pCurrBlock = *pCxtInExec;
24✔
532

533
    if (pStmt->sql.autoCreateTbl) {
24✔
534
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
18!
535
      return TSDB_CODE_SUCCESS;
18✔
536
    }
537
  }
538

539
  if (NULL == pStmt->pCatalog) {
171✔
540
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
86!
541
    pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
89✔
542
  }
543

544
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
174!
545
    if (pStmt->bInfo.inExecCache) {
105!
546
      pStmt->bInfo.needParse = false;
×
547
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
×
548
      return TSDB_CODE_SUCCESS;
×
549
    }
550

551
    tscDebug("no stmt block cache for tb %s", pStmt->bInfo.tbFName);
105!
552
    return TSDB_CODE_SUCCESS;
104✔
553
  }
554

555
  if (pStmt->sql.autoCreateTbl) {
72✔
556
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
48✔
557
    if (pCache) {
48!
558
      pStmt->bInfo.needParse = false;
48✔
559
      pStmt->bInfo.tbUid = 0;
48✔
560

561
      STableDataCxt* pNewBlock = NULL;
48✔
562
      STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid, -1));
48!
563

564
      if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
48!
565
                      POINTER_BYTES)) {
566
        STMT_ERR_RET(terrno);
×
567
      }
568

569
      pStmt->exec.pCurrBlock = pNewBlock;
48✔
570

571
      tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
48!
572

573
      return TSDB_CODE_SUCCESS;
48✔
574
    }
575

576
    STMT_RET(stmtCleanBindInfo(pStmt));
×
577
  }
578

579
  uint64_t uid, suid;
580
  int32_t  vgId;
581
  int8_t   tableType;
582

583
  STableMeta*      pTableMeta = NULL;
24✔
584
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
24✔
585
                           .requestId = pStmt->exec.pRequest->requestId,
24✔
586
                           .requestObjRefId = pStmt->exec.pRequest->self,
24✔
587
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
24✔
588
  int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
24✔
589

590
  pStmt->stat.ctgGetTbMetaNum++;
24✔
591

592
  if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
24!
593
    tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
×
594
    STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
595

596
    STMT_ERR_RET(code);
×
597
  }
598

599
  STMT_ERR_RET(code);
24!
600

601
  uid = pTableMeta->uid;
24✔
602
  suid = pTableMeta->suid;
24✔
603
  tableType = pTableMeta->tableType;
24✔
604
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
24✔
605
  vgId = pTableMeta->vgId;
24✔
606

607
  taosMemoryFree(pTableMeta);
24!
608

609
  uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
24!
610

611
  if (uid == pStmt->bInfo.tbUid) {
24!
612
    pStmt->bInfo.needParse = false;
×
613

614
    tscDebug("tb %s is current table", pStmt->bInfo.tbFName);
×
615

616
    return TSDB_CODE_SUCCESS;
×
617
  }
618

619
  if (pStmt->bInfo.inExecCache) {
24✔
620
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
6✔
621
    if (NULL == pCache) {
6!
622
      tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash",
×
623
               pStmt->bInfo.tbFName, uid, cacheUid);
624

625
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
626
    }
627

628
    pStmt->bInfo.needParse = false;
6✔
629

630
    pStmt->bInfo.tbUid = uid;
6✔
631
    pStmt->bInfo.tbSuid = suid;
6✔
632
    pStmt->bInfo.tbType = tableType;
6✔
633
    pStmt->bInfo.boundTags = pCache->boundTags;
6✔
634
    pStmt->bInfo.tagsCached = true;
6✔
635

636
    tscDebug("tb %s in execBlock list, set to current", pStmt->bInfo.tbFName);
6!
637

638
    return TSDB_CODE_SUCCESS;
6✔
639
  }
640

641
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
18✔
642
  if (pCache) {
18!
643
    pStmt->bInfo.needParse = false;
18✔
644

645
    pStmt->bInfo.tbUid = uid;
18✔
646
    pStmt->bInfo.tbSuid = suid;
18✔
647
    pStmt->bInfo.tbType = tableType;
18✔
648
    pStmt->bInfo.boundTags = pCache->boundTags;
18✔
649
    pStmt->bInfo.tagsCached = true;
18✔
650

651
    STableDataCxt* pNewBlock = NULL;
18✔
652
    STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid, vgId));
18!
653

654
    if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
18!
655
                    POINTER_BYTES)) {
656
      STMT_ERR_RET(terrno);
×
657
    }
658

659
    pStmt->exec.pCurrBlock = pNewBlock;
18✔
660

661
    tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);
18!
662

663
    return TSDB_CODE_SUCCESS;
18✔
664
  }
665

666
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
667

668
  return TSDB_CODE_SUCCESS;
×
669
}
670

671
static int32_t stmtResetStmt(STscStmt2* pStmt) {
×
672
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
×
673

674
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
675
  if (NULL == pStmt->sql.pTableCache) {
×
676
    STMT_ERR_RET(terrno);
×
677
  }
678

679
  pStmt->sql.status = STMT_INIT;
×
680

681
  return TSDB_CODE_SUCCESS;
×
682
}
683

684
static int32_t stmtAsyncOutput(STscStmt2* pStmt, void* param) {
10,173✔
685
  SStmtQNode* pParam = (SStmtQNode*)param;
10,173✔
686

687
  if (pParam->restoreTbCols) {
10,173✔
688
    for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) {
10,174✔
689
      SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i);
5,761✔
690
      *p = taosArrayInit(20, POINTER_BYTES);
5,761✔
691
      if (*p == NULL) {
5,760!
692
        STMT_ERR_RET(terrno);
×
693
      }
694
    }
695

696
    atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
4,413✔
697
  } else {
698
    int code = qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
5,751✔
699
                                      &pStmt->sql.siInfo, pParam->pCreateTbReq);
700
    // taosMemoryFree(pParam->pTbData);
701
    (void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
5,751✔
702
    STMT_ERR_RET(code);
5,763!
703
  }
704
  return TSDB_CODE_SUCCESS;
10,181✔
705
}
706

707
static void* stmtBindThreadFunc(void* param) {
96✔
708
  setThreadName("stmtBind");
96✔
709

710
  qInfo("stmt bind thread started");
96!
711

712
  STscStmt2* pStmt = (STscStmt2*)param;
96✔
713

714
  while (true) {
10,271✔
715
    if (pStmt->queue.stopQueue) {
10,367✔
716
      break;
96✔
717
    }
718

719
    SStmtQNode* asyncParam = NULL;
10,271✔
720
    if (!stmtDequeue(pStmt, &asyncParam)) {
10,271✔
721
      continue;
92✔
722
    }
723

724
    if (stmtAsyncOutput(pStmt, asyncParam) != 0) {
10,174!
725
      qError("stmt async output failed");
×
726
    }
727
  }
728

729
  qInfo("stmt bind thread stopped");
96!
730

731
  return NULL;
96✔
732
}
733

734
static int32_t stmtStartBindThread(STscStmt2* pStmt) {
96✔
735
  TdThreadAttr thAttr;
736
  if (taosThreadAttrInit(&thAttr) != 0) {
96!
737
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
738
  }
739
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
96!
740
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
741
  }
742

743
  if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) {
96!
744
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
745
    STMT_ERR_RET(terrno);
×
746
  }
747

748
  pStmt->bindThreadInUse = true;
96✔
749

750
  (void)taosThreadAttrDestroy(&thAttr);
96✔
751
  return TSDB_CODE_SUCCESS;
96✔
752
}
753

754
static int32_t stmtInitQueue(STscStmt2* pStmt) {
96✔
755
  (void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
96✔
756
  (void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
96✔
757
  STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
192!
758
  pStmt->queue.tail = pStmt->queue.head;
96✔
759

760
  return TSDB_CODE_SUCCESS;
96✔
761
}
762

763
static int32_t stmtIniAsyncBind(STscStmt2* pStmt) {
148✔
764
  (void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL);
148✔
765
  (void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL);
148✔
766
  pStmt->asyncBindParam.asyncBindNum = 0;
148✔
767

768
  return TSDB_CODE_SUCCESS;
148✔
769
}
770

771
static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
96✔
772
  pTblBuf->buffUnit = sizeof(SStmtQNode);
96✔
773
  pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
96✔
774
  pTblBuf->pBufList = taosArrayInit(100, POINTER_BYTES);
96✔
775
  if (NULL == pTblBuf->pBufList) {
96!
776
    return terrno;
×
777
  }
778
  void* buff = taosMemoryMalloc(pTblBuf->buffSize);
96!
779
  if (NULL == buff) {
96!
780
    return terrno;
×
781
  }
782

783
  if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
192!
784
    return terrno;
×
785
  }
786

787
  pTblBuf->pCurBuff = buff;
96✔
788
  pTblBuf->buffIdx = 1;
96✔
789
  pTblBuf->buffOffset = 0;
96✔
790

791
  return TSDB_CODE_SUCCESS;
96✔
792
}
793

794
TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
148✔
795
  STscObj*   pObj = (STscObj*)taos;
148✔
796
  STscStmt2* pStmt = NULL;
148✔
797
  int32_t    code = 0;
148✔
798

799
  pStmt = taosMemoryCalloc(1, sizeof(STscStmt2));
148!
800
  if (NULL == pStmt) {
148!
801
    return NULL;
×
802
  }
803

804
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
148✔
805
  if (NULL == pStmt->sql.pTableCache) {
148!
806
    taosMemoryFree(pStmt);
×
807
    return NULL;
×
808
  }
809

810
  pStmt->taos = pObj;
148✔
811
  if (taos->db != NULL && taos->db[0] != '\0') {
148!
812
    pStmt->db = taosStrdup(taos->db);
51!
813
  }
814
  pStmt->bInfo.needParse = true;
148✔
815
  pStmt->sql.status = STMT_INIT;
148✔
816
  pStmt->errCode = TSDB_CODE_SUCCESS;
148✔
817

818
  if (NULL != pOptions) {
148!
819
    (void)memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
148✔
820
    if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) {
148✔
821
      pStmt->stbInterlaceMode = true;
82✔
822
    }
823

824
    pStmt->reqid = pOptions->reqid;
148✔
825
  }
826

827
  if (pStmt->stbInterlaceMode) {
148✔
828
    pStmt->sql.siInfo.transport = taos->pAppInfo->pTransporter;
82✔
829
    pStmt->sql.siInfo.acctId = taos->acctId;
82✔
830
    pStmt->sql.siInfo.dbname = taos->db;
82✔
831
    pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
82✔
832
    pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
82✔
833
    if (NULL == pStmt->sql.siInfo.pTableHash) {
82!
834
      (void)stmtClose2(pStmt);
×
835
      return NULL;
×
836
    }
837
    pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
82✔
838
    if (NULL == pStmt->sql.siInfo.pTableCols) {
82!
839
      terrno = terrno;
×
840
      (void)stmtClose2(pStmt);
×
841
      return NULL;
×
842
    }
843

844
    code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
82✔
845
    if (TSDB_CODE_SUCCESS == code) {
82!
846
      code = stmtInitQueue(pStmt);
82✔
847
    }
848
    if (TSDB_CODE_SUCCESS == code) {
82!
849
      code = stmtStartBindThread(pStmt);
82✔
850
    }
851
    if (TSDB_CODE_SUCCESS != code) {
82!
852
      terrno = code;
×
853
      (void)stmtClose2(pStmt);
×
854
      return NULL;
×
855
    }
856
  }
857

858
  pStmt->sql.siInfo.tableColsReady = true;
148✔
859
  if (pStmt->options.asyncExecFn) {
148✔
860
    if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) {
5!
861
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
862
      (void)stmtClose2(pStmt);
×
863
      return NULL;
×
864
    }
865
  }
866
  code = stmtIniAsyncBind(pStmt);
148✔
867
  if (TSDB_CODE_SUCCESS != code) {
148!
868
    terrno = code;
×
869
    (void)stmtClose2(pStmt);
×
870
    return NULL;
×
871
  }
872

873
  pStmt->execSemWaited = false;
148✔
874

875
  STMT_LOG_SEQ(STMT_INIT);
148!
876

877
  tscDebug("stmt:%p initialized", pStmt);
148!
878

879
  return pStmt;
148✔
880
}
881

882
static int stmtSetDbName2(TAOS_STMT2* stmt, const char* dbName) {
64✔
883
  STscStmt2* pStmt = (STscStmt2*)stmt;
64✔
884

885
  STMT2_DLOG("dbname is specified in sql:%s", dbName);
64!
886

887
  pStmt->db = taosStrdup(dbName);
64!
888
  (void)strdequote(pStmt->db);
64✔
889
  STMT_ERR_RET(stmtCreateRequest(pStmt));
64!
890

891
  // The SQL statement specifies a database name, overriding the previously specified database
892
  taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
64!
893
  pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
64!
894
  if (pStmt->exec.pRequest->pDb == NULL) {
64!
895
    return terrno;
×
896
  }
897
  if (pStmt->sql.stbInterlaceMode) {
64✔
898
    pStmt->sql.siInfo.dbname = pStmt->db;
22✔
899
  }
900
  return TSDB_CODE_SUCCESS;
64✔
901
}
902
static int32_t stmtResetStbInterlaceCache(STscStmt2* pStmt) {
14✔
903
  int32_t code = TSDB_CODE_SUCCESS;
14✔
904

905
  if (pStmt->bindThreadInUse) {
14!
906
    pStmt->queue.stopQueue = true;
14✔
907

908
    (void)taosThreadMutexLock(&pStmt->queue.mutex);
14✔
909
    (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
14✔
910
    (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
14✔
911
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
14✔
912

913
    (void)taosThreadJoin(pStmt->bindThread, NULL);
14✔
914
    pStmt->bindThreadInUse = false;
14✔
915
    pStmt->queue.head = NULL;
14✔
916
    pStmt->queue.tail = NULL;
14✔
917
    pStmt->queue.qRemainNum = 0;
14✔
918

919
    (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
14✔
920
    (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
14✔
921
  }
922

923
  pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
14✔
924
  if (NULL == pStmt->sql.siInfo.pTableHash) {
14!
925
    return terrno;
×
926
  }
927

928
  pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
14✔
929
  if (NULL == pStmt->sql.siInfo.pTableCols) {
14!
930
    return terrno;
×
931
  }
932

933
  code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
14✔
934

935
  if (TSDB_CODE_SUCCESS == code) {
14!
936
    code = stmtInitQueue(pStmt);
14✔
937
    pStmt->queue.stopQueue = false;
14✔
938
  }
939
  if (TSDB_CODE_SUCCESS == code) {
14!
940
    code = stmtStartBindThread(pStmt);
14✔
941
  }
942
  if (TSDB_CODE_SUCCESS != code) {
14!
943
    return code;
×
944
  }
945

946
  return TSDB_CODE_SUCCESS;
14✔
947
}
948

949
static int32_t stmtResetStmtForPrepare(STscStmt2* pStmt) {
18✔
950
  char*             db = pStmt->db;
18✔
951
  bool              stbInterlaceMode = pStmt->stbInterlaceMode;
18✔
952
  TAOS_STMT2_OPTION options = pStmt->options;
18✔
953
  uint32_t          reqid = pStmt->reqid;
18✔
954

955
  taosMemoryFree(pStmt->sql.pBindInfo);
18!
956
  pStmt->sql.pBindInfo = NULL;
18✔
957

958
  taosMemoryFree(pStmt->sql.queryRes.fields);
18!
959
  pStmt->sql.queryRes.fields = NULL;
18✔
960

961
  taosMemoryFree(pStmt->sql.queryRes.userFields);
18!
962
  pStmt->sql.queryRes.userFields = NULL;
18✔
963

964
  pStmt->sql.type = 0;
18✔
965
  pStmt->sql.runTimes = 0;
18✔
966
  taosMemoryFree(pStmt->sql.sqlStr);
18!
967
  pStmt->sql.sqlStr = NULL;
18✔
968

969
  qDestroyQuery(pStmt->sql.pQuery);
18✔
970
  pStmt->sql.pQuery = NULL;
18✔
971

972
  taosArrayDestroy(pStmt->sql.nodeList);
18✔
973
  pStmt->sql.nodeList = NULL;
18✔
974

975
  taosHashCleanup(pStmt->sql.pVgHash);
18✔
976
  pStmt->sql.pVgHash = NULL;
18✔
977

978
  if (pStmt->sql.fixValueTags) {
18✔
979
    tdDestroySVCreateTbReq(pStmt->sql.fixValueTbReq);
9!
980
    pStmt->sql.fixValueTbReq = NULL;
9✔
981
  }
982
  pStmt->sql.fixValueTags = false;
18✔
983

984
  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
18✔
985
  while (pIter) {
21✔
986
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
3✔
987

988
    qDestroyStmtDataBlock(pCache->pDataCtx);
3✔
989
    qDestroyBoundColInfo(pCache->boundTags);
3✔
990
    taosMemoryFreeClear(pCache->boundTags);
3!
991

992
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
3✔
993
  }
994
  taosHashCleanup(pStmt->sql.pTableCache);
18✔
995
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
18✔
996
  if (NULL == pStmt->sql.pTableCache) {
18!
997
    return terrno;
×
998
  }
999

1000
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
18!
1001

1002
  if (pStmt->exec.pRequest) {
18✔
1003
    taos_free_result(pStmt->exec.pRequest);
12✔
1004
    pStmt->exec.pRequest = NULL;
12✔
1005
  }
1006

1007
  if (pStmt->sql.siInfo.pTableCols) {
18✔
1008
    taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
14✔
1009
    pStmt->sql.siInfo.pTableCols = NULL;
14✔
1010
  }
1011

1012
  if (pStmt->sql.siInfo.tbBuf.pBufList) {
18✔
1013
    taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
14✔
1014
    pStmt->sql.siInfo.tbBuf.pBufList = NULL;
14✔
1015
  }
1016

1017
  if (pStmt->sql.siInfo.pTableHash) {
18✔
1018
    tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
14✔
1019
    pStmt->sql.siInfo.pTableHash = NULL;
14✔
1020
  }
1021

1022
  if (pStmt->sql.siInfo.pVgroupHash) {
18!
1023
    taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
×
1024
    pStmt->sql.siInfo.pVgroupHash = NULL;
×
1025
  }
1026

1027
  if (pStmt->sql.siInfo.pVgroupList) {
18!
1028
    taosArrayDestroy(pStmt->sql.siInfo.pVgroupList);
×
1029
    pStmt->sql.siInfo.pVgroupList = NULL;
×
1030
  }
1031

1032
  if (pStmt->sql.siInfo.pDataCtx) {
18✔
1033
    qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
12✔
1034
    pStmt->sql.siInfo.pDataCtx = NULL;
12✔
1035
  }
1036

1037
  if (pStmt->sql.siInfo.pTSchema) {
18✔
1038
    taosMemoryFree(pStmt->sql.siInfo.pTSchema);
12!
1039
    pStmt->sql.siInfo.pTSchema = NULL;
12✔
1040
  }
1041

1042
  if (pStmt->sql.siInfo.pRequest) {
18✔
1043
    taos_free_result(pStmt->sql.siInfo.pRequest);
12✔
1044
    pStmt->sql.siInfo.pRequest = NULL;
12✔
1045
  }
1046

1047
  if (stbInterlaceMode) {
18✔
1048
    STMT_ERR_RET(stmtResetStbInterlaceCache(pStmt));
14!
1049
  }
1050

1051
  pStmt->db = db;
18✔
1052
  pStmt->stbInterlaceMode = stbInterlaceMode;
18✔
1053
  pStmt->options = options;
18✔
1054
  pStmt->reqid = reqid;
18✔
1055

1056
  pStmt->sql.status = STMT_INIT;
18✔
1057

1058
  return TSDB_CODE_SUCCESS;
18✔
1059
}
1060

1061
int stmtPrepare2(TAOS_STMT2* stmt, const char* sql, unsigned long length) {
163✔
1062
  STscStmt2* pStmt = (STscStmt2*)stmt;
163✔
1063
  int32_t    code = 0;
163✔
1064

1065
  if (stmt == NULL || sql == NULL) {
163!
1066
    STMT2_ELOG_E("stmt or sql is NULL");
×
1067
    return TSDB_CODE_INVALID_PARA;
×
1068
  }
1069

1070
  if (pStmt->sql.status >= STMT_PREPARE) {
163✔
1071
    STMT2_DLOG("stmt status is %d, need to reset stmt2 cache before prepare", pStmt->sql.status);
18!
1072
    STMT_ERR_RET(stmtResetStmtForPrepare(pStmt));
18!
1073
  }
1074

1075
  STMT2_DLOG("start to prepare with sql:%s", sql);
163!
1076

1077
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
163✔
1078
    STMT2_ELOG("stmt errCode is not success, ErrCode: 0x%x, ErrMessage: %s\n. ", pStmt->errCode,
1!
1079
               strerror(pStmt->errCode));
1080
    return pStmt->errCode;
1✔
1081
  }
1082

1083
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));
162!
1084

1085
  if (length <= 0) {
163✔
1086
    length = strlen(sql);
103✔
1087
  }
1088

1089
  pStmt->sql.sqlStr = taosStrndup(sql, length);
163!
1090
  if (!pStmt->sql.sqlStr) {
163!
1091
    return terrno;
×
1092
  }
1093
  pStmt->sql.sqlLen = length;
163✔
1094
  pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode;
163✔
1095

1096
  char* dbName = NULL;
163✔
1097
  if (qParseDbName(sql, length, &dbName)) {
163✔
1098
    STMT_ERR_RET(stmtSetDbName2(stmt, dbName));
64!
1099
    taosMemoryFreeClear(dbName);
64!
1100
  }
1101

1102
  return TSDB_CODE_SUCCESS;
162✔
1103
}
1104

1105
static int32_t stmtInitStbInterlaceTableInfo(STscStmt2* pStmt) {
85✔
1106
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
85✔
1107
  if (!pSrc) {
86!
1108
    return terrno;
×
1109
  }
1110
  STableDataCxt* pDst = NULL;
86✔
1111

1112
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
86✔
1113
  pStmt->sql.siInfo.pDataCtx = pDst;
81✔
1114

1115
  SArray* pTblCols = NULL;
81✔
1116
  for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) {
72,776✔
1117
    pTblCols = taosArrayInit(20, POINTER_BYTES);
72,595✔
1118
    if (NULL == pTblCols) {
78,772!
1119
      return terrno;
×
1120
    }
1121

1122
    if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
151,467!
1123
      return terrno;
×
1124
    }
1125
  }
1126

1127
  pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
181✔
1128

1129
  return TSDB_CODE_SUCCESS;
181✔
1130
}
1131

1132
int stmtIsInsert2(TAOS_STMT2* stmt, int* insert) {
11,755✔
1133
  STscStmt2* pStmt = (STscStmt2*)stmt;
11,755✔
1134

1135
  STMT_DLOG_E("start is insert");
11,755!
1136

1137
  if (pStmt->sql.type) {
11,766✔
1138
    *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
11,604✔
1139
  } else {
1140
    *insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
162✔
1141
  }
1142

1143
  return TSDB_CODE_SUCCESS;
11,766✔
1144
}
1145

1146
int stmtSetTbName2(TAOS_STMT2* stmt, const char* tbName) {
5,851✔
1147
  STscStmt2* pStmt = (STscStmt2*)stmt;
5,851✔
1148

1149
  int64_t startUs = taosGetTimestampUs();
5,857✔
1150

1151
  STMT_DLOG("start to set tbName:%s", tbName);
5,857!
1152

1153
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
5,854!
1154
    return pStmt->errCode;
×
1155
  }
1156

1157
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
5,854!
1158

1159
  int32_t insert = 0;
5,850✔
1160
  STMT_ERR_RET(stmtIsInsert2(stmt, &insert));
5,850!
1161
  if (0 == insert) {
5,850!
1162
    tscError("set tb name not available for none insert statement");
×
1163
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1164
  }
1165

1166
  if (!pStmt->sql.stbInterlaceMode || NULL == pStmt->sql.siInfo.pDataCtx) {
5,850✔
1167
    STMT_ERR_RET(stmtCreateRequest(pStmt));
196!
1168

1169
    STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
194!
1170
                              pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1171
    STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
194✔
1172

1173
    STMT_ERR_RET(stmtGetFromCache(pStmt));
192!
1174

1175
    if (pStmt->bInfo.needParse) {
193✔
1176
      tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
103✔
1177
      pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
103✔
1178

1179
      STMT_ERR_RET(stmtParseSql(pStmt));
103!
1180
    }
1181
  } else {
1182
    tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
5,654✔
1183
    pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
5,654✔
1184
    pStmt->exec.pRequest->requestId++;
5,654✔
1185
    pStmt->bInfo.needParse = false;
5,654✔
1186
  }
1187

1188
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
5,848✔
1189
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
86!
1190
  }
1191

1192
  int64_t startUs2 = taosGetTimestampUs();
5,848✔
1193
  pStmt->stat.setTbNameUs += startUs2 - startUs;
5,848✔
1194

1195
  return TSDB_CODE_SUCCESS;
5,848✔
1196
}
1197

1198
int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags, SVCreateTbReq** pCreateTbReq) {
120✔
1199
  STscStmt2* pStmt = (STscStmt2*)stmt;
120✔
1200

1201
  STMT_DLOG_E("start to set tbTags");
120!
1202

1203
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
120!
1204
    return pStmt->errCode;
×
1205
  }
1206

1207
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
120!
1208

1209
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
120!
1210
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1211
    pStmt->bInfo.needParse = false;
×
1212
  }
1213
  STMT_ERR_RET(stmtCreateRequest(pStmt));
120!
1214

1215
  if (pStmt->bInfo.needParse) {
120!
1216
    STMT_ERR_RET(stmtParseSql(pStmt));
×
1217
  }
1218
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
120!
1219
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1220
  }
1221

1222
  SBoundColInfo* tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
120✔
1223
  // if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) {
1224
  //   tscWarn("no tags or cols bound in sql, will not bound tags");
1225
  //   return TSDB_CODE_SUCCESS;
1226
  // }
1227
  if (pStmt->sql.autoCreateTbl && pStmt->sql.stbInterlaceMode) {
120!
1228
    STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, pStmt->bInfo.tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
38!
1229
                              pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1230
    STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
38!
1231
  }
1232

1233
  STableDataCxt** pDataBlock = NULL;
120✔
1234
  if (pStmt->exec.pCurrBlock) {
120✔
1235
    pDataBlock = &pStmt->exec.pCurrBlock;
95✔
1236
  } else {
1237
    pDataBlock =
1238
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
25✔
1239
    if (NULL == pDataBlock) {
25!
1240
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1241
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1242
    }
1243
    // pStmt->exec.pCurrBlock = *pDataBlock;
1244
    // if (pStmt->sql.stbInterlaceMode) {
1245
    //   taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
1246
    //   (*pDataBlock)->pData->aCol = NULL;
1247
    // }
1248
  }
1249
  if (pStmt->bInfo.inExecCache && !pStmt->sql.autoCreateTbl) {
120!
1250
    return TSDB_CODE_SUCCESS;
×
1251
  }
1252

1253
  tscDebug("start to bind stmt tag values");
120!
1254

1255
  void* boundTags = NULL;
120✔
1256
  if (pStmt->sql.stbInterlaceMode) {
120✔
1257
    boundTags = pStmt->sql.siInfo.boundTags;
38✔
1258
    *pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
38!
1259
    if (NULL == pCreateTbReq) {
38!
1260
      return terrno;
×
1261
    }
1262
    int32_t vgId = -1;
38✔
1263
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
38!
1264
    (*pCreateTbReq)->uid = vgId;
38✔
1265
  } else {
1266
    boundTags = pStmt->bInfo.boundTags;
82✔
1267
  }
1268

1269
  STMT_ERR_RET(qBindStmtTagsValue2(*pDataBlock, boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
120✔
1270
                                   pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
1271
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt, *pCreateTbReq));
1272

1273
  return TSDB_CODE_SUCCESS;
119✔
1274
}
1275

1276
int stmtCheckTags2(TAOS_STMT2* stmt, SVCreateTbReq** pCreateTbReq) {
22✔
1277
  STscStmt2* pStmt = (STscStmt2*)stmt;
22✔
1278

1279
  STMT_DLOG_E("start to set tbTags");
22!
1280

1281
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
22!
1282
    return pStmt->errCode;
×
1283
  }
1284

1285
  if (!pStmt->sql.stbInterlaceMode) {
22!
1286
    return TSDB_CODE_SUCCESS;
×
1287
  }
1288

1289
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
22!
1290

1291
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
22!
1292
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1293
    pStmt->bInfo.needParse = false;
×
1294
  }
1295
  STMT_ERR_RET(stmtCreateRequest(pStmt));
22!
1296

1297
  if (pStmt->bInfo.needParse) {
22!
1298
    STMT_ERR_RET(stmtParseSql(pStmt));
×
1299
    if (!pStmt->sql.autoCreateTbl) {
×
1300
      return TSDB_CODE_SUCCESS;
×
1301
    }
1302
  }
1303

1304
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
22!
1305
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1306
  }
1307

1308
  STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, pStmt->bInfo.tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
22!
1309
                            pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1310
  STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
22!
1311

1312
  STableDataCxt** pDataBlock = NULL;
22✔
1313
  if (pStmt->exec.pCurrBlock) {
22✔
1314
    pDataBlock = &pStmt->exec.pCurrBlock;
9✔
1315
  } else {
1316
    pDataBlock =
1317
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
13✔
1318
    if (NULL == pDataBlock) {
13!
1319
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1320
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1321
    }
1322
  }
1323

1324
  if (!((*pDataBlock)->pData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE)) {
22!
1325
    return TSDB_CODE_SUCCESS;
×
1326
  }
1327

1328
  if (pStmt->sql.fixValueTags) {
22✔
1329
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
9!
1330
    if ((*pCreateTbReq)->name) {
9!
1331
      taosMemoryFree((*pCreateTbReq)->name);
9!
1332
    }
1333
    (*pCreateTbReq)->name = taosStrdup(pStmt->bInfo.tbName);
9!
1334
    int32_t vgId = -1;
9✔
1335
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
9!
1336
    (*pCreateTbReq)->uid = vgId;
9✔
1337
    return TSDB_CODE_SUCCESS;
9✔
1338
  }
1339

1340
  if ((*pDataBlock)->pData->pCreateTbReq) {
13!
1341
    pStmt->sql.fixValueTags = true;
13✔
1342
    STMT_ERR_RET(cloneSVreateTbReq((*pDataBlock)->pData->pCreateTbReq, &pStmt->sql.fixValueTbReq));
13!
1343
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
13!
1344
    (*pCreateTbReq)->uid = (*pDataBlock)->pMeta->vgId;
13✔
1345
  }
1346

1347
  return TSDB_CODE_SUCCESS;
13✔
1348
}
1349

1350
static int stmtFetchColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
×
1351
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
1352
    return pStmt->errCode;
×
1353
  }
1354

1355
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
1356
    tscError("invalid operation to get query column fileds");
×
1357
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1358
  }
1359

1360
  STableDataCxt** pDataBlock = NULL;
×
1361

1362
  if (pStmt->sql.stbInterlaceMode) {
×
1363
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
×
1364
  } else {
1365
    pDataBlock =
1366
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
1367
    if (NULL == pDataBlock) {
×
1368
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1369
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1370
    }
1371
  }
1372

1373
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
×
1374

1375
  return TSDB_CODE_SUCCESS;
×
1376
}
1377

1378
static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_ALL** fields) {
42✔
1379
  int32_t    code = 0;
42✔
1380
  int32_t    preCode = pStmt->errCode;
42✔
1381

1382
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
42!
1383
    return pStmt->errCode;
×
1384
  }
1385

1386
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
42!
1387
    tscError("invalid operation to get query column fileds");
×
1388
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1389
  }
1390

1391
  STableDataCxt** pDataBlock = NULL;
42✔
1392
  bool            cleanStb = false;
42✔
1393

1394
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
42✔
1395
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
7✔
1396
  } else {
1397
    cleanStb = true;
35✔
1398
    pDataBlock =
1399
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
35✔
1400
  }
1401

1402
  if (NULL == pDataBlock || NULL == *pDataBlock) {
42!
1403
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1404
    STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
1405
  }
1406

1407
  STMT_ERRI_JRET(
42!
1408
      qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbNameFlag, fieldNum, fields));
1409

1410
  if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE && cleanStb) {
42!
1411
    qDestroyStmtDataBlock(*pDataBlock);
28✔
1412
    *pDataBlock = NULL;
28✔
1413
    if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
28!
1414
      tscError("get fileds %s remove exec blockHash fail", pStmt->bInfo.tbFName);
×
1415
      STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
1416
    }
1417
    pStmt->sql.autoCreateTbl = false;
28✔
1418
    pStmt->bInfo.tagsCached = false;
28✔
1419
    pStmt->bInfo.sname = (SName){0};
28✔
1420
    stmtCleanBindInfo(pStmt);
28✔
1421
  }
1422

1423
_return:
14✔
1424

1425
  pStmt->errCode = preCode;
42✔
1426

1427
  return code;
42✔
1428
}
1429
/*
1430
SArray* stmtGetFreeCol(STscStmt2* pStmt, int32_t* idx) {
1431
  while (true) {
1432
    if (pStmt->exec.smInfo.pColIdx >= STMT_COL_BUF_SIZE) {
1433
      pStmt->exec.smInfo.pColIdx = 0;
1434
    }
1435

1436
    if ((pStmt->exec.smInfo.pColIdx + 1) == atomic_load_32(&pStmt->exec.smInfo.pColFreeIdx)) {
1437
      taosUsleep(1);
1438
      continue;
1439
    }
1440

1441
    *idx = pStmt->exec.smInfo.pColIdx;
1442
    return pStmt->exec.smInfo.pCols[pStmt->exec.smInfo.pColIdx++];
1443
  }
1444
}
1445
*/
1446
static int32_t stmtAppendTablePostHandle(STscStmt2* pStmt, SStmtQNode* param) {
5,741✔
1447
  if (NULL == pStmt->sql.siInfo.pVgroupHash) {
5,741✔
1448
    pStmt->sql.siInfo.pVgroupHash =
4,411✔
1449
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
4,407✔
1450
  }
1451
  if (NULL == pStmt->sql.siInfo.pVgroupList) {
5,745✔
1452
    pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES);
4,412✔
1453
  }
1454

1455
  if (NULL == pStmt->sql.siInfo.pRequest) {
5,746✔
1456
    STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false,
84!
1457
                              (SRequestObj**)&pStmt->sql.siInfo.pRequest, pStmt->reqid));
1458

1459
    if (pStmt->reqid != 0) {
84!
1460
      pStmt->reqid++;
×
1461
    }
1462
    pStmt->exec.pRequest->syncQuery = true;
84✔
1463

1464
    pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId;
84✔
1465
    pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self;
84✔
1466
  }
1467

1468
  if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] &&
5,746✔
1469
      0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) {
93✔
1470
    pStmt->sql.siInfo.tbFromHash = true;
35✔
1471
  }
1472

1473
  if (0 == pStmt->sql.siInfo.firstName[0]) {
5,746✔
1474
    tstrncpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
73✔
1475
  }
1476

1477
  param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash;
5,746✔
1478
  param->next = NULL;
5,746✔
1479

1480
  (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
5,746✔
1481

1482
  stmtEnqueue(pStmt, param);
5,762✔
1483

1484
  return TSDB_CODE_SUCCESS;
5,759✔
1485
}
1486

1487
static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt2* pStmt, SArray** pTableCols) {
1488
  while (true) {
1489
    if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) {
5,749!
1490
      *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++);
5,749✔
1491
      break;
5,750✔
1492
    } else {
1493
      SArray* pTblCols = NULL;
×
1494
      for (int32_t i = 0; i < 100; i++) {
×
1495
        pTblCols = taosArrayInit(20, POINTER_BYTES);
×
1496
        if (NULL == pTblCols) {
×
1497
          return terrno;
×
1498
        }
1499

1500
        if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
×
1501
          return terrno;
×
1502
        }
1503
      }
1504
    }
1505
  }
1506

1507
  return TSDB_CODE_SUCCESS;
5,750✔
1508
}
1509

1510
static int32_t stmtCacheBlock(STscStmt2* pStmt) {
116✔
1511
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
116✔
1512
    return TSDB_CODE_SUCCESS;
9✔
1513
  }
1514

1515
  uint64_t uid = pStmt->bInfo.tbUid;
107✔
1516
  uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
107!
1517

1518
  if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
107✔
1519
    return TSDB_CODE_SUCCESS;
89✔
1520
  }
1521

1522
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
18✔
1523
  if (!pSrc) {
18!
1524
    return terrno;
×
1525
  }
1526
  STableDataCxt* pDst = NULL;
18✔
1527

1528
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
18!
1529

1530
  SStmtTableCache cache = {
18✔
1531
      .pDataCtx = pDst,
1532
      .boundTags = pStmt->bInfo.boundTags,
18✔
1533
  };
1534

1535
  if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
18!
1536
    return terrno;
×
1537
  }
1538

1539
  if (pStmt->sql.autoCreateTbl) {
18✔
1540
    pStmt->bInfo.tagsCached = true;
15✔
1541
  } else {
1542
    pStmt->bInfo.boundTags = NULL;
3✔
1543
  }
1544

1545
  return TSDB_CODE_SUCCESS;
18✔
1546
}
1547

1548
static int stmtAddBatch2(TAOS_STMT2* stmt) {
4,535✔
1549
  STscStmt2* pStmt = (STscStmt2*)stmt;
4,535✔
1550

1551
  int64_t startUs = taosGetTimestampUs();
4,533✔
1552

1553
  STMT_DLOG_E("start to add batch");
4,533!
1554

1555
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
4,534!
1556
    return pStmt->errCode;
×
1557
  }
1558

1559
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
4,534!
1560

1561
  if (pStmt->sql.stbInterlaceMode) {
4,533✔
1562
    int64_t startUs2 = taosGetTimestampUs();
4,416✔
1563
    pStmt->stat.addBatchUs += startUs2 - startUs;
4,416✔
1564

1565
    pStmt->sql.siInfo.tableColsReady = false;
4,416✔
1566

1567
    SStmtQNode* param = NULL;
4,416✔
1568
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
8,833!
1569
    param->restoreTbCols = true;
4,417✔
1570
    param->next = NULL;
4,417✔
1571

1572
    if (pStmt->sql.autoCreateTbl) {
4,417✔
1573
      pStmt->bInfo.tagsCached = true;
32✔
1574
    }
1575

1576
    stmtEnqueue(pStmt, param);
4,417✔
1577

1578
    return TSDB_CODE_SUCCESS;
4,423✔
1579
  }
1580

1581
  STMT_ERR_RET(stmtCacheBlock(pStmt));
116!
1582

1583
  return TSDB_CODE_SUCCESS;
116✔
1584
}
1585
/*
1586
static int32_t stmtBackupQueryFields(STscStmt2* pStmt) {
1587
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
1588
  pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
1589
  pRes->precision = pStmt->exec.pRequest->body.resInfo.precision;
1590

1591
  int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
1592
  pRes->fields = taosMemoryMalloc(size);
1593
  pRes->userFields = taosMemoryMalloc(size);
1594
  if (NULL == pRes->fields || NULL == pRes->userFields) {
1595
    STMT_ERR_RET(terrno);
1596
  }
1597
  (void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
1598
  (void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
1599

1600
  return TSDB_CODE_SUCCESS;
1601
}
1602

1603
static int32_t stmtRestoreQueryFields(STscStmt2* pStmt) {
1604
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
1605
  int32_t            size = pRes->numOfCols * sizeof(TAOS_FIELD);
1606

1607
  pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols;
1608
  pStmt->exec.pRequest->body.resInfo.precision = pRes->precision;
1609

1610
  if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
1611
    pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size);
1612
    if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
1613
      STMT_ERR_RET(terrno);
1614
    }
1615
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
1616
  }
1617

1618
  if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
1619
    pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size);
1620
    if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
1621
      STMT_ERR_RET(terrno);
1622
    }
1623
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
1624
  }
1625

1626
  return TSDB_CODE_SUCCESS;
1627
}
1628
*/
1629
int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx, SVCreateTbReq* pCreateTbReq) {
5,852✔
1630
  STscStmt2* pStmt = (STscStmt2*)stmt;
5,852✔
1631
  int32_t    code = 0;
5,852✔
1632

1633
  int64_t startUs = taosGetTimestampUs();
5,864✔
1634

1635
  STMT_DLOG("start to bind stmt data, colIdx:%d", colIdx);
5,864!
1636

1637
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
5,865!
1638
    return pStmt->errCode;
×
1639
  }
1640

1641
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
5,865!
1642

1643
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
5,859!
1644
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1645
    pStmt->bInfo.needParse = false;
×
1646
  }
1647

1648
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
5,859✔
1649
    taos_free_result(pStmt->exec.pRequest);
1✔
1650
    pStmt->exec.pRequest = NULL;
1✔
1651
  }
1652

1653
  STMT_ERR_RET(stmtCreateRequest(pStmt));
5,859!
1654
  if (pStmt->bInfo.needParse) {
5,857✔
1655
    code = stmtParseSql(pStmt);
6✔
1656
    if (code != TSDB_CODE_SUCCESS) {
6!
1657
      goto cleanup_root;
×
1658
    }
1659
  }
1660

1661
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
5,857✔
1662
    code = qStmtBindParams2(pStmt->sql.pQuery, bind, colIdx, pStmt->taos->optionInfo.charsetCxt);
5✔
1663
    if (code != TSDB_CODE_SUCCESS) {
5!
1664
      goto cleanup_root;
×
1665
    }
1666
    SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
5✔
1667
                         .acctId = pStmt->taos->acctId,
5✔
1668
                         .db = pStmt->exec.pRequest->pDb,
5✔
1669
                         .topicQuery = false,
1670
                         .pSql = pStmt->sql.sqlStr,
5✔
1671
                         .sqlLen = pStmt->sql.sqlLen,
5✔
1672
                         .pMsg = pStmt->exec.pRequest->msgBuf,
5✔
1673
                         .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1674
                         .pTransporter = pStmt->taos->pAppInfo->pTransporter,
5✔
1675
                         .pStmtCb = NULL,
1676
                         .pUser = pStmt->taos->user};
5✔
1677
    ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
5✔
1678
    code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog);
5✔
1679
    if (code != TSDB_CODE_SUCCESS) {
5!
1680
      goto cleanup_root;
×
1681
    }
1682
    code = qStmtParseQuerySql(&ctx, pStmt->sql.pQuery);
5✔
1683
    if (code != TSDB_CODE_SUCCESS) {
5!
1684
      goto cleanup_root;
×
1685
    }
1686

1687
    if (pStmt->sql.pQuery->haveResultSet) {
5!
1688
      STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
10!
1689
                                    pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true));
1690
      taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
5!
1691
      taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
5!
1692
      setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
5✔
1693
    }
1694

1695
    TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
5✔
1696
    TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
5✔
1697
    TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
5✔
1698

1699
    // if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
1700
    //   STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
1701
    // }
1702

1703
    // STMT_ERR_RET(stmtBackupQueryFields(pStmt));
1704

1705
    return TSDB_CODE_SUCCESS;
5✔
1706

1707
  cleanup_root:
×
1708
    if (pStmt->sql.pQuery->pRoot) {
×
1709
      nodesDestroyNode(pStmt->sql.pQuery->pRoot);
×
1710
      pStmt->sql.pQuery->pRoot = NULL;
×
1711
    }
1712
    STMT_ERR_RET(code);
×
1713
  }
1714

1715
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
5,852!
1716
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1717
  }
1718

1719
  STableDataCxt** pDataBlock = NULL;
5,858✔
1720

1721
  if (pStmt->exec.pCurrBlock) {
5,858✔
1722
    pDataBlock = &pStmt->exec.pCurrBlock;
5,751✔
1723
  } else {
1724
    pDataBlock =
1725
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
107✔
1726
    if (NULL == pDataBlock) {
107!
1727
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1728
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1729
    }
1730
    pStmt->exec.pCurrBlock = *pDataBlock;
107✔
1731
    if (pStmt->sql.stbInterlaceMode) {
107✔
1732
      taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
85✔
1733
      (*pDataBlock)->pData->aCol = NULL;
85✔
1734
    }
1735
    if (colIdx < -1) {
107✔
1736
      pStmt->sql.bindRowFormat = true;
1✔
1737
      taosArrayDestroy((*pDataBlock)->pData->aCol);
1✔
1738
      (*pDataBlock)->pData->aCol = taosArrayInit(20, POINTER_BYTES);
1✔
1739
    }
1740
  }
1741

1742
  int64_t startUs2 = taosGetTimestampUs();
5,855✔
1743
  pStmt->stat.bindDataUs1 += startUs2 - startUs;
5,855✔
1744

1745
  SStmtQNode* param = NULL;
5,855✔
1746
  if (pStmt->sql.stbInterlaceMode) {
5,855✔
1747
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
11,494!
1748
    STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, &param->tblData.aCol));
11,499!
1749
    taosArrayClear(param->tblData.aCol);
5,750✔
1750

1751
    // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES);
1752

1753
    param->restoreTbCols = false;
5,736✔
1754
    param->tblData.isOrdered = true;
5,736✔
1755
    param->tblData.isDuplicateTs = false;
5,736✔
1756
    tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
5,736✔
1757

1758
    param->pCreateTbReq = pCreateTbReq;
5,736✔
1759
  }
1760

1761
  int64_t startUs3 = taosGetTimestampUs();
5,862✔
1762
  pStmt->stat.bindDataUs2 += startUs3 - startUs2;
5,862✔
1763

1764
  SArray* pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol;
5,862✔
1765

1766
  if (colIdx < 0) {
5,862✔
1767
    if (pStmt->sql.stbInterlaceMode) {
5,855✔
1768
      // (*pDataBlock)->pData->flags = 0;
1769
      (*pDataBlock)->pData->flags &= ~SUBMIT_REQ_COLUMN_DATA_FORMAT;
5,745✔
1770
      code = qBindStmtStbColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
5,745✔
1771
                                    pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
5,745✔
1772
                                    pStmt->taos->optionInfo.charsetCxt);
5,745✔
1773
      param->tblData.isOrdered = (*pDataBlock)->ordered;
5,744✔
1774
      param->tblData.isDuplicateTs = (*pDataBlock)->duplicateTs;
5,744✔
1775
    } else {
1776
      if (colIdx == -1) {
111✔
1777
        if (pStmt->sql.bindRowFormat) {
109✔
1778
          tscError("can't mix bind row format and bind column format");
1!
1779
          STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
1!
1780
        }
1781
        code = qBindStmtColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
108✔
1782
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt);
108✔
1783
      } else {
1784
        code = qBindStmt2RowValue(*pDataBlock, (*pDataBlock)->pData->aRowP, bind, pStmt->exec.pRequest->msgBuf,
2✔
1785
                                  pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
2✔
1786
                                  pStmt->taos->optionInfo.charsetCxt);
2✔
1787
      }
1788
    }
1789

1790
    if (code) {
5,854✔
1791
      tscError("qBindStmtColsValue failed, error:%s", tstrerror(code));
1!
1792
      STMT_ERR_RET(code);
1!
1793
    }
1794
  } else {
1795
    if (pStmt->sql.stbInterlaceMode) {
7!
1796
      tscError("bind single column not allowed in stb insert mode");
×
1797
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1798
    }
1799

1800
    if (pStmt->sql.bindRowFormat) {
7!
1801
      tscError("can't mix bind row format and bind column format");
×
1802
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1803
    }
1804

1805
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
7!
1806
      tscError("bind column index not in sequence");
×
1807
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1808
    }
1809

1810
    pStmt->bInfo.sBindLastIdx = colIdx;
7✔
1811

1812
    if (0 == colIdx) {
7✔
1813
      pStmt->bInfo.sBindRowNum = bind->num;
3✔
1814
    }
1815

1816
    code = qBindStmtSingleColValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
7✔
1817
                                    pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum, pStmt->taos->optionInfo.charsetCxt);
7✔
1818
    if (code) {
6!
1819
      tscError("qBindStmtSingleColValue failed, error:%s", tstrerror(code));
×
1820
      STMT_ERR_RET(code);
×
1821
    }
1822
  }
1823

1824
  int64_t startUs4 = taosGetTimestampUs();
5,858✔
1825
  pStmt->stat.bindDataUs3 += startUs4 - startUs3;
5,858✔
1826

1827
  if (pStmt->sql.stbInterlaceMode) {
5,858✔
1828
    STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param));
5,744!
1829
  } else {
1830
    STMT_ERR_RET(stmtAddBatch2(pStmt));
116!
1831
  }
1832

1833
  pStmt->stat.bindDataUs4 += taosGetTimestampUs() - startUs4;
5,871✔
1834

1835
  return TSDB_CODE_SUCCESS;
5,871✔
1836
}
1837
/*
1838
int stmtUpdateTableUid(STscStmt2* pStmt, SSubmitRsp* pRsp) {
1839
  tscDebug("stmt start to update tbUid, blockNum:%d", pRsp->nBlocks);
1840

1841
  int32_t code = 0;
1842
  int32_t finalCode = 0;
1843
  size_t  keyLen = 0;
1844
  void*   pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
1845
  while (pIter) {
1846
    STableDataCxt* pBlock = *(STableDataCxt**)pIter;
1847
    char*          key = taosHashGetKey(pIter, &keyLen);
1848

1849
    STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
1850
    if (pMeta->uid) {
1851
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1852
      continue;
1853
    }
1854

1855
    SSubmitBlkRsp* blkRsp = NULL;
1856
    int32_t        i = 0;
1857
    for (; i < pRsp->nBlocks; ++i) {
1858
      blkRsp = pRsp->pBlocks + i;
1859
      if (strlen(blkRsp->tblFName) != keyLen) {
1860
        continue;
1861
      }
1862

1863
      if (strncmp(blkRsp->tblFName, key, keyLen)) {
1864
        continue;
1865
      }
1866

1867
      break;
1868
    }
1869

1870
    if (i < pRsp->nBlocks) {
1871
      tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid,
1872
               blkRsp->uid);
1873

1874
      pMeta->uid = blkRsp->uid;
1875
      pStmt->bInfo.tbUid = blkRsp->uid;
1876
    } else {
1877
      tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
1878
      if (NULL == pStmt->pCatalog) {
1879
        code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
1880
        if (code) {
1881
          pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1882
          finalCode = code;
1883
          continue;
1884
        }
1885
      }
1886

1887
      code = stmtCreateRequest(pStmt);
1888
      if (code) {
1889
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1890
        finalCode = code;
1891
        continue;
1892
      }
1893

1894
      STableMeta*      pTableMeta = NULL;
1895
      SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
1896
                               .requestId = pStmt->exec.pRequest->requestId,
1897
                               .requestObjRefId = pStmt->exec.pRequest->self,
1898
                               .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
1899
      code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
1900

1901
      pStmt->stat.ctgGetTbMetaNum++;
1902

1903
      taos_free_result(pStmt->exec.pRequest);
1904
      pStmt->exec.pRequest = NULL;
1905

1906
      if (code || NULL == pTableMeta) {
1907
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1908
        finalCode = code;
1909
        taosMemoryFree(pTableMeta);
1910
        continue;
1911
      }
1912

1913
      pMeta->uid = pTableMeta->uid;
1914
      pStmt->bInfo.tbUid = pTableMeta->uid;
1915
      taosMemoryFree(pTableMeta);
1916
    }
1917

1918
    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1919
  }
1920

1921
  return finalCode;
1922
}
1923
*/
1924
/*
1925
int stmtStaticModeExec(TAOS_STMT* stmt) {
1926
  STscStmt2*   pStmt = (STscStmt2*)stmt;
1927
  int32_t     code = 0;
1928
  SSubmitRsp* pRsp = NULL;
1929
  if (pStmt->sql.staticMode) {
1930
    return TSDB_CODE_TSC_STMT_API_ERROR;
1931
  }
1932

1933
  STMT_DLOG_E("start to exec");
1934

1935
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
1936

1937
  STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList,
1938
pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum));
1939

1940
  launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
1941

1942
  if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
1943
    code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
1944
    if (code) {
1945
      pStmt->exec.pRequest->code = code;
1946
    } else {
1947
      tFreeSSubmitRsp(pRsp);
1948
      STMT_ERR_RET(stmtResetStmt(pStmt));
1949
      STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
1950
    }
1951
  }
1952

1953
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
1954

1955
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
1956
  pStmt->affectedRows += pStmt->exec.affectedRows;
1957

1958
_return:
1959

1960
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
1961

1962
  tFreeSSubmitRsp(pRsp);
1963

1964
  ++pStmt->sql.runTimes;
1965

1966
  STMT_RET(code);
1967
}
1968
*/
1969

1970
static int32_t createParseContext(const SRequestObj* pRequest, SParseContext** pCxt, SSqlCallbackWrapper* pWrapper) {
12✔
1971
  const STscObj* pTscObj = pRequest->pTscObj;
12✔
1972

1973
  *pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
12!
1974
  if (*pCxt == NULL) {
12!
1975
    return terrno;
×
1976
  }
1977

1978
  **pCxt = (SParseContext){.requestId = pRequest->requestId,
12✔
1979
                           .requestRid = pRequest->self,
12✔
1980
                           .acctId = pTscObj->acctId,
12✔
1981
                           .db = pRequest->pDb,
12✔
1982
                           .topicQuery = false,
1983
                           .pSql = pRequest->sqlstr,
12✔
1984
                           .sqlLen = pRequest->sqlLen,
12✔
1985
                           .pMsg = pRequest->msgBuf,
12✔
1986
                           .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1987
                           .pTransporter = pTscObj->pAppInfo->pTransporter,
12✔
1988
                           .pStmtCb = NULL,
1989
                           .pUser = pTscObj->user,
12✔
1990
                           .pEffectiveUser = pRequest->effectiveUser,
12✔
1991
                           .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
12✔
1992
                           .enableSysInfo = pTscObj->sysInfo,
12✔
1993
                           .async = true,
1994
                           .svrVer = pTscObj->sVer,
12✔
1995
                           .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
12✔
1996
                           .allocatorId = pRequest->allocatorRefId,
12✔
1997
                           .parseSqlFp = clientParseSql,
1998
                           .parseSqlParam = pWrapper};
1999
  int8_t biMode = atomic_load_8(&((STscObj*)pTscObj)->biMode);
12✔
2000
  (*pCxt)->biMode = biMode;
12✔
2001
  return TSDB_CODE_SUCCESS;
12✔
2002
}
2003

2004
static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
12✔
2005
  STscStmt2*        pStmt = userdata;
12✔
2006
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
12✔
2007

2008
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
12✔
2009
  pStmt->affectedRows += pStmt->exec.affectedRows;
12✔
2010

2011
  fp(pStmt->options.userdata, res, code);
12✔
2012

2013
  while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
12!
2014
    taosUsleep(1);
×
2015
  }
2016
  (void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
12✔
2017
  ++pStmt->sql.runTimes;
12✔
2018

2019
  if (tsem_post(&pStmt->asyncExecSem) != 0) {
12!
2020
    tscError("failed to post asyncExecSem");
×
2021
  }
2022
}
12✔
2023

2024
int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
4,473✔
2025
  STscStmt2* pStmt = (STscStmt2*)stmt;
4,473✔
2026
  int32_t    code = 0;
4,473✔
2027
  int64_t    startUs = taosGetTimestampUs();
4,474✔
2028

2029
  STMT_DLOG_E("start to exec");
4,474!
2030

2031
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
4,475!
2032
    return pStmt->errCode;
×
2033
  }
2034

2035
  TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
4,475!
2036
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
4,475!
2037
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
2038
  }
2039
  TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
4,473!
2040

2041
  if (pStmt->sql.stbInterlaceMode) {
4,473✔
2042
    STMT_ERR_RET(stmtAddBatch2(pStmt));
4,420!
2043
  }
2044

2045
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
4,476✔
2046

2047
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
4,474✔
2048
    if (pStmt->sql.stbInterlaceMode) {
4,468✔
2049
      int64_t startTs = taosGetTimestampUs();
4,422✔
2050
      while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
9,111✔
2051
        taosUsleep(1);
4,691✔
2052
      }
2053
      pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs;
4,421✔
2054

2055
      STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList));
4,421!
2056
      taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
4,420✔
2057
      pStmt->sql.siInfo.pVgroupHash = NULL;
4,421✔
2058
      pStmt->sql.siInfo.pVgroupList = NULL;
4,421✔
2059
    } else {
2060
      tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
47✔
2061
      taosMemoryFreeClear(pStmt->exec.pCurrTbData);
47!
2062

2063
      STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
47!
2064

2065
      STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
47!
2066
    }
2067
  }
2068

2069
  SRequestObj*      pRequest = pStmt->exec.pRequest;
4,475✔
2070
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
4,475✔
2071

2072
  if (!fp) {
4,475✔
2073
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
4,463✔
2074

2075
    if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
4,459!
2076
      code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
×
2077
      if (code) {
×
2078
        pStmt->exec.pRequest->code = code;
×
2079
      } else {
2080
        STMT_ERR_RET(stmtResetStmt(pStmt));
×
2081
        STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
×
2082
      }
2083
    }
2084

2085
    STMT_ERR_JRET(pStmt->exec.pRequest->code);
4,460!
2086

2087
    pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
4,460✔
2088
    if (affected_rows) {
4,456✔
2089
      *affected_rows = pStmt->exec.affectedRows;
4,447✔
2090
    }
2091
    pStmt->affectedRows += pStmt->exec.affectedRows;
4,456✔
2092

2093
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
4,456!
2094
      taosUsleep(1);
×
2095
    }
2096

2097
    STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
4,452✔
2098

2099
    ++pStmt->sql.runTimes;
4,449✔
2100
  } else {
2101
    SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
12!
2102
    if (pWrapper == NULL) {
12!
2103
      code = terrno;
×
2104
    } else {
2105
      pWrapper->pRequest = pRequest;
12✔
2106
      pRequest->pWrapper = pWrapper;
12✔
2107
    }
2108
    if (TSDB_CODE_SUCCESS == code) {
12!
2109
      code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper);
12✔
2110
    }
2111
    pRequest->syncQuery = false;
12✔
2112
    pRequest->body.queryFp = asyncQueryCb;
12✔
2113
    ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
12✔
2114

2115
    pStmt->execSemWaited = false;
12✔
2116
    launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
12✔
2117
  }
2118

2119
_return:
4,461✔
2120
  pStmt->stat.execUseUs += taosGetTimestampUs() - startUs;
4,464✔
2121

2122
  STMT_RET(code);
4,464!
2123
}
2124

2125
int stmtClose2(TAOS_STMT2* stmt) {
146✔
2126
  STscStmt2* pStmt = (STscStmt2*)stmt;
146✔
2127

2128
  STMT_DLOG_E("start to free stmt");
146!
2129

2130
  if (pStmt->bindThreadInUse) {
146✔
2131
    pStmt->queue.stopQueue = true;
82✔
2132

2133
    (void)taosThreadMutexLock(&pStmt->queue.mutex);
82✔
2134
    (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
82✔
2135
    (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
82✔
2136
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
82✔
2137

2138
    (void)taosThreadJoin(pStmt->bindThread, NULL);
82✔
2139
    pStmt->bindThreadInUse = false;
82✔
2140

2141
    (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
82✔
2142
    (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
82✔
2143
  }
2144

2145
  TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
146!
2146
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
146!
2147
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
2148
  }
2149
  TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
146!
2150

2151
  (void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond);
146✔
2152
  (void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex);
146✔
2153

2154
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
146!
2155
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
5!
2156
      tscError("failed to wait asyncExecSem");
×
2157
    }
2158
  }
2159

2160
  STMT_DLOG("stmt %p closed, stbInterlaceMode:%d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
146!
2161
            ", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
2162
            ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
2163
            ", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64
2164
            ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64,
2165
            pStmt, pStmt->sql.stbInterlaceMode, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo,
2166
            pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum, pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND],
2167
            pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE], pStmt->stat.setTbNameUs,
2168
            pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
2169
            pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
2170

2171
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
146!
2172

2173
  if (pStmt->options.asyncExecFn) {
146✔
2174
    if (tsem_destroy(&pStmt->asyncExecSem) != 0) {
5!
2175
      tscError("failed to destroy asyncExecSem");
×
2176
    }
2177
  }
2178
  taosMemoryFree(stmt);
146!
2179

2180
  return TSDB_CODE_SUCCESS;
146✔
2181
}
2182

2183
const char* stmtErrstr2(TAOS_STMT2* stmt) {
3✔
2184
  STscStmt2* pStmt = (STscStmt2*)stmt;
3✔
2185

2186
  if (stmt == NULL || NULL == pStmt->exec.pRequest) {
3!
2187
    return (char*)tstrerror(terrno);
3✔
2188
  }
2189

2190
  pStmt->exec.pRequest->code = terrno;
×
2191

2192
  return taos_errstr(pStmt->exec.pRequest);
×
2193
}
2194
/*
2195
int stmtAffectedRows(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->affectedRows; }
2196

2197
int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->exec.affectedRows; }
2198
*/
2199

2200
int stmtParseColFields2(TAOS_STMT2* stmt) {
57✔
2201
  int32_t    code = 0;
57✔
2202
  STscStmt2* pStmt = (STscStmt2*)stmt;
57✔
2203
  int32_t    preCode = pStmt->errCode;
57✔
2204

2205
  STMT_DLOG_E("start to get col fields");
57!
2206

2207
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
57!
2208
    return pStmt->errCode;
×
2209
  }
2210

2211
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
57!
2212
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2213
  }
2214

2215
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
57!
2216

2217
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
57!
2218
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
3!
2219
    pStmt->bInfo.needParse = false;
×
2220
  }
2221
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
57✔
2222
    pStmt->bInfo.needParse = false;
7✔
2223
  }
2224
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
57!
2225
    taos_free_result(pStmt->exec.pRequest);
×
2226
    pStmt->exec.pRequest = NULL;
×
2227
    STMT_ERRI_JRET(stmtCreateRequest(pStmt));
×
2228
  }
2229

2230
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
57!
2231

2232
  if (pStmt->bInfo.needParse) {
57✔
2233
    STMT_ERRI_JRET(stmtParseSql(pStmt));
50✔
2234
  }
2235

2236
_return:
42✔
2237
  // compatible with previous versions
2238
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST && (pStmt->bInfo.tbNameFlag & NO_DATA_USING_CLAUSE) == 0x0) {
57!
2239
    code = TSDB_CODE_TSC_STMT_TBNAME_ERROR;
1✔
2240
  }
2241

2242
  pStmt->errCode = preCode;
57✔
2243

2244
  return code;
57✔
2245
}
2246

2247
int stmtGetStbColFields2(TAOS_STMT2* stmt, int* nums, TAOS_FIELD_ALL** fields) {
57✔
2248
  int32_t code = stmtParseColFields2(stmt);
57✔
2249
  if (code != TSDB_CODE_SUCCESS) {
57✔
2250
    return code;
15✔
2251
  }
2252

2253
  return stmtFetchStbColFields2(stmt, nums, fields);
42✔
2254
}
2255

2256
int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) {
11✔
2257
  int32_t    code = 0;
11✔
2258
  STscStmt2* pStmt = (STscStmt2*)stmt;
11✔
2259
  int32_t    preCode = pStmt->errCode;
11✔
2260

2261
  STMT_DLOG_E("start to get param num");
11!
2262

2263
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
11!
2264
    return pStmt->errCode;
×
2265
  }
2266

2267
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
11!
2268

2269
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
11!
2270
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
2271
    pStmt->bInfo.needParse = false;
×
2272
  }
2273

2274
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
11!
2275
    taos_free_result(pStmt->exec.pRequest);
×
2276
    pStmt->exec.pRequest = NULL;
×
2277
  }
2278

2279
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
11!
2280

2281
  if (pStmt->bInfo.needParse) {
11!
2282
    STMT_ERRI_JRET(stmtParseSql(pStmt));
11✔
2283
  }
2284

2285
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
4!
2286
    *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
4✔
2287
  } else {
2288
    STMT_ERRI_JRET(stmtFetchColFields2(stmt, nums, NULL));
×
2289
  }
2290

2291
_return:
×
2292

2293
  pStmt->errCode = preCode;
11✔
2294

2295
  return code;
11✔
2296
}
2297

2298
TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) {
5✔
2299
  STscStmt2* pStmt = (STscStmt2*)stmt;
5✔
2300

2301
  STMT_DLOG_E("start to use result");
5!
2302

2303
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
5!
2304
    tscError("useResult only for query statement");
×
2305
    return NULL;
×
2306
  }
2307

2308
  return pStmt->exec.pRequest;
5✔
2309
}
2310

2311
int32_t stmtAsyncBindThreadFunc(void* args) {
×
2312
  qInfo("async stmt bind thread started");
×
2313

2314
  ThreadArgs* targs = (ThreadArgs*)args;
×
2315
  STscStmt2*  pStmt = (STscStmt2*)targs->stmt;
×
2316

2317
  int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx);
×
2318
  targs->fp(targs->param, NULL, code);
×
2319
  (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
×
2320
  (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
×
2321
  (void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
×
2322
  (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
×
2323
  taosMemoryFree(args);
×
2324

2325
  qInfo("async stmt bind thread stopped");
×
2326

2327
  return code;
×
2328
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc