• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3616

19 Feb 2025 11:22AM UTC coverage: 63.315% (+0.4%) from 62.953%
#3616

push

travis-ci

web-flow
Merge pull request #29823 from taosdata/feat/TS-5928

fix:[TS-5928]add consumer parameters

148228 of 300186 branches covered (49.38%)

Branch coverage included in aggregate %.

232358 of 300909 relevant lines covered (77.22%)

17990742.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.09
/source/client/src/clientStmt2.c
1
#include "clientInt.h"
2
#include "clientLog.h"
3
#include "tdef.h"
4

5
#include "clientStmt.h"
6
#include "clientStmt2.h"
7
/*
8
char* gStmtStatusStr[] = {"unknown",     "init", "prepare", "settbname", "settags",
9
                          "fetchFields", "bind", "bindCol", "addBatch",  "exec"};
10
*/
11
static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pBuf) {
12
  if (pTblBuf->buffOffset < pTblBuf->buffSize) {
87✔
13
    *pBuf = (char*)pTblBuf->pCurBuff + pTblBuf->buffOffset;
87✔
14
    pTblBuf->buffOffset += pTblBuf->buffUnit;
87✔
15
  } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) {
×
16
    pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++);
×
17
    if (NULL == pTblBuf->pCurBuff) {
×
18
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
19
    }
20
    *pBuf = pTblBuf->pCurBuff;
×
21
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
22
  } else {
23
    void* buff = taosMemoryMalloc(pTblBuf->buffSize);
×
24
    if (NULL == buff) {
×
25
      return terrno;
×
26
    }
27

28
    if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
×
29
      return terrno;
×
30
    }
31

32
    pTblBuf->buffIdx++;
×
33
    pTblBuf->pCurBuff = buff;
×
34
    *pBuf = buff;
×
35
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
36
  }
37

38
  return TSDB_CODE_SUCCESS;
87✔
39
}
40

41
static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
83✔
42
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
83✔
43
  while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
109✔
44
    (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
47✔
45
    if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) {
47✔
46
      (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
21✔
47
      return false;
21✔
48
    }
49
  }
50
  SStmtQNode* orig = pStmt->queue.head;
62✔
51
  SStmtQNode* node = pStmt->queue.head->next;
62✔
52
  pStmt->queue.head = pStmt->queue.head->next;
62✔
53
  *param = node;
62✔
54

55
  (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
62✔
56
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
62✔
57

58
  return true;
62✔
59
}
60

61
static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
64✔
62
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
64✔
63

64
  pStmt->queue.tail->next = param;
64✔
65
  pStmt->queue.tail = param;
64✔
66
  pStmt->stat.bindDataNum++;
64✔
67
  (void)atomic_add_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
64✔
68
  (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
64✔
69

70
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
64✔
71
}
64✔
72

73
static int32_t stmtCreateRequest(STscStmt2* pStmt) {
543✔
74
  int32_t code = 0;
543✔
75

76
  if (pStmt->exec.pRequest == NULL) {
543✔
77
    code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
120✔
78
                        pStmt->reqid);
79
    if (pStmt->reqid != 0) {
120!
80
      pStmt->reqid++;
×
81
    }
82
    if (pStmt->db != NULL) {
120✔
83
      taosMemoryFreeClear(pStmt->exec.pRequest->pDb); 
58!
84
      pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
58!
85
    }
86
    if (TSDB_CODE_SUCCESS == code) {
120!
87
      pStmt->exec.pRequest->syncQuery = true;
120✔
88
      pStmt->exec.pRequest->isStmtBind = true;
120✔
89
    }
90
  }
91

92
  return code;
543✔
93
}
94

95
static int32_t stmtSwitchStatus(STscStmt2* pStmt, STMT_STATUS newStatus) {
767✔
96
  int32_t code = 0;
767✔
97

98
  if (newStatus >= STMT_INIT && newStatus < STMT_MAX) {
767!
99
    STMT_LOG_SEQ(newStatus);
767!
100
  }
101

102
  if (pStmt->errCode && newStatus != STMT_PREPARE) {
767!
103
    STMT_DLOG("stmt already failed with err: %s", tstrerror(pStmt->errCode));
×
104
    return pStmt->errCode;
×
105
  }
106

107
  switch (newStatus) {
767!
108
    case STMT_PREPARE:
75✔
109
      pStmt->errCode = 0;
75✔
110
      break;
75✔
111
    case STMT_SETTBNAME:
156✔
112
      if (STMT_STATUS_EQ(INIT)) {
156!
113
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
114
      }
115
      if (!pStmt->sql.stbInterlaceMode && (STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL))) {
156!
116
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
117
      }
118
      break;
156✔
119
    case STMT_SETTAGS:
98✔
120
      if (STMT_STATUS_EQ(INIT)) {
98!
121
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
122
      }
123
      break;
98✔
124
    case STMT_FETCH_FIELDS:
43✔
125
      if (STMT_STATUS_EQ(INIT)) {
43!
126
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
127
      }
128
      break;
43✔
129
    case STMT_BIND:
167✔
130
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
167!
131
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
132
      }
133
      /*
134
            if ((pStmt->sql.type == STMT_TYPE_MULTI_INSERT) && ()) {
135
              code = TSDB_CODE_TSC_STMT_API_ERROR;
136
            }
137
      */
138
      break;
167✔
139
    case STMT_BIND_COL:
×
140
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
×
141
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
142
      }
143
      break;
×
144
    case STMT_ADD_BATCH:
141✔
145
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
141!
146
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
147
      }
148
      break;
141✔
149
    case STMT_EXECUTE:
87✔
150
      if (STMT_TYPE_QUERY == pStmt->sql.type) {
87✔
151
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
3!
152
            STMT_STATUS_NE(BIND_COL)) {
×
153
          code = TSDB_CODE_TSC_STMT_API_ERROR;
×
154
        }
155
      } else {
156
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
84!
157
          code = TSDB_CODE_TSC_STMT_API_ERROR;
1✔
158
        }
159
      }
160
      break;
87✔
161
    default:
×
162
      code = TSDB_CODE_APP_ERROR;
×
163
      break;
×
164
  }
165

166
  STMT_ERR_RET(code);
767✔
167

168
  pStmt->sql.status = newStatus;
766✔
169

170
  return TSDB_CODE_SUCCESS;
766✔
171
}
172

173
static int32_t stmtGetTbName(TAOS_STMT2* stmt, char** tbName) {
43✔
174
  STscStmt2* pStmt = (STscStmt2*)stmt;
43✔
175

176
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
43✔
177

178
  if ('\0' == pStmt->bInfo.tbName[0]) {
43✔
179
    tscError("no table name set");
17!
180
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
17!
181
  }
182

183
  *tbName = pStmt->bInfo.tbName;
26✔
184

185
  return TSDB_CODE_SUCCESS;
26✔
186
}
187

188
static int32_t stmtUpdateBindInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SName* tbName,
50✔
189
                                  const char* sTableName, bool autoCreateTbl, bool preCtbname) {
190
  STscStmt2* pStmt = (STscStmt2*)stmt;
50✔
191
  char       tbFName[TSDB_TABLE_FNAME_LEN];
192
  int32_t    code = tNameExtractFullName(tbName, tbFName);
50✔
193
  if (code != 0) {
50!
194
    return code;
×
195
  }
196

197
  (void)memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
50✔
198
  tstrncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName));
50✔
199
  pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
50✔
200

201
  pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
50✔
202
  pStmt->bInfo.tbSuid = pTableMeta->suid;
50✔
203
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
50✔
204
  pStmt->bInfo.tbType = pTableMeta->tableType;
50✔
205

206
  if (!pStmt->bInfo.tagsCached) {
50✔
207
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
49✔
208
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
47!
209
  }
210

211
  pStmt->bInfo.boundTags = tags;
47✔
212
  pStmt->bInfo.tagsCached = false;
47✔
213
  pStmt->bInfo.preCtbname = preCtbname;
47✔
214
  tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
47✔
215

216
  return TSDB_CODE_SUCCESS;
47✔
217
}
218

219
static int32_t stmtUpdateExecInfo(TAOS_STMT2* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
47✔
220
  STscStmt2* pStmt = (STscStmt2*)stmt;
47✔
221

222
  pStmt->sql.pVgHash = pVgHash;
47✔
223
  pStmt->exec.pBlockHash = pBlockHash;
47✔
224

225
  return TSDB_CODE_SUCCESS;
47✔
226
}
227

228
static int32_t stmtUpdateInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, bool autoCreateTbl,
50✔
229
                              SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName, bool preCtbname) {
230
  STscStmt2* pStmt = (STscStmt2*)stmt;
50✔
231

232
  STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbName, sTableName, autoCreateTbl, preCtbname));
50!
233
  STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
47!
234

235
  pStmt->sql.autoCreateTbl = autoCreateTbl;
47✔
236
  if (pStmt->sql.autoCreateTbl) {
47✔
237
    pStmt->sql.stbInterlaceMode = false;
29✔
238
  }
239

240
  return TSDB_CODE_SUCCESS;
47✔
241
}
242

243
static int32_t stmtGetExecInfo(TAOS_STMT2* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
×
244
  STscStmt2* pStmt = (STscStmt2*)stmt;
×
245

246
  *pVgHash = pStmt->sql.pVgHash;
×
247
  pStmt->sql.pVgHash = NULL;
×
248

249
  *pBlockHash = pStmt->exec.pBlockHash;
×
250
  pStmt->exec.pBlockHash = NULL;
×
251

252
  return TSDB_CODE_SUCCESS;
×
253
}
254

255
static int32_t stmtParseSql(STscStmt2* pStmt) {
73✔
256
  pStmt->exec.pCurrBlock = NULL;
73✔
257

258
  SStmtCallback stmtCb = {
73✔
259
      .pStmt = pStmt,
260
      .getTbNameFn = stmtGetTbName,
261
      .setInfoFn = stmtUpdateInfo,
262
      .getExecInfoFn = stmtGetExecInfo,
263
  };
264

265
  STMT_ERR_RET(stmtCreateRequest(pStmt));
73!
266

267
  pStmt->stat.parseSqlNum++;
73✔
268
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
73✔
269
  pStmt->sql.siInfo.pQuery = pStmt->sql.pQuery;
54✔
270

271
  pStmt->bInfo.needParse = false;
54✔
272

273
  if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) {
54✔
274
    pStmt->sql.type = STMT_TYPE_INSERT;
9✔
275
    pStmt->sql.stbInterlaceMode = false;
9✔
276
  } else if (pStmt->sql.pQuery->pPrepareRoot) {
45✔
277
    pStmt->sql.type = STMT_TYPE_QUERY;
4✔
278
    pStmt->sql.stbInterlaceMode = false;
4✔
279

280
    return TSDB_CODE_SUCCESS;
4✔
281
  }
282

283
  STableDataCxt** pSrc =
284
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
50✔
285
  if (NULL == pSrc || NULL == *pSrc) {
49!
286
    return terrno;
×
287
  }
288

289
  STableDataCxt* pTableCtx = *pSrc;
49✔
290
  if (pStmt->sql.stbInterlaceMode) {
49✔
291
    int16_t lastIdx = -1;
8✔
292

293
    for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) {
34✔
294
      if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) {
26!
295
        pStmt->sql.stbInterlaceMode = false;
×
296
        break;
×
297
      }
298

299
      lastIdx = pTableCtx->boundColsInfo.pColIndex[i];
26✔
300
    }
301
  }
302

303
  if (NULL == pStmt->sql.pBindInfo) {
49!
304
    pStmt->sql.pBindInfo = taosMemoryMalloc(pTableCtx->boundColsInfo.numOfBound * sizeof(*pStmt->sql.pBindInfo));
49!
305
    if (NULL == pStmt->sql.pBindInfo) {
50!
306
      return terrno;
×
307
    }
308
  }
309

310
  return TSDB_CODE_SUCCESS;
50✔
311
}
312

313
static int32_t stmtCleanBindInfo(STscStmt2* pStmt) {
175✔
314
  pStmt->bInfo.tbUid = 0;
175✔
315
  pStmt->bInfo.tbSuid = 0;
175✔
316
  pStmt->bInfo.tbVgId = -1;
175✔
317
  pStmt->bInfo.tbType = 0;
175✔
318
  pStmt->bInfo.needParse = true;
175✔
319
  pStmt->bInfo.inExecCache = false;
175✔
320

321
  pStmt->bInfo.tbName[0] = 0;
175✔
322
  pStmt->bInfo.tbFName[0] = 0;
175✔
323
  if (!pStmt->bInfo.tagsCached) {
175✔
324
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
141✔
325
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
141!
326
  }
327
  pStmt->bInfo.stbFName[0] = 0;
175✔
328

329
  return TSDB_CODE_SUCCESS;
175✔
330
}
331

332
static void stmtFreeTableBlkList(STableColsData* pTb) {
×
333
  (void)qResetStmtColumns(pTb->aCol, true);
×
334
  taosArrayDestroy(pTb->aCol);
×
335
}
×
336

337
static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
21✔
338
  pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0);
21✔
339
  if (NULL == pTblBuf->pCurBuff) {
21!
340
    tscError("QInfo:%p, failed to get buffer from list", pTblBuf);
×
341
    return;
×
342
  }
343
  pTblBuf->buffIdx = 1;
21✔
344
  pTblBuf->buffOffset = sizeof(*pQueue->head);
21✔
345

346
  (void)taosThreadMutexLock(&pQueue->mutex);
21✔
347
  pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
21✔
348
  pQueue->qRemainNum = 0;
21✔
349
  pQueue->head->next = NULL;
21✔
350
  (void)taosThreadMutexUnlock(&pQueue->mutex);
21✔
351
}
352

353
static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
163✔
354
  if (pStmt->sql.stbInterlaceMode) {
163✔
355
    if (deepClean) {
32✔
356
      taosHashCleanup(pStmt->exec.pBlockHash);
11✔
357
      pStmt->exec.pBlockHash = NULL;
11✔
358

359
      if (NULL != pStmt->exec.pCurrBlock) {
11✔
360
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->pData);
9!
361
        qDestroyStmtDataBlock(pStmt->exec.pCurrBlock);
9✔
362
      }
363
    } else {
364
      pStmt->sql.siInfo.pTableColsIdx = 0;
21✔
365
      stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue);
21✔
366
    }
367
    if (NULL != pStmt->exec.pRequest) {
32✔
368
      pStmt->exec.pRequest->body.resInfo.numOfRows = 0;
31✔
369
    }
370
  } else {
371
    if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
131✔
372
      // if (!pStmt->options.asyncExecFn) {
373
      taos_free_result(pStmt->exec.pRequest);
128✔
374
      pStmt->exec.pRequest = NULL;
128✔
375
      //}
376
    }
377

378
    size_t keyLen = 0;
131✔
379
    void*  pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
131✔
380
    while (pIter) {
273✔
381
      STableDataCxt* pBlocks = *(STableDataCxt**)pIter;
142✔
382
      char*          key = taosHashGetKey(pIter, &keyLen);
142✔
383
      STableMeta*    pMeta = qGetTableMetaInDataBlock(pBlocks);
142✔
384

385
      if (keepTable && pBlocks == pStmt->exec.pCurrBlock) {
142✔
386
        TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData);
62✔
387
        STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
127!
388

389
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
62✔
390
        continue;
62✔
391
      }
392

393
      qDestroyStmtDataBlock(pBlocks);
80✔
394
      STMT_ERR_RET(taosHashRemove(pStmt->exec.pBlockHash, key, keyLen));
80!
395

396
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
80✔
397
    }
398

399
    if (keepTable) {
131✔
400
      return TSDB_CODE_SUCCESS;
65✔
401
    }
402

403
    taosHashCleanup(pStmt->exec.pBlockHash);
66✔
404
    pStmt->exec.pBlockHash = NULL;
66✔
405

406
    tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
66✔
407
    taosMemoryFreeClear(pStmt->exec.pCurrTbData);
66!
408
  }
409

410
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
98!
411

412
  return TSDB_CODE_SUCCESS;
98✔
413
}
414

415
static void stmtFreeTbBuf(void* buf) {
23✔
416
  void* pBuf = *(void**)buf;
23✔
417
  taosMemoryFree(pBuf);
23!
418
}
23✔
419

420
static void stmtFreeTbCols(void* buf) {
9,000✔
421
  SArray* pCols = *(SArray**)buf;
9,000✔
422
  taosArrayDestroy(pCols);
9,000✔
423
}
9,000✔
424

425
static int32_t stmtCleanSQLInfo(STscStmt2* pStmt) {
77✔
426
  STMT_DLOG_E("start to free SQL info");
77!
427

428
  taosMemoryFreeClear(pStmt->db);
77!
429
  taosMemoryFree(pStmt->sql.pBindInfo);
77!
430
  taosMemoryFree(pStmt->sql.queryRes.fields);
77!
431
  taosMemoryFree(pStmt->sql.queryRes.userFields);
77!
432
  taosMemoryFree(pStmt->sql.sqlStr);
77!
433
  qDestroyQuery(pStmt->sql.pQuery);
77✔
434
  taosArrayDestroy(pStmt->sql.nodeList);
77✔
435
  taosHashCleanup(pStmt->sql.pVgHash);
77✔
436
  pStmt->sql.pVgHash = NULL;
77✔
437

438
  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
77✔
439
  while (pIter) {
94✔
440
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
17✔
441

442
    qDestroyStmtDataBlock(pCache->pDataCtx);
17✔
443
    qDestroyBoundColInfo(pCache->boundTags);
17✔
444
    taosMemoryFreeClear(pCache->boundTags);
17!
445

446
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
17✔
447
  }
448
  taosHashCleanup(pStmt->sql.pTableCache);
77✔
449
  pStmt->sql.pTableCache = NULL;
77✔
450

451
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
77!
452
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
77!
453

454
  taos_free_result(pStmt->sql.siInfo.pRequest);
77✔
455
  taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
77✔
456
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
77✔
457
  taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
77✔
458
  taosMemoryFree(pStmt->sql.siInfo.pTSchema);
77!
459
  qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
77✔
460
  taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
77✔
461

462
  (void)memset(&pStmt->sql, 0, sizeof(pStmt->sql));
77✔
463
  pStmt->sql.siInfo.tableColsReady = true;
77✔
464

465
  STMT_DLOG_E("end to free SQL info");
77!
466

467
  return TSDB_CODE_SUCCESS;
77✔
468
}
469

470
static int32_t stmtTryAddTableVgroupInfo(STscStmt2* pStmt, int32_t* vgId) {
54✔
471
  if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) {
54✔
472
    return TSDB_CODE_SUCCESS;
8✔
473
  }
474

475
  SVgroupInfo      vgInfo = {0};
46✔
476
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
46✔
477
                           .requestId = pStmt->exec.pRequest->requestId,
46✔
478
                           .requestObjRefId = pStmt->exec.pRequest->self,
46✔
479
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
46✔
480

481
  int32_t code = catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo);
46✔
482
  if (TSDB_CODE_SUCCESS != code) {
46!
483
    return code;
×
484
  }
485

486
  code =
487
      taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
46✔
488
  if (TSDB_CODE_SUCCESS != code) {
46!
489
    return code;
×
490
  }
491

492
  *vgId = vgInfo.vgId;
46✔
493

494
  return TSDB_CODE_SUCCESS;
46✔
495
}
496

497
static int32_t stmtRebuildDataBlock(STscStmt2* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
54✔
498
                                    uint64_t suid, int32_t vgId) {
499
  STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
54!
500
  STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgId, pStmt->sql.autoCreateTbl));
54!
501

502
  STMT_DLOG("tableDataCxt rebuilt, uid:%" PRId64 ", vgId:%d", uid, vgId);
54!
503

504
  return TSDB_CODE_SUCCESS;
54✔
505
}
506

507
static int32_t stmtGetFromCache(STscStmt2* pStmt) {
122✔
508
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) {
122!
509
    pStmt->bInfo.needParse = false;
×
510
    pStmt->bInfo.inExecCache = false;
×
511
    return TSDB_CODE_SUCCESS;
×
512
  }
513

514
  pStmt->bInfo.needParse = true;
122✔
515
  pStmt->bInfo.inExecCache = false;
122✔
516

517
  STableDataCxt** pCxtInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
122✔
518
  if (pCxtInExec) {
121✔
519
    pStmt->bInfo.needParse = false;
42✔
520
    pStmt->bInfo.inExecCache = true;
42✔
521

522
    pStmt->exec.pCurrBlock = *pCxtInExec;
42✔
523

524
    if (pStmt->sql.autoCreateTbl) {
42✔
525
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
38!
526
      return TSDB_CODE_SUCCESS;
38✔
527
    }
528
  }
529

530
  if (NULL == pStmt->pCatalog) {
83✔
531
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
24!
532
    pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
25✔
533
  }
534

535
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
84!
536
    if (pStmt->bInfo.inExecCache) {
26!
537
      pStmt->bInfo.needParse = false;
×
538
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
×
539
      return TSDB_CODE_SUCCESS;
×
540
    }
541

542
    tscDebug("no stmt block cache for tb %s", pStmt->bInfo.tbFName);
26!
543
    return TSDB_CODE_SUCCESS;
26✔
544
  }
545

546
  if (pStmt->sql.autoCreateTbl) {
58✔
547
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
45✔
548
    if (pCache) {
45!
549
      pStmt->bInfo.needParse = false;
45✔
550
      pStmt->bInfo.tbUid = 0;
45✔
551

552
      STableDataCxt* pNewBlock = NULL;
45✔
553
      STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid, -1));
45!
554

555
      if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
45!
556
                      POINTER_BYTES)) {
557
        STMT_ERR_RET(terrno);
×
558
      }
559

560
      pStmt->exec.pCurrBlock = pNewBlock;
45✔
561

562
      tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
45!
563

564
      return TSDB_CODE_SUCCESS;
45✔
565
    }
566

567
    STMT_RET(stmtCleanBindInfo(pStmt));
×
568
  }
569

570
  uint64_t uid, suid;
571
  int32_t  vgId;
572
  int8_t   tableType;
573

574
  STableMeta*      pTableMeta = NULL;
13✔
575
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
13✔
576
                           .requestId = pStmt->exec.pRequest->requestId,
13✔
577
                           .requestObjRefId = pStmt->exec.pRequest->self,
13✔
578
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
13✔
579
  int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
13✔
580

581
  pStmt->stat.ctgGetTbMetaNum++;
13✔
582

583
  if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
13!
584
    tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
×
585
    STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
586

587
    STMT_ERR_RET(code);
×
588
  }
589

590
  STMT_ERR_RET(code);
13!
591

592
  uid = pTableMeta->uid;
13✔
593
  suid = pTableMeta->suid;
13✔
594
  tableType = pTableMeta->tableType;
13✔
595
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
13✔
596
  vgId = pTableMeta->vgId;
13✔
597

598
  taosMemoryFree(pTableMeta);
13!
599

600
  uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
13!
601

602
  if (uid == pStmt->bInfo.tbUid) {
13!
603
    pStmt->bInfo.needParse = false;
×
604

605
    tscDebug("tb %s is current table", pStmt->bInfo.tbFName);
×
606

607
    return TSDB_CODE_SUCCESS;
×
608
  }
609

610
  if (pStmt->bInfo.inExecCache) {
13✔
611
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
4✔
612
    if (NULL == pCache) {
4!
613
      tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash",
×
614
               pStmt->bInfo.tbFName, uid, cacheUid);
615

616
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
617
    }
618

619
    pStmt->bInfo.needParse = false;
4✔
620

621
    pStmt->bInfo.tbUid = uid;
4✔
622
    pStmt->bInfo.tbSuid = suid;
4✔
623
    pStmt->bInfo.tbType = tableType;
4✔
624
    pStmt->bInfo.boundTags = pCache->boundTags;
4✔
625
    pStmt->bInfo.tagsCached = true;
4✔
626

627
    tscDebug("tb %s in execBlock list, set to current", pStmt->bInfo.tbFName);
4!
628

629
    return TSDB_CODE_SUCCESS;
4✔
630
  }
631

632
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
9✔
633
  if (pCache) {
9!
634
    pStmt->bInfo.needParse = false;
9✔
635

636
    pStmt->bInfo.tbUid = uid;
9✔
637
    pStmt->bInfo.tbSuid = suid;
9✔
638
    pStmt->bInfo.tbType = tableType;
9✔
639
    pStmt->bInfo.boundTags = pCache->boundTags;
9✔
640
    pStmt->bInfo.tagsCached = true;
9✔
641

642
    STableDataCxt* pNewBlock = NULL;
9✔
643
    STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid, vgId));
9!
644

645
    if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
9!
646
                    POINTER_BYTES)) {
647
      STMT_ERR_RET(terrno);
×
648
    }
649

650
    pStmt->exec.pCurrBlock = pNewBlock;
9✔
651

652
    tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);
9!
653

654
    return TSDB_CODE_SUCCESS;
9✔
655
  }
656

657
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
658

659
  return TSDB_CODE_SUCCESS;
×
660
}
661

662
static int32_t stmtResetStmt(STscStmt2* pStmt) {
2✔
663
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
2!
664

665
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
2✔
666
  if (NULL == pStmt->sql.pTableCache) {
2!
667
    STMT_ERR_RET(terrno);
×
668
  }
669

670
  pStmt->sql.status = STMT_INIT;
2✔
671

672
  return TSDB_CODE_SUCCESS;
2✔
673
}
674

675
static int32_t stmtAsyncOutput(STscStmt2* pStmt, void* param) {
62✔
676
  SStmtQNode* pParam = (SStmtQNode*)param;
62✔
677

678
  if (pParam->restoreTbCols) {
62✔
679
    for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) {
62✔
680
      SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i);
41✔
681
      *p = taosArrayInit(20, POINTER_BYTES);
41✔
682
      if (*p == NULL) {
41!
683
        STMT_ERR_RET(terrno);
×
684
      }
685
    }
686

687
    atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
21✔
688
  } else {
689
    STMT_ERR_RET(qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
41!
690
                                        &pStmt->sql.siInfo));
691

692
    // taosMemoryFree(pParam->pTbData);
693

694
    (void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
41✔
695
  }
696
  return TSDB_CODE_SUCCESS;
62✔
697
}
698

699
static void* stmtBindThreadFunc(void* param) {
23✔
700
  setThreadName("stmtBind");
23✔
701

702
  qInfo("stmt bind thread started");
23!
703

704
  STscStmt2* pStmt = (STscStmt2*)param;
23✔
705

706
  while (true) {
83✔
707
    if (atomic_load_8((int8_t*)&pStmt->queue.stopQueue)) {
106✔
708
      break;
23✔
709
    }
710

711
    SStmtQNode* asyncParam = NULL;
83✔
712
    if (!stmtDequeue(pStmt, &asyncParam)) {
83✔
713
      continue;
21✔
714
    }
715

716
    if (stmtAsyncOutput(pStmt, asyncParam) != 0) {
62!
717
      qError("stmt async output failed");
×
718
    }
719
  }
720

721
  qInfo("stmt bind thread stopped");
23!
722

723
  return NULL;
23✔
724
}
725

726
static int32_t stmtStartBindThread(STscStmt2* pStmt) {
23✔
727
  TdThreadAttr thAttr;
728
  if (taosThreadAttrInit(&thAttr) != 0) {
23!
729
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
730
  }
731
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
23!
732
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
733
  }
734

735
  if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) {
23!
736
    terrno = TAOS_SYSTEM_ERROR(errno);
×
737
    STMT_ERR_RET(terrno);
×
738
  }
739

740
  pStmt->bindThreadInUse = true;
23✔
741

742
  (void)taosThreadAttrDestroy(&thAttr);
23✔
743
  return TSDB_CODE_SUCCESS;
23✔
744
}
745

746
static int32_t stmtInitQueue(STscStmt2* pStmt) {
23✔
747
  (void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
23✔
748
  (void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
23✔
749
  STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
46!
750
  pStmt->queue.tail = pStmt->queue.head;
23✔
751

752
  return TSDB_CODE_SUCCESS;
23✔
753
}
754

755
static int32_t stmtIniAsyncBind(STscStmt2* pStmt) {
76✔
756
  (void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL);
76✔
757
  (void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL);
76✔
758
  pStmt->asyncBindParam.asyncBindNum = 0;
76✔
759

760
  return TSDB_CODE_SUCCESS;
76✔
761
}
762

763
static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
23✔
764
  pTblBuf->buffUnit = sizeof(SStmtQNode);
23✔
765
  pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
23✔
766
  pTblBuf->pBufList = taosArrayInit(100, POINTER_BYTES);
23✔
767
  if (NULL == pTblBuf->pBufList) {
23!
768
    return terrno;
×
769
  }
770
  void* buff = taosMemoryMalloc(pTblBuf->buffSize);
23!
771
  if (NULL == buff) {
23!
772
    return terrno;
×
773
  }
774

775
  if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
46!
776
    return terrno;
×
777
  }
778

779
  pTblBuf->pCurBuff = buff;
23✔
780
  pTblBuf->buffIdx = 1;
23✔
781
  pTblBuf->buffOffset = 0;
23✔
782

783
  return TSDB_CODE_SUCCESS;
23✔
784
}
785

786
TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
76✔
787
  STscObj*   pObj = (STscObj*)taos;
76✔
788
  STscStmt2* pStmt = NULL;
76✔
789
  int32_t    code = 0;
76✔
790

791
  pStmt = taosMemoryCalloc(1, sizeof(STscStmt2));
76!
792
  if (NULL == pStmt) {
76!
793
    return NULL;
×
794
  }
795

796
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
76✔
797
  if (NULL == pStmt->sql.pTableCache) {
76!
798
    taosMemoryFree(pStmt);
×
799
    return NULL;
×
800
  }
801

802
  pStmt->taos = pObj;
76✔
803
  pStmt->bInfo.needParse = true;
76✔
804
  pStmt->sql.status = STMT_INIT;
76✔
805
  pStmt->errCode = TSDB_CODE_SUCCESS;
76✔
806

807
  if (NULL != pOptions) {
76!
808
    (void)memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
76✔
809
    if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) {
76✔
810
      pStmt->stbInterlaceMode = true;
23✔
811
    }
812

813
    pStmt->reqid = pOptions->reqid;
76✔
814
  }
815

816
  if (pStmt->stbInterlaceMode) {
76✔
817
    pStmt->sql.siInfo.transport = taos->pAppInfo->pTransporter;
23✔
818
    pStmt->sql.siInfo.acctId = taos->acctId;
23✔
819
    pStmt->sql.siInfo.dbname = taos->db;
23✔
820
    pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
23✔
821
    pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
23✔
822
    if (NULL == pStmt->sql.siInfo.pTableHash) {
23!
823
      (void)stmtClose2(pStmt);
×
824
      return NULL;
×
825
    }
826
    pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
23✔
827
    if (NULL == pStmt->sql.siInfo.pTableCols) {
23!
828
      terrno = terrno;
×
829
      (void)stmtClose2(pStmt);
×
830
      return NULL;
×
831
    }
832

833
    code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
23✔
834
    if (TSDB_CODE_SUCCESS == code) {
23!
835
      code = stmtInitQueue(pStmt);
23✔
836
    }
837
    if (TSDB_CODE_SUCCESS == code) {
23!
838
      code = stmtStartBindThread(pStmt);
23✔
839
    }
840
    if (TSDB_CODE_SUCCESS != code) {
23!
841
      terrno = code;
×
842
      (void)stmtClose2(pStmt);
×
843
      return NULL;
×
844
    }
845
  }
846

847
  pStmt->sql.siInfo.tableColsReady = true;
76✔
848
  if (pStmt->options.asyncExecFn) {
76✔
849
    if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) {
6!
850
      terrno = TAOS_SYSTEM_ERROR(errno);
×
851
      (void)stmtClose2(pStmt);
×
852
      return NULL;
×
853
    }
854
  }
855
  code = stmtIniAsyncBind(pStmt);
76✔
856
  if (TSDB_CODE_SUCCESS != code) {
76!
857
    terrno = code;
×
858
    (void)stmtClose2(pStmt);
×
859
    return NULL;
×
860
  }
861

862
  pStmt->execSemWaited = false;
76✔
863

864
  STMT_LOG_SEQ(STMT_INIT);
76!
865

866
  tscDebug("stmt:%p initialized", pStmt);
76!
867

868
  return pStmt;
76✔
869
}
870

871
static int stmtSetDbName2(TAOS_STMT2* stmt, const char* dbName) {
41✔
872
  STscStmt2* pStmt = (STscStmt2*)stmt;
41✔
873

874
  STMT_DLOG("start to set dbName: %s", dbName);
41!
875

876
  pStmt->db = taosStrdup(dbName);
41!
877
  (void)strdequote(pStmt->db);
41✔
878
  STMT_ERR_RET(stmtCreateRequest(pStmt));
41!
879

880
  // The SQL statement specifies a database name, overriding the previously specified database
881
  taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
41!
882
  pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
41!
883
  if (pStmt->exec.pRequest->pDb == NULL) {
41!
884
    return terrno;
×
885
  }
886
  return TSDB_CODE_SUCCESS;
41✔
887
}
888

889
int stmtPrepare2(TAOS_STMT2* stmt, const char* sql, unsigned long length) {
76✔
890
  STscStmt2* pStmt = (STscStmt2*)stmt;
76✔
891

892
  STMT_DLOG_E("start to prepare");
76!
893

894
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
76✔
895
    return pStmt->errCode;
1✔
896
  }
897

898
  if (pStmt->sql.status >= STMT_PREPARE) {
75✔
899
    STMT_ERR_RET(stmtResetStmt(pStmt));
2!
900
  }
901

902
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));
75!
903

904
  if (length <= 0) {
75✔
905
    length = strlen(sql);
66✔
906
  }
907

908
  pStmt->sql.sqlStr = taosStrndup(sql, length);
75!
909
  if (!pStmt->sql.sqlStr) {
75!
910
    return terrno;
×
911
  }
912
  pStmt->sql.sqlLen = length;
75✔
913
  pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode;
75✔
914

915
  char* dbName = NULL;
75✔
916
  if (qParseDbName(sql, length, &dbName)) {
75✔
917
    STMT_ERR_RET(stmtSetDbName2(stmt, dbName));
41!
918
    taosMemoryFreeClear(dbName);
41!
919
  }
920

921
  return TSDB_CODE_SUCCESS;
75✔
922
}
923

924
static int32_t stmtInitStbInterlaceTableInfo(STscStmt2* pStmt) {
9✔
925
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
9✔
926
  if (!pSrc) {
9!
927
    return terrno;
×
928
  }
929
  STableDataCxt* pDst = NULL;
9✔
930

931
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
9!
932
  pStmt->sql.siInfo.pDataCtx = pDst;
8✔
933

934
  SArray* pTblCols = NULL;
8✔
935
  for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) {
8,412✔
936
    pTblCols = taosArrayInit(20, POINTER_BYTES);
8,404✔
937
    if (NULL == pTblCols) {
8,835!
938
      return terrno;
×
939
    }
940

941
    if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
17,239!
942
      return terrno;
×
943
    }
944
  }
945

946
  pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
8✔
947

948
  return TSDB_CODE_SUCCESS;
8✔
949
}
950

951
int stmtIsInsert2(TAOS_STMT2* stmt, int* insert) {
367✔
952
  STscStmt2* pStmt = (STscStmt2*)stmt;
367✔
953

954
  STMT_DLOG_E("start is insert");
367!
955

956
  if (pStmt->sql.type) {
367✔
957
    *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
293✔
958
  } else {
959
    *insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
74✔
960
  }
961

962
  return TSDB_CODE_SUCCESS;
367✔
963
}
964

965
int stmtSetTbName2(TAOS_STMT2* stmt, const char* tbName) {
156✔
966
  STscStmt2* pStmt = (STscStmt2*)stmt;
156✔
967

968
  int64_t startUs = taosGetTimestampUs();
156✔
969

970
  STMT_DLOG("start to set tbName: %s", tbName);
156!
971

972
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
156!
973
    return pStmt->errCode;
×
974
  }
975

976
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
156!
977

978
  int32_t insert = 0;
156✔
979
  STMT_ERR_RET(stmtIsInsert2(stmt, &insert));
156!
980
  if (0 == insert) {
156!
981
    tscError("set tb name not available for none insert statement");
×
982
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
983
  }
984

985
  if (!pStmt->sql.stbInterlaceMode || NULL == pStmt->sql.siInfo.pDataCtx) {
156✔
986
    STMT_ERR_RET(stmtCreateRequest(pStmt));
122!
987

988
    STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
122!
989
                              pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
990
    STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
121!
991

992
    STMT_ERR_RET(stmtGetFromCache(pStmt));
122!
993

994
    if (pStmt->bInfo.needParse) {
122✔
995
      tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
26✔
996
      pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
26✔
997

998
      STMT_ERR_RET(stmtParseSql(pStmt));
26!
999
    }
1000
  } else {
1001
    tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
34✔
1002
    pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
34✔
1003
    pStmt->exec.pRequest->requestId++;
34✔
1004
    pStmt->bInfo.needParse = false;
34✔
1005
  }
1006

1007
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
156✔
1008
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
9!
1009
  }
1010

1011
  int64_t startUs2 = taosGetTimestampUs();
156✔
1012
  pStmt->stat.setTbNameUs += startUs2 - startUs;
156✔
1013

1014
  return TSDB_CODE_SUCCESS;
156✔
1015
}
1016

1017
int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags) {
98✔
1018
  STscStmt2* pStmt = (STscStmt2*)stmt;
98✔
1019

1020
  STMT_DLOG_E("start to set tbTags");
98!
1021

1022
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
98!
1023
    return pStmt->errCode;
×
1024
  }
1025

1026
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
98!
1027

1028
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
98!
1029
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1030
    pStmt->bInfo.needParse = false;
×
1031
  }
1032
  STMT_ERR_RET(stmtCreateRequest(pStmt));
98!
1033

1034
  if (pStmt->bInfo.needParse) {
97!
1035
    STMT_ERR_RET(stmtParseSql(pStmt));
×
1036
  }
1037
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
97!
1038
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1039
  }
1040

1041
  SBoundColInfo* tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
97✔
1042
  // if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) {
1043
  //   tscWarn("no tags or cols bound in sql, will not bound tags");
1044
  //   return TSDB_CODE_SUCCESS;
1045
  // }
1046

1047
  STableDataCxt** pDataBlock =
1048
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
97✔
1049
  if (NULL == pDataBlock) {
98!
1050
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1051
    STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1052
  }
1053

1054
  if (pStmt->bInfo.inExecCache && !pStmt->sql.autoCreateTbl) {
98!
1055
    return TSDB_CODE_SUCCESS;
×
1056
  }
1057

1058
  tscDebug("start to bind stmt tag values");
98!
1059
  STMT_ERR_RET(qBindStmtTagsValue2(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
98!
1060
                                   pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
1061
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt));
1062

1063
  return TSDB_CODE_SUCCESS;
98✔
1064
}
1065

1066
static int stmtFetchColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
×
1067
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
1068
    return pStmt->errCode;
×
1069
  }
1070

1071
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
1072
    tscError("invalid operation to get query column fileds");
×
1073
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1074
  }
1075

1076
  STableDataCxt** pDataBlock = NULL;
×
1077

1078
  if (pStmt->sql.stbInterlaceMode) {
×
1079
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
×
1080
  } else {
1081
    pDataBlock =
1082
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
1083
    if (NULL == pDataBlock) {
×
1084
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1085
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1086
    }
1087
  }
1088

1089
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
×
1090

1091
  return TSDB_CODE_SUCCESS;
×
1092
}
1093

1094
static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_ALL** fields) {
22✔
1095
  int32_t    code = 0;
22✔
1096
  int32_t    preCode = pStmt->errCode;
22✔
1097

1098
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
22!
1099
    return pStmt->errCode;
×
1100
  }
1101

1102
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
22!
1103
    tscError("invalid operation to get query column fileds");
×
1104
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1105
  }
1106

1107
  STableDataCxt** pDataBlock = NULL;
22✔
1108

1109
  if (pStmt->sql.stbInterlaceMode) {
22!
1110
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
×
1111
  } else {
1112
    pDataBlock =
1113
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
22✔
1114
    if (NULL == pDataBlock) {
22!
1115
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1116
      STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
1117
    }
1118
  }
1119

1120
  STMT_ERRI_JRET(qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.preCtbname, fieldNum, fields));
22!
1121
  if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE) {
22✔
1122
    pStmt->bInfo.needParse = true;
15✔
1123
    qDestroyStmtDataBlock(*pDataBlock);
15✔
1124
    if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
15!
1125
      tscError("get fileds %s remove exec blockHash fail", pStmt->bInfo.tbFName);
×
1126
      STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
1127
    }
1128
  }
1129

1130
_return:
22✔
1131

1132
  pStmt->errCode = preCode;
22✔
1133

1134
  return code;
22✔
1135
}
1136
/*
1137
SArray* stmtGetFreeCol(STscStmt2* pStmt, int32_t* idx) {
1138
  while (true) {
1139
    if (pStmt->exec.smInfo.pColIdx >= STMT_COL_BUF_SIZE) {
1140
      pStmt->exec.smInfo.pColIdx = 0;
1141
    }
1142

1143
    if ((pStmt->exec.smInfo.pColIdx + 1) == atomic_load_32(&pStmt->exec.smInfo.pColFreeIdx)) {
1144
      taosUsleep(1);
1145
      continue;
1146
    }
1147

1148
    *idx = pStmt->exec.smInfo.pColIdx;
1149
    return pStmt->exec.smInfo.pCols[pStmt->exec.smInfo.pColIdx++];
1150
  }
1151
}
1152
*/
1153
static int32_t stmtAppendTablePostHandle(STscStmt2* pStmt, SStmtQNode* param) {
43✔
1154
  if (NULL == pStmt->sql.siInfo.pVgroupHash) {
43✔
1155
    pStmt->sql.siInfo.pVgroupHash =
22✔
1156
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
22✔
1157
  }
1158
  if (NULL == pStmt->sql.siInfo.pVgroupList) {
43✔
1159
    pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES);
22✔
1160
  }
1161

1162
  if (NULL == pStmt->sql.siInfo.pRequest) {
43✔
1163
    STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false,
9!
1164
                              (SRequestObj**)&pStmt->sql.siInfo.pRequest, pStmt->reqid));
1165

1166
    if (pStmt->reqid != 0) {
9!
1167
      pStmt->reqid++;
×
1168
    }
1169
    pStmt->exec.pRequest->syncQuery = true;
9✔
1170

1171
    pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId;
9✔
1172
    pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self;
9✔
1173
  }
1174

1175
  if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] &&
43✔
1176
      0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) {
13✔
1177
    pStmt->sql.siInfo.tbFromHash = true;
7✔
1178
  }
1179

1180
  if (0 == pStmt->sql.siInfo.firstName[0]) {
43✔
1181
    tstrncpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
9✔
1182
  }
1183

1184
  param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash;
43✔
1185
  param->next = NULL;
43✔
1186

1187
  (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
43✔
1188

1189
  stmtEnqueue(pStmt, param);
43✔
1190

1191
  return TSDB_CODE_SUCCESS;
43✔
1192
}
1193

1194
static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt2* pStmt, SArray** pTableCols) {
1195
  while (true) {
1196
    if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) {
43!
1197
      *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++);
43✔
1198
      break;
43✔
1199
    } else {
1200
      SArray* pTblCols = NULL;
×
1201
      for (int32_t i = 0; i < 100; i++) {
×
1202
        pTblCols = taosArrayInit(20, POINTER_BYTES);
×
1203
        if (NULL == pTblCols) {
×
1204
          return terrno;
×
1205
        }
1206

1207
        if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
×
1208
          return terrno;
×
1209
        }
1210
      }
1211
    }
1212
  }
1213

1214
  return TSDB_CODE_SUCCESS;
43✔
1215
}
1216

1217
static int32_t stmtCacheBlock(STscStmt2* pStmt) {
120✔
1218
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
120✔
1219
    return TSDB_CODE_SUCCESS;
7✔
1220
  }
1221

1222
  uint64_t uid = pStmt->bInfo.tbUid;
113✔
1223
  uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
113!
1224

1225
  if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
113✔
1226
    return TSDB_CODE_SUCCESS;
96✔
1227
  }
1228

1229
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
17✔
1230
  if (!pSrc) {
17!
1231
    return terrno;
×
1232
  }
1233
  STableDataCxt* pDst = NULL;
17✔
1234

1235
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
17!
1236

1237
  SStmtTableCache cache = {
15✔
1238
      .pDataCtx = pDst,
1239
      .boundTags = pStmt->bInfo.boundTags,
15✔
1240
  };
1241

1242
  if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
15!
1243
    return terrno;
×
1244
  }
1245

1246
  if (pStmt->sql.autoCreateTbl) {
17✔
1247
    pStmt->bInfo.tagsCached = true;
15✔
1248
  } else {
1249
    pStmt->bInfo.boundTags = NULL;
2✔
1250
  }
1251

1252
  return TSDB_CODE_SUCCESS;
17✔
1253
}
1254

1255
static int stmtAddBatch2(TAOS_STMT2* stmt) {
141✔
1256
  STscStmt2* pStmt = (STscStmt2*)stmt;
141✔
1257

1258
  int64_t startUs = taosGetTimestampUs();
141✔
1259

1260
  STMT_DLOG_E("start to add batch");
141!
1261

1262
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
141!
1263
    return pStmt->errCode;
×
1264
  }
1265

1266
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
141!
1267

1268
  if (pStmt->sql.stbInterlaceMode) {
141✔
1269
    int64_t startUs2 = taosGetTimestampUs();
21✔
1270
    pStmt->stat.addBatchUs += startUs2 - startUs;
21✔
1271

1272
    pStmt->sql.siInfo.tableColsReady = false;
21✔
1273

1274
    SStmtQNode* param = NULL;
21✔
1275
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
42!
1276
    param->restoreTbCols = true;
21✔
1277
    param->next = NULL;
21✔
1278

1279
    stmtEnqueue(pStmt, param);
21✔
1280

1281
    return TSDB_CODE_SUCCESS;
21✔
1282
  }
1283

1284
  STMT_ERR_RET(stmtCacheBlock(pStmt));
120!
1285

1286
  return TSDB_CODE_SUCCESS;
120✔
1287
}
1288
/*
1289
static int32_t stmtBackupQueryFields(STscStmt2* pStmt) {
1290
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
1291
  pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
1292
  pRes->precision = pStmt->exec.pRequest->body.resInfo.precision;
1293

1294
  int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
1295
  pRes->fields = taosMemoryMalloc(size);
1296
  pRes->userFields = taosMemoryMalloc(size);
1297
  if (NULL == pRes->fields || NULL == pRes->userFields) {
1298
    STMT_ERR_RET(terrno);
1299
  }
1300
  (void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
1301
  (void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
1302

1303
  return TSDB_CODE_SUCCESS;
1304
}
1305

1306
static int32_t stmtRestoreQueryFields(STscStmt2* pStmt) {
1307
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
1308
  int32_t            size = pRes->numOfCols * sizeof(TAOS_FIELD);
1309

1310
  pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols;
1311
  pStmt->exec.pRequest->body.resInfo.precision = pRes->precision;
1312

1313
  if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
1314
    pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size);
1315
    if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
1316
      STMT_ERR_RET(terrno);
1317
    }
1318
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
1319
  }
1320

1321
  if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
1322
    pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size);
1323
    if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
1324
      STMT_ERR_RET(terrno);
1325
    }
1326
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
1327
  }
1328

1329
  return TSDB_CODE_SUCCESS;
1330
}
1331
*/
1332
int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) {
167✔
1333
  STscStmt2* pStmt = (STscStmt2*)stmt;
167✔
1334
  int32_t    code = 0;
167✔
1335

1336
  int64_t startUs = taosGetTimestampUs();
167✔
1337

1338
  STMT_DLOG("start to bind stmt data, colIdx: %d", colIdx);
167!
1339

1340
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
167!
1341
    return pStmt->errCode;
×
1342
  }
1343

1344
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
167!
1345

1346
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
167!
1347
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1348
    pStmt->bInfo.needParse = false;
×
1349
  }
1350

1351
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
167!
1352
    taos_free_result(pStmt->exec.pRequest);
1✔
1353
    pStmt->exec.pRequest = NULL;
1✔
1354
  }
1355

1356
  STMT_ERR_RET(stmtCreateRequest(pStmt));
167!
1357

1358
  if (pStmt->bInfo.needParse) {
167✔
1359
    STMT_ERR_RET(stmtParseSql(pStmt));
4!
1360
  }
1361

1362
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
167✔
1363
    STMT_ERR_RET(qStmtBindParams2(pStmt->sql.pQuery, bind, colIdx, pStmt->taos->optionInfo.charsetCxt));
3!
1364

1365
    SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
3✔
1366
                         .acctId = pStmt->taos->acctId,
3✔
1367
                         .db = pStmt->exec.pRequest->pDb,
3✔
1368
                         .topicQuery = false,
1369
                         .pSql = pStmt->sql.sqlStr,
3✔
1370
                         .sqlLen = pStmt->sql.sqlLen,
3✔
1371
                         .pMsg = pStmt->exec.pRequest->msgBuf,
3✔
1372
                         .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1373
                         .pTransporter = pStmt->taos->pAppInfo->pTransporter,
3✔
1374
                         .pStmtCb = NULL,
1375
                         .pUser = pStmt->taos->user};
3✔
1376
    ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
3✔
1377
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog));
3!
1378

1379
    STMT_ERR_RET(qStmtParseQuerySql(&ctx, pStmt->sql.pQuery));
3!
1380

1381
    if (pStmt->sql.pQuery->haveResultSet) {
3!
1382
      STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
3!
1383
                                    pStmt->sql.pQuery->numOfResCols));
1384
      taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
3!
1385
      setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
3✔
1386
    }
1387

1388
    TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
3✔
1389
    TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
3✔
1390
    TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
3✔
1391

1392
    // if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
1393
    //   STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
1394
    // }
1395

1396
    // STMT_ERR_RET(stmtBackupQueryFields(pStmt));
1397

1398
    return TSDB_CODE_SUCCESS;
3✔
1399
  }
1400

1401
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
164!
1402
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1403
  }
1404

1405
  STableDataCxt** pDataBlock = NULL;
164✔
1406

1407
  if (pStmt->exec.pCurrBlock) {
164✔
1408
    pDataBlock = &pStmt->exec.pCurrBlock;
136✔
1409
  } else {
1410
    pDataBlock =
1411
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
28✔
1412
    if (NULL == pDataBlock) {
27!
1413
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1414
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1415
    }
1416
    pStmt->exec.pCurrBlock = *pDataBlock;
27✔
1417
    if (pStmt->sql.stbInterlaceMode) {
27✔
1418
      taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
9✔
1419
      pStmt->exec.pCurrBlock->pData->aCol = NULL;
9✔
1420
    }
1421
  }
1422

1423
  int64_t startUs2 = taosGetTimestampUs();
163✔
1424
  pStmt->stat.bindDataUs1 += startUs2 - startUs;
163✔
1425

1426
  SStmtQNode* param = NULL;
163✔
1427
  if (pStmt->sql.stbInterlaceMode) {
163✔
1428
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
86!
1429
    STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, &param->tblData.aCol));
86!
1430
    taosArrayClear(param->tblData.aCol);
43✔
1431

1432
    // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES);
1433

1434
    param->restoreTbCols = false;
43✔
1435
    tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
43✔
1436
  }
1437

1438
  int64_t startUs3 = taosGetTimestampUs();
163✔
1439
  pStmt->stat.bindDataUs2 += startUs3 - startUs2;
163✔
1440

1441
  SArray* pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol;
163✔
1442

1443
  if (colIdx < 0) {
163✔
1444
    if (pStmt->sql.stbInterlaceMode) {
157✔
1445
      (*pDataBlock)->pData->flags = 0;
43✔
1446
      code = qBindStmtStbColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
43✔
1447
                                    pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo, pStmt->taos->optionInfo.charsetCxt);
43✔
1448
    } else {
1449
      code =
1450
          qBindStmtColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt);
114✔
1451
    }
1452

1453
    if (code) {
158✔
1454
      tscError("qBindStmtColsValue failed, error:%s", tstrerror(code));
1!
1455
      STMT_ERR_RET(code);
1!
1456
    }
1457
  } else {
1458
    if (pStmt->sql.stbInterlaceMode) {
6!
1459
      tscError("bind single column not allowed in stb insert mode");
×
1460
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1461
    }
1462

1463
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
6!
1464
      tscError("bind column index not in sequence");
×
1465
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1466
    }
1467

1468
    pStmt->bInfo.sBindLastIdx = colIdx;
6✔
1469

1470
    if (0 == colIdx) {
6✔
1471
      pStmt->bInfo.sBindRowNum = bind->num;
3✔
1472
    }
1473

1474
    code = qBindStmtSingleColValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
6✔
1475
                                    pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum, pStmt->taos->optionInfo.charsetCxt);
6✔
1476
    if (code) {
6!
1477
      tscError("qBindStmtSingleColValue failed, error:%s", tstrerror(code));
×
1478
      STMT_ERR_RET(code);
×
1479
    }
1480
  }
1481

1482
  int64_t startUs4 = taosGetTimestampUs();
163✔
1483
  pStmt->stat.bindDataUs3 += startUs4 - startUs3;
163✔
1484

1485
  if (pStmt->sql.stbInterlaceMode) {
163✔
1486
    STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param));
43!
1487
  } else {
1488
    STMT_ERR_RET(stmtAddBatch2(pStmt));
120!
1489
  }
1490

1491
  pStmt->stat.bindDataUs4 += taosGetTimestampUs() - startUs4;
163✔
1492

1493
  return TSDB_CODE_SUCCESS;
163✔
1494
}
1495
/*
1496
int stmtUpdateTableUid(STscStmt2* pStmt, SSubmitRsp* pRsp) {
1497
  tscDebug("stmt start to update tbUid, blockNum: %d", pRsp->nBlocks);
1498

1499
  int32_t code = 0;
1500
  int32_t finalCode = 0;
1501
  size_t  keyLen = 0;
1502
  void*   pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
1503
  while (pIter) {
1504
    STableDataCxt* pBlock = *(STableDataCxt**)pIter;
1505
    char*          key = taosHashGetKey(pIter, &keyLen);
1506

1507
    STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
1508
    if (pMeta->uid) {
1509
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1510
      continue;
1511
    }
1512

1513
    SSubmitBlkRsp* blkRsp = NULL;
1514
    int32_t        i = 0;
1515
    for (; i < pRsp->nBlocks; ++i) {
1516
      blkRsp = pRsp->pBlocks + i;
1517
      if (strlen(blkRsp->tblFName) != keyLen) {
1518
        continue;
1519
      }
1520

1521
      if (strncmp(blkRsp->tblFName, key, keyLen)) {
1522
        continue;
1523
      }
1524

1525
      break;
1526
    }
1527

1528
    if (i < pRsp->nBlocks) {
1529
      tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid,
1530
               blkRsp->uid);
1531

1532
      pMeta->uid = blkRsp->uid;
1533
      pStmt->bInfo.tbUid = blkRsp->uid;
1534
    } else {
1535
      tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
1536
      if (NULL == pStmt->pCatalog) {
1537
        code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
1538
        if (code) {
1539
          pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1540
          finalCode = code;
1541
          continue;
1542
        }
1543
      }
1544

1545
      code = stmtCreateRequest(pStmt);
1546
      if (code) {
1547
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1548
        finalCode = code;
1549
        continue;
1550
      }
1551

1552
      STableMeta*      pTableMeta = NULL;
1553
      SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
1554
                               .requestId = pStmt->exec.pRequest->requestId,
1555
                               .requestObjRefId = pStmt->exec.pRequest->self,
1556
                               .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
1557
      code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
1558

1559
      pStmt->stat.ctgGetTbMetaNum++;
1560

1561
      taos_free_result(pStmt->exec.pRequest);
1562
      pStmt->exec.pRequest = NULL;
1563

1564
      if (code || NULL == pTableMeta) {
1565
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1566
        finalCode = code;
1567
        taosMemoryFree(pTableMeta);
1568
        continue;
1569
      }
1570

1571
      pMeta->uid = pTableMeta->uid;
1572
      pStmt->bInfo.tbUid = pTableMeta->uid;
1573
      taosMemoryFree(pTableMeta);
1574
    }
1575

1576
    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1577
  }
1578

1579
  return finalCode;
1580
}
1581
*/
1582
/*
1583
int stmtStaticModeExec(TAOS_STMT* stmt) {
1584
  STscStmt2*   pStmt = (STscStmt2*)stmt;
1585
  int32_t     code = 0;
1586
  SSubmitRsp* pRsp = NULL;
1587
  if (pStmt->sql.staticMode) {
1588
    return TSDB_CODE_TSC_STMT_API_ERROR;
1589
  }
1590

1591
  STMT_DLOG_E("start to exec");
1592

1593
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
1594

1595
  STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList,
1596
pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum));
1597

1598
  launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
1599

1600
  if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
1601
    code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
1602
    if (code) {
1603
      pStmt->exec.pRequest->code = code;
1604
    } else {
1605
      tFreeSSubmitRsp(pRsp);
1606
      STMT_ERR_RET(stmtResetStmt(pStmt));
1607
      STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
1608
    }
1609
  }
1610

1611
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
1612

1613
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
1614
  pStmt->affectedRows += pStmt->exec.affectedRows;
1615

1616
_return:
1617

1618
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
1619

1620
  tFreeSSubmitRsp(pRsp);
1621

1622
  ++pStmt->sql.runTimes;
1623

1624
  STMT_RET(code);
1625
}
1626
*/
1627

1628
static int32_t createParseContext(const SRequestObj* pRequest, SParseContext** pCxt, SSqlCallbackWrapper* pWrapper) {
15✔
1629
  const STscObj* pTscObj = pRequest->pTscObj;
15✔
1630

1631
  *pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
15!
1632
  if (*pCxt == NULL) {
15!
1633
    return terrno;
×
1634
  }
1635

1636
  **pCxt = (SParseContext){.requestId = pRequest->requestId,
15✔
1637
                           .requestRid = pRequest->self,
15✔
1638
                           .acctId = pTscObj->acctId,
15✔
1639
                           .db = pRequest->pDb,
15✔
1640
                           .topicQuery = false,
1641
                           .pSql = pRequest->sqlstr,
15✔
1642
                           .sqlLen = pRequest->sqlLen,
15✔
1643
                           .pMsg = pRequest->msgBuf,
15✔
1644
                           .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1645
                           .pTransporter = pTscObj->pAppInfo->pTransporter,
15✔
1646
                           .pStmtCb = NULL,
1647
                           .pUser = pTscObj->user,
15✔
1648
                           .pEffectiveUser = pRequest->effectiveUser,
15✔
1649
                           .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
15✔
1650
                           .enableSysInfo = pTscObj->sysInfo,
15✔
1651
                           .async = true,
1652
                           .svrVer = pTscObj->sVer,
15✔
1653
                           .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
15✔
1654
                           .allocatorId = pRequest->allocatorRefId,
15✔
1655
                           .parseSqlFp = clientParseSql,
1656
                           .parseSqlParam = pWrapper};
1657
  int8_t biMode = atomic_load_8(&((STscObj*)pTscObj)->biMode);
15✔
1658
  (*pCxt)->biMode = biMode;
15✔
1659
  return TSDB_CODE_SUCCESS;
15✔
1660
}
1661

1662
static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
15✔
1663
  STscStmt2*        pStmt = userdata;
15✔
1664
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
15✔
1665

1666
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
15✔
1667
  pStmt->affectedRows += pStmt->exec.affectedRows;
15✔
1668

1669
  fp(pStmt->options.userdata, res, code);
15✔
1670

1671
  while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
15!
1672
    taosUsleep(1);
×
1673
  }
1674
  (void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
15✔
1675
  ++pStmt->sql.runTimes;
15✔
1676

1677
  if (tsem_post(&pStmt->asyncExecSem) != 0) {
15!
1678
    tscError("failed to post asyncExecSem");
×
1679
  }
1680
}
15✔
1681

1682
int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
83✔
1683
  STscStmt2* pStmt = (STscStmt2*)stmt;
83✔
1684
  int32_t    code = 0;
83✔
1685
  int64_t    startUs = taosGetTimestampUs();
87✔
1686

1687
  STMT_DLOG_E("start to exec");
87!
1688

1689
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
87!
1690
    return pStmt->errCode;
×
1691
  }
1692

1693
  taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
87✔
1694
  if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
87✔
1695
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
5✔
1696
  }
1697
  taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
87✔
1698

1699
  if (pStmt->sql.stbInterlaceMode) {
87✔
1700
    STMT_ERR_RET(stmtAddBatch2(pStmt));
21!
1701
  }
1702

1703
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
87✔
1704

1705
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
86✔
1706
    if (pStmt->sql.stbInterlaceMode) {
83✔
1707
      int64_t startTs = taosGetTimestampUs();
21✔
1708
      while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
45✔
1709
        taosUsleep(1);
25✔
1710
      }
1711
      pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs;
21✔
1712

1713
      STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList));
21!
1714
      taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
21✔
1715
      pStmt->sql.siInfo.pVgroupHash = NULL;
21✔
1716
      pStmt->sql.siInfo.pVgroupList = NULL;
21✔
1717
    } else {
1718
      tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
62✔
1719
      taosMemoryFreeClear(pStmt->exec.pCurrTbData);
62!
1720

1721
      STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
62!
1722

1723
      STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
60!
1724
    }
1725
  }
1726

1727
  SRequestObj*      pRequest = pStmt->exec.pRequest;
86✔
1728
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
86✔
1729

1730
  if (!fp) {
86✔
1731
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
71✔
1732

1733
    if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
71!
1734
      code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
×
1735
      if (code) {
×
1736
        pStmt->exec.pRequest->code = code;
×
1737
      } else {
1738
        STMT_ERR_RET(stmtResetStmt(pStmt));
×
1739
        STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
×
1740
      }
1741
    }
1742

1743
    STMT_ERR_JRET(pStmt->exec.pRequest->code);
71!
1744

1745
    pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
71✔
1746
    if (affected_rows) {
71✔
1747
      *affected_rows = pStmt->exec.affectedRows;
66✔
1748
    }
1749
    pStmt->affectedRows += pStmt->exec.affectedRows;
71✔
1750

1751
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
71!
1752
      taosUsleep(1);
×
1753
    }
1754

1755
    STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
71!
1756

1757
    ++pStmt->sql.runTimes;
71✔
1758
  } else {
1759
    SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
15!
1760
    if (pWrapper == NULL) {
15!
1761
      code = terrno;
×
1762
    } else {
1763
      pWrapper->pRequest = pRequest;
15✔
1764
      pRequest->pWrapper = pWrapper;
15✔
1765
    }
1766
    if (TSDB_CODE_SUCCESS == code) {
15!
1767
      code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper);
15✔
1768
    }
1769
    pRequest->syncQuery = false;
15✔
1770
    pRequest->body.queryFp = asyncQueryCb;
15✔
1771
    ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
15✔
1772

1773
    pStmt->execSemWaited = false;
15✔
1774
    launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
15✔
1775
  }
1776

1777
_return:
86✔
1778
  pStmt->stat.execUseUs += taosGetTimestampUs() - startUs;
86✔
1779

1780
  STMT_RET(code);
86!
1781
}
1782

1783
int stmtClose2(TAOS_STMT2* stmt) {
75✔
1784
  STscStmt2* pStmt = (STscStmt2*)stmt;
75✔
1785

1786
  STMT_DLOG_E("start to free stmt");
75!
1787

1788
  pStmt->queue.stopQueue = true;
75✔
1789

1790
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
75✔
1791
  (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
75✔
1792
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
75✔
1793

1794
  if (pStmt->bindThreadInUse) {
75✔
1795
    (void)taosThreadJoin(pStmt->bindThread, NULL);
23✔
1796
    pStmt->bindThreadInUse = false;
23✔
1797
  }
1798

1799
  taosThreadMutexLock(&pStmt->asyncBindParam.mutex);
75✔
1800
  if (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
75✔
1801
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
1✔
1802
  }
1803
  taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex);
75✔
1804
  (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
75✔
1805
  (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
75✔
1806

1807
  (void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond);
75✔
1808
  (void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex);
75✔
1809

1810
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
75✔
1811
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
5!
1812
      tscError("failed to wait asyncExecSem");
×
1813
    }
1814
  }
1815

1816
  STMT_DLOG("stmt %p closed, stbInterlaceMode: %d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
75!
1817
            ", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
1818
            ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
1819
            ", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64
1820
            ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64,
1821
            pStmt, pStmt->sql.stbInterlaceMode, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo,
1822
            pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum, pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND],
1823
            pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE], pStmt->stat.setTbNameUs,
1824
            pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
1825
            pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
1826

1827
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
75!
1828

1829
  if (pStmt->options.asyncExecFn) {
75✔
1830
    if (tsem_destroy(&pStmt->asyncExecSem) != 0) {
6!
1831
      tscError("failed to destroy asyncExecSem");
×
1832
    }
1833
  }
1834
  taosMemoryFree(stmt);
75!
1835

1836
  return TSDB_CODE_SUCCESS;
75✔
1837
}
1838

1839
const char* stmtErrstr2(TAOS_STMT2* stmt) {
3✔
1840
  STscStmt2* pStmt = (STscStmt2*)stmt;
3✔
1841

1842
  if (stmt == NULL || NULL == pStmt->exec.pRequest) {
3!
1843
    return (char*)tstrerror(terrno);
3✔
1844
  }
1845

1846
  pStmt->exec.pRequest->code = terrno;
×
1847

1848
  return taos_errstr(pStmt->exec.pRequest);
×
1849
}
1850
/*
1851
int stmtAffectedRows(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->affectedRows; }
1852

1853
int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->exec.affectedRows; }
1854
*/
1855

1856
int stmtParseColFields2(TAOS_STMT2* stmt) {
34✔
1857
  int32_t    code = 0;
34✔
1858
  STscStmt2* pStmt = (STscStmt2*)stmt;
34✔
1859
  int32_t    preCode = pStmt->errCode;
34✔
1860

1861
  STMT_DLOG_E("start to get col fields");
34!
1862

1863
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
34!
1864
    return pStmt->errCode;
×
1865
  }
1866

1867
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
34!
1868
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1869
  }
1870

1871
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
34!
1872

1873
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
34!
1874
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1875
    pStmt->bInfo.needParse = false;
×
1876
  }
1877

1878
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
34!
1879
    taos_free_result(pStmt->exec.pRequest);
×
1880
    pStmt->exec.pRequest = NULL;
×
1881
    STMT_ERRI_JRET(stmtCreateRequest(pStmt));
×
1882
  }
1883

1884
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
34!
1885

1886
  if (pStmt->bInfo.needParse) {
34!
1887
    STMT_ERRI_JRET(stmtParseSql(pStmt));
34✔
1888
  }
1889

1890
_return:
22✔
1891

1892
  pStmt->errCode = preCode;
34✔
1893

1894
  return code;
34✔
1895
}
1896

1897
int stmtGetStbColFields2(TAOS_STMT2* stmt, int* nums, TAOS_FIELD_ALL** fields) {
34✔
1898
  int32_t code = stmtParseColFields2(stmt);
34✔
1899
  if (code != TSDB_CODE_SUCCESS) {
34✔
1900
    return code;
12✔
1901
  }
1902

1903
  return stmtFetchStbColFields2(stmt, nums, fields);
22✔
1904
}
1905

1906
int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) {
9✔
1907
  int32_t    code = 0;
9✔
1908
  STscStmt2* pStmt = (STscStmt2*)stmt;
9✔
1909
  int32_t    preCode = pStmt->errCode;
9✔
1910

1911
  STMT_DLOG_E("start to get param num");
9!
1912

1913
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
9!
1914
    return pStmt->errCode;
×
1915
  }
1916

1917
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
9!
1918

1919
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
9!
1920
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1921
    pStmt->bInfo.needParse = false;
×
1922
  }
1923

1924
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
9!
1925
    taos_free_result(pStmt->exec.pRequest);
×
1926
    pStmt->exec.pRequest = NULL;
×
1927
  }
1928

1929
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
9!
1930

1931
  if (pStmt->bInfo.needParse) {
9!
1932
    STMT_ERRI_JRET(stmtParseSql(pStmt));
9✔
1933
  }
1934

1935
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
2!
1936
    *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
2✔
1937
  } else {
1938
    STMT_ERRI_JRET(stmtFetchColFields2(stmt, nums, NULL));
×
1939
  }
1940

1941
_return:
×
1942

1943
  pStmt->errCode = preCode;
9✔
1944

1945
  return code;
9✔
1946
}
1947

1948
TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) {
3✔
1949
  STscStmt2* pStmt = (STscStmt2*)stmt;
3✔
1950

1951
  STMT_DLOG_E("start to use result");
3!
1952

1953
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
3!
1954
    tscError("useResult only for query statement");
×
1955
    return NULL;
×
1956
  }
1957

1958
  return pStmt->exec.pRequest;
3✔
1959
}
1960

1961
int32_t stmtAsyncBindThreadFunc(void* args) {
9✔
1962
  qInfo("async stmt bind thread started");
9!
1963

1964
  ThreadArgs* targs = (ThreadArgs*)args;
9✔
1965
  STscStmt2*  pStmt = (STscStmt2*)targs->stmt;
9✔
1966

1967
  int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx);
9✔
1968
  targs->fp(targs->param, NULL, code);
9✔
1969
  (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
9✔
1970
  (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
9✔
1971
  (void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
9✔
1972
  (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
9✔
1973
  taosMemoryFree(args);
9!
1974

1975
  qInfo("async stmt bind thread stopped");
9!
1976

1977
  return code;
9✔
1978
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc