• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.5
/source/client/src/clientStmt.c
1

2
#include "clientInt.h"
3
#include "clientLog.h"
4
#include "tdef.h"
5

6
#include "clientStmt.h"
7

8
char* gStmtStatusStr[] = {"unknown",     "init", "prepare", "settbname", "settags",
9
                          "fetchFields", "bind", "bindCol", "addBatch",  "exec"};
10

11
static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pBuf) {
12
  if (pTblBuf->buffOffset < pTblBuf->buffSize) {
36,409✔
13
    *pBuf = (char*)pTblBuf->pCurBuff + pTblBuf->buffOffset;
36,419✔
14
    pTblBuf->buffOffset += pTblBuf->buffUnit;
36,419✔
15
  } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) {
×
16
    pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++);
×
17
    if (NULL == pTblBuf->pCurBuff) {
×
18
      return TAOS_GET_TERRNO(terrno);
×
19
    }
20
    *pBuf = pTblBuf->pCurBuff;
×
21
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
22
  } else {
23
    void* buff = taosMemoryMalloc(pTblBuf->buffSize);
×
24
    if (NULL == buff) {
×
25
      return terrno;
×
26
    }
27

28
    if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
×
29
      return terrno;
×
30
    }
31

32
    pTblBuf->buffIdx++;
×
33
    pTblBuf->pCurBuff = buff;
×
34
    *pBuf = buff;
×
35
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
36
  }
37

38
  return TSDB_CODE_SUCCESS;
36,419✔
39
}
40

41
bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) {
36,700✔
42
  int i = 0;
36,700✔
43
  while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
63,708✔
44
    if (i < 10) {
27,030✔
45
      taosUsleep(1);
25,525✔
46
      i++;
25,503✔
47
    } else {
48
      (void)taosThreadMutexLock(&pStmt->queue.mutex);
1,505✔
49
      if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
1,506!
50
        (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
1,506✔
51
      }
52
      (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
1,505✔
53
    }
54
  }
55
  if (pStmt->queue.stopQueue) {
36,576✔
56
    return false;
75✔
57
  }
58
  SStmtQNode* orig = pStmt->queue.head;
36,501✔
59
  SStmtQNode* node = pStmt->queue.head->next;
36,501✔
60
  pStmt->queue.head = pStmt->queue.head->next;
36,501✔
61
  *param = node;
36,501✔
62

63
  (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
36,501✔
64

65
  return true;
36,630✔
66
}
67

68
void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) {
36,588✔
69
  pStmt->queue.tail->next = param;
36,588✔
70
  pStmt->queue.tail = param;
36,588✔
71

72
  pStmt->stat.bindDataNum++;
36,588✔
73

74
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
36,588✔
75
  (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
36,607✔
76
  (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
36,649✔
77
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
36,593✔
78
}
36,598✔
79

80
static int32_t stmtCreateRequest(STscStmt* pStmt) {
1,067,449✔
81
  int32_t code = 0;
1,067,449✔
82

83
  if (pStmt->exec.pRequest == NULL) {
1,067,449✔
84
    code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
30,493✔
85
                        pStmt->reqid);
86
    if (pStmt->reqid != 0) {
30,499!
87
      pStmt->reqid++;
×
88
    }
89
    if (TSDB_CODE_SUCCESS == code) {
30,499!
90
      pStmt->exec.pRequest->syncQuery = true;
30,499✔
91
      pStmt->exec.pRequest->isStmtBind = true;
30,499✔
92
    }
93
  }
94

95
  return code;
1,067,455✔
96
}
97

98
int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
2,098,872✔
99
  int32_t code = 0;
2,098,872✔
100

101
  if (newStatus >= STMT_INIT && newStatus < STMT_MAX) {
2,098,872!
102
    STMT_LOG_SEQ(newStatus);
2,129,995✔
103
  }
104

105
  if (pStmt->errCode && newStatus != STMT_PREPARE) {
2,152,037!
106
    STMT_DLOG("stmt already failed with err:%s", tstrerror(pStmt->errCode));
×
107
    return pStmt->errCode;
×
108
  }
109

110
  switch (newStatus) {
2,152,037!
111
    case STMT_PREPARE:
10,561✔
112
      pStmt->errCode = 0;
10,561✔
113
      break;
10,561✔
114
    case STMT_SETTBNAME:
64,629✔
115
      if (STMT_STATUS_EQ(INIT)) {
64,629!
116
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
117
      }
118
      if (!pStmt->sql.stbInterlaceMode && (STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL))) {
64,629!
119
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
120
      }
121
      break;
64,629✔
122
    case STMT_SETTAGS:
62✔
123
      if (STMT_STATUS_NE(SETTBNAME) && STMT_STATUS_NE(FETCH_FIELDS)) {
62!
124
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
125
      }
126
      break;
62✔
127
    case STMT_FETCH_FIELDS:
1,056✔
128
      if (STMT_STATUS_EQ(INIT)) {
1,056!
129
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
130
      }
131
      break;
1,056✔
132
    case STMT_BIND:
1,042,212✔
133
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
1,042,212!
134
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
135
      }
136
      /*
137
            if ((pStmt->sql.type == STMT_TYPE_MULTI_INSERT) && ()) {
138
              code = TSDB_CODE_TSC_STMT_API_ERROR;
139
            }
140
      */
141
      break;
1,042,212✔
142
    case STMT_BIND_COL:
×
143
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
×
144
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
145
      }
146
      break;
×
147
    case STMT_ADD_BATCH:
1,000,829✔
148
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
1,000,829!
149
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
150
      }
151
      break;
1,000,829✔
152
    case STMT_EXECUTE:
32,688✔
153
      if (STMT_TYPE_QUERY == pStmt->sql.type) {
32,688✔
154
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
2!
155
            STMT_STATUS_NE(BIND_COL)) {
×
156
          code = TSDB_CODE_TSC_STMT_API_ERROR;
×
157
        }
158
      } else {
159
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
32,686!
160
          code = TSDB_CODE_TSC_STMT_API_ERROR;
×
161
        }
162
      }
163
      break;
32,688✔
164
    default:
×
165
      code = TSDB_CODE_APP_ERROR;
×
166
      break;
×
167
  }
168

169
  STMT_ERR_RET(code);
2,152,037!
170

171
  pStmt->sql.status = newStatus;
2,152,037✔
172

173
  return TSDB_CODE_SUCCESS;
2,152,037✔
174
}
175

176
int32_t stmtGetTbName(TAOS_STMT* stmt, char** tbName) {
10,413✔
177
  STscStmt* pStmt = (STscStmt*)stmt;
10,413✔
178

179
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
10,413✔
180

181
  if ('\0' == pStmt->bInfo.tbName[0]) {
10,413✔
182
    tscError("no table name set");
2!
183
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
2!
184
  }
185

186
  *tbName = pStmt->bInfo.tbName;
10,410✔
187

188
  return TSDB_CODE_SUCCESS;
10,410✔
189
}
190
/*
191
int32_t stmtBackupQueryFields(STscStmt* pStmt) {
192
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
193
  pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
194
  pRes->precision = pStmt->exec.pRequest->body.resInfo.precision;
195

196
  int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
197
  pRes->fields = taosMemoryMalloc(size);
198
  if (pRes->fields == NULL) {
199
    STMT_ERR_RET(terrno);
200
  }
201

202
  pRes->userFields = taosMemoryMalloc(size);
203
  if (pRes->userFields == NULL) {
204
    taosMemoryFreeClear(pRes->fields);
205
    STMT_ERR_RET(terrno);
206
  }
207

208
  (void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
209
  (void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
210

211
  return TSDB_CODE_SUCCESS;
212
}
213

214
int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
215
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
216
  int32_t            size = pRes->numOfCols * sizeof(TAOS_FIELD_E);
217

218
  pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols;
219
  pStmt->exec.pRequest->body.resInfo.precision = pRes->precision;
220

221
  if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
222
    pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size);
223
    if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
224
      STMT_ERR_RET(terrno);
225
    }
226
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
227
  }
228

229
  if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
230
    pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size);
231
    if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
232
      STMT_ERR_RET(terrno);
233
    }
234
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
235
  }
236

237
  return TSDB_CODE_SUCCESS;
238
}
239
*/
240
int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, const char* sTableName,
10,410✔
241
                           bool autoCreateTbl) {
242
  STscStmt* pStmt = (STscStmt*)stmt;
10,410✔
243
  char      tbFName[TSDB_TABLE_FNAME_LEN];
244
  int32_t   code = tNameExtractFullName(tbName, tbFName);
10,410✔
245
  if (code != 0) {
10,416!
246
    return code;
×
247
  }
248

249
  (void)memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
10,416✔
250
  tstrncpy(pStmt->bInfo.tbFName, tbFName, TSDB_TABLE_FNAME_LEN);
10,416✔
251
  pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
10,416✔
252

253
  pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
10,416✔
254
  pStmt->bInfo.tbSuid = pTableMeta->suid;
10,416✔
255
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
10,416✔
256
  pStmt->bInfo.tbType = pTableMeta->tableType;
10,416✔
257
  pStmt->bInfo.boundTags = tags;
10,416✔
258
  pStmt->bInfo.tagsCached = false;
10,416✔
259
  tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
10,416✔
260

261
  return TSDB_CODE_SUCCESS;
10,416✔
262
}
263

264
int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
10,405✔
265
  STscStmt* pStmt = (STscStmt*)stmt;
10,405✔
266

267
  pStmt->sql.pVgHash = pVgHash;
10,405✔
268
  pStmt->exec.pBlockHash = pBlockHash;
10,405✔
269

270
  return TSDB_CODE_SUCCESS;
10,405✔
271
}
272

273
int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, bool autoCreateTbl,
10,412✔
274
                       SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName, bool preCtbname) {
275
  STscStmt* pStmt = (STscStmt*)stmt;
10,412✔
276

277
  STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbName, sTableName, autoCreateTbl));
10,412!
278
  STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
10,414!
279

280
  pStmt->sql.autoCreateTbl = autoCreateTbl;
10,406✔
281
  if (pStmt->sql.autoCreateTbl) {
10,406✔
282
    pStmt->sql.stbInterlaceMode = false;
10,040✔
283
  }
284

285
  return TSDB_CODE_SUCCESS;
10,406✔
286
}
287

288
int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
18✔
289
  STscStmt* pStmt = (STscStmt*)stmt;
18✔
290

291
  *pVgHash = pStmt->sql.pVgHash;
18✔
292
  pStmt->sql.pVgHash = NULL;
18✔
293

294
  *pBlockHash = pStmt->exec.pBlockHash;
18✔
295
  pStmt->exec.pBlockHash = NULL;
18✔
296

297
  return TSDB_CODE_SUCCESS;
18✔
298
}
299

300
int32_t stmtCacheBlock(STscStmt* pStmt) {
987,157✔
301
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
987,157✔
302
    return TSDB_CODE_SUCCESS;
5✔
303
  }
304

305
  uint64_t uid = pStmt->bInfo.tbUid;
987,152✔
306
  uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
987,152!
307

308
  if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
987,152✔
309
    return TSDB_CODE_SUCCESS;
1,008,193✔
310
  }
311

312
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
10,803✔
313
  if (!pSrc) {
10,357!
314
    return terrno;
×
315
  }
316
  STableDataCxt* pDst = NULL;
10,357✔
317

318
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
10,357!
319

320
  SStmtTableCache cache = {
10,349✔
321
      .pDataCtx = pDst,
322
      .boundTags = pStmt->bInfo.boundTags,
10,349✔
323
  };
324

325
  if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
10,349!
326
    return terrno;
×
327
  }
328

329
  if (pStmt->sql.autoCreateTbl) {
10,357✔
330
    pStmt->bInfo.tagsCached = true;
10,040✔
331
  } else {
332
    pStmt->bInfo.boundTags = NULL;
317✔
333
  }
334

335
  return TSDB_CODE_SUCCESS;
10,357✔
336
}
337

338
int32_t stmtParseSql(STscStmt* pStmt) {
10,421✔
339
  pStmt->exec.pCurrBlock = NULL;
10,421✔
340

341
  SStmtCallback stmtCb = {
10,421✔
342
      .pStmt = pStmt,
343
      .getTbNameFn = stmtGetTbName,
344
      .setInfoFn = stmtUpdateInfo,
345
      .getExecInfoFn = stmtGetExecInfo,
346
  };
347

348
  STMT_ERR_RET(stmtCreateRequest(pStmt));
10,421!
349

350
  pStmt->stat.parseSqlNum++;
10,421✔
351
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
10,421✔
352
  pStmt->sql.siInfo.pQuery = pStmt->sql.pQuery;
10,416✔
353

354
  pStmt->bInfo.needParse = false;
10,416✔
355

356
  if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) {
10,416✔
357
    pStmt->sql.type = STMT_TYPE_INSERT;
3✔
358
    pStmt->sql.stbInterlaceMode = false;
3✔
359
  } else if (pStmt->sql.pQuery->pPrepareRoot) {
10,413✔
360
    pStmt->sql.type = STMT_TYPE_QUERY;
2✔
361
    pStmt->sql.stbInterlaceMode = false;
2✔
362

363
    return TSDB_CODE_SUCCESS;
2✔
364
  }
365

366
  STableDataCxt** pSrc =
367
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
10,414✔
368
  if (NULL == pSrc || NULL == *pSrc) {
10,415!
369
    return terrno;
×
370
  }
371

372
  STableDataCxt* pTableCtx = *pSrc;
10,415✔
373
  if (pStmt->sql.stbInterlaceMode) {
10,415✔
374
    int16_t lastIdx = -1;
57✔
375

376
    for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) {
560✔
377
      if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) {
503!
UNCOV
378
        pStmt->sql.stbInterlaceMode = false;
×
UNCOV
379
        break;
×
380
      }
381

382
      lastIdx = pTableCtx->boundColsInfo.pColIndex[i];
503✔
383
    }
384
  }
385

386
  if (NULL == pStmt->sql.pBindInfo) {
10,415✔
387
    pStmt->sql.pBindInfo = taosMemoryMalloc(pTableCtx->boundColsInfo.numOfBound * sizeof(*pStmt->sql.pBindInfo));
10,396!
388
    if (NULL == pStmt->sql.pBindInfo) {
10,394!
389
      return terrno;
×
390
    }
391
  }
392

393
  return TSDB_CODE_SUCCESS;
10,413✔
394
}
395

396
int32_t stmtCleanBindInfo(STscStmt* pStmt) {
23,384✔
397
  pStmt->bInfo.tbUid = 0;
23,384✔
398
  pStmt->bInfo.tbSuid = 0;
23,384✔
399
  pStmt->bInfo.tbVgId = -1;
23,384✔
400
  pStmt->bInfo.tbType = 0;
23,384✔
401
  pStmt->bInfo.needParse = true;
23,384✔
402
  pStmt->bInfo.inExecCache = false;
23,384✔
403

404
  pStmt->bInfo.tbName[0] = 0;
23,384✔
405
  pStmt->bInfo.tbFName[0] = 0;
23,384✔
406
  if (!pStmt->bInfo.tagsCached) {
23,384✔
407
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
3,301✔
408
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
3,300!
409
  }
410
  pStmt->bInfo.stbFName[0] = 0;
23,378✔
411

412
  return TSDB_CODE_SUCCESS;
23,378✔
413
}
414

415
void stmtFreeTableBlkList(STableColsData* pTb) {
×
416
  (void)qResetStmtColumns(pTb->aCol, true);
×
417
  taosArrayDestroy(pTb->aCol);
×
418
}
×
419

420
void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
2,256✔
421
  pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0);
2,256✔
422
  if (NULL == pTblBuf->pCurBuff) {
2,258✔
423
    tscError("QInfo:%p, failed to get buffer from list", pTblBuf);
1!
424
    return;
×
425
  }
426
  pTblBuf->buffIdx = 1;
2,257✔
427
  pTblBuf->buffOffset = sizeof(*pQueue->head);
2,257✔
428

429
  pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
2,257✔
430
  pQueue->qRemainNum = 0;
2,257✔
431
  pQueue->head->next = NULL;
2,257✔
432
}
433

434
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
43,229✔
435
  if (pStmt->sql.stbInterlaceMode) {
43,229✔
436
    if (deepClean) {
2,314✔
437
      taosHashCleanup(pStmt->exec.pBlockHash);
57✔
438
      pStmt->exec.pBlockHash = NULL;
57✔
439

440
      if (NULL != pStmt->exec.pCurrBlock) {
57!
441
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->pData);
57!
442
        qDestroyStmtDataBlock(pStmt->exec.pCurrBlock);
57✔
443
      }
444
    } else {
445
      pStmt->sql.siInfo.pTableColsIdx = 0;
2,257✔
446
      stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue);
2,257✔
447
    }
448
  } else {
449
    if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
40,915✔
450
      taos_free_result(pStmt->exec.pRequest);
40,913✔
451
      pStmt->exec.pRequest = NULL;
40,919✔
452
    }
453

454
    size_t keyLen = 0;
40,921✔
455
    void*  pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
40,921✔
456
    while (pIter) {
81,743✔
457
      STableDataCxt* pBlocks = *(STableDataCxt**)pIter;
40,815✔
458
      char*          key = taosHashGetKey(pIter, &keyLen);
40,815✔
459
      STableMeta*    pMeta = qGetTableMetaInDataBlock(pBlocks);
40,814✔
460

461
      if (keepTable && pBlocks == pStmt->exec.pCurrBlock) {
40,814✔
462
        TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData);
30,427✔
463
        STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
60,856!
464

465
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
30,426✔
466
        continue;
30,427✔
467
      }
468

469
      qDestroyStmtDataBlock(pBlocks);
10,387✔
470
      STMT_ERR_RET(taosHashRemove(pStmt->exec.pBlockHash, key, keyLen));
10,389!
471

472
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
10,389✔
473
    }
474

475
    if (keepTable) {
40,928✔
476
      return TSDB_CODE_SUCCESS;
30,429✔
477
    }
478

479
    taosHashCleanup(pStmt->exec.pBlockHash);
10,499✔
480
    pStmt->exec.pBlockHash = NULL;
10,499✔
481

482
    tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
10,499✔
483
    taosMemoryFreeClear(pStmt->exec.pCurrTbData);
10,499!
484
  }
485

486
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
12,812!
487

488
  return TSDB_CODE_SUCCESS;
12,808✔
489
}
490

491
void stmtFreeTbBuf(void* buf) {
75✔
492
  void* pBuf = *(void**)buf;
75✔
493
  taosMemoryFree(pBuf);
75!
494
}
75✔
495

496
void stmtFreeTbCols(void* buf) {
57,000✔
497
  SArray* pCols = *(SArray**)buf;
57,000✔
498
  taosArrayDestroy(pCols);
57,000✔
499
}
57,000✔
500

501
int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
10,553✔
502
  STMT_DLOG_E("start to free SQL info");
10,553✔
503

504
  taosMemoryFree(pStmt->sql.pBindInfo);
10,553!
505
  taosMemoryFree(pStmt->sql.queryRes.fields);
10,554!
506
  taosMemoryFree(pStmt->sql.queryRes.userFields);
10,554!
507
  taosMemoryFree(pStmt->sql.sqlStr);
10,553!
508
  qDestroyQuery(pStmt->sql.pQuery);
10,554✔
509
  taosArrayDestroy(pStmt->sql.nodeList);
10,555✔
510
  taosHashCleanup(pStmt->sql.pVgHash);
10,555✔
511
  pStmt->sql.pVgHash = NULL;
10,555✔
512

513
  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
10,555✔
514
  while (pIter) {
20,910✔
515
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
10,356✔
516

517
    qDestroyStmtDataBlock(pCache->pDataCtx);
10,356✔
518
    qDestroyBoundColInfo(pCache->boundTags);
10,357✔
519
    taosMemoryFreeClear(pCache->boundTags);
10,356!
520

521
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
10,357✔
522
  }
523
  taosHashCleanup(pStmt->sql.pTableCache);
10,554✔
524
  pStmt->sql.pTableCache = NULL;
10,555✔
525

526
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
10,555!
527
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
10,553!
528

529
  taos_free_result(pStmt->sql.siInfo.pRequest);
10,553✔
530
  taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
10,554✔
531
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
10,554✔
532
  taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
10,554✔
533
  taosMemoryFree(pStmt->sql.siInfo.pTSchema);
10,554!
534
  qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
10,554✔
535
  taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
10,554✔
536

537
  (void)memset(&pStmt->sql, 0, sizeof(pStmt->sql));
10,554✔
538
  pStmt->sql.siInfo.tableColsReady = true;
10,554✔
539

540
  STMT_DLOG_E("end to free SQL info");
10,554✔
541

542
  return TSDB_CODE_SUCCESS;
10,555✔
543
}
544

545
int32_t stmtTryAddTableVgroupInfo(STscStmt* pStmt, int32_t* vgId) {
29✔
546
  if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) {
29!
UNCOV
547
    return TSDB_CODE_SUCCESS;
×
548
  }
549

550
  SVgroupInfo      vgInfo = {0};
29✔
551
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
29✔
552
                           .requestId = pStmt->exec.pRequest->requestId,
29✔
553
                           .requestObjRefId = pStmt->exec.pRequest->self,
29✔
554
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
29✔
555

556
  int32_t code = catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo);
29✔
557
  if (TSDB_CODE_SUCCESS != code) {
29!
558
    return code;
×
559
  }
560

561
  code =
562
      taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
29✔
563
  if (TSDB_CODE_SUCCESS != code) {
29!
564
    return code;
×
565
  }
566

567
  *vgId = vgInfo.vgId;
29✔
568

569
  return TSDB_CODE_SUCCESS;
29✔
570
}
571

572
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
29✔
573
                             uint64_t suid, int32_t vgId) {
574
  STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
29!
575
  STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgId, pStmt->sql.autoCreateTbl));
29!
576

577
  STMT_DLOG("uid:%" PRId64 ", rebuild table data context, vgId:%d", uid, vgId);
29!
578

579
  return TSDB_CODE_SUCCESS;
29✔
580
}
581

582
int32_t stmtGetFromCache(STscStmt* pStmt) {
30,457✔
583
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) {
30,457!
584
    pStmt->bInfo.needParse = false;
×
585
    pStmt->bInfo.inExecCache = false;
×
586
    return TSDB_CODE_SUCCESS;
×
587
  }
588

589
  pStmt->bInfo.needParse = true;
30,457✔
590
  pStmt->bInfo.inExecCache = false;
30,457✔
591

592
  STableDataCxt** pCxtInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
30,457✔
593
  if (pCxtInExec) {
30,454✔
594
    pStmt->bInfo.needParse = false;
20,025✔
595
    pStmt->bInfo.inExecCache = true;
20,025✔
596

597
    pStmt->exec.pCurrBlock = *pCxtInExec;
20,025✔
598

599
    if (pStmt->sql.autoCreateTbl) {
20,025!
600
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
20,025!
601
      return TSDB_CODE_SUCCESS;
20,025✔
602
    }
603
  }
604

605
  if (NULL == pStmt->pCatalog) {
10,429✔
606
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
388✔
607
    pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
397✔
608
  }
609

610
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
10,438!
611
    if (pStmt->bInfo.inExecCache) {
10,394!
UNCOV
612
      pStmt->bInfo.needParse = false;
×
UNCOV
613
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
×
UNCOV
614
      return TSDB_CODE_SUCCESS;
×
615
    }
616

617
    tscDebug("no stmt block cache for tb %s", pStmt->bInfo.tbFName);
10,394✔
618
    return TSDB_CODE_SUCCESS;
10,394✔
619
  }
620

621
  if (pStmt->sql.autoCreateTbl) {
44✔
622
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
29✔
623
    if (pCache) {
29!
624
      pStmt->bInfo.needParse = false;
29✔
625
      pStmt->bInfo.tbUid = 0;
29✔
626

627
      STableDataCxt* pNewBlock = NULL;
29✔
628
      STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid, -1));
29!
629

630
      if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
29!
631
                      POINTER_BYTES)) {
632
        STMT_ERR_RET(terrno);
×
633
      }
634

635
      pStmt->exec.pCurrBlock = pNewBlock;
29✔
636

637
      tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
29!
638

639
      return TSDB_CODE_SUCCESS;
29✔
640
    }
641

642
    STMT_RET(stmtCleanBindInfo(pStmt));
×
643
  }
644

645
  uint64_t uid, suid;
646
  int32_t  vgId;
647
  int8_t   tableType;
648

649
  STableMeta*      pTableMeta = NULL;
15✔
650
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
15✔
651
                           .requestId = pStmt->exec.pRequest->requestId,
15✔
652
                           .requestObjRefId = pStmt->exec.pRequest->self,
15✔
653
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
15✔
654
  int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
18✔
655

656
  pStmt->stat.ctgGetTbMetaNum++;
18✔
657

658
  if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
18!
659
    tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
×
660
    STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
661

662
    STMT_ERR_RET(code);
×
663
  }
664

665
  STMT_ERR_RET(code);
18!
666

667
  uid = pTableMeta->uid;
18✔
668
  suid = pTableMeta->suid;
18✔
669
  tableType = pTableMeta->tableType;
18✔
670
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
18✔
671
  vgId = pTableMeta->vgId;
18✔
672

673
  taosMemoryFree(pTableMeta);
18!
674

675
  uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
18!
676

677
  if (uid == pStmt->bInfo.tbUid) {
18!
UNCOV
678
    pStmt->bInfo.needParse = false;
×
679

UNCOV
680
    tscDebug("tb %s is current table", pStmt->bInfo.tbFName);
×
681

UNCOV
682
    return TSDB_CODE_SUCCESS;
×
683
  }
684

685
  if (pStmt->bInfo.inExecCache) {
18!
UNCOV
686
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
×
UNCOV
687
    if (NULL == pCache) {
×
688
      tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash",
×
689
               pStmt->bInfo.tbFName, uid, cacheUid);
690

691
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
692
    }
693

UNCOV
694
    pStmt->bInfo.needParse = false;
×
695

UNCOV
696
    pStmt->bInfo.tbUid = uid;
×
UNCOV
697
    pStmt->bInfo.tbSuid = suid;
×
UNCOV
698
    pStmt->bInfo.tbType = tableType;
×
UNCOV
699
    pStmt->bInfo.boundTags = pCache->boundTags;
×
UNCOV
700
    pStmt->bInfo.tagsCached = true;
×
701

UNCOV
702
    tscDebug("tb %s in execBlock list, set to current", pStmt->bInfo.tbFName);
×
703

UNCOV
704
    return TSDB_CODE_SUCCESS;
×
705
  }
706

707
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
18✔
708
  if (pCache) {
18!
UNCOV
709
    pStmt->bInfo.needParse = false;
×
710

UNCOV
711
    pStmt->bInfo.tbUid = uid;
×
UNCOV
712
    pStmt->bInfo.tbSuid = suid;
×
UNCOV
713
    pStmt->bInfo.tbType = tableType;
×
UNCOV
714
    pStmt->bInfo.boundTags = pCache->boundTags;
×
UNCOV
715
    pStmt->bInfo.tagsCached = true;
×
716

UNCOV
717
    STableDataCxt* pNewBlock = NULL;
×
UNCOV
718
    STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid, vgId));
×
719

UNCOV
720
    if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
×
721
                    POINTER_BYTES)) {
722
      STMT_ERR_RET(terrno);
×
723
    }
724

UNCOV
725
    pStmt->exec.pCurrBlock = pNewBlock;
×
726

UNCOV
727
    tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);
×
728

UNCOV
729
    return TSDB_CODE_SUCCESS;
×
730
  }
731

732
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
18!
733

734
  return TSDB_CODE_SUCCESS;
18✔
735
}
736

737
int32_t stmtResetStmt(STscStmt* pStmt) {
9,996✔
738
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
9,996!
739

740
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
9,998✔
741
  if (NULL == pStmt->sql.pTableCache) {
9,996!
742
    STMT_ERR_RET(terrno);
×
743
  }
744

745
  pStmt->sql.status = STMT_INIT;
9,995✔
746

747
  return TSDB_CODE_SUCCESS;
9,995✔
748
}
749

750
int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) {
36,606✔
751
  SStmtQNode* pParam = (SStmtQNode*)param;
36,606✔
752

753
  if (pParam->restoreTbCols) {
36,606✔
754
    for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) {
36,547✔
755
      SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i);
34,296✔
756
      *p = taosArrayInit(20, POINTER_BYTES);
34,296✔
757
      if (*p == NULL) {
34,313!
758
        STMT_ERR_RET(terrno);
×
759
      }
760
    }
761

762
    atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
2,251✔
763
  } else {
764
    STMT_ERR_RET(qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
34,348!
765
                                        &pStmt->sql.siInfo, NULL));
766

767
    // taosMemoryFree(pParam->pTbData);
768

769
    (void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
34,162✔
770
  }
771
  return TSDB_CODE_SUCCESS;
36,644✔
772
}
773

774
void* stmtBindThreadFunc(void* param) {
76✔
775
  setThreadName("stmtBind");
76✔
776

777
  qInfo("stmt bind thread started");
76!
778

779
  STscStmt* pStmt = (STscStmt*)param;
76✔
780

781
  while (true) {
36,705✔
782
    if (pStmt->queue.stopQueue) {
36,781✔
783
      break;
75✔
784
    }
785

786
    SStmtQNode* asyncParam = NULL;
36,706✔
787
    if (!stmtDequeue(pStmt, &asyncParam)) {
36,706✔
788
      continue;
75✔
789
    }
790

791
    int ret = stmtAsyncOutput(pStmt, asyncParam);
36,629✔
792
    if (ret != 0) {
36,630!
793
      qError("stmtAsyncOutput failed, reason:%s", tstrerror(ret));
×
794
    }
795
  }
796

797
  qInfo("stmt bind thread stopped");
75!
798

799
  return NULL;
75✔
800
}
801

802
int32_t stmtStartBindThread(STscStmt* pStmt) {
76✔
803
  TdThreadAttr thAttr;
804
  if (taosThreadAttrInit(&thAttr) != 0) {
76!
805
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
806
  }
807
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
76!
808
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
809
  }
810

811
  if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) {
76!
812
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
813
    STMT_ERR_RET(terrno);
×
814
  }
815

816
  pStmt->bindThreadInUse = true;
76✔
817

818
  (void)taosThreadAttrDestroy(&thAttr);
76✔
819
  return TSDB_CODE_SUCCESS;
76✔
820
}
821

822
int32_t stmtInitQueue(STscStmt* pStmt) {
76✔
823
  (void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
76✔
824
  (void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
76✔
825
  STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
152!
826
  pStmt->queue.tail = pStmt->queue.head;
76✔
827

828
  return TSDB_CODE_SUCCESS;
76✔
829
}
830

831
int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
76✔
832
  pTblBuf->buffUnit = sizeof(SStmtQNode);
76✔
833
  pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
76✔
834
  pTblBuf->pBufList = taosArrayInit(100, POINTER_BYTES);
76✔
835
  if (NULL == pTblBuf->pBufList) {
76!
836
    return terrno;
×
837
  }
838
  void* buff = taosMemoryMalloc(pTblBuf->buffSize);
76!
839
  if (NULL == buff) {
76!
840
    return terrno;
×
841
  }
842

843
  if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
152!
844
    return terrno;
×
845
  }
846

847
  pTblBuf->pCurBuff = buff;
76✔
848
  pTblBuf->buffIdx = 1;
76✔
849
  pTblBuf->buffOffset = 0;
76✔
850

851
  return TSDB_CODE_SUCCESS;
76✔
852
}
853

854
TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) {
566✔
855
  STscObj*  pObj = (STscObj*)taos;
566✔
856
  STscStmt* pStmt = NULL;
566✔
857
  int32_t   code = 0;
566✔
858

859
  pStmt = taosMemoryCalloc(1, sizeof(STscStmt));
566!
860
  if (NULL == pStmt) {
565!
861
    return NULL;
×
862
  }
863

864
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
565✔
865
  if (NULL == pStmt->sql.pTableCache) {
567!
866
    taosMemoryFree(pStmt);
×
867
    return NULL;
×
868
  }
869

870
  pStmt->taos = pObj;
567✔
871
  pStmt->bInfo.needParse = true;
567✔
872
  pStmt->sql.status = STMT_INIT;
567✔
873
  pStmt->reqid = reqid;
567✔
874
  pStmt->errCode = TSDB_CODE_SUCCESS;
567✔
875

876
  if (NULL != pOptions) {
567✔
877
    (void)memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
76✔
878
    if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) {
76!
879
      pStmt->stbInterlaceMode = true;
76✔
880
    }
881
  }
882

883
  if (pStmt->stbInterlaceMode) {
567✔
884
    pStmt->sql.siInfo.transport = taos->pAppInfo->pTransporter;
76✔
885
    pStmt->sql.siInfo.acctId = taos->acctId;
76✔
886
    pStmt->sql.siInfo.dbname = taos->db;
76✔
887
    pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
76✔
888
    pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
76✔
889
    if (NULL == pStmt->sql.siInfo.pTableHash) {
76!
890
      (void)stmtClose(pStmt);
×
891
      return NULL;
×
892
    }
893
    pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
76✔
894
    if (NULL == pStmt->sql.siInfo.pTableCols) {
76!
895
      (void)stmtClose(pStmt);
×
896
      return NULL;
×
897
    }
898

899
    code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
76✔
900
    if (TSDB_CODE_SUCCESS == code) {
76!
901
      code = stmtInitQueue(pStmt);
76✔
902
    }
903
    if (TSDB_CODE_SUCCESS == code) {
76!
904
      code = stmtStartBindThread(pStmt);
76✔
905
    }
906
    if (TSDB_CODE_SUCCESS != code) {
76!
907
      terrno = code;
×
908
      (void)stmtClose(pStmt);
×
909
      return NULL;
×
910
    }
911
  }
912

913
  pStmt->sql.siInfo.tableColsReady = true;
567✔
914

915
  STMT_LOG_SEQ(STMT_INIT);
567✔
916

917
  tscDebug("stmt:%p initialized", pStmt);
567✔
918

919
  return pStmt;
567✔
920
}
921

922
int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
10,562✔
923
  STscStmt* pStmt = (STscStmt*)stmt;
10,562✔
924

925
  STMT_DLOG_E("start to prepare");
10,562✔
926

927
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
10,562!
928
    return pStmt->errCode;
×
929
  }
930

931
  if (pStmt->sql.status >= STMT_PREPARE) {
10,562✔
932
    STMT_ERR_RET(stmtResetStmt(pStmt));
9,998!
933
  }
934

935
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));
10,559!
936

937
  if (length <= 0) {
10,559✔
938
    length = strlen(sql);
292✔
939
  }
940

941
  pStmt->sql.sqlStr = taosStrndup(sql, length);
10,559!
942
  if (!pStmt->sql.sqlStr) {
10,561!
943
    return terrno;
×
944
  }
945
  pStmt->sql.sqlLen = length;
10,561✔
946
  pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode;
10,561✔
947

948
  char* dbName = NULL;
10,561✔
949
  if (qParseDbName(sql, length, &dbName)) {
10,561✔
950
    STMT_ERR_RET(stmtSetDbName(stmt, dbName));
3!
951
    taosMemoryFreeClear(dbName);
4!
952
  }
953

954
  return TSDB_CODE_SUCCESS;
10,561✔
955
}
956

957
int32_t stmtInitStbInterlaceTableInfo(STscStmt* pStmt) {
57✔
958
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
57✔
959
  if (!pSrc) {
57!
960
    return terrno;
×
961
  }
962
  STableDataCxt* pDst = NULL;
57✔
963

964
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
57!
965
  pStmt->sql.siInfo.pDataCtx = pDst;
51✔
966

967
  SArray* pTblCols = NULL;
51✔
968
  for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) {
51,933✔
969
    pTblCols = taosArrayInit(20, POINTER_BYTES);
51,832✔
970
    if (NULL == pTblCols) {
53,852!
971
      return terrno;
×
972
    }
973

974
    if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
105,734!
975
      return terrno;
×
976
    }
977
  }
978

979
  pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
101✔
980

981
  return TSDB_CODE_SUCCESS;
101✔
982
}
983

984
int stmtSetDbName(TAOS_STMT* stmt, const char* dbName) {
3✔
985
  STscStmt* pStmt = (STscStmt*)stmt;
3✔
986

987
  STMT_DLOG("start to set dbName:%s", dbName);
3!
988

989
  STMT_ERR_RET(stmtCreateRequest(pStmt));
3!
990

991
  // The SQL statement specifies a database name, overriding the previously specified database
992
  taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
3!
993
  pStmt->exec.pRequest->pDb = taosStrdup(dbName);
3!
994
  if (pStmt->exec.pRequest->pDb == NULL) {
3!
995
    return terrno;
×
996
  }
997
  return TSDB_CODE_SUCCESS;
3✔
998
}
999

1000
int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
64,715✔
1001
  STscStmt* pStmt = (STscStmt*)stmt;
64,715✔
1002

1003
  int64_t startUs = taosGetTimestampUs();
64,674✔
1004

1005
  STMT_DLOG("start to set tbName:%s", tbName);
64,674✔
1006

1007
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
64,682✔
1008
    return pStmt->errCode;
24✔
1009
  }
1010

1011
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
64,658!
1012

1013
  int32_t insert = 0;
64,519✔
1014
  STMT_ERR_RET(stmtIsInsert(stmt, &insert));
64,519!
1015
  if (0 == insert) {
64,518!
1016
    tscError("set tb name not available for none insert statement");
×
1017
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1018
  }
1019

1020
  if (!pStmt->sql.stbInterlaceMode || NULL == pStmt->sql.siInfo.pDataCtx) {
64,518✔
1021
    STMT_ERR_RET(stmtCreateRequest(pStmt));
30,465!
1022

1023
    STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
30,473!
1024
                              pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1025
    STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
30,471✔
1026

1027
    STMT_ERR_RET(stmtGetFromCache(pStmt));
30,458!
1028

1029
    if (pStmt->bInfo.needParse) {
30,467✔
1030
      tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
10,411✔
1031
      pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
10,411✔
1032

1033
      STMT_ERR_RET(stmtParseSql(pStmt));
10,411✔
1034
    }
1035
  } else {
1036
    tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
34,053✔
1037
    pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
34,053✔
1038
    pStmt->exec.pRequest->requestId++;
34,053✔
1039
    pStmt->bInfo.needParse = false;
34,053✔
1040
  }
1041

1042
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
64,516✔
1043
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
57!
1044
  }
1045

1046
  int64_t startUs2 = taosGetTimestampUs();
64,570✔
1047
  pStmt->stat.setTbNameUs += startUs2 - startUs;
64,570✔
1048

1049
  return TSDB_CODE_SUCCESS;
64,570✔
1050
}
1051

1052
int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
66✔
1053
  STscStmt* pStmt = (STscStmt*)stmt;
66✔
1054

1055
  STMT_DLOG_E("start to set tbTags");
66✔
1056

1057
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
66✔
1058
    return pStmt->errCode;
4✔
1059
  }
1060

1061
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
62!
1062

1063
  SBoundColInfo* tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
62✔
1064
  if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) {
62!
1065
    tscWarn("no tags bound in sql, will not bound tags");
×
1066
    return TSDB_CODE_SUCCESS;
×
1067
  }
1068

1069
  if (pStmt->bInfo.inExecCache) {
62✔
1070
    return TSDB_CODE_SUCCESS;
9✔
1071
  }
1072

1073
  STableDataCxt** pDataBlock =
1074
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
53✔
1075
  if (NULL == pDataBlock) {
53!
1076
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1077
    STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1078
  }
1079

1080
  tscDebug("start to bind stmt tag values");
53✔
1081
  STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
53!
1082
                                  pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
1083
                                  pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt));
1084

1085
  return TSDB_CODE_SUCCESS;
53✔
1086
}
1087

UNCOV
1088
int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
×
UNCOV
1089
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
1090
    return pStmt->errCode;
×
1091
  }
1092

UNCOV
1093
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
1094
    tscError("invalid operation to get query tag fileds");
×
1095
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1096
  }
1097

1098
  STableDataCxt** pDataBlock =
UNCOV
1099
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
UNCOV
1100
  if (NULL == pDataBlock) {
×
1101
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1102
    STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1103
  }
1104

UNCOV
1105
  STMT_ERR_RET(qBuildStmtTagFields(*pDataBlock, pStmt->bInfo.boundTags, fieldNum, fields));
×
1106

UNCOV
1107
  return TSDB_CODE_SUCCESS;
×
1108
}
1109

1110
int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
1,053✔
1111
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
1,053!
1112
    return pStmt->errCode;
×
1113
  }
1114

1115
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
1,053!
1116
    tscError("invalid operation to get query column fileds");
×
1117
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1118
  }
1119

1120
  STableDataCxt** pDataBlock = NULL;
1,053✔
1121

1122
  if (pStmt->sql.stbInterlaceMode) {
1,053!
UNCOV
1123
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
×
1124
  } else {
1125
    pDataBlock =
1126
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
1,053✔
1127
    if (NULL == pDataBlock) {
1,053!
1128
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1129
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1130
    }
1131
  }
1132

1133
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
1,053!
1134

1135
  return TSDB_CODE_SUCCESS;
1,053✔
1136
}
1137

1138
/*
1139
SArray* stmtGetFreeCol(STscStmt* pStmt, int32_t* idx) {
1140
  while (true) {
1141
    if (pStmt->exec.smInfo.pColIdx >= STMT_COL_BUF_SIZE) {
1142
      pStmt->exec.smInfo.pColIdx = 0;
1143
    }
1144

1145
    if ((pStmt->exec.smInfo.pColIdx + 1) == atomic_load_32(&pStmt->exec.smInfo.pColFreeIdx)) {
1146
      taosUsleep(1);
1147
      continue;
1148
    }
1149

1150
    *idx = pStmt->exec.smInfo.pColIdx;
1151
    return pStmt->exec.smInfo.pCols[pStmt->exec.smInfo.pColIdx++];
1152
  }
1153
}
1154
*/
1155

1156
int32_t stmtAppendTablePostHandle(STscStmt* pStmt, SStmtQNode* param) {
34,195✔
1157
  if (NULL == pStmt->sql.siInfo.pVgroupHash) {
34,195✔
1158
    pStmt->sql.siInfo.pVgroupHash =
2,256✔
1159
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,253✔
1160
  }
1161
  if (NULL == pStmt->sql.siInfo.pVgroupList) {
34,198✔
1162
    pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES);
2,256✔
1163
  }
1164

1165
  if (NULL == pStmt->sql.siInfo.pRequest) {
34,197✔
1166
    STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false,
57!
1167
                              (SRequestObj**)&pStmt->sql.siInfo.pRequest, pStmt->reqid));
1168

1169
    if (pStmt->reqid != 0) {
57!
1170
      pStmt->reqid++;
×
1171
    }
1172
    pStmt->exec.pRequest->syncQuery = true;
57✔
1173

1174
    pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId;
57✔
1175
    pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self;
57✔
1176
  }
1177

1178
  if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] &&
34,197✔
1179
      0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) {
286✔
1180
    pStmt->sql.siInfo.tbFromHash = true;
24✔
1181
  }
1182

1183
  if (0 == pStmt->sql.siInfo.firstName[0]) {
34,197✔
1184
    tstrncpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
57✔
1185
  }
1186

1187
  param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash;
34,197✔
1188
  param->next = NULL;
34,197✔
1189

1190
  (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
34,197✔
1191

1192
  stmtEnqueue(pStmt, param);
34,362✔
1193

1194
  return TSDB_CODE_SUCCESS;
34,327✔
1195
}
1196

1197
static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt* pStmt, SArray** pTableCols) {
1198
  while (true) {
1199
    if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) {
34,085!
1200
      *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++);
34,088✔
1201
      break;
34,040✔
1202
    } else {
1203
      SArray* pTblCols = NULL;
×
1204
      for (int32_t i = 0; i < 100; i++) {
×
1205
        pTblCols = taosArrayInit(20, POINTER_BYTES);
×
1206
        if (NULL == pTblCols) {
×
1207
          return terrno;
×
1208
        }
1209

1210
        if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
×
1211
          return terrno;
×
1212
        }
1213
      }
1214
    }
1215
  }
1216

1217
  return TSDB_CODE_SUCCESS;
34,040✔
1218
}
1219

1220
int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
1,037,021✔
1221
  STscStmt* pStmt = (STscStmt*)stmt;
1,037,021✔
1222
  int32_t   code = 0;
1,037,021✔
1223

1224
  int64_t startUs = taosGetTimestampUs();
1,026,534✔
1225

1226
  STMT_DLOG("start to bind stmt data, colIdx:%d", colIdx);
1,026,534✔
1227

1228
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
1,039,946✔
1229
    return pStmt->errCode;
84✔
1230
  }
1231

1232
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
1,039,862!
1233

1234
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
1,033,890!
1235
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1236
    pStmt->bInfo.needParse = false;
×
1237
  }
1238

1239
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
1,033,890!
UNCOV
1240
    taos_free_result(pStmt->exec.pRequest);
×
UNCOV
1241
    pStmt->exec.pRequest = NULL;
×
1242
  }
1243

1244
  STMT_ERR_RET(stmtCreateRequest(pStmt));
1,033,890!
1245

1246
  if (pStmt->bInfo.needParse) {
1,024,403✔
1247
    STMT_ERR_RET(stmtParseSql(pStmt));
8✔
1248
  }
1249

1250
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
1,033,503✔
1251
    STMT_ERR_RET(qStmtBindParams(pStmt->sql.pQuery, bind, colIdx, pStmt->taos->optionInfo.charsetCxt));
2!
1252

1253
    SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
2✔
1254
                         .acctId = pStmt->taos->acctId,
2✔
1255
                         .db = pStmt->exec.pRequest->pDb,
2✔
1256
                         .topicQuery = false,
1257
                         .pSql = pStmt->sql.sqlStr,
2✔
1258
                         .sqlLen = pStmt->sql.sqlLen,
2✔
1259
                         .pMsg = pStmt->exec.pRequest->msgBuf,
2✔
1260
                         .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1261
                         .pTransporter = pStmt->taos->pAppInfo->pTransporter,
2✔
1262
                         .pStmtCb = NULL,
1263
                         .pUser = pStmt->taos->user,
2✔
1264
                         .setQueryFp = setQueryRequest};
1265

1266
    ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
2✔
1267
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog));
2!
1268

1269
    STMT_ERR_RET(qStmtParseQuerySql(&ctx, pStmt->sql.pQuery));
2!
1270

1271
    if (pStmt->sql.pQuery->haveResultSet) {
2!
1272
      STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
2!
1273
                                    pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true));
1274
      taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
2!
1275
      taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
2!
1276
      setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
2✔
1277
    }
1278

1279
    TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
2✔
1280
    TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
2✔
1281
    TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
2✔
1282

1283
    // if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
1284
    //   STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
1285
    // }
1286

1287
    // STMT_ERR_RET(stmtBackupQueryFields(pStmt));
1288

1289
    return TSDB_CODE_SUCCESS;
2✔
1290
  }
1291

1292
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
1,033,501!
UNCOV
1293
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1294
  }
1295

1296
  STableDataCxt** pDataBlock = NULL;
1,026,708✔
1297

1298
  if (pStmt->exec.pCurrBlock) {
1,026,708✔
1299
    pDataBlock = &pStmt->exec.pCurrBlock;
1,016,291✔
1300
  } else {
1301
    pDataBlock =
1302
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
10,417✔
1303
    if (NULL == pDataBlock) {
10,417!
1304
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
UNCOV
1305
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1306
    }
1307
    pStmt->exec.pCurrBlock = *pDataBlock;
10,417✔
1308
    if (pStmt->sql.stbInterlaceMode) {
10,417✔
1309
      taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
57✔
1310
      pStmt->exec.pCurrBlock->pData->aCol = NULL;
57✔
1311
    }
1312
  }
1313

1314
  int64_t startUs2 = taosGetTimestampUs();
1,021,415✔
1315
  pStmt->stat.bindDataUs1 += startUs2 - startUs;
1,021,415✔
1316

1317
  SStmtQNode* param = NULL;
1,021,415✔
1318
  if (pStmt->sql.stbInterlaceMode) {
1,021,415✔
1319
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
68,160!
1320
    STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, &param->tblData.aCol));
68,125!
1321
    taosArrayClear(param->tblData.aCol);
34,040✔
1322

1323
    // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES);
1324

1325
    param->restoreTbCols = false;
33,692✔
1326
    tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
33,692✔
1327
  }
1328

1329
  int64_t startUs3 = taosGetTimestampUs();
1,017,925✔
1330
  pStmt->stat.bindDataUs2 += startUs3 - startUs2;
1,017,925✔
1331

1332
  SArray* pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol;
1,017,925✔
1333

1334
  if (colIdx < 0) {
1,017,925!
1335
    if (pStmt->sql.stbInterlaceMode) {
1,040,569✔
1336
      (*pDataBlock)->pData->flags = 0;
33,972✔
1337
      code =
1338
          qBindStmtStbColsValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen,
33,972✔
1339
                                &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo, pStmt->taos->optionInfo.charsetCxt);
33,972✔
1340
    } else {
1341
      code = qBindStmtColsValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen,
1,006,597✔
1342
                                pStmt->taos->optionInfo.charsetCxt);
1,006,597✔
1343
    }
1344

1345
    if (code) {
1,032,449!
UNCOV
1346
      tscError("qBindStmtColsValue failed, error:%s", tstrerror(code));
×
UNCOV
1347
      STMT_ERR_RET(code);
×
1348
    }
1349
  } else {
1350
    if (pStmt->sql.stbInterlaceMode) {
×
1351
      tscError("bind single column not allowed in stb insert mode");
×
UNCOV
1352
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1353
    }
1354

1355
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
×
1356
      tscError("bind column index not in sequence");
×
UNCOV
1357
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1358
    }
1359

UNCOV
1360
    pStmt->bInfo.sBindLastIdx = colIdx;
×
1361

UNCOV
1362
    if (0 == colIdx) {
×
1363
      pStmt->bInfo.sBindRowNum = bind->num;
154✔
1364
    }
1365

1366
    code =
UNCOV
1367
        qBindStmtSingleColValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen,
×
UNCOV
1368
                                colIdx, pStmt->bInfo.sBindRowNum, pStmt->taos->optionInfo.charsetCxt);
×
1369
    if (code) {
732!
1370
      tscError("qBindStmtSingleColValue failed, error:%s", tstrerror(code));
×
UNCOV
1371
      STMT_ERR_RET(code);
×
1372
    }
1373
  }
1374

1375
  int64_t startUs4 = taosGetTimestampUs();
1,031,354✔
1376
  pStmt->stat.bindDataUs3 += startUs4 - startUs3;
1,031,354✔
1377

1378
  if (pStmt->sql.stbInterlaceMode) {
1,031,354✔
1379
    STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param));
34,184!
1380
  }
1381

1382
  pStmt->stat.bindDataUs4 += taosGetTimestampUs() - startUs4;
1,043,315✔
1383

1384
  return TSDB_CODE_SUCCESS;
1,043,315✔
1385
}
1386

1387
int stmtAddBatch(TAOS_STMT* stmt) {
995,290✔
1388
  STscStmt* pStmt = (STscStmt*)stmt;
995,290✔
1389

1390
  int64_t startUs = taosGetTimestampUs();
1,001,866✔
1391

1392
  STMT_DLOG_E("start to add batch");
1,001,866✔
1393

1394
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
993,841✔
1395
    return pStmt->errCode;
30✔
1396
  }
1397

1398
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
993,811!
1399

1400
  if (pStmt->sql.stbInterlaceMode) {
1,002,253✔
1401
    int64_t startUs2 = taosGetTimestampUs();
2,258✔
1402
    pStmt->stat.addBatchUs += startUs2 - startUs;
2,258✔
1403

1404
    pStmt->sql.siInfo.tableColsReady = false;
2,258✔
1405

1406
    SStmtQNode* param = NULL;
2,258✔
1407
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
4,516!
1408
    param->restoreTbCols = true;
2,258✔
1409
    param->next = NULL;
2,258✔
1410

1411
    stmtEnqueue(pStmt, param);
2,258✔
1412

1413
    return TSDB_CODE_SUCCESS;
2,258✔
1414
  }
1415

1416
  STMT_ERR_RET(stmtCacheBlock(pStmt));
999,995!
1417

1418
  return TSDB_CODE_SUCCESS;
1,011,087✔
1419
}
1420
/*
1421
int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
1422
  tscDebug("stmt start to update tbUid, blockNum:%d", pRsp->nBlocks);
1423

1424
  int32_t code = 0;
1425
  int32_t finalCode = 0;
1426
  size_t  keyLen = 0;
1427
  void*   pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
1428
  while (pIter) {
1429
    STableDataCxt* pBlock = *(STableDataCxt**)pIter;
1430
    char*          key = taosHashGetKey(pIter, &keyLen);
1431

1432
    STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
1433
    if (pMeta->uid) {
1434
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1435
      continue;
1436
    }
1437

1438
    SSubmitBlkRsp* blkRsp = NULL;
1439
    int32_t        i = 0;
1440
    for (; i < pRsp->nBlocks; ++i) {
1441
      blkRsp = pRsp->pBlocks + i;
1442
      if (strlen(blkRsp->tblFName) != keyLen) {
1443
        continue;
1444
      }
1445

1446
      if (strncmp(blkRsp->tblFName, key, keyLen)) {
1447
        continue;
1448
      }
1449

1450
      break;
1451
    }
1452

1453
    if (i < pRsp->nBlocks) {
1454
      tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid,
1455
               blkRsp->uid);
1456

1457
      pMeta->uid = blkRsp->uid;
1458
      pStmt->bInfo.tbUid = blkRsp->uid;
1459
    } else {
1460
      tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
1461
      if (NULL == pStmt->pCatalog) {
1462
        code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
1463
        if (code) {
1464
          pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1465
          finalCode = code;
1466
          continue;
1467
        }
1468
      }
1469

1470
      code = stmtCreateRequest(pStmt);
1471
      if (code) {
1472
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1473
        finalCode = code;
1474
        continue;
1475
      }
1476

1477
      STableMeta*      pTableMeta = NULL;
1478
      SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
1479
                               .requestId = pStmt->exec.pRequest->requestId,
1480
                               .requestObjRefId = pStmt->exec.pRequest->self,
1481
                               .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
1482
      code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
1483

1484
      pStmt->stat.ctgGetTbMetaNum++;
1485

1486
      taos_free_result(pStmt->exec.pRequest);
1487
      pStmt->exec.pRequest = NULL;
1488

1489
      if (code || NULL == pTableMeta) {
1490
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1491
        finalCode = code;
1492
        taosMemoryFree(pTableMeta);
1493
        continue;
1494
      }
1495

1496
      pMeta->uid = pTableMeta->uid;
1497
      pStmt->bInfo.tbUid = pTableMeta->uid;
1498
      taosMemoryFree(pTableMeta);
1499
    }
1500

1501
    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1502
  }
1503

1504
  return finalCode;
1505
}
1506
*/
1507

1508
/*
1509
int stmtStaticModeExec(TAOS_STMT* stmt) {
1510
  STscStmt*   pStmt = (STscStmt*)stmt;
1511
  int32_t     code = 0;
1512
  SSubmitRsp* pRsp = NULL;
1513
  if (pStmt->sql.staticMode) {
1514
    return TSDB_CODE_TSC_STMT_API_ERROR;
1515
  }
1516

1517
  STMT_DLOG_E("start to exec");
1518

1519
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
1520

1521
  STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList,
1522
pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum));
1523

1524
  launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
1525

1526
  if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
1527
    code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
1528
    if (code) {
1529
      pStmt->exec.pRequest->code = code;
1530
    } else {
1531
      tFreeSSubmitRsp(pRsp);
1532
      STMT_ERR_RET(stmtResetStmt(pStmt));
1533
      STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
1534
    }
1535
  }
1536

1537
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
1538

1539
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
1540
  pStmt->affectedRows += pStmt->exec.affectedRows;
1541

1542
_return:
1543

1544
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
1545

1546
  tFreeSSubmitRsp(pRsp);
1547

1548
  ++pStmt->sql.runTimes;
1549

1550
  STMT_RET(code);
1551
}
1552
*/
1553

1554
int stmtExec(TAOS_STMT* stmt) {
32,713✔
1555
  STscStmt*   pStmt = (STscStmt*)stmt;
32,713✔
1556
  int32_t     code = 0;
32,713✔
1557
  SSubmitRsp* pRsp = NULL;
32,713✔
1558

1559
  int64_t startUs = taosGetTimestampUs();
32,715✔
1560

1561
  STMT_DLOG_E("start to exec");
32,715✔
1562

1563
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
32,715✔
1564
    return pStmt->errCode;
30✔
1565
  }
1566

1567
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
32,685✔
1568

1569
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
32,683✔
1570
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
2✔
1571
  } else {
1572
    if (pStmt->sql.stbInterlaceMode) {
32,681✔
1573
      int64_t startTs = taosGetTimestampUs();
2,257✔
1574
      while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
4,948✔
1575
        taosUsleep(1);
2,693✔
1576
      }
1577
      pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs;
2,257✔
1578

1579
      STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList));
2,257!
1580
      taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
2,257✔
1581
      pStmt->sql.siInfo.pVgroupHash = NULL;
2,257✔
1582
      pStmt->sql.siInfo.pVgroupList = NULL;
2,257✔
1583
    } else {
1584
      tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
30,424✔
1585
      taosMemoryFreeClear(pStmt->exec.pCurrTbData);
30,426!
1586

1587
      STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
30,426!
1588

1589
      STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
30,424!
1590
    }
1591

1592
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
32,680✔
1593
  }
1594

1595
  if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
32,683!
1596
    code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
8✔
1597
    if (code) {
×
UNCOV
1598
      pStmt->exec.pRequest->code = code;
×
1599
    } else {
1600
      tFreeSSubmitRsp(pRsp);
×
1601
      STMT_ERR_RET(stmtResetStmt(pStmt));
×
UNCOV
1602
      STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
×
1603
    }
1604
  }
1605

1606
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
32,675✔
1607

1608
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
32,674✔
1609
  pStmt->affectedRows += pStmt->exec.affectedRows;
32,685✔
1610

1611
_return:
32,686✔
1612

1613
  while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
32,686!
UNCOV
1614
    taosUsleep(1);
×
1615
  }
1616

1617
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
32,686!
1618

1619
  tFreeSSubmitRsp(pRsp);
32,684✔
1620

1621
  ++pStmt->sql.runTimes;
32,682✔
1622

1623
  int64_t startUs2 = taosGetTimestampUs();
32,687✔
1624
  pStmt->stat.execUseUs += startUs2 - startUs;
32,687✔
1625

1626
  STMT_RET(code);
32,687✔
1627
}
1628

1629
int stmtClose(TAOS_STMT* stmt) {
557✔
1630
  STscStmt* pStmt = (STscStmt*)stmt;
557✔
1631

1632
  STMT_DLOG_E("start to free stmt");
557✔
1633

1634
  pStmt->queue.stopQueue = true;
557✔
1635
  
1636
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
557✔
1637
  (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
557✔
1638
  (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
557✔
1639
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
557✔
1640

1641
  if (pStmt->bindThreadInUse) {
557✔
1642
    (void)taosThreadJoin(pStmt->bindThread, NULL);
75✔
1643
    pStmt->bindThreadInUse = false;
75✔
1644
  }
1645

1646
  (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
557✔
1647
  (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
556✔
1648

1649
  STMT_DLOG("stmt %p closed, stbInterlaceMode:%d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
557✔
1650
            ", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
1651
            ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
1652
            ", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64
1653
            ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64,
1654
            pStmt, pStmt->sql.stbInterlaceMode, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo,
1655
            pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum, pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND],
1656
            pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE], pStmt->stat.setTbNameUs,
1657
            pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
1658
            pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
1659

1660
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
557!
1661
  taosMemoryFree(stmt);
557!
1662

1663
  return TSDB_CODE_SUCCESS;
557✔
1664
}
1665

1666
const char* stmtErrstr(TAOS_STMT* stmt) {
184✔
1667
  STscStmt* pStmt = (STscStmt*)stmt;
184✔
1668

1669
  if (stmt == NULL || NULL == pStmt->exec.pRequest) {
184!
1670
    return (char*)tstrerror(terrno);
1✔
1671
  }
1672

1673
  pStmt->exec.pRequest->code = terrno;
183✔
1674

1675
  return taos_errstr(pStmt->exec.pRequest);
183✔
1676
}
1677

1678
int stmtAffectedRows(TAOS_STMT* stmt) { return ((STscStmt*)stmt)->affectedRows; }
6✔
1679

1680
int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt*)stmt)->exec.affectedRows; }
28✔
1681

1682
int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
1,100,550✔
1683
  STscStmt* pStmt = (STscStmt*)stmt;
1,100,550✔
1684

1685
  STMT_DLOG_E("start is insert");
1,100,550✔
1686

1687
  if (pStmt->sql.type) {
1,122,919✔
1688
    *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
1,112,513!
1689
  } else {
1690
    *insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
10,406✔
1691
  }
1692

1693
  return TSDB_CODE_SUCCESS;
1,122,922✔
1694
}
1695

1696
int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
2✔
1697
  int32_t   code = 0;
2✔
1698
  STscStmt* pStmt = (STscStmt*)stmt;
2✔
1699
  int32_t   preCode = pStmt->errCode;
2✔
1700

1701
  STMT_DLOG_E("start to get tag fields");
2!
1702

1703
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
2!
UNCOV
1704
    return pStmt->errCode;
×
1705
  }
1706

1707
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
2!
UNCOV
1708
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1709
  }
1710

1711
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
2!
1712

1713
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
2!
1714
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
UNCOV
1715
    pStmt->bInfo.needParse = false;
×
1716
  }
1717

1718
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
2!
1719
    taos_free_result(pStmt->exec.pRequest);
×
UNCOV
1720
    pStmt->exec.pRequest = NULL;
×
1721
  }
1722

1723
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
2!
1724

1725
  if (pStmt->bInfo.needParse) {
2!
1726
    STMT_ERRI_JRET(stmtParseSql(pStmt));
2!
1727
  }
1728

UNCOV
1729
  STMT_ERRI_JRET(stmtFetchTagFields(stmt, nums, fields));
×
1730

UNCOV
1731
_return:
×
1732

1733
  pStmt->errCode = preCode;
2✔
1734

1735
  return code;
2✔
1736
}
1737

1738
int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
151✔
1739
  int32_t   code = 0;
151✔
1740
  STscStmt* pStmt = (STscStmt*)stmt;
151✔
1741
  int32_t   preCode = pStmt->errCode;
151✔
1742

1743
  STMT_DLOG_E("start to get col fields");
151!
1744

1745
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
151!
UNCOV
1746
    return pStmt->errCode;
×
1747
  }
1748

1749
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
151!
UNCOV
1750
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1751
  }
1752

1753
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
151!
1754

1755
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
151!
1756
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
UNCOV
1757
    pStmt->bInfo.needParse = false;
×
1758
  }
1759

1760
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
151!
1761
    taos_free_result(pStmt->exec.pRequest);
×
1762
    pStmt->exec.pRequest = NULL;
×
UNCOV
1763
    STMT_ERR_RET(stmtCreateRequest(pStmt));
×
1764
  }
1765

1766
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
151!
1767

1768
  if (pStmt->bInfo.needParse) {
151✔
1769
    STMT_ERRI_JRET(stmtParseSql(pStmt));
1!
1770
  }
1771

1772
  STMT_ERRI_JRET(stmtFetchColFields(stmt, nums, fields));
151!
1773

1774
_return:
151✔
1775

1776
  pStmt->errCode = preCode;
151✔
1777

1778
  return code;
151✔
1779
}
1780

1781
int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
2✔
1782
  int       code = 0;
2✔
1783
  STscStmt* pStmt = (STscStmt*)stmt;
2✔
1784
  int32_t   preCode = pStmt->errCode;
2✔
1785

1786
  STMT_DLOG_E("start to get param num");
2!
1787

1788
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
2✔
1789
    return pStmt->errCode;
1✔
1790
  }
1791

1792
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
1!
1793

1794
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
1!
1795
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
UNCOV
1796
    pStmt->bInfo.needParse = false;
×
1797
  }
1798

1799
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
1!
UNCOV
1800
    taos_free_result(pStmt->exec.pRequest);
×
UNCOV
1801
    pStmt->exec.pRequest = NULL;
×
1802
  }
1803

1804
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
1!
1805

1806
  if (pStmt->bInfo.needParse) {
1!
1807
    STMT_ERRI_JRET(stmtParseSql(pStmt));
1!
1808
  }
1809

UNCOV
1810
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
UNCOV
1811
    *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
×
1812
  } else {
UNCOV
1813
    STMT_ERRI_JRET(stmtFetchColFields(stmt, nums, NULL));
×
1814
  }
1815

UNCOV
1816
_return:
×
1817

1818
  pStmt->errCode = preCode;
1✔
1819

1820
  return code;
1✔
1821
}
1822

1823
int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
902✔
1824
  int       code = 0;
902✔
1825
  STscStmt* pStmt = (STscStmt*)stmt;
902✔
1826
  int32_t   preCode = pStmt->errCode;
902✔
1827

1828
  STMT_DLOG_E("start to get param");
902!
1829

1830
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
902!
UNCOV
1831
    return pStmt->errCode;
×
1832
  }
1833

1834
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
902!
UNCOV
1835
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1836
  }
1837

1838
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
902!
1839

1840
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
902!
1841
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
UNCOV
1842
    pStmt->bInfo.needParse = false;
×
1843
  }
1844

1845
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
902!
1846
    taos_free_result(pStmt->exec.pRequest);
×
UNCOV
1847
    pStmt->exec.pRequest = NULL;
×
1848
  }
1849

1850
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
902!
1851

1852
  if (pStmt->bInfo.needParse) {
902!
UNCOV
1853
    STMT_ERRI_JRET(stmtParseSql(pStmt));
×
1854
  }
1855

1856
  int32_t       nums = 0;
902✔
1857
  TAOS_FIELD_E* pField = NULL;
902✔
1858
  STMT_ERRI_JRET(stmtFetchColFields(stmt, &nums, &pField));
902!
1859
  if (idx >= nums) {
902!
1860
    tscError("idx %d is too big", idx);
×
UNCOV
1861
    STMT_ERRI_JRET(TSDB_CODE_INVALID_PARA);
×
1862
  }
1863

1864
  *type = pField[idx].type;
902✔
1865
  *bytes = calcSchemaBytesFromTypeBytes(pField[idx].type, pField[idx].bytes, true);
902✔
1866

1867
_return:
902✔
1868

1869
  taosMemoryFree(pField);
902!
1870
  pStmt->errCode = preCode;
902✔
1871

1872
  return code;
902✔
1873
}
1874

1875
TAOS_RES* stmtUseResult(TAOS_STMT* stmt) {
2✔
1876
  STscStmt* pStmt = (STscStmt*)stmt;
2✔
1877

1878
  STMT_DLOG_E("start to use result");
2✔
1879

1880
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
2!
1881
    tscError("useResult only for query statement");
×
UNCOV
1882
    return NULL;
×
1883
  }
1884

1885
  return pStmt->exec.pRequest;
2✔
1886
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc