• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.21
/source/client/src/clientStmt2.c
1
#include "clientInt.h"
2
#include "clientLog.h"
3
#include "tdef.h"
4

5
#include "clientStmt.h"
6
#include "clientStmt2.h"
7
/*
8
char* gStmtStatusStr[] = {"unknown",     "init", "prepare", "settbname", "settags",
9
                          "fetchFields", "bind", "bindCol", "addBatch",  "exec"};
10
*/
11
static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pBuf) {
12
  if (pTblBuf->buffOffset < pTblBuf->buffSize) {
829✔
13
    *pBuf = (char*)pTblBuf->pCurBuff + pTblBuf->buffOffset;
828✔
14
    pTblBuf->buffOffset += pTblBuf->buffUnit;
828✔
15
  } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) {
1!
16
    pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++);
×
17
    if (NULL == pTblBuf->pCurBuff) {
×
18
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
19
    }
20
    *pBuf = pTblBuf->pCurBuff;
×
21
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
22
  } else {
23
    void* buff = taosMemoryMalloc(pTblBuf->buffSize);
×
24
    if (NULL == buff) {
×
25
      return terrno;
×
26
    }
27

28
    if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
×
29
      return terrno;
×
30
    }
31

32
    pTblBuf->buffIdx++;
×
33
    pTblBuf->pCurBuff = buff;
×
34
    *pBuf = buff;
×
35
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
36
  }
37

38
  return TSDB_CODE_SUCCESS;
828✔
39
}
40

41
static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
830✔
42
  int i = 0;
830✔
43
  while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
3,461✔
44
    if (i < 10) {
2,631✔
45
      taosUsleep(1);
2,518✔
46
      i++;
2,518✔
47
    } else {
48
      (void)taosThreadMutexLock(&pStmt->queue.mutex);
113✔
49
      if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
113✔
50
        (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
112✔
51
      }
52
      (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
113✔
53
    }
54
  }
55
  if (pStmt->queue.stopQueue) {
830✔
56
    return false;
24✔
57
  }
58
  SStmtQNode* orig = pStmt->queue.head;
806✔
59
  SStmtQNode* node = pStmt->queue.head->next;
806✔
60
  pStmt->queue.head = pStmt->queue.head->next;
806✔
61
  *param = node;
806✔
62

63
  (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
806✔
64

65
  return true;
806✔
66
}
67

68
static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
806✔
69
  pStmt->queue.tail->next = param;
806✔
70
  pStmt->queue.tail = param;
806✔
71

72
  pStmt->stat.bindDataNum++;
806✔
73

74
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
806✔
75
  (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
806✔
76
  (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
806✔
77
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
806✔
78
}
806✔
79

80
static int32_t stmtCreateRequest(STscStmt2* pStmt) {
601✔
81
  int32_t code = 0;
601✔
82

83
  if (pStmt->exec.pRequest == NULL) {
601✔
84
    code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
24✔
85
                        pStmt->reqid);
86
    if (pStmt->reqid != 0) {
24!
87
      pStmt->reqid++;
×
88
    }
89
    if (pStmt->db != NULL) {
24!
90
      taosMemoryFreeClear(pStmt->exec.pRequest->pDb); 
×
91
      pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
×
92
    }
93
    if (TSDB_CODE_SUCCESS == code) {
24!
94
      pStmt->exec.pRequest->syncQuery = true;
24✔
95
      pStmt->exec.pRequest->isStmtBind = true;
24✔
96
    }
97
  }
98

99
  return code;
601✔
100
}
101

102
static int32_t stmtSwitchStatus(STscStmt2* pStmt, STMT_STATUS newStatus) {
1,636✔
103
  int32_t code = 0;
1,636✔
104

105
  if (newStatus >= STMT_INIT && newStatus < STMT_MAX) {
1,636!
106
    STMT_LOG_SEQ(newStatus);
1,636!
107
  }
108

109
  if (pStmt->errCode && newStatus != STMT_PREPARE) {
1,636!
110
    STMT_DLOG("stmt already failed with err:%s", tstrerror(pStmt->errCode));
×
111
    return pStmt->errCode;
×
112
  }
113

114
  switch (newStatus) {
1,636!
115
    case STMT_PREPARE:
24✔
116
      pStmt->errCode = 0;
24✔
117
      break;
24✔
118
    case STMT_SETTBNAME:
553✔
119
      if (STMT_STATUS_EQ(INIT)) {
553!
120
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
121
      }
122
      if (!pStmt->sql.stbInterlaceMode && (STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL))) {
553!
123
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
124
      }
125
      break;
553✔
126
    case STMT_SETTAGS:
×
127
      if (STMT_STATUS_EQ(INIT)) {
×
128
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
129
      }
130
      break;
×
131
    case STMT_FETCH_FIELDS:
×
132
      if (STMT_STATUS_EQ(INIT)) {
×
133
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
134
      }
135
      break;
×
136
    case STMT_BIND:
553✔
137
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
553!
138
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
139
      }
140
      /*
141
            if ((pStmt->sql.type == STMT_TYPE_MULTI_INSERT) && ()) {
142
              code = TSDB_CODE_TSC_STMT_API_ERROR;
143
            }
144
      */
145
      break;
553✔
146
    case STMT_BIND_COL:
×
147
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
×
148
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
149
      }
150
      break;
×
151
    case STMT_ADD_BATCH:
253✔
152
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
253!
153
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
154
      }
155
      break;
253✔
156
    case STMT_EXECUTE:
253✔
157
      if (STMT_TYPE_QUERY == pStmt->sql.type) {
253!
158
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
×
159
            STMT_STATUS_NE(BIND_COL)) {
×
160
          code = TSDB_CODE_TSC_STMT_API_ERROR;
×
161
        }
162
      } else {
163
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
253!
164
          code = TSDB_CODE_TSC_STMT_API_ERROR;
×
165
        }
166
      }
167
      break;
253✔
168
    default:
×
169
      code = TSDB_CODE_APP_ERROR;
×
170
      break;
×
171
  }
172

173
  STMT_ERR_RET(code);
1,636!
174

175
  pStmt->sql.status = newStatus;
1,636✔
176

177
  return TSDB_CODE_SUCCESS;
1,636✔
178
}
179

180
static int32_t stmtGetTbName(TAOS_STMT2* stmt, char** tbName) {
24✔
181
  STscStmt2* pStmt = (STscStmt2*)stmt;
24✔
182

183
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
24✔
184

185
  if ('\0' == pStmt->bInfo.tbName[0]) {
24!
186
    tscWarn("no table name set, OK if it is a stmt get fields");
×
187
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
×
188
  }
189

190
  *tbName = pStmt->bInfo.tbName;
24✔
191

192
  return TSDB_CODE_SUCCESS;
24✔
193
}
194

195
static int32_t stmtUpdateBindInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SName* tbName,
24✔
196
                                  const char* sTableName, bool autoCreateTbl, bool preCtbname) {
197
  STscStmt2* pStmt = (STscStmt2*)stmt;
24✔
198
  char       tbFName[TSDB_TABLE_FNAME_LEN];
199
  int32_t    code = tNameExtractFullName(tbName, tbFName);
24✔
200
  if (code != 0) {
24!
201
    return code;
×
202
  }
203

204
  (void)memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
24✔
205
  tstrncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName));
24✔
206
  pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
24✔
207

208
  pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
24!
209
  pStmt->bInfo.tbSuid = pTableMeta->suid;
24✔
210
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
24✔
211
  pStmt->bInfo.tbType = pTableMeta->tableType;
24✔
212

213
  if (!pStmt->bInfo.tagsCached) {
24!
214
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
24✔
215
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
23!
216
  }
217

218
  pStmt->bInfo.boundTags = tags;
21✔
219
  pStmt->bInfo.tagsCached = false;
21✔
220
  pStmt->bInfo.preCtbname = preCtbname;
21✔
221
  tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
21✔
222

223
  return TSDB_CODE_SUCCESS;
21✔
224
}
225

226
static int32_t stmtUpdateExecInfo(TAOS_STMT2* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
21✔
227
  STscStmt2* pStmt = (STscStmt2*)stmt;
21✔
228

229
  pStmt->sql.pVgHash = pVgHash;
21✔
230
  pStmt->exec.pBlockHash = pBlockHash;
21✔
231

232
  return TSDB_CODE_SUCCESS;
21✔
233
}
234

235
static int32_t stmtUpdateInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, bool autoCreateTbl,
24✔
236
                              SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName, bool preCtbname) {
237
  STscStmt2* pStmt = (STscStmt2*)stmt;
24✔
238

239
  STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbName, sTableName, autoCreateTbl, preCtbname));
24!
240
  STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
21!
241

242
  pStmt->sql.autoCreateTbl = autoCreateTbl;
20✔
243

244
  return TSDB_CODE_SUCCESS;
20✔
245
}
246

247
static int32_t stmtGetExecInfo(TAOS_STMT2* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
×
248
  STscStmt2* pStmt = (STscStmt2*)stmt;
×
249

250
  *pVgHash = pStmt->sql.pVgHash;
×
251
  pStmt->sql.pVgHash = NULL;
×
252

253
  *pBlockHash = pStmt->exec.pBlockHash;
×
254
  pStmt->exec.pBlockHash = NULL;
×
255

256
  return TSDB_CODE_SUCCESS;
×
257
}
258

259
static int32_t stmtParseSql(STscStmt2* pStmt) {
24✔
260
  pStmt->exec.pCurrBlock = NULL;
24✔
261

262
  SStmtCallback stmtCb = {
24✔
263
      .pStmt = pStmt,
264
      .getTbNameFn = stmtGetTbName,
265
      .setInfoFn = stmtUpdateInfo,
266
      .getExecInfoFn = stmtGetExecInfo,
267
  };
268

269
  STMT_ERR_RET(stmtCreateRequest(pStmt));
24!
270

271
  pStmt->stat.parseSqlNum++;
24✔
272
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
24!
273
  pStmt->sql.siInfo.pQuery = pStmt->sql.pQuery;
24✔
274

275
  pStmt->bInfo.needParse = false;
24✔
276

277
  if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) {
24!
278
    pStmt->sql.type = STMT_TYPE_INSERT;
×
279
    pStmt->sql.stbInterlaceMode = false;
×
280
  } else if (pStmt->sql.pQuery->pPrepareRoot) {
24!
281
    pStmt->sql.type = STMT_TYPE_QUERY;
×
282
    pStmt->sql.stbInterlaceMode = false;
×
283

284
    return TSDB_CODE_SUCCESS;
×
285
  }
286

287
  STableDataCxt** pSrc =
288
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
24✔
289
  if (NULL == pSrc || NULL == *pSrc) {
24!
290
    return terrno;
×
291
  }
292

293
  STableDataCxt* pTableCtx = *pSrc;
24✔
294
  // if (pStmt->sql.stbInterlaceMode) {
295
  //   int16_t lastIdx = -1;
296

297
  //   for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) {
298
  //     if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) {
299
  //       pStmt->sql.stbInterlaceMode = false;
300
  //       break;
301
  //     }
302

303
  //     lastIdx = pTableCtx->boundColsInfo.pColIndex[i];
304
  //   }
305
  // }
306

307
  if (NULL == pStmt->sql.pBindInfo) {
24!
308
    pStmt->sql.pBindInfo = taosMemoryMalloc(pTableCtx->boundColsInfo.numOfBound * sizeof(*pStmt->sql.pBindInfo));
24!
309
    if (NULL == pStmt->sql.pBindInfo) {
24!
310
      return terrno;
×
311
    }
312
  }
313

314
  return TSDB_CODE_SUCCESS;
24✔
315
}
316

317
static int32_t stmtCleanBindInfo(STscStmt2* pStmt) {
300✔
318
  pStmt->bInfo.tbUid = 0;
300✔
319
  pStmt->bInfo.tbVgId = -1;
300✔
320
  pStmt->bInfo.tbType = 0;
300✔
321
  pStmt->bInfo.needParse = true;
300✔
322
  pStmt->bInfo.inExecCache = false;
300✔
323

324
  pStmt->bInfo.tbName[0] = 0;
300✔
325
  pStmt->bInfo.tbFName[0] = 0;
300✔
326
  if (!pStmt->bInfo.tagsCached) {
300!
327
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
300✔
328
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
301!
329
  }
330
  if (!pStmt->sql.autoCreateTbl) {
301!
331
    pStmt->bInfo.stbFName[0] = 0;
301✔
332
    pStmt->bInfo.tbSuid = 0;
301✔
333
  }
334

335
  return TSDB_CODE_SUCCESS;
301✔
336
}
337

338
static void stmtFreeTableBlkList(STableColsData* pTb) {
×
339
  (void)qResetStmtColumns(pTb->aCol, true);
×
340
  taosArrayDestroy(pTb->aCol);
×
341
}
×
342

343
static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
253✔
344
  pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0);
253✔
345
  if (NULL == pTblBuf->pCurBuff) {
253✔
346
    tscError("QInfo:%p, failed to get buffer from list", pTblBuf);
1!
347
    return;
×
348
  }
349
  pTblBuf->buffIdx = 1;
252✔
350
  pTblBuf->buffOffset = sizeof(*pQueue->head);
252✔
351

352
  pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
252✔
353
  pQueue->qRemainNum = 0;
252✔
354
  pQueue->head->next = NULL;
252✔
355
}
356

357
static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
277✔
358
  if (pStmt->sql.stbInterlaceMode) {
277!
359
    if (deepClean) {
277✔
360
      taosHashCleanup(pStmt->exec.pBlockHash);
24✔
361
      pStmt->exec.pBlockHash = NULL;
24✔
362

363
      if (NULL != pStmt->exec.pCurrBlock) {
24!
364
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->pData);
24!
365
        qDestroyStmtDataBlock(pStmt->exec.pCurrBlock);
24✔
366
      }
367
    } else {
368
      pStmt->sql.siInfo.pTableColsIdx = 0;
253✔
369
      stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue);
253✔
370
    }
371
    if (NULL != pStmt->exec.pRequest) {
276!
372
      pStmt->exec.pRequest->body.resInfo.numOfRows = 0;
276✔
373
    }
374
  } else {
375
    if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
×
376
      // if (!pStmt->options.asyncExecFn) {
377
      taos_free_result(pStmt->exec.pRequest);
×
378
      pStmt->exec.pRequest = NULL;
×
379
      //}
380
    }
381

382
    size_t keyLen = 0;
×
383
    void*  pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
×
384
    while (pIter) {
×
385
      STableDataCxt* pBlocks = *(STableDataCxt**)pIter;
×
386
      char*          key = taosHashGetKey(pIter, &keyLen);
×
387
      STableMeta*    pMeta = qGetTableMetaInDataBlock(pBlocks);
×
388

389
      if (keepTable && pBlocks == pStmt->exec.pCurrBlock) {
×
390
        TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData);
×
391
        STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
×
392

393
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
×
394
        continue;
×
395
      }
396

397
      qDestroyStmtDataBlock(pBlocks);
×
398
      STMT_ERR_RET(taosHashRemove(pStmt->exec.pBlockHash, key, keyLen));
×
399

400
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
×
401
    }
402

403
    if (keepTable) {
×
404
      return TSDB_CODE_SUCCESS;
×
405
    }
406

407
    taosHashCleanup(pStmt->exec.pBlockHash);
×
408
    pStmt->exec.pBlockHash = NULL;
×
409

410
    tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
×
411
    taosMemoryFreeClear(pStmt->exec.pCurrTbData);
×
412
  }
413

414
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
276!
415

416
  return TSDB_CODE_SUCCESS;
276✔
417
}
418

419
static void stmtFreeTbBuf(void* buf) {
24✔
420
  void* pBuf = *(void**)buf;
24✔
421
  taosMemoryFree(pBuf);
24!
422
}
24✔
423

424
static void stmtFreeTbCols(void* buf) {
24,000✔
425
  SArray* pCols = *(SArray**)buf;
24,000✔
426
  taosArrayDestroy(pCols);
24,000✔
427
}
24,000✔
428

429
static int32_t stmtCleanSQLInfo(STscStmt2* pStmt) {
24✔
430
  STMT_DLOG_E("start to free SQL info");
24!
431

432
  taosMemoryFreeClear(pStmt->db);
24!
433
  taosMemoryFree(pStmt->sql.pBindInfo);
24!
434
  taosMemoryFree(pStmt->sql.queryRes.fields);
24!
435
  taosMemoryFree(pStmt->sql.queryRes.userFields);
24!
436
  taosMemoryFree(pStmt->sql.sqlStr);
24!
437
  qDestroyQuery(pStmt->sql.pQuery);
24✔
438
  taosArrayDestroy(pStmt->sql.nodeList);
24✔
439
  taosHashCleanup(pStmt->sql.pVgHash);
24✔
440
  pStmt->sql.pVgHash = NULL;
24✔
441
  if (pStmt->sql.fixValueTags) {
24!
442
    tdDestroySVCreateTbReq(pStmt->sql.fixValueTbReq);
×
443
  }
444

445
  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
24✔
446
  while (pIter) {
24!
447
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
×
448

449
    qDestroyStmtDataBlock(pCache->pDataCtx);
×
450
    qDestroyBoundColInfo(pCache->boundTags);
×
451
    taosMemoryFreeClear(pCache->boundTags);
×
452

453
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
×
454
  }
455
  taosHashCleanup(pStmt->sql.pTableCache);
24✔
456
  pStmt->sql.pTableCache = NULL;
24✔
457

458
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
24!
459
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
24!
460

461
  taos_free_result(pStmt->sql.siInfo.pRequest);
24✔
462
  taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
24✔
463
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
24✔
464
  taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
24✔
465
  taosMemoryFree(pStmt->sql.siInfo.pTSchema);
24!
466
  qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
24✔
467
  taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
24✔
468

469
  (void)memset(&pStmt->sql, 0, sizeof(pStmt->sql));
24✔
470
  pStmt->sql.siInfo.tableColsReady = true;
24✔
471

472
  STMT_DLOG_E("end to free SQL info");
24!
473

474
  return TSDB_CODE_SUCCESS;
24✔
475
}
476

477
static int32_t stmtTryAddTableVgroupInfo(STscStmt2* pStmt, int32_t* vgId) {
×
478
  if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) {
×
479
    return TSDB_CODE_SUCCESS;
×
480
  }
481

482
  SVgroupInfo      vgInfo = {0};
×
483
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
×
484
                           .requestId = pStmt->exec.pRequest->requestId,
×
485
                           .requestObjRefId = pStmt->exec.pRequest->self,
×
486
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
×
487

488
  int32_t code = catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo);
×
489
  if (TSDB_CODE_SUCCESS != code) {
×
490
    return code;
×
491
  }
492

493
  code =
494
      taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
×
495
  if (TSDB_CODE_SUCCESS != code) {
×
496
    return code;
×
497
  }
498

499
  *vgId = vgInfo.vgId;
×
500

501
  return TSDB_CODE_SUCCESS;
×
502
}
503

504
static int32_t stmtRebuildDataBlock(STscStmt2* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
×
505
                                    uint64_t suid, int32_t vgId) {
506
  STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
×
507
  STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgId, pStmt->sql.autoCreateTbl));
×
508

509
  STMT_DLOG("uid:%" PRId64 ", rebuild table data context, vgId:%d", uid, vgId);
×
510

511
  return TSDB_CODE_SUCCESS;
×
512
}
513

514
static int32_t stmtGetFromCache(STscStmt2* pStmt) {
24✔
515
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) {
24!
516
    pStmt->bInfo.needParse = false;
×
517
    pStmt->bInfo.inExecCache = false;
×
518
    return TSDB_CODE_SUCCESS;
×
519
  }
520

521
  pStmt->bInfo.needParse = true;
24✔
522
  pStmt->bInfo.inExecCache = false;
24✔
523

524
  STableDataCxt** pCxtInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
24✔
525
  if (pCxtInExec) {
24!
526
    pStmt->bInfo.needParse = false;
×
527
    pStmt->bInfo.inExecCache = true;
×
528

529
    pStmt->exec.pCurrBlock = *pCxtInExec;
×
530

531
    if (pStmt->sql.autoCreateTbl) {
×
532
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
×
533
      return TSDB_CODE_SUCCESS;
×
534
    }
535
  }
536

537
  if (NULL == pStmt->pCatalog) {
24!
538
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
24✔
539
    pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
23✔
540
  }
541

542
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
23!
543
    if (pStmt->bInfo.inExecCache) {
24!
544
      pStmt->bInfo.needParse = false;
×
545
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
×
546
      return TSDB_CODE_SUCCESS;
×
547
    }
548

549
    tscDebug("no stmt block cache for tb %s", pStmt->bInfo.tbFName);
24!
550
    return TSDB_CODE_SUCCESS;
24✔
551
  }
552

553
  if (pStmt->sql.autoCreateTbl) {
×
554
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
×
555
    if (pCache) {
×
556
      pStmt->bInfo.needParse = false;
×
557
      pStmt->bInfo.tbUid = 0;
×
558

559
      STableDataCxt* pNewBlock = NULL;
×
560
      STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid, -1));
×
561

562
      if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
×
563
                      POINTER_BYTES)) {
564
        STMT_ERR_RET(terrno);
×
565
      }
566

567
      pStmt->exec.pCurrBlock = pNewBlock;
×
568

569
      tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
×
570

571
      return TSDB_CODE_SUCCESS;
×
572
    }
573

574
    STMT_RET(stmtCleanBindInfo(pStmt));
×
575
  }
576

577
  uint64_t uid, suid;
578
  int32_t  vgId;
579
  int8_t   tableType;
580

581
  STableMeta*      pTableMeta = NULL;
×
582
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
×
583
                           .requestId = pStmt->exec.pRequest->requestId,
×
584
                           .requestObjRefId = pStmt->exec.pRequest->self,
×
585
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
×
586
  int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
×
587

588
  pStmt->stat.ctgGetTbMetaNum++;
×
589

590
  if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
×
591
    tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
×
592
    STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
593

594
    STMT_ERR_RET(code);
×
595
  }
596

597
  STMT_ERR_RET(code);
×
598

599
  uid = pTableMeta->uid;
×
600
  suid = pTableMeta->suid;
×
601
  tableType = pTableMeta->tableType;
×
602
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
×
603
  vgId = pTableMeta->vgId;
×
604

605
  taosMemoryFree(pTableMeta);
×
606

607
  uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
×
608

609
  if (uid == pStmt->bInfo.tbUid) {
×
610
    pStmt->bInfo.needParse = false;
×
611

612
    tscDebug("tb %s is current table", pStmt->bInfo.tbFName);
×
613

614
    return TSDB_CODE_SUCCESS;
×
615
  }
616

617
  if (pStmt->bInfo.inExecCache) {
×
618
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
×
619
    if (NULL == pCache) {
×
620
      tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash",
×
621
               pStmt->bInfo.tbFName, uid, cacheUid);
622

623
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
624
    }
625

626
    pStmt->bInfo.needParse = false;
×
627

628
    pStmt->bInfo.tbUid = uid;
×
629
    pStmt->bInfo.tbSuid = suid;
×
630
    pStmt->bInfo.tbType = tableType;
×
631
    pStmt->bInfo.boundTags = pCache->boundTags;
×
632
    pStmt->bInfo.tagsCached = true;
×
633

634
    tscDebug("tb %s in execBlock list, set to current", pStmt->bInfo.tbFName);
×
635

636
    return TSDB_CODE_SUCCESS;
×
637
  }
638

639
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
×
640
  if (pCache) {
×
641
    pStmt->bInfo.needParse = false;
×
642

643
    pStmt->bInfo.tbUid = uid;
×
644
    pStmt->bInfo.tbSuid = suid;
×
645
    pStmt->bInfo.tbType = tableType;
×
646
    pStmt->bInfo.boundTags = pCache->boundTags;
×
647
    pStmt->bInfo.tagsCached = true;
×
648

649
    STableDataCxt* pNewBlock = NULL;
×
650
    STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid, vgId));
×
651

652
    if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
×
653
                    POINTER_BYTES)) {
654
      STMT_ERR_RET(terrno);
×
655
    }
656

657
    pStmt->exec.pCurrBlock = pNewBlock;
×
658

659
    tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);
×
660

661
    return TSDB_CODE_SUCCESS;
×
662
  }
663

664
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
665

666
  return TSDB_CODE_SUCCESS;
×
667
}
668

669
static int32_t stmtResetStmt(STscStmt2* pStmt) {
×
670
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
×
671

672
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
673
  if (NULL == pStmt->sql.pTableCache) {
×
674
    STMT_ERR_RET(terrno);
×
675
  }
676

677
  pStmt->sql.status = STMT_INIT;
×
678

679
  return TSDB_CODE_SUCCESS;
×
680
}
681

682
static int32_t stmtAsyncOutput(STscStmt2* pStmt, void* param) {
806✔
683
  SStmtQNode* pParam = (SStmtQNode*)param;
806✔
684

685
  if (pParam->restoreTbCols) {
806✔
686
    for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) {
806✔
687
      SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i);
553✔
688
      *p = taosArrayInit(20, POINTER_BYTES);
553✔
689
      if (*p == NULL) {
553!
690
        STMT_ERR_RET(terrno);
×
691
      }
692
    }
693

694
    atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
253✔
695
  } else {
696
    int code = qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
553✔
697
                                      &pStmt->sql.siInfo, pParam->pCreateTbReq);
698
    // taosMemoryFree(pParam->pTbData);
699
    (void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
553✔
700
    STMT_ERR_RET(code);
553!
701
  }
702
  return TSDB_CODE_SUCCESS;
806✔
703
}
704

705
static void* stmtBindThreadFunc(void* param) {
24✔
706
  setThreadName("stmtBind");
24✔
707

708
  qInfo("stmt bind thread started");
24!
709

710
  STscStmt2* pStmt = (STscStmt2*)param;
24✔
711

712
  while (true) {
830✔
713
    if (pStmt->queue.stopQueue) {
854✔
714
      break;
24✔
715
    }
716

717
    SStmtQNode* asyncParam = NULL;
830✔
718
    if (!stmtDequeue(pStmt, &asyncParam)) {
830✔
719
      continue;
24✔
720
    }
721

722
    if (stmtAsyncOutput(pStmt, asyncParam) != 0) {
806!
723
      qError("stmt async output failed");
×
724
    }
725
  }
726

727
  qInfo("stmt bind thread stopped");
24!
728

729
  return NULL;
24✔
730
}
731

732
static int32_t stmtStartBindThread(STscStmt2* pStmt) {
24✔
733
  TdThreadAttr thAttr;
734
  if (taosThreadAttrInit(&thAttr) != 0) {
24!
735
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
736
  }
737
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
24!
738
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
739
  }
740

741
  if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) {
24!
742
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
743
    STMT_ERR_RET(terrno);
×
744
  }
745

746
  pStmt->bindThreadInUse = true;
24✔
747

748
  (void)taosThreadAttrDestroy(&thAttr);
24✔
749
  return TSDB_CODE_SUCCESS;
24✔
750
}
751

752
static int32_t stmtInitQueue(STscStmt2* pStmt) {
24✔
753
  (void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
24✔
754
  (void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
24✔
755
  STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
48!
756
  pStmt->queue.tail = pStmt->queue.head;
24✔
757

758
  return TSDB_CODE_SUCCESS;
24✔
759
}
760

761
static int32_t stmtIniAsyncBind(STscStmt2* pStmt) {
24✔
762
  (void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL);
24✔
763
  (void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL);
24✔
764
  pStmt->asyncBindParam.asyncBindNum = 0;
24✔
765

766
  return TSDB_CODE_SUCCESS;
24✔
767
}
768

769
static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
24✔
770
  pTblBuf->buffUnit = sizeof(SStmtQNode);
24✔
771
  pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
24✔
772
  pTblBuf->pBufList = taosArrayInit(100, POINTER_BYTES);
24✔
773
  if (NULL == pTblBuf->pBufList) {
24!
774
    return terrno;
×
775
  }
776
  void* buff = taosMemoryMalloc(pTblBuf->buffSize);
24!
777
  if (NULL == buff) {
24!
778
    return terrno;
×
779
  }
780

781
  if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
48!
782
    return terrno;
×
783
  }
784

785
  pTblBuf->pCurBuff = buff;
24✔
786
  pTblBuf->buffIdx = 1;
24✔
787
  pTblBuf->buffOffset = 0;
24✔
788

789
  return TSDB_CODE_SUCCESS;
24✔
790
}
791

792
TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
24✔
793
  STscObj*   pObj = (STscObj*)taos;
24✔
794
  STscStmt2* pStmt = NULL;
24✔
795
  int32_t    code = 0;
24✔
796

797
  pStmt = taosMemoryCalloc(1, sizeof(STscStmt2));
24!
798
  if (NULL == pStmt) {
24!
799
    return NULL;
×
800
  }
801

802
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
24✔
803
  if (NULL == pStmt->sql.pTableCache) {
24!
804
    taosMemoryFree(pStmt);
×
805
    return NULL;
×
806
  }
807

808
  pStmt->taos = pObj;
24✔
809
  pStmt->bInfo.needParse = true;
24✔
810
  pStmt->sql.status = STMT_INIT;
24✔
811
  pStmt->errCode = TSDB_CODE_SUCCESS;
24✔
812

813
  if (NULL != pOptions) {
24!
814
    (void)memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
24✔
815
    if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) {
24!
816
      pStmt->stbInterlaceMode = true;
24✔
817
    }
818

819
    pStmt->reqid = pOptions->reqid;
24✔
820
  }
821

822
  if (pStmt->stbInterlaceMode) {
24!
823
    pStmt->sql.siInfo.transport = taos->pAppInfo->pTransporter;
24✔
824
    pStmt->sql.siInfo.acctId = taos->acctId;
24✔
825
    pStmt->sql.siInfo.dbname = taos->db;
24✔
826
    pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
24✔
827
    pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
24✔
828
    if (NULL == pStmt->sql.siInfo.pTableHash) {
24!
829
      (void)stmtClose2(pStmt);
×
830
      return NULL;
×
831
    }
832
    pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
24✔
833
    if (NULL == pStmt->sql.siInfo.pTableCols) {
24!
834
      terrno = terrno;
×
835
      (void)stmtClose2(pStmt);
×
836
      return NULL;
×
837
    }
838

839
    code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
24✔
840
    if (TSDB_CODE_SUCCESS == code) {
24!
841
      code = stmtInitQueue(pStmt);
24✔
842
    }
843
    if (TSDB_CODE_SUCCESS == code) {
24!
844
      code = stmtStartBindThread(pStmt);
24✔
845
    }
846
    if (TSDB_CODE_SUCCESS != code) {
24!
847
      terrno = code;
×
848
      (void)stmtClose2(pStmt);
×
849
      return NULL;
×
850
    }
851
  }
852

853
  pStmt->sql.siInfo.tableColsReady = true;
24✔
854
  if (pStmt->options.asyncExecFn) {
24!
855
    if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) {
×
856
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
857
      (void)stmtClose2(pStmt);
×
858
      return NULL;
×
859
    }
860
  }
861
  code = stmtIniAsyncBind(pStmt);
24✔
862
  if (TSDB_CODE_SUCCESS != code) {
24!
863
    terrno = code;
×
864
    (void)stmtClose2(pStmt);
×
865
    return NULL;
×
866
  }
867

868
  pStmt->execSemWaited = false;
24✔
869

870
  STMT_LOG_SEQ(STMT_INIT);
24!
871

872
  tscDebug("stmt:%p initialized", pStmt);
24!
873

874
  return pStmt;
24✔
875
}
876

877
static int stmtSetDbName2(TAOS_STMT2* stmt, const char* dbName) {
×
878
  STscStmt2* pStmt = (STscStmt2*)stmt;
×
879

880
  STMT_DLOG("start to set dbName:%s", dbName);
×
881

882
  pStmt->db = taosStrdup(dbName);
×
883
  (void)strdequote(pStmt->db);
×
884
  STMT_ERR_RET(stmtCreateRequest(pStmt));
×
885

886
  // The SQL statement specifies a database name, overriding the previously specified database
887
  taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
×
888
  pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
×
889
  if (pStmt->exec.pRequest->pDb == NULL) {
×
890
    return terrno;
×
891
  }
892
  return TSDB_CODE_SUCCESS;
×
893
}
894

895
int stmtPrepare2(TAOS_STMT2* stmt, const char* sql, unsigned long length) {
24✔
896
  STscStmt2* pStmt = (STscStmt2*)stmt;
24✔
897

898
  STMT_DLOG_E("start to prepare");
24!
899

900
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
24!
901
    return pStmt->errCode;
×
902
  }
903

904
  if (pStmt->sql.status >= STMT_PREPARE) {
24!
905
    STMT_ERR_RET(stmtResetStmt(pStmt));
×
906
  }
907

908
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));
24!
909

910
  if (length <= 0) {
24!
911
    length = strlen(sql);
×
912
  }
913

914
  pStmt->sql.sqlStr = taosStrndup(sql, length);
24!
915
  if (!pStmt->sql.sqlStr) {
24!
916
    return terrno;
×
917
  }
918
  pStmt->sql.sqlLen = length;
24✔
919
  pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode;
24✔
920

921
  char* dbName = NULL;
24✔
922
  if (qParseDbName(sql, length, &dbName)) {
24!
923
    STMT_ERR_RET(stmtSetDbName2(stmt, dbName));
×
924
    taosMemoryFreeClear(dbName);
×
925
  }
926

927
  return TSDB_CODE_SUCCESS;
24✔
928
}
929

930
static int32_t stmtInitStbInterlaceTableInfo(STscStmt2* pStmt) {
24✔
931
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
24✔
932
  if (!pSrc) {
24!
933
    return terrno;
×
934
  }
935
  STableDataCxt* pDst = NULL;
24✔
936

937
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
24!
938
  pStmt->sql.siInfo.pDataCtx = pDst;
23✔
939

940
  SArray* pTblCols = NULL;
23✔
941
  for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) {
21,020✔
942
    pTblCols = taosArrayInit(20, POINTER_BYTES);
20,948✔
943
    if (NULL == pTblCols) {
21,900!
944
      return terrno;
×
945
    }
946

947
    if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
42,897!
948
      return terrno;
×
949
    }
950
  }
951

952
  pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
72✔
953

954
  return TSDB_CODE_SUCCESS;
72✔
955
}
956

957
int stmtIsInsert2(TAOS_STMT2* stmt, int* insert) {
1,106✔
958
  STscStmt2* pStmt = (STscStmt2*)stmt;
1,106✔
959

960
  STMT_DLOG_E("start is insert");
1,106!
961

962
  if (pStmt->sql.type) {
1,106✔
963
    *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
1,082!
964
  } else {
965
    *insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
24✔
966
  }
967

968
  return TSDB_CODE_SUCCESS;
1,106✔
969
}
970

971
int stmtSetTbName2(TAOS_STMT2* stmt, const char* tbName) {
553✔
972
  STscStmt2* pStmt = (STscStmt2*)stmt;
553✔
973

974
  int64_t startUs = taosGetTimestampUs();
553✔
975

976
  STMT_DLOG("start to set tbName:%s", tbName);
553!
977

978
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
553!
979
    return pStmt->errCode;
×
980
  }
981

982
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
553!
983

984
  int32_t insert = 0;
553✔
985
  STMT_ERR_RET(stmtIsInsert2(stmt, &insert));
553!
986
  if (0 == insert) {
553!
987
    tscError("set tb name not available for none insert statement");
×
988
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
989
  }
990

991
  if (!pStmt->sql.stbInterlaceMode || NULL == pStmt->sql.siInfo.pDataCtx) {
553!
992
    STMT_ERR_RET(stmtCreateRequest(pStmt));
24!
993

994
    STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
24!
995
                              pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
996
    STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
24!
997

998
    STMT_ERR_RET(stmtGetFromCache(pStmt));
24!
999

1000
    if (pStmt->bInfo.needParse) {
23!
1001
      tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
24✔
1002
      pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
24✔
1003

1004
      STMT_ERR_RET(stmtParseSql(pStmt));
24!
1005
    }
1006
  } else {
1007
    tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
529✔
1008
    pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
529✔
1009
    pStmt->exec.pRequest->requestId++;
529✔
1010
    pStmt->bInfo.needParse = false;
529✔
1011
  }
1012

1013
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
553!
1014
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
24!
1015
  }
1016

1017
  int64_t startUs2 = taosGetTimestampUs();
553✔
1018
  pStmt->stat.setTbNameUs += startUs2 - startUs;
553✔
1019

1020
  return TSDB_CODE_SUCCESS;
553✔
1021
}
1022

1023
int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags, SVCreateTbReq** pCreateTbReq) {
×
1024
  STscStmt2* pStmt = (STscStmt2*)stmt;
×
1025

1026
  STMT_DLOG_E("start to set tbTags");
×
1027

1028
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
1029
    return pStmt->errCode;
×
1030
  }
1031

1032
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
×
1033

1034
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
×
1035
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1036
    pStmt->bInfo.needParse = false;
×
1037
  }
1038
  STMT_ERR_RET(stmtCreateRequest(pStmt));
×
1039

1040
  if (pStmt->bInfo.needParse) {
×
1041
    STMT_ERR_RET(stmtParseSql(pStmt));
×
1042
  }
1043
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
×
1044
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1045
  }
1046

1047
  SBoundColInfo* tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
×
1048
  // if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) {
1049
  //   tscWarn("no tags or cols bound in sql, will not bound tags");
1050
  //   return TSDB_CODE_SUCCESS;
1051
  // }
1052
  if (pStmt->sql.autoCreateTbl && pStmt->sql.stbInterlaceMode) {
×
1053
    STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, pStmt->bInfo.tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
×
1054
                              pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1055
    STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
×
1056
  }
1057

1058
  STableDataCxt** pDataBlock = NULL;
×
1059
  if (pStmt->exec.pCurrBlock) {
×
1060
    pDataBlock = &pStmt->exec.pCurrBlock;
×
1061
  } else {
1062
    pDataBlock =
1063
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
1064
    if (NULL == pDataBlock) {
×
1065
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1066
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1067
    }
1068
    // pStmt->exec.pCurrBlock = *pDataBlock;
1069
    // if (pStmt->sql.stbInterlaceMode) {
1070
    //   taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
1071
    //   (*pDataBlock)->pData->aCol = NULL;
1072
    // }
1073
  }
1074
  if (pStmt->bInfo.inExecCache && !pStmt->sql.autoCreateTbl) {
×
1075
    return TSDB_CODE_SUCCESS;
×
1076
  }
1077

1078
  tscDebug("start to bind stmt tag values");
×
1079

1080
  void* boundTags = NULL;
×
1081
  if (pStmt->sql.stbInterlaceMode) {
×
1082
    boundTags = pStmt->sql.siInfo.boundTags;
×
1083
    *pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
×
1084
    if (NULL == pCreateTbReq) {
×
1085
      return terrno;
×
1086
    }
1087
    int32_t vgId = -1;
×
1088
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
×
1089
    (*pCreateTbReq)->uid = vgId;
×
1090
  } else {
1091
    boundTags = pStmt->bInfo.boundTags;
×
1092
  }
1093

1094
  STMT_ERR_RET(qBindStmtTagsValue2(*pDataBlock, boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
×
1095
                                   pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
1096
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt, *pCreateTbReq));
1097

1098
  return TSDB_CODE_SUCCESS;
×
1099
}
1100

1101
int stmtCheckTags2(TAOS_STMT2* stmt, SVCreateTbReq** pCreateTbReq) {
×
1102
  STscStmt2* pStmt = (STscStmt2*)stmt;
×
1103

1104
  STMT_DLOG_E("start to set tbTags");
×
1105

1106
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
1107
    return pStmt->errCode;
×
1108
  }
1109

1110
  if (!pStmt->sql.stbInterlaceMode) {
×
1111
    return TSDB_CODE_SUCCESS;
×
1112
  }
1113

1114
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
×
1115

1116
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
×
1117
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1118
    pStmt->bInfo.needParse = false;
×
1119
  }
1120
  STMT_ERR_RET(stmtCreateRequest(pStmt));
×
1121

1122
  if (pStmt->bInfo.needParse) {
×
1123
    STMT_ERR_RET(stmtParseSql(pStmt));
×
1124
    if (!pStmt->sql.autoCreateTbl) {
×
1125
      return TSDB_CODE_SUCCESS;
×
1126
    }
1127
  }
1128

1129
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
×
1130
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1131
  }
1132

1133
  STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, pStmt->bInfo.tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
×
1134
                            pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1135
  STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
×
1136

1137
  STableDataCxt** pDataBlock = NULL;
×
1138
  if (pStmt->exec.pCurrBlock) {
×
1139
    pDataBlock = &pStmt->exec.pCurrBlock;
×
1140
  } else {
1141
    pDataBlock =
1142
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
1143
    if (NULL == pDataBlock) {
×
1144
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1145
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1146
    }
1147
  }
1148

1149
  if (!((*pDataBlock)->pData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE)) {
×
1150
    return TSDB_CODE_SUCCESS;
×
1151
  }
1152

1153
  if (pStmt->sql.fixValueTags) {
×
1154
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
×
1155
    if ((*pCreateTbReq)->name) {
×
1156
      taosMemoryFree((*pCreateTbReq)->name);
×
1157
    }
1158
    (*pCreateTbReq)->name = taosStrdup(pStmt->bInfo.tbName);
×
1159
    int32_t vgId = -1;
×
1160
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
×
1161
    (*pCreateTbReq)->uid = vgId;
×
1162
    return TSDB_CODE_SUCCESS;
×
1163
  }
1164

1165
  if ((*pDataBlock)->pData->pCreateTbReq) {
×
1166
    pStmt->sql.fixValueTags = true;
×
1167
    STMT_ERR_RET(cloneSVreateTbReq((*pDataBlock)->pData->pCreateTbReq, &pStmt->sql.fixValueTbReq));
×
1168
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
×
1169
    (*pCreateTbReq)->uid = (*pDataBlock)->pMeta->vgId;
×
1170
  }
1171

1172
  return TSDB_CODE_SUCCESS;
×
1173
}
1174

1175
static int stmtFetchColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
×
1176
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
1177
    return pStmt->errCode;
×
1178
  }
1179

1180
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
1181
    tscError("invalid operation to get query column fileds");
×
1182
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1183
  }
1184

1185
  STableDataCxt** pDataBlock = NULL;
×
1186

1187
  if (pStmt->sql.stbInterlaceMode) {
×
1188
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
×
1189
  } else {
1190
    pDataBlock =
1191
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
1192
    if (NULL == pDataBlock) {
×
1193
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1194
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1195
    }
1196
  }
1197

1198
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
×
1199

1200
  return TSDB_CODE_SUCCESS;
×
1201
}
1202

1203
static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_ALL** fields) {
×
1204
  int32_t    code = 0;
×
1205
  int32_t    preCode = pStmt->errCode;
×
1206

1207
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
1208
    return pStmt->errCode;
×
1209
  }
1210

1211
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
1212
    tscError("invalid operation to get query column fileds");
×
1213
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1214
  }
1215

1216
  STableDataCxt** pDataBlock = NULL;
×
1217
  bool            cleanStb = false;
×
1218

1219
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
×
1220
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
×
1221
  } else {
1222
    cleanStb = true;
×
1223
    pDataBlock =
1224
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
1225
  }
1226

1227
  if (NULL == pDataBlock || NULL == *pDataBlock) {
×
1228
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1229
    STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
1230
  }
1231

1232
  STMT_ERRI_JRET(
×
1233
      qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.preCtbname, fieldNum, fields));
1234
  if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE && cleanStb) {
×
1235
    pStmt->bInfo.needParse = true;
×
1236
    qDestroyStmtDataBlock(*pDataBlock);
×
1237
    *pDataBlock = NULL;
×
1238
    if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
×
1239
      tscError("get fileds %s remove exec blockHash fail", pStmt->bInfo.tbFName);
×
1240
      STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
1241
    }
1242
  }
1243

1244
_return:
×
1245

1246
  pStmt->errCode = preCode;
×
1247

1248
  return code;
×
1249
}
1250
/*
1251
SArray* stmtGetFreeCol(STscStmt2* pStmt, int32_t* idx) {
1252
  while (true) {
1253
    if (pStmt->exec.smInfo.pColIdx >= STMT_COL_BUF_SIZE) {
1254
      pStmt->exec.smInfo.pColIdx = 0;
1255
    }
1256

1257
    if ((pStmt->exec.smInfo.pColIdx + 1) == atomic_load_32(&pStmt->exec.smInfo.pColFreeIdx)) {
1258
      taosUsleep(1);
1259
      continue;
1260
    }
1261

1262
    *idx = pStmt->exec.smInfo.pColIdx;
1263
    return pStmt->exec.smInfo.pCols[pStmt->exec.smInfo.pColIdx++];
1264
  }
1265
}
1266
*/
1267
static int32_t stmtAppendTablePostHandle(STscStmt2* pStmt, SStmtQNode* param) {
553✔
1268
  if (NULL == pStmt->sql.siInfo.pVgroupHash) {
553✔
1269
    pStmt->sql.siInfo.pVgroupHash =
253✔
1270
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
253✔
1271
  }
1272
  if (NULL == pStmt->sql.siInfo.pVgroupList) {
553✔
1273
    pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES);
253✔
1274
  }
1275

1276
  if (NULL == pStmt->sql.siInfo.pRequest) {
553✔
1277
    STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false,
24!
1278
                              (SRequestObj**)&pStmt->sql.siInfo.pRequest, pStmt->reqid));
1279

1280
    if (pStmt->reqid != 0) {
24!
1281
      pStmt->reqid++;
×
1282
    }
1283
    pStmt->exec.pRequest->syncQuery = true;
24✔
1284

1285
    pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId;
24✔
1286
    pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self;
24✔
1287
  }
1288

1289
  if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] &&
553✔
1290
      0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) {
36✔
1291
    pStmt->sql.siInfo.tbFromHash = true;
6✔
1292
  }
1293

1294
  if (0 == pStmt->sql.siInfo.firstName[0]) {
553✔
1295
    tstrncpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
24✔
1296
  }
1297

1298
  param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash;
553✔
1299
  param->next = NULL;
553✔
1300

1301
  (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
553✔
1302

1303
  stmtEnqueue(pStmt, param);
553✔
1304

1305
  return TSDB_CODE_SUCCESS;
553✔
1306
}
1307

1308
static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt2* pStmt, SArray** pTableCols) {
1309
  while (true) {
1310
    if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) {
552!
1311
      *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++);
552✔
1312
      break;
552✔
1313
    } else {
1314
      SArray* pTblCols = NULL;
×
1315
      for (int32_t i = 0; i < 100; i++) {
×
1316
        pTblCols = taosArrayInit(20, POINTER_BYTES);
×
1317
        if (NULL == pTblCols) {
×
1318
          return terrno;
×
1319
        }
1320

1321
        if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
×
1322
          return terrno;
×
1323
        }
1324
      }
1325
    }
1326
  }
1327

1328
  return TSDB_CODE_SUCCESS;
552✔
1329
}
1330

1331
static int32_t stmtCacheBlock(STscStmt2* pStmt) {
×
1332
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
×
1333
    return TSDB_CODE_SUCCESS;
×
1334
  }
1335

1336
  uint64_t uid = pStmt->bInfo.tbUid;
×
1337
  uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
×
1338

1339
  if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
×
1340
    return TSDB_CODE_SUCCESS;
×
1341
  }
1342

1343
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
1344
  if (!pSrc) {
×
1345
    return terrno;
×
1346
  }
1347
  STableDataCxt* pDst = NULL;
×
1348

1349
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
×
1350

1351
  SStmtTableCache cache = {
×
1352
      .pDataCtx = pDst,
1353
      .boundTags = pStmt->bInfo.boundTags,
×
1354
  };
1355

1356
  if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
×
1357
    return terrno;
×
1358
  }
1359

1360
  if (pStmt->sql.autoCreateTbl) {
×
1361
    pStmt->bInfo.tagsCached = true;
×
1362
  } else {
1363
    pStmt->bInfo.boundTags = NULL;
×
1364
  }
1365

1366
  return TSDB_CODE_SUCCESS;
×
1367
}
1368

1369
static int stmtAddBatch2(TAOS_STMT2* stmt) {
253✔
1370
  STscStmt2* pStmt = (STscStmt2*)stmt;
253✔
1371

1372
  int64_t startUs = taosGetTimestampUs();
253✔
1373

1374
  STMT_DLOG_E("start to add batch");
253!
1375

1376
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
253!
1377
    return pStmt->errCode;
×
1378
  }
1379

1380
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
253!
1381

1382
  if (pStmt->sql.stbInterlaceMode) {
253!
1383
    int64_t startUs2 = taosGetTimestampUs();
252✔
1384
    pStmt->stat.addBatchUs += startUs2 - startUs;
252✔
1385

1386
    pStmt->sql.siInfo.tableColsReady = false;
252✔
1387

1388
    SStmtQNode* param = NULL;
252✔
1389
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
504!
1390
    param->restoreTbCols = true;
252✔
1391
    param->next = NULL;
252✔
1392

1393
    if (pStmt->sql.autoCreateTbl) {
252!
1394
      pStmt->bInfo.tagsCached = true;
×
1395
    }
1396

1397
    stmtEnqueue(pStmt, param);
252✔
1398

1399
    return TSDB_CODE_SUCCESS;
253✔
1400
  }
1401

1402
  STMT_ERR_RET(stmtCacheBlock(pStmt));
×
1403

1404
  return TSDB_CODE_SUCCESS;
×
1405
}
1406
/*
1407
static int32_t stmtBackupQueryFields(STscStmt2* pStmt) {
1408
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
1409
  pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
1410
  pRes->precision = pStmt->exec.pRequest->body.resInfo.precision;
1411

1412
  int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
1413
  pRes->fields = taosMemoryMalloc(size);
1414
  pRes->userFields = taosMemoryMalloc(size);
1415
  if (NULL == pRes->fields || NULL == pRes->userFields) {
1416
    STMT_ERR_RET(terrno);
1417
  }
1418
  (void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
1419
  (void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
1420

1421
  return TSDB_CODE_SUCCESS;
1422
}
1423

1424
static int32_t stmtRestoreQueryFields(STscStmt2* pStmt) {
1425
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
1426
  int32_t            size = pRes->numOfCols * sizeof(TAOS_FIELD);
1427

1428
  pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols;
1429
  pStmt->exec.pRequest->body.resInfo.precision = pRes->precision;
1430

1431
  if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
1432
    pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size);
1433
    if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
1434
      STMT_ERR_RET(terrno);
1435
    }
1436
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
1437
  }
1438

1439
  if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
1440
    pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size);
1441
    if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
1442
      STMT_ERR_RET(terrno);
1443
    }
1444
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
1445
  }
1446

1447
  return TSDB_CODE_SUCCESS;
1448
}
1449
*/
1450
int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx, SVCreateTbReq* pCreateTbReq) {
551✔
1451
  STscStmt2* pStmt = (STscStmt2*)stmt;
551✔
1452
  int32_t    code = 0;
551✔
1453

1454
  int64_t startUs = taosGetTimestampUs();
553✔
1455

1456
  STMT_DLOG("start to bind stmt data, colIdx:%d", colIdx);
553!
1457

1458
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
553!
1459
    return pStmt->errCode;
×
1460
  }
1461

1462
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
553!
1463

1464
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
553!
1465
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1466
    pStmt->bInfo.needParse = false;
×
1467
  }
1468

1469
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
553!
1470
    taos_free_result(pStmt->exec.pRequest);
×
1471
    pStmt->exec.pRequest = NULL;
×
1472
  }
1473

1474
  STMT_ERR_RET(stmtCreateRequest(pStmt));
553!
1475

1476
  if (pStmt->bInfo.needParse) {
553!
1477
    STMT_ERR_RET(stmtParseSql(pStmt));
×
1478
  }
1479

1480
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
553!
1481
    STMT_ERR_RET(qStmtBindParams2(pStmt->sql.pQuery, bind, colIdx, pStmt->taos->optionInfo.charsetCxt));
×
1482

1483
    SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
×
1484
                         .acctId = pStmt->taos->acctId,
×
1485
                         .db = pStmt->exec.pRequest->pDb,
×
1486
                         .topicQuery = false,
1487
                         .pSql = pStmt->sql.sqlStr,
×
1488
                         .sqlLen = pStmt->sql.sqlLen,
×
1489
                         .pMsg = pStmt->exec.pRequest->msgBuf,
×
1490
                         .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1491
                         .pTransporter = pStmt->taos->pAppInfo->pTransporter,
×
1492
                         .pStmtCb = NULL,
1493
                         .pUser = pStmt->taos->user};
×
1494
    ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
×
1495
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog));
×
1496

1497
    STMT_ERR_RET(qStmtParseQuerySql(&ctx, pStmt->sql.pQuery));
×
1498

1499
    if (pStmt->sql.pQuery->haveResultSet) {
×
1500
      STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
×
1501
                                    pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true));
1502
      taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
×
1503
      taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
×
UNCOV
1504
      setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
×
1505
    }
1506

1507
    TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
×
1508
    TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
×
UNCOV
1509
    TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
×
1510

1511
    // if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
1512
    //   STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
1513
    // }
1514

1515
    // STMT_ERR_RET(stmtBackupQueryFields(pStmt));
1516

UNCOV
1517
    return TSDB_CODE_SUCCESS;
×
1518
  }
1519

1520
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
553!
UNCOV
1521
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1522
  }
1523

1524
  STableDataCxt** pDataBlock = NULL;
553✔
1525

1526
  if (pStmt->exec.pCurrBlock) {
553✔
1527
    pDataBlock = &pStmt->exec.pCurrBlock;
529✔
1528
  } else {
1529
    pDataBlock =
1530
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
24✔
1531
    if (NULL == pDataBlock) {
24!
1532
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
UNCOV
1533
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1534
    }
1535
    pStmt->exec.pCurrBlock = *pDataBlock;
24✔
1536
    if (pStmt->sql.stbInterlaceMode) {
24!
1537
      taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
24✔
1538
      (*pDataBlock)->pData->aCol = NULL;
24✔
1539
    }
1540
    if (colIdx < -1) {
24!
1541
      pStmt->sql.bindRowFormat = true;
×
1542
      taosArrayDestroy((*pDataBlock)->pData->aCol);
×
UNCOV
1543
      (*pDataBlock)->pData->aCol = taosArrayInit(20, POINTER_BYTES);
×
1544
    }
1545
  }
1546

1547
  int64_t startUs2 = taosGetTimestampUs();
553✔
1548
  pStmt->stat.bindDataUs1 += startUs2 - startUs;
553✔
1549

1550
  SStmtQNode* param = NULL;
553✔
1551
  if (pStmt->sql.stbInterlaceMode) {
553!
1552
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
1,105!
1553
    STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, &param->tblData.aCol));
1,104!
1554
    taosArrayClear(param->tblData.aCol);
552✔
1555

1556
    // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES);
1557

1558
    param->restoreTbCols = false;
552✔
1559
    tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
552✔
1560

1561
    param->pCreateTbReq = pCreateTbReq;
552✔
1562
  }
1563

1564
  int64_t startUs3 = taosGetTimestampUs();
552✔
1565
  pStmt->stat.bindDataUs2 += startUs3 - startUs2;
552✔
1566

1567
  SArray* pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol;
552!
1568

1569
  if (colIdx < 0) {
552!
1570
    if (pStmt->sql.stbInterlaceMode) {
552!
1571
      // (*pDataBlock)->pData->flags = 0;
1572
      (*pDataBlock)->pData->flags &= ~SUBMIT_REQ_COLUMN_DATA_FORMAT;
552✔
1573
      code = qBindStmtStbColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
552✔
1574
                                    pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
552✔
1575
                                    pStmt->taos->optionInfo.charsetCxt);
552✔
1576
    } else {
1577
      if (colIdx == -1) {
×
1578
        if (pStmt->sql.bindRowFormat) {
×
1579
          tscError("can't mix bind row format and bind column format");
×
UNCOV
1580
          STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1581
        }
1582
        code = qBindStmtColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
×
UNCOV
1583
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt);
×
1584
      } else {
1585
        code = qBindStmt2RowValue(*pDataBlock, (*pDataBlock)->pData->aRowP, bind, pStmt->exec.pRequest->msgBuf,
×
1586
                                  pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
×
UNCOV
1587
                                  pStmt->taos->optionInfo.charsetCxt);
×
1588
      }
1589
    }
1590

1591
    if (code) {
552!
1592
      tscError("qBindStmtColsValue failed, error:%s", tstrerror(code));
×
UNCOV
1593
      STMT_ERR_RET(code);
×
1594
    }
1595
  } else {
1596
    if (pStmt->sql.stbInterlaceMode) {
×
1597
      tscError("bind single column not allowed in stb insert mode");
×
UNCOV
1598
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1599
    }
1600

1601
    if (pStmt->sql.bindRowFormat) {
×
1602
      tscError("can't mix bind row format and bind column format");
×
UNCOV
1603
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1604
    }
1605

1606
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
×
1607
      tscError("bind column index not in sequence");
×
UNCOV
1608
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1609
    }
1610

UNCOV
1611
    pStmt->bInfo.sBindLastIdx = colIdx;
×
1612

1613
    if (0 == colIdx) {
×
UNCOV
1614
      pStmt->bInfo.sBindRowNum = bind->num;
×
1615
    }
1616

1617
    code = qBindStmtSingleColValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
×
1618
                                    pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum, pStmt->taos->optionInfo.charsetCxt);
×
1619
    if (code) {
×
1620
      tscError("qBindStmtSingleColValue failed, error:%s", tstrerror(code));
×
UNCOV
1621
      STMT_ERR_RET(code);
×
1622
    }
1623
  }
1624

1625
  int64_t startUs4 = taosGetTimestampUs();
552✔
1626
  pStmt->stat.bindDataUs3 += startUs4 - startUs3;
552✔
1627

1628
  if (pStmt->sql.stbInterlaceMode) {
552!
1629
    STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param));
552!
1630
  } else {
UNCOV
1631
    STMT_ERR_RET(stmtAddBatch2(pStmt));
×
1632
  }
1633

1634
  pStmt->stat.bindDataUs4 += taosGetTimestampUs() - startUs4;
553✔
1635

1636
  return TSDB_CODE_SUCCESS;
553✔
1637
}
1638
/*
1639
int stmtUpdateTableUid(STscStmt2* pStmt, SSubmitRsp* pRsp) {
1640
  tscDebug("stmt start to update tbUid, blockNum:%d", pRsp->nBlocks);
1641

1642
  int32_t code = 0;
1643
  int32_t finalCode = 0;
1644
  size_t  keyLen = 0;
1645
  void*   pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
1646
  while (pIter) {
1647
    STableDataCxt* pBlock = *(STableDataCxt**)pIter;
1648
    char*          key = taosHashGetKey(pIter, &keyLen);
1649

1650
    STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
1651
    if (pMeta->uid) {
1652
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1653
      continue;
1654
    }
1655

1656
    SSubmitBlkRsp* blkRsp = NULL;
1657
    int32_t        i = 0;
1658
    for (; i < pRsp->nBlocks; ++i) {
1659
      blkRsp = pRsp->pBlocks + i;
1660
      if (strlen(blkRsp->tblFName) != keyLen) {
1661
        continue;
1662
      }
1663

1664
      if (strncmp(blkRsp->tblFName, key, keyLen)) {
1665
        continue;
1666
      }
1667

1668
      break;
1669
    }
1670

1671
    if (i < pRsp->nBlocks) {
1672
      tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid,
1673
               blkRsp->uid);
1674

1675
      pMeta->uid = blkRsp->uid;
1676
      pStmt->bInfo.tbUid = blkRsp->uid;
1677
    } else {
1678
      tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
1679
      if (NULL == pStmt->pCatalog) {
1680
        code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
1681
        if (code) {
1682
          pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1683
          finalCode = code;
1684
          continue;
1685
        }
1686
      }
1687

1688
      code = stmtCreateRequest(pStmt);
1689
      if (code) {
1690
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1691
        finalCode = code;
1692
        continue;
1693
      }
1694

1695
      STableMeta*      pTableMeta = NULL;
1696
      SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
1697
                               .requestId = pStmt->exec.pRequest->requestId,
1698
                               .requestObjRefId = pStmt->exec.pRequest->self,
1699
                               .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
1700
      code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
1701

1702
      pStmt->stat.ctgGetTbMetaNum++;
1703

1704
      taos_free_result(pStmt->exec.pRequest);
1705
      pStmt->exec.pRequest = NULL;
1706

1707
      if (code || NULL == pTableMeta) {
1708
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1709
        finalCode = code;
1710
        taosMemoryFree(pTableMeta);
1711
        continue;
1712
      }
1713

1714
      pMeta->uid = pTableMeta->uid;
1715
      pStmt->bInfo.tbUid = pTableMeta->uid;
1716
      taosMemoryFree(pTableMeta);
1717
    }
1718

1719
    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1720
  }
1721

1722
  return finalCode;
1723
}
1724
*/
1725
/*
1726
int stmtStaticModeExec(TAOS_STMT* stmt) {
1727
  STscStmt2*   pStmt = (STscStmt2*)stmt;
1728
  int32_t     code = 0;
1729
  SSubmitRsp* pRsp = NULL;
1730
  if (pStmt->sql.staticMode) {
1731
    return TSDB_CODE_TSC_STMT_API_ERROR;
1732
  }
1733

1734
  STMT_DLOG_E("start to exec");
1735

1736
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
1737

1738
  STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList,
1739
pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum));
1740

1741
  launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
1742

1743
  if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
1744
    code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
1745
    if (code) {
1746
      pStmt->exec.pRequest->code = code;
1747
    } else {
1748
      tFreeSSubmitRsp(pRsp);
1749
      STMT_ERR_RET(stmtResetStmt(pStmt));
1750
      STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
1751
    }
1752
  }
1753

1754
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
1755

1756
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
1757
  pStmt->affectedRows += pStmt->exec.affectedRows;
1758

1759
_return:
1760

1761
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
1762

1763
  tFreeSSubmitRsp(pRsp);
1764

1765
  ++pStmt->sql.runTimes;
1766

1767
  STMT_RET(code);
1768
}
1769
*/
1770

1771
static int32_t createParseContext(const SRequestObj* pRequest, SParseContext** pCxt, SSqlCallbackWrapper* pWrapper) {
×
UNCOV
1772
  const STscObj* pTscObj = pRequest->pTscObj;
×
1773

1774
  *pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
×
1775
  if (*pCxt == NULL) {
×
UNCOV
1776
    return terrno;
×
1777
  }
1778

1779
  **pCxt = (SParseContext){.requestId = pRequest->requestId,
×
1780
                           .requestRid = pRequest->self,
×
1781
                           .acctId = pTscObj->acctId,
×
UNCOV
1782
                           .db = pRequest->pDb,
×
1783
                           .topicQuery = false,
1784
                           .pSql = pRequest->sqlstr,
×
1785
                           .sqlLen = pRequest->sqlLen,
×
UNCOV
1786
                           .pMsg = pRequest->msgBuf,
×
1787
                           .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
UNCOV
1788
                           .pTransporter = pTscObj->pAppInfo->pTransporter,
×
1789
                           .pStmtCb = NULL,
1790
                           .pUser = pTscObj->user,
×
1791
                           .pEffectiveUser = pRequest->effectiveUser,
×
1792
                           .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
×
UNCOV
1793
                           .enableSysInfo = pTscObj->sysInfo,
×
1794
                           .async = true,
1795
                           .svrVer = pTscObj->sVer,
×
1796
                           .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
×
UNCOV
1797
                           .allocatorId = pRequest->allocatorRefId,
×
1798
                           .parseSqlFp = clientParseSql,
1799
                           .parseSqlParam = pWrapper};
1800
  int8_t biMode = atomic_load_8(&((STscObj*)pTscObj)->biMode);
×
1801
  (*pCxt)->biMode = biMode;
×
UNCOV
1802
  return TSDB_CODE_SUCCESS;
×
1803
}
1804

1805
static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
×
1806
  STscStmt2*        pStmt = userdata;
×
UNCOV
1807
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
×
1808

1809
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
×
UNCOV
1810
  pStmt->affectedRows += pStmt->exec.affectedRows;
×
1811

UNCOV
1812
  fp(pStmt->options.userdata, res, code);
×
1813

1814
  while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
×
UNCOV
1815
    taosUsleep(1);
×
1816
  }
1817
  (void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
×
UNCOV
1818
  ++pStmt->sql.runTimes;
×
1819

1820
  if (tsem_post(&pStmt->asyncExecSem) != 0) {
×
UNCOV
1821
    tscError("failed to post asyncExecSem");
×
1822
  }
UNCOV
1823
}
×
1824

1825
int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
252✔
1826
  STscStmt2* pStmt = (STscStmt2*)stmt;
252✔
1827
  int32_t    code = 0;
252✔
1828
  int64_t    startUs = taosGetTimestampUs();
252✔
1829

1830
  STMT_DLOG_E("start to exec");
252!
1831

1832
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
252!
UNCOV
1833
    return pStmt->errCode;
×
1834
  }
1835

1836
  TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
252!
1837
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
252!
UNCOV
1838
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
1839
  }
1840
  TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
253!
1841

1842
  if (pStmt->sql.stbInterlaceMode) {
253!
1843
    STMT_ERR_RET(stmtAddBatch2(pStmt));
253!
1844
  }
1845

1846
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
253!
1847

1848
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
253!
1849
    if (pStmt->sql.stbInterlaceMode) {
253!
1850
      int64_t startTs = taosGetTimestampUs();
253✔
1851
      while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
549✔
1852
        taosUsleep(1);
296✔
1853
      }
1854
      pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs;
253✔
1855

1856
      STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList));
253!
1857
      taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
253✔
1858
      pStmt->sql.siInfo.pVgroupHash = NULL;
253✔
1859
      pStmt->sql.siInfo.pVgroupList = NULL;
253✔
1860
    } else {
1861
      tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
×
UNCOV
1862
      taosMemoryFreeClear(pStmt->exec.pCurrTbData);
×
1863

UNCOV
1864
      STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
×
1865

UNCOV
1866
      STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
×
1867
    }
1868
  }
1869

1870
  SRequestObj*      pRequest = pStmt->exec.pRequest;
253✔
1871
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
253✔
1872

1873
  if (!fp) {
253!
1874
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
253✔
1875

1876
    if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
253!
1877
      code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
×
1878
      if (code) {
×
UNCOV
1879
        pStmt->exec.pRequest->code = code;
×
1880
      } else {
1881
        STMT_ERR_RET(stmtResetStmt(pStmt));
×
UNCOV
1882
        STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
×
1883
      }
1884
    }
1885

1886
    STMT_ERR_JRET(pStmt->exec.pRequest->code);
253!
1887

1888
    pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
253✔
1889
    if (affected_rows) {
253!
1890
      *affected_rows = pStmt->exec.affectedRows;
253✔
1891
    }
1892
    pStmt->affectedRows += pStmt->exec.affectedRows;
253✔
1893

1894
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
255✔
1895
      taosUsleep(1);
2✔
1896
    }
1897

1898
    STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
252!
1899

1900
    ++pStmt->sql.runTimes;
252✔
1901
  } else {
1902
    SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
×
1903
    if (pWrapper == NULL) {
×
UNCOV
1904
      code = terrno;
×
1905
    } else {
1906
      pWrapper->pRequest = pRequest;
×
UNCOV
1907
      pRequest->pWrapper = pWrapper;
×
1908
    }
1909
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1910
      code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper);
×
1911
    }
1912
    pRequest->syncQuery = false;
×
1913
    pRequest->body.queryFp = asyncQueryCb;
×
UNCOV
1914
    ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
×
1915

1916
    pStmt->execSemWaited = false;
×
UNCOV
1917
    launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
×
1918
  }
1919

1920
_return:
252✔
1921
  pStmt->stat.execUseUs += taosGetTimestampUs() - startUs;
253✔
1922

1923
  STMT_RET(code);
253!
1924
}
1925

1926
int stmtClose2(TAOS_STMT2* stmt) {
24✔
1927
  STscStmt2* pStmt = (STscStmt2*)stmt;
24✔
1928

1929
  STMT_DLOG_E("start to free stmt");
24!
1930

1931
  pStmt->queue.stopQueue = true;
24✔
1932

1933
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
24✔
1934
  (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
24✔
1935
  (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
24✔
1936
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
24✔
1937

1938
  if (pStmt->bindThreadInUse) {
24!
1939
    (void)taosThreadJoin(pStmt->bindThread, NULL);
24✔
1940
    pStmt->bindThreadInUse = false;
24✔
1941
  }
1942

1943
  TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
24!
1944
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
24!
UNCOV
1945
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
1946
  }
1947
  TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
24!
1948

1949
  (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
24✔
1950
  (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
24✔
1951

1952
  (void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond);
24✔
1953
  (void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex);
24✔
1954

1955
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
24!
1956
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
×
UNCOV
1957
      tscError("failed to wait asyncExecSem");
×
1958
    }
1959
  }
1960

1961
  STMT_DLOG("stmt %p closed, stbInterlaceMode:%d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
24!
1962
            ", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
1963
            ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
1964
            ", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64
1965
            ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64,
1966
            pStmt, pStmt->sql.stbInterlaceMode, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo,
1967
            pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum, pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND],
1968
            pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE], pStmt->stat.setTbNameUs,
1969
            pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
1970
            pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
1971

1972
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
24!
1973

1974
  if (pStmt->options.asyncExecFn) {
24!
1975
    if (tsem_destroy(&pStmt->asyncExecSem) != 0) {
×
UNCOV
1976
      tscError("failed to destroy asyncExecSem");
×
1977
    }
1978
  }
1979
  taosMemoryFree(stmt);
24!
1980

1981
  return TSDB_CODE_SUCCESS;
24✔
1982
}
1983

1984
const char* stmtErrstr2(TAOS_STMT2* stmt) {
×
UNCOV
1985
  STscStmt2* pStmt = (STscStmt2*)stmt;
×
1986

1987
  if (stmt == NULL || NULL == pStmt->exec.pRequest) {
×
UNCOV
1988
    return (char*)tstrerror(terrno);
×
1989
  }
1990

UNCOV
1991
  pStmt->exec.pRequest->code = terrno;
×
1992

UNCOV
1993
  return taos_errstr(pStmt->exec.pRequest);
×
1994
}
1995
/*
1996
int stmtAffectedRows(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->affectedRows; }
1997

1998
int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->exec.affectedRows; }
1999
*/
2000

2001
int stmtParseColFields2(TAOS_STMT2* stmt) {
×
2002
  int32_t    code = 0;
×
2003
  STscStmt2* pStmt = (STscStmt2*)stmt;
×
UNCOV
2004
  int32_t    preCode = pStmt->errCode;
×
2005

UNCOV
2006
  STMT_DLOG_E("start to get col fields");
×
2007

2008
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
UNCOV
2009
    return pStmt->errCode;
×
2010
  }
2011

2012
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
UNCOV
2013
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2014
  }
2015

UNCOV
2016
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
×
2017

2018
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
×
2019
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
UNCOV
2020
    pStmt->bInfo.needParse = false;
×
2021
  }
2022

2023
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
×
2024
    taos_free_result(pStmt->exec.pRequest);
×
2025
    pStmt->exec.pRequest = NULL;
×
UNCOV
2026
    STMT_ERRI_JRET(stmtCreateRequest(pStmt));
×
2027
  }
2028

UNCOV
2029
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
×
2030

2031
  if (pStmt->bInfo.needParse) {
×
UNCOV
2032
    STMT_ERRI_JRET(stmtParseSql(pStmt));
×
2033
  }
2034

UNCOV
2035
_return:
×
2036

UNCOV
2037
  pStmt->errCode = preCode;
×
2038

UNCOV
2039
  return code;
×
2040
}
2041

2042
int stmtGetStbColFields2(TAOS_STMT2* stmt, int* nums, TAOS_FIELD_ALL** fields) {
×
2043
  int32_t code = stmtParseColFields2(stmt);
×
2044
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2045
    return code;
×
2046
  }
2047

UNCOV
2048
  return stmtFetchStbColFields2(stmt, nums, fields);
×
2049
}
2050

2051
int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) {
×
2052
  int32_t    code = 0;
×
2053
  STscStmt2* pStmt = (STscStmt2*)stmt;
×
UNCOV
2054
  int32_t    preCode = pStmt->errCode;
×
2055

UNCOV
2056
  STMT_DLOG_E("start to get param num");
×
2057

2058
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
UNCOV
2059
    return pStmt->errCode;
×
2060
  }
2061

UNCOV
2062
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
×
2063

2064
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
×
2065
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
UNCOV
2066
    pStmt->bInfo.needParse = false;
×
2067
  }
2068

2069
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
×
2070
    taos_free_result(pStmt->exec.pRequest);
×
UNCOV
2071
    pStmt->exec.pRequest = NULL;
×
2072
  }
2073

UNCOV
2074
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
×
2075

2076
  if (pStmt->bInfo.needParse) {
×
UNCOV
2077
    STMT_ERRI_JRET(stmtParseSql(pStmt));
×
2078
  }
2079

2080
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
UNCOV
2081
    *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
×
2082
  } else {
UNCOV
2083
    STMT_ERRI_JRET(stmtFetchColFields2(stmt, nums, NULL));
×
2084
  }
2085

UNCOV
2086
_return:
×
2087

UNCOV
2088
  pStmt->errCode = preCode;
×
2089

UNCOV
2090
  return code;
×
2091
}
2092

2093
TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) {
×
UNCOV
2094
  STscStmt2* pStmt = (STscStmt2*)stmt;
×
2095

UNCOV
2096
  STMT_DLOG_E("start to use result");
×
2097

2098
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
×
2099
    tscError("useResult only for query statement");
×
UNCOV
2100
    return NULL;
×
2101
  }
2102

UNCOV
2103
  return pStmt->exec.pRequest;
×
2104
}
2105

2106
int32_t stmtAsyncBindThreadFunc(void* args) {
×
UNCOV
2107
  qInfo("async stmt bind thread started");
×
2108

2109
  ThreadArgs* targs = (ThreadArgs*)args;
×
UNCOV
2110
  STscStmt2*  pStmt = (STscStmt2*)targs->stmt;
×
2111

2112
  int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx);
×
2113
  targs->fp(targs->param, NULL, code);
×
2114
  (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
×
2115
  (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
×
2116
  (void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
×
2117
  (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
×
UNCOV
2118
  taosMemoryFree(args);
×
2119

UNCOV
2120
  qInfo("async stmt bind thread stopped");
×
2121

UNCOV
2122
  return code;
×
2123
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc