• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3676

22 Mar 2025 04:46PM UTC coverage: 25.147% (-36.8%) from 61.952%
#3676

push

travis-ci

web-flow
fix: userOperTest in linux (#30363)

Co-authored-by: taos-support <it@taosdata.com>

55963 of 304767 branches covered (18.36%)

Branch coverage included in aggregate %.

96374 of 301020 relevant lines covered (32.02%)

582640.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.38
/source/client/src/clientStmt2.c
1
#include "clientInt.h"
2
#include "clientLog.h"
3
#include "tdef.h"
4

5
#include "clientStmt.h"
6
#include "clientStmt2.h"
7
/*
8
char* gStmtStatusStr[] = {"unknown",     "init", "prepare", "settbname", "settags",
9
                          "fetchFields", "bind", "bindCol", "addBatch",  "exec"};
10
*/
11
static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pBuf) {
12
  if (pTblBuf->buffOffset < pTblBuf->buffSize) {
106✔
13
    *pBuf = (char*)pTblBuf->pCurBuff + pTblBuf->buffOffset;
106✔
14
    pTblBuf->buffOffset += pTblBuf->buffUnit;
106✔
15
  } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) {
×
16
    pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++);
×
17
    if (NULL == pTblBuf->pCurBuff) {
×
18
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
19
    }
20
    *pBuf = pTblBuf->pCurBuff;
×
21
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
22
  } else {
23
    void* buff = taosMemoryMalloc(pTblBuf->buffSize);
×
24
    if (NULL == buff) {
×
25
      return terrno;
×
26
    }
27

28
    if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
×
29
      return terrno;
×
30
    }
31

32
    pTblBuf->buffIdx++;
×
33
    pTblBuf->pCurBuff = buff;
×
34
    *pBuf = buff;
×
35
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
36
  }
37

38
  return TSDB_CODE_SUCCESS;
106✔
39
}
40

41
static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
102✔
42
  int i = 0;
102✔
43
  while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
488✔
44
    if (i < 10) {
386✔
45
      taosUsleep(1);
365✔
46
      i++;
365✔
47
    } else {
48
      (void)taosThreadMutexLock(&pStmt->queue.mutex);
21✔
49
      if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
21!
50
        (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
21✔
51
      }
52
      (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
21✔
53
    }
54
  }
55
  if (pStmt->queue.stopQueue) {
102✔
56
    return false;
13✔
57
  }
58
  SStmtQNode* orig = pStmt->queue.head;
89✔
59
  SStmtQNode* node = pStmt->queue.head->next;
89✔
60
  pStmt->queue.head = pStmt->queue.head->next;
89✔
61
  *param = node;
89✔
62

63
  (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
89✔
64

65
  return true;
89✔
66
}
67

68
static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
89✔
69
  pStmt->queue.tail->next = param;
89✔
70
  pStmt->queue.tail = param;
89✔
71

72
  pStmt->stat.bindDataNum++;
89✔
73

74
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
89✔
75
  (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
89✔
76
  (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
89✔
77
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
89✔
78
}
89✔
79

80
static int32_t stmtCreateRequest(STscStmt2* pStmt) {
565✔
81
  int32_t code = 0;
565✔
82

83
  if (pStmt->exec.pRequest == NULL) {
565✔
84
    code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
100✔
85
                        pStmt->reqid);
86
    if (pStmt->reqid != 0) {
100!
87
      pStmt->reqid++;
×
88
    }
89
    if (pStmt->db != NULL) {
100✔
90
      taosMemoryFreeClear(pStmt->exec.pRequest->pDb); 
64!
91
      pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
64!
92
    }
93
    if (TSDB_CODE_SUCCESS == code) {
100!
94
      pStmt->exec.pRequest->syncQuery = true;
100✔
95
      pStmt->exec.pRequest->isStmtBind = true;
100✔
96
    }
97
  }
98

99
  return code;
565✔
100
}
101

102
static int32_t stmtSwitchStatus(STscStmt2* pStmt, STMT_STATUS newStatus) {
762✔
103
  int32_t code = 0;
762✔
104

105
  if (newStatus >= STMT_INIT && newStatus < STMT_MAX) {
762!
106
    STMT_LOG_SEQ(newStatus);
762!
107
  }
108

109
  if (pStmt->errCode && newStatus != STMT_PREPARE) {
762!
110
    STMT_DLOG("stmt already failed with err:%s", tstrerror(pStmt->errCode));
×
111
    return pStmt->errCode;
×
112
  }
113

114
  switch (newStatus) {
762!
115
    case STMT_PREPARE:
75✔
116
      pStmt->errCode = 0;
75✔
117
      break;
75✔
118
    case STMT_SETTBNAME:
156✔
119
      if (STMT_STATUS_EQ(INIT)) {
156!
120
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
121
      }
122
      if (!pStmt->sql.stbInterlaceMode && (STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL))) {
156!
123
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
124
      }
125
      break;
156✔
126
    case STMT_SETTAGS:
122✔
127
      if (STMT_STATUS_EQ(INIT)) {
122!
128
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
129
      }
130
      break;
122✔
131
    case STMT_FETCH_FIELDS:
54✔
132
      if (STMT_STATUS_EQ(INIT)) {
54!
133
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
134
      }
135
      break;
54✔
136
    case STMT_BIND:
167✔
137
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
167!
138
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
139
      }
140
      /*
141
            if ((pStmt->sql.type == STMT_TYPE_MULTI_INSERT) && ()) {
142
              code = TSDB_CODE_TSC_STMT_API_ERROR;
143
            }
144
      */
145
      break;
167✔
146
    case STMT_BIND_COL:
×
147
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
×
148
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
149
      }
150
      break;
×
151
    case STMT_ADD_BATCH:
122✔
152
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
122!
153
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
154
      }
155
      break;
122✔
156
    case STMT_EXECUTE:
66✔
157
      if (STMT_TYPE_QUERY == pStmt->sql.type) {
66✔
158
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
3!
159
            STMT_STATUS_NE(BIND_COL)) {
×
160
          code = TSDB_CODE_TSC_STMT_API_ERROR;
×
161
        }
162
      } else {
163
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
63!
164
          code = TSDB_CODE_TSC_STMT_API_ERROR;
1✔
165
        }
166
      }
167
      break;
66✔
168
    default:
×
169
      code = TSDB_CODE_APP_ERROR;
×
170
      break;
×
171
  }
172

173
  STMT_ERR_RET(code);
762✔
174

175
  pStmt->sql.status = newStatus;
761✔
176

177
  return TSDB_CODE_SUCCESS;
761✔
178
}
179

180
static int32_t stmtGetTbName(TAOS_STMT2* stmt, char** tbName) {
44✔
181
  STscStmt2* pStmt = (STscStmt2*)stmt;
44✔
182

183
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
44✔
184

185
  if ('\0' == pStmt->bInfo.tbName[0]) {
44✔
186
    tscWarn("no table name set, OK if it is a stmt get fields");
22!
187
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
22!
188
  }
189

190
  *tbName = pStmt->bInfo.tbName;
22✔
191

192
  return TSDB_CODE_SUCCESS;
22✔
193
}
194

195
static int32_t stmtUpdateBindInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SName* tbName,
48✔
196
                                  const char* sTableName, bool autoCreateTbl, int8_t tbNameFlag) {
197
  STscStmt2* pStmt = (STscStmt2*)stmt;
48✔
198
  char       tbFName[TSDB_TABLE_FNAME_LEN];
199
  int32_t    code = tNameExtractFullName(tbName, tbFName);
48✔
200
  if (code != 0) {
48!
201
    return code;
×
202
  }
203

204
  (void)memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
48✔
205
  tstrncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName));
48✔
206
  pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
48✔
207

208
  pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
48✔
209
  pStmt->bInfo.tbSuid = pTableMeta->suid;
48✔
210
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
48✔
211
  pStmt->bInfo.tbType = pTableMeta->tableType;
48✔
212

213
  if (!pStmt->bInfo.tagsCached) {
48!
214
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
48✔
215
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
48!
216
  }
217

218
  pStmt->bInfo.boundTags = tags;
48✔
219
  pStmt->bInfo.tagsCached = false;
48✔
220
  pStmt->bInfo.tbNameFlag = tbNameFlag;
48✔
221
  tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
48✔
222

223
  return TSDB_CODE_SUCCESS;
48✔
224
}
225

226
static int32_t stmtUpdateExecInfo(TAOS_STMT2* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
48✔
227
  STscStmt2* pStmt = (STscStmt2*)stmt;
48✔
228

229
  pStmt->sql.pVgHash = pVgHash;
48✔
230
  pStmt->exec.pBlockHash = pBlockHash;
48✔
231

232
  return TSDB_CODE_SUCCESS;
48✔
233
}
234

235
static int32_t stmtUpdateInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, bool autoCreateTbl,
48✔
236
                              SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName, uint8_t tbNameFlag) {
237
  STscStmt2* pStmt = (STscStmt2*)stmt;
48✔
238

239
  STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbName, sTableName, autoCreateTbl, tbNameFlag));
48!
240
  STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
48!
241

242
  pStmt->sql.autoCreateTbl = autoCreateTbl;
48✔
243

244
  return TSDB_CODE_SUCCESS;
48✔
245
}
246

247
static int32_t stmtGetExecInfo(TAOS_STMT2* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
1✔
248
  STscStmt2* pStmt = (STscStmt2*)stmt;
1✔
249

250
  *pVgHash = pStmt->sql.pVgHash;
1✔
251
  pStmt->sql.pVgHash = NULL;
1✔
252

253
  *pBlockHash = pStmt->exec.pBlockHash;
1✔
254
  pStmt->exec.pBlockHash = NULL;
1✔
255

256
  return TSDB_CODE_SUCCESS;
1✔
257
}
258

259
static int32_t stmtParseSql(STscStmt2* pStmt) {
74✔
260
  pStmt->exec.pCurrBlock = NULL;
74✔
261

262
  SStmtCallback stmtCb = {
74✔
263
      .pStmt = pStmt,
264
      .getTbNameFn = stmtGetTbName,
265
      .setInfoFn = stmtUpdateInfo,
266
      .getExecInfoFn = stmtGetExecInfo,
267
  };
268

269
  STMT_ERR_RET(stmtCreateRequest(pStmt));
74!
270

271
  pStmt->stat.parseSqlNum++;
74✔
272
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
74✔
273
  pStmt->sql.siInfo.pQuery = pStmt->sql.pQuery;
52✔
274

275
  pStmt->bInfo.needParse = false;
52✔
276

277
  if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) {
52✔
278
    pStmt->sql.type = STMT_TYPE_INSERT;
9✔
279
    pStmt->sql.stbInterlaceMode = false;
9✔
280
  } else if (pStmt->sql.pQuery->pPrepareRoot) {
43✔
281
    pStmt->sql.type = STMT_TYPE_QUERY;
4✔
282
    pStmt->sql.stbInterlaceMode = false;
4✔
283

284
    return TSDB_CODE_SUCCESS;
4✔
285
  }
286

287
  STableDataCxt** pSrc =
288
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
48✔
289
  if (NULL == pSrc || NULL == *pSrc) {
48!
290
    return terrno;
×
291
  }
292

293
  STableDataCxt* pTableCtx = *pSrc;
48✔
294
  // if (pStmt->sql.stbInterlaceMode) {
295
  //   int16_t lastIdx = -1;
296

297
  //   for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) {
298
  //     if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) {
299
  //       pStmt->sql.stbInterlaceMode = false;
300
  //       break;
301
  //     }
302

303
  //     lastIdx = pTableCtx->boundColsInfo.pColIndex[i];
304
  //   }
305
  // }
306

307
  if (NULL == pStmt->sql.pBindInfo) {
48✔
308
    pStmt->sql.pBindInfo = taosMemoryMalloc(pTableCtx->boundColsInfo.numOfBound * sizeof(*pStmt->sql.pBindInfo));
47!
309
    if (NULL == pStmt->sql.pBindInfo) {
47!
310
      return terrno;
×
311
    }
312
  }
313

314
  return TSDB_CODE_SUCCESS;
48✔
315
}
316

317
static int32_t stmtCleanBindInfo(STscStmt2* pStmt) {
179✔
318
  pStmt->bInfo.tbUid = 0;
179✔
319
  pStmt->bInfo.tbVgId = -1;
179✔
320
  pStmt->bInfo.tbType = 0;
179✔
321
  pStmt->bInfo.needParse = true;
179✔
322
  pStmt->bInfo.inExecCache = false;
179✔
323

324
  pStmt->bInfo.tbName[0] = 0;
179✔
325
  pStmt->bInfo.tbFName[0] = 0;
179✔
326
  if (!pStmt->bInfo.tagsCached) {
179✔
327
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
125✔
328
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
125!
329
  }
330
  if (!pStmt->sql.autoCreateTbl) {
179✔
331
    pStmt->bInfo.stbFName[0] = 0;
95✔
332
    pStmt->bInfo.tbSuid = 0;
95✔
333
  }
334

335
  return TSDB_CODE_SUCCESS;
179✔
336
}
337

338
static void stmtFreeTableBlkList(STableColsData* pTb) {
×
339
  (void)qResetStmtColumns(pTb->aCol, true);
×
340
  taosArrayDestroy(pTb->aCol);
×
341
}
×
342

343
static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
25✔
344
  pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0);
25✔
345
  if (NULL == pTblBuf->pCurBuff) {
25!
346
    tscError("QInfo:%p, failed to get buffer from list", pTblBuf);
×
347
    return;
×
348
  }
349
  pTblBuf->buffIdx = 1;
25✔
350
  pTblBuf->buffOffset = sizeof(*pQueue->head);
25✔
351

352
  pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
25✔
353
  pQueue->qRemainNum = 0;
25✔
354
  pQueue->head->next = NULL;
25✔
355
}
356

357
static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
142✔
358
  if (pStmt->sql.stbInterlaceMode) {
142✔
359
    if (deepClean) {
37✔
360
      taosHashCleanup(pStmt->exec.pBlockHash);
12✔
361
      pStmt->exec.pBlockHash = NULL;
12✔
362

363
      if (NULL != pStmt->exec.pCurrBlock) {
12✔
364
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->pData);
10!
365
        qDestroyStmtDataBlock(pStmt->exec.pCurrBlock);
10✔
366
      }
367
    } else {
368
      pStmt->sql.siInfo.pTableColsIdx = 0;
25✔
369
      stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue);
25✔
370
    }
371
    if (NULL != pStmt->exec.pRequest) {
37✔
372
      pStmt->exec.pRequest->body.resInfo.numOfRows = 0;
36✔
373
    }
374
  } else {
375
    if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
105✔
376
      // if (!pStmt->options.asyncExecFn) {
377
      taos_free_result(pStmt->exec.pRequest);
102✔
378
      pStmt->exec.pRequest = NULL;
102✔
379
      //}
380
    }
381

382
    size_t keyLen = 0;
105✔
383
    void*  pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
105✔
384
    while (pIter) {
220✔
385
      STableDataCxt* pBlocks = *(STableDataCxt**)pIter;
115✔
386
      char*          key = taosHashGetKey(pIter, &keyLen);
115✔
387
      STableMeta*    pMeta = qGetTableMetaInDataBlock(pBlocks);
115✔
388

389
      if (keepTable && pBlocks == pStmt->exec.pCurrBlock) {
115✔
390
        TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData);
37✔
391
        STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
77!
392

393
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
37✔
394
        continue;
37✔
395
      }
396

397
      qDestroyStmtDataBlock(pBlocks);
78✔
398
      STMT_ERR_RET(taosHashRemove(pStmt->exec.pBlockHash, key, keyLen));
78!
399

400
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
78✔
401
    }
402

403
    if (keepTable) {
105✔
404
      return TSDB_CODE_SUCCESS;
40✔
405
    }
406

407
    taosHashCleanup(pStmt->exec.pBlockHash);
65✔
408
    pStmt->exec.pBlockHash = NULL;
65✔
409

410
    tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
65✔
411
    taosMemoryFreeClear(pStmt->exec.pCurrTbData);
65!
412
  }
413

414
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
102!
415

416
  return TSDB_CODE_SUCCESS;
102✔
417
}
418

419
static void stmtFreeTbBuf(void* buf) {
16✔
420
  void* pBuf = *(void**)buf;
16✔
421
  taosMemoryFree(pBuf);
16!
422
}
16✔
423

424
static void stmtFreeTbCols(void* buf) {
10,000✔
425
  SArray* pCols = *(SArray**)buf;
10,000✔
426
  taosArrayDestroy(pCols);
10,000✔
427
}
10,000✔
428

429
static int32_t stmtCleanSQLInfo(STscStmt2* pStmt) {
77✔
430
  STMT_DLOG_E("start to free SQL info");
77!
431

432
  taosMemoryFreeClear(pStmt->db);
77!
433
  taosMemoryFree(pStmt->sql.pBindInfo);
77!
434
  taosMemoryFree(pStmt->sql.queryRes.fields);
77!
435
  taosMemoryFree(pStmt->sql.queryRes.userFields);
77!
436
  taosMemoryFree(pStmt->sql.sqlStr);
77!
437
  qDestroyQuery(pStmt->sql.pQuery);
77✔
438
  taosArrayDestroy(pStmt->sql.nodeList);
77✔
439
  taosHashCleanup(pStmt->sql.pVgHash);
77✔
440
  pStmt->sql.pVgHash = NULL;
77✔
441
  if (pStmt->sql.fixValueTags) {
77✔
442
    tdDestroySVCreateTbReq(pStmt->sql.fixValueTbReq);
2!
443
  }
444

445
  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
77✔
446
  while (pIter) {
89✔
447
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
12✔
448

449
    qDestroyStmtDataBlock(pCache->pDataCtx);
12✔
450
    qDestroyBoundColInfo(pCache->boundTags);
12✔
451
    taosMemoryFreeClear(pCache->boundTags);
12!
452

453
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
12✔
454
  }
455
  taosHashCleanup(pStmt->sql.pTableCache);
77✔
456
  pStmt->sql.pTableCache = NULL;
77✔
457

458
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
77!
459
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
77!
460

461
  taos_free_result(pStmt->sql.siInfo.pRequest);
77✔
462
  taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
77✔
463
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
77✔
464
  taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
77✔
465
  taosMemoryFree(pStmt->sql.siInfo.pTSchema);
77!
466
  qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
77✔
467
  taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
77✔
468

469
  (void)memset(&pStmt->sql, 0, sizeof(pStmt->sql));
77✔
470
  pStmt->sql.siInfo.tableColsReady = true;
77✔
471

472
  STMT_DLOG_E("end to free SQL info");
77!
473

474
  return TSDB_CODE_SUCCESS;
77✔
475
}
476

477
static int32_t stmtTryAddTableVgroupInfo(STscStmt2* pStmt, int32_t* vgId) {
101✔
478
  if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) {
101✔
479
    return TSDB_CODE_SUCCESS;
10✔
480
  }
481

482
  SVgroupInfo      vgInfo = {0};
91✔
483
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
91✔
484
                           .requestId = pStmt->exec.pRequest->requestId,
91✔
485
                           .requestObjRefId = pStmt->exec.pRequest->self,
91✔
486
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
91✔
487

488
  int32_t code = catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo);
91✔
489
  if (TSDB_CODE_SUCCESS != code) {
91!
490
    return code;
×
491
  }
492

493
  code =
494
      taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
91✔
495
  if (TSDB_CODE_SUCCESS != code) {
91!
496
    return code;
×
497
  }
498

499
  *vgId = vgInfo.vgId;
91✔
500

501
  return TSDB_CODE_SUCCESS;
91✔
502
}
503

504
static int32_t stmtRebuildDataBlock(STscStmt2* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
57✔
505
                                    uint64_t suid, int32_t vgId) {
506
  STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
57!
507
  STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgId, pStmt->sql.autoCreateTbl));
57!
508

509
  STMT_DLOG("uid:%" PRId64 ", rebuild table data context, vgId:%d", uid, vgId);
57!
510

511
  return TSDB_CODE_SUCCESS;
57✔
512
}
513

514
static int32_t stmtGetFromCache(STscStmt2* pStmt) {
101✔
515
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) {
101!
516
    pStmt->bInfo.needParse = false;
×
517
    pStmt->bInfo.inExecCache = false;
×
518
    return TSDB_CODE_SUCCESS;
×
519
  }
520

521
  pStmt->bInfo.needParse = true;
101✔
522
  pStmt->bInfo.inExecCache = false;
101✔
523

524
  STableDataCxt** pCxtInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
101✔
525
  if (pCxtInExec) {
101✔
526
    pStmt->bInfo.needParse = false;
22✔
527
    pStmt->bInfo.inExecCache = true;
22✔
528

529
    pStmt->exec.pCurrBlock = *pCxtInExec;
22✔
530

531
    if (pStmt->sql.autoCreateTbl) {
22✔
532
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
18!
533
      return TSDB_CODE_SUCCESS;
18✔
534
    }
535
  }
536

537
  if (NULL == pStmt->pCatalog) {
83✔
538
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
22!
539
    pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
22✔
540
  }
541

542
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
83!
543
    if (pStmt->bInfo.inExecCache) {
22!
544
      pStmt->bInfo.needParse = false;
×
545
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
×
546
      return TSDB_CODE_SUCCESS;
×
547
    }
548

549
    tscDebug("no stmt block cache for tb %s", pStmt->bInfo.tbFName);
22!
550
    return TSDB_CODE_SUCCESS;
22✔
551
  }
552

553
  if (pStmt->sql.autoCreateTbl) {
61✔
554
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
45✔
555
    if (pCache) {
45!
556
      pStmt->bInfo.needParse = false;
45✔
557
      pStmt->bInfo.tbUid = 0;
45✔
558

559
      STableDataCxt* pNewBlock = NULL;
45✔
560
      STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid, -1));
45!
561

562
      if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
45!
563
                      POINTER_BYTES)) {
564
        STMT_ERR_RET(terrno);
×
565
      }
566

567
      pStmt->exec.pCurrBlock = pNewBlock;
45✔
568

569
      tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
45!
570

571
      return TSDB_CODE_SUCCESS;
45✔
572
    }
573

574
    STMT_RET(stmtCleanBindInfo(pStmt));
×
575
  }
576

577
  uint64_t uid, suid;
578
  int32_t  vgId;
579
  int8_t   tableType;
580

581
  STableMeta*      pTableMeta = NULL;
16✔
582
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
16✔
583
                           .requestId = pStmt->exec.pRequest->requestId,
16✔
584
                           .requestObjRefId = pStmt->exec.pRequest->self,
16✔
585
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
16✔
586
  int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
16✔
587

588
  pStmt->stat.ctgGetTbMetaNum++;
16✔
589

590
  if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
16!
591
    tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
×
592
    STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
593

594
    STMT_ERR_RET(code);
×
595
  }
596

597
  STMT_ERR_RET(code);
16!
598

599
  uid = pTableMeta->uid;
16✔
600
  suid = pTableMeta->suid;
16✔
601
  tableType = pTableMeta->tableType;
16✔
602
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
16✔
603
  vgId = pTableMeta->vgId;
16✔
604

605
  taosMemoryFree(pTableMeta);
16!
606

607
  uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
16!
608

609
  if (uid == pStmt->bInfo.tbUid) {
16!
610
    pStmt->bInfo.needParse = false;
×
611

612
    tscDebug("tb %s is current table", pStmt->bInfo.tbFName);
×
613

614
    return TSDB_CODE_SUCCESS;
×
615
  }
616

617
  if (pStmt->bInfo.inExecCache) {
16✔
618
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
4✔
619
    if (NULL == pCache) {
4!
620
      tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash",
×
621
               pStmt->bInfo.tbFName, uid, cacheUid);
622

623
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
624
    }
625

626
    pStmt->bInfo.needParse = false;
4✔
627

628
    pStmt->bInfo.tbUid = uid;
4✔
629
    pStmt->bInfo.tbSuid = suid;
4✔
630
    pStmt->bInfo.tbType = tableType;
4✔
631
    pStmt->bInfo.boundTags = pCache->boundTags;
4✔
632
    pStmt->bInfo.tagsCached = true;
4✔
633

634
    tscDebug("tb %s in execBlock list, set to current", pStmt->bInfo.tbFName);
4!
635

636
    return TSDB_CODE_SUCCESS;
4✔
637
  }
638

639
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
12✔
640
  if (pCache) {
12!
641
    pStmt->bInfo.needParse = false;
12✔
642

643
    pStmt->bInfo.tbUid = uid;
12✔
644
    pStmt->bInfo.tbSuid = suid;
12✔
645
    pStmt->bInfo.tbType = tableType;
12✔
646
    pStmt->bInfo.boundTags = pCache->boundTags;
12✔
647
    pStmt->bInfo.tagsCached = true;
12✔
648

649
    STableDataCxt* pNewBlock = NULL;
12✔
650
    STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid, vgId));
12!
651

652
    if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
12!
653
                    POINTER_BYTES)) {
654
      STMT_ERR_RET(terrno);
×
655
    }
656

657
    pStmt->exec.pCurrBlock = pNewBlock;
12✔
658

659
    tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);
12!
660

661
    return TSDB_CODE_SUCCESS;
12✔
662
  }
663

664
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
665

666
  return TSDB_CODE_SUCCESS;
×
667
}
668

669
static int32_t stmtResetStmt(STscStmt2* pStmt) {
1✔
670
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
1!
671

672
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
1✔
673
  if (NULL == pStmt->sql.pTableCache) {
1!
674
    STMT_ERR_RET(terrno);
×
675
  }
676

677
  pStmt->sql.status = STMT_INIT;
1✔
678

679
  return TSDB_CODE_SUCCESS;
1✔
680
}
681

682
static int32_t stmtAsyncOutput(STscStmt2* pStmt, void* param) {
89✔
683
  SStmtQNode* pParam = (SStmtQNode*)param;
89✔
684

685
  if (pParam->restoreTbCols) {
89✔
686
    for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) {
89✔
687
      SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i);
64✔
688
      *p = taosArrayInit(20, POINTER_BYTES);
64✔
689
      if (*p == NULL) {
64!
690
        STMT_ERR_RET(terrno);
×
691
      }
692
    }
693

694
    atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
25✔
695
  } else {
696
    int code = qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
64✔
697
                                      &pStmt->sql.siInfo, pParam->pCreateTbReq);
698
    // taosMemoryFree(pParam->pTbData);
699
    (void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
64✔
700
    STMT_ERR_RET(code);
64!
701
  }
702
  return TSDB_CODE_SUCCESS;
89✔
703
}
704

705
static void* stmtBindThreadFunc(void* param) {
16✔
706
  setThreadName("stmtBind");
16✔
707

708
  qInfo("stmt bind thread started");
16!
709

710
  STscStmt2* pStmt = (STscStmt2*)param;
16✔
711

712
  while (true) {
102✔
713
    if (pStmt->queue.stopQueue) {
118✔
714
      break;
16✔
715
    }
716

717
    SStmtQNode* asyncParam = NULL;
102✔
718
    if (!stmtDequeue(pStmt, &asyncParam)) {
102✔
719
      continue;
13✔
720
    }
721

722
    if (stmtAsyncOutput(pStmt, asyncParam) != 0) {
89!
723
      qError("stmt async output failed");
×
724
    }
725
  }
726

727
  qInfo("stmt bind thread stopped");
16!
728

729
  return NULL;
16✔
730
}
731

732
static int32_t stmtStartBindThread(STscStmt2* pStmt) {
16✔
733
  TdThreadAttr thAttr;
734
  if (taosThreadAttrInit(&thAttr) != 0) {
16!
735
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
736
  }
737
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
16!
738
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
739
  }
740

741
  if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) {
16!
742
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
743
    STMT_ERR_RET(terrno);
×
744
  }
745

746
  pStmt->bindThreadInUse = true;
16✔
747

748
  (void)taosThreadAttrDestroy(&thAttr);
16✔
749
  return TSDB_CODE_SUCCESS;
16✔
750
}
751

752
static int32_t stmtInitQueue(STscStmt2* pStmt) {
16✔
753
  (void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
16✔
754
  (void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
16✔
755
  STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
32!
756
  pStmt->queue.tail = pStmt->queue.head;
16✔
757

758
  return TSDB_CODE_SUCCESS;
16✔
759
}
760

761
static int32_t stmtIniAsyncBind(STscStmt2* pStmt) {
77✔
762
  (void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL);
77✔
763
  (void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL);
77✔
764
  pStmt->asyncBindParam.asyncBindNum = 0;
77✔
765

766
  return TSDB_CODE_SUCCESS;
77✔
767
}
768

769
static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
16✔
770
  pTblBuf->buffUnit = sizeof(SStmtQNode);
16✔
771
  pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
16✔
772
  pTblBuf->pBufList = taosArrayInit(100, POINTER_BYTES);
16✔
773
  if (NULL == pTblBuf->pBufList) {
16!
774
    return terrno;
×
775
  }
776
  void* buff = taosMemoryMalloc(pTblBuf->buffSize);
16!
777
  if (NULL == buff) {
16!
778
    return terrno;
×
779
  }
780

781
  if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
32!
782
    return terrno;
×
783
  }
784

785
  pTblBuf->pCurBuff = buff;
16✔
786
  pTblBuf->buffIdx = 1;
16✔
787
  pTblBuf->buffOffset = 0;
16✔
788

789
  return TSDB_CODE_SUCCESS;
16✔
790
}
791

792
TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
77✔
793
  STscObj*   pObj = (STscObj*)taos;
77✔
794
  STscStmt2* pStmt = NULL;
77✔
795
  int32_t    code = 0;
77✔
796

797
  pStmt = taosMemoryCalloc(1, sizeof(STscStmt2));
77!
798
  if (NULL == pStmt) {
77!
799
    return NULL;
×
800
  }
801

802
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
77✔
803
  if (NULL == pStmt->sql.pTableCache) {
77!
804
    taosMemoryFree(pStmt);
×
805
    return NULL;
×
806
  }
807

808
  pStmt->taos = pObj;
77✔
809
  pStmt->bInfo.needParse = true;
77✔
810
  pStmt->sql.status = STMT_INIT;
77✔
811
  pStmt->errCode = TSDB_CODE_SUCCESS;
77✔
812

813
  if (NULL != pOptions) {
77!
814
    (void)memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
77✔
815
    if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) {
77✔
816
      pStmt->stbInterlaceMode = true;
16✔
817
    }
818

819
    pStmt->reqid = pOptions->reqid;
77✔
820
  }
821

822
  if (pStmt->stbInterlaceMode) {
77✔
823
    pStmt->sql.siInfo.transport = taos->pAppInfo->pTransporter;
16✔
824
    pStmt->sql.siInfo.acctId = taos->acctId;
16✔
825
    pStmt->sql.siInfo.dbname = taos->db;
16✔
826
    pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
16✔
827
    pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
16✔
828
    if (NULL == pStmt->sql.siInfo.pTableHash) {
16!
829
      (void)stmtClose2(pStmt);
×
830
      return NULL;
×
831
    }
832
    pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
16✔
833
    if (NULL == pStmt->sql.siInfo.pTableCols) {
16!
834
      terrno = terrno;
×
835
      (void)stmtClose2(pStmt);
×
836
      return NULL;
×
837
    }
838

839
    code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
16✔
840
    if (TSDB_CODE_SUCCESS == code) {
16!
841
      code = stmtInitQueue(pStmt);
16✔
842
    }
843
    if (TSDB_CODE_SUCCESS == code) {
16!
844
      code = stmtStartBindThread(pStmt);
16✔
845
    }
846
    if (TSDB_CODE_SUCCESS != code) {
16!
847
      terrno = code;
×
848
      (void)stmtClose2(pStmt);
×
849
      return NULL;
×
850
    }
851
  }
852

853
  pStmt->sql.siInfo.tableColsReady = true;
77✔
854
  if (pStmt->options.asyncExecFn) {
77✔
855
    if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) {
4!
856
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
857
      (void)stmtClose2(pStmt);
×
858
      return NULL;
×
859
    }
860
  }
861
  code = stmtIniAsyncBind(pStmt);
77✔
862
  if (TSDB_CODE_SUCCESS != code) {
77!
863
    terrno = code;
×
864
    (void)stmtClose2(pStmt);
×
865
    return NULL;
×
866
  }
867

868
  pStmt->execSemWaited = false;
77✔
869

870
  STMT_LOG_SEQ(STMT_INIT);
77!
871

872
  tscDebug("stmt:%p initialized", pStmt);
77!
873

874
  return pStmt;
77✔
875
}
876

877
static int stmtSetDbName2(TAOS_STMT2* stmt, const char* dbName) {
47✔
878
  STscStmt2* pStmt = (STscStmt2*)stmt;
47✔
879

880
  STMT_DLOG("start to set dbName:%s", dbName);
47!
881

882
  pStmt->db = taosStrdup(dbName);
47!
883
  (void)strdequote(pStmt->db);
47✔
884
  STMT_ERR_RET(stmtCreateRequest(pStmt));
47!
885

886
  // The SQL statement specifies a database name, overriding the previously specified database
887
  taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
47!
888
  pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
47!
889
  if (pStmt->exec.pRequest->pDb == NULL) {
47!
890
    return terrno;
×
891
  }
892
  if (pStmt->sql.stbInterlaceMode) {
47✔
893
    pStmt->sql.siInfo.dbname = pStmt->db;
8✔
894
  }
895
  return TSDB_CODE_SUCCESS;
47✔
896
}
897

898
int stmtPrepare2(TAOS_STMT2* stmt, const char* sql, unsigned long length) {
76✔
899
  STscStmt2* pStmt = (STscStmt2*)stmt;
76✔
900

901
  STMT_DLOG_E("start to prepare");
76!
902

903
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
76✔
904
    return pStmt->errCode;
1✔
905
  }
906

907
  if (pStmt->sql.status >= STMT_PREPARE) {
75✔
908
    STMT_ERR_RET(stmtResetStmt(pStmt));
1!
909
  }
910

911
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));
75!
912

913
  if (length <= 0) {
75!
914
    length = strlen(sql);
75✔
915
  }
916

917
  pStmt->sql.sqlStr = taosStrndup(sql, length);
75!
918
  if (!pStmt->sql.sqlStr) {
75!
919
    return terrno;
×
920
  }
921
  pStmt->sql.sqlLen = length;
75✔
922
  pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode;
75✔
923

924
  char* dbName = NULL;
75✔
925
  if (qParseDbName(sql, length, &dbName)) {
75✔
926
    STMT_ERR_RET(stmtSetDbName2(stmt, dbName));
47!
927
    taosMemoryFreeClear(dbName);
47!
928
  }
929

930
  return TSDB_CODE_SUCCESS;
75✔
931
}
932

933
static int32_t stmtInitStbInterlaceTableInfo(STscStmt2* pStmt) {
10✔
934
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
10✔
935
  if (!pSrc) {
10!
936
    return terrno;
×
937
  }
938
  STableDataCxt* pDst = NULL;
10✔
939

940
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
10!
941
  pStmt->sql.siInfo.pDataCtx = pDst;
10✔
942

943
  SArray* pTblCols = NULL;
10✔
944
  for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) {
10,010✔
945
    pTblCols = taosArrayInit(20, POINTER_BYTES);
10,000✔
946
    if (NULL == pTblCols) {
10,000!
947
      return terrno;
×
948
    }
949

950
    if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
20,000!
951
      return terrno;
×
952
    }
953
  }
954

955
  pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
10✔
956

957
  return TSDB_CODE_SUCCESS;
10✔
958
}
959

960
int stmtIsInsert2(TAOS_STMT2* stmt, int* insert) {
372✔
961
  STscStmt2* pStmt = (STscStmt2*)stmt;
372✔
962

963
  STMT_DLOG_E("start is insert");
372!
964

965
  if (pStmt->sql.type) {
372✔
966
    *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
301✔
967
  } else {
968
    *insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
71✔
969
  }
970

971
  return TSDB_CODE_SUCCESS;
372✔
972
}
973

974
int stmtSetTbName2(TAOS_STMT2* stmt, const char* tbName) {
156✔
975
  STscStmt2* pStmt = (STscStmt2*)stmt;
156✔
976

977
  int64_t startUs = taosGetTimestampUs();
156✔
978

979
  STMT_DLOG("start to set tbName:%s", tbName);
156!
980

981
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
156!
982
    return pStmt->errCode;
×
983
  }
984

985
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
156!
986

987
  int32_t insert = 0;
156✔
988
  STMT_ERR_RET(stmtIsInsert2(stmt, &insert));
156!
989
  if (0 == insert) {
156!
990
    tscError("set tb name not available for none insert statement");
×
991
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
992
  }
993

994
  if (!pStmt->sql.stbInterlaceMode || NULL == pStmt->sql.siInfo.pDataCtx) {
156✔
995
    STMT_ERR_RET(stmtCreateRequest(pStmt));
101!
996

997
    STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
101!
998
                              pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
999
    STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
101!
1000

1001
    STMT_ERR_RET(stmtGetFromCache(pStmt));
101!
1002

1003
    if (pStmt->bInfo.needParse) {
101✔
1004
      tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
22✔
1005
      pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
22✔
1006

1007
      STMT_ERR_RET(stmtParseSql(pStmt));
22!
1008
    }
1009
  } else {
1010
    tstrncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName));
55✔
1011
    pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
55✔
1012
    pStmt->exec.pRequest->requestId++;
55✔
1013
    pStmt->bInfo.needParse = false;
55✔
1014
  }
1015

1016
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
156✔
1017
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
10!
1018
  }
1019

1020
  int64_t startUs2 = taosGetTimestampUs();
156✔
1021
  pStmt->stat.setTbNameUs += startUs2 - startUs;
156✔
1022

1023
  return TSDB_CODE_SUCCESS;
156✔
1024
}
1025

1026
int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags, SVCreateTbReq** pCreateTbReq) {
109✔
1027
  STscStmt2* pStmt = (STscStmt2*)stmt;
109✔
1028

1029
  STMT_DLOG_E("start to set tbTags");
109!
1030

1031
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
109!
1032
    return pStmt->errCode;
×
1033
  }
1034

1035
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
109!
1036

1037
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
109!
1038
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1039
    pStmt->bInfo.needParse = false;
×
1040
  }
1041
  STMT_ERR_RET(stmtCreateRequest(pStmt));
109!
1042

1043
  if (pStmt->bInfo.needParse) {
109!
1044
    STMT_ERR_RET(stmtParseSql(pStmt));
×
1045
  }
1046
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
109!
1047
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1048
  }
1049

1050
  SBoundColInfo* tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
109✔
1051
  // if (tags_info->numOfBound <= 0 || tags_info->numOfCols <= 0) {
1052
  //   tscWarn("no tags or cols bound in sql, will not bound tags");
1053
  //   return TSDB_CODE_SUCCESS;
1054
  // }
1055
  if (pStmt->sql.autoCreateTbl && pStmt->sql.stbInterlaceMode) {
109!
1056
    STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, pStmt->bInfo.tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
36!
1057
                              pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1058
    STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
36!
1059
  }
1060

1061
  STableDataCxt** pDataBlock = NULL;
109✔
1062
  if (pStmt->exec.pCurrBlock) {
109✔
1063
    pDataBlock = &pStmt->exec.pCurrBlock;
94✔
1064
  } else {
1065
    pDataBlock =
1066
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
15✔
1067
    if (NULL == pDataBlock) {
15!
1068
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1069
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1070
    }
1071
    // pStmt->exec.pCurrBlock = *pDataBlock;
1072
    // if (pStmt->sql.stbInterlaceMode) {
1073
    //   taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
1074
    //   (*pDataBlock)->pData->aCol = NULL;
1075
    // }
1076
  }
1077
  if (pStmt->bInfo.inExecCache && !pStmt->sql.autoCreateTbl) {
109!
1078
    return TSDB_CODE_SUCCESS;
×
1079
  }
1080

1081
  tscDebug("start to bind stmt tag values");
109!
1082

1083
  void* boundTags = NULL;
109✔
1084
  if (pStmt->sql.stbInterlaceMode) {
109✔
1085
    boundTags = pStmt->sql.siInfo.boundTags;
36✔
1086
    *pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
36!
1087
    if (NULL == pCreateTbReq) {
36!
1088
      return terrno;
×
1089
    }
1090
    int32_t vgId = -1;
36✔
1091
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
36!
1092
    (*pCreateTbReq)->uid = vgId;
36✔
1093
  } else {
1094
    boundTags = pStmt->bInfo.boundTags;
73✔
1095
  }
1096

1097
  STMT_ERR_RET(qBindStmtTagsValue2(*pDataBlock, boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
109!
1098
                                   pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
1099
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt, *pCreateTbReq));
1100

1101
  return TSDB_CODE_SUCCESS;
109✔
1102
}
1103

1104
int stmtCheckTags2(TAOS_STMT2* stmt, SVCreateTbReq** pCreateTbReq) {
15✔
1105
  STscStmt2* pStmt = (STscStmt2*)stmt;
15✔
1106

1107
  STMT_DLOG_E("start to set tbTags");
15!
1108

1109
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
15!
1110
    return pStmt->errCode;
×
1111
  }
1112

1113
  if (!pStmt->sql.stbInterlaceMode) {
15✔
1114
    return TSDB_CODE_SUCCESS;
2✔
1115
  }
1116

1117
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
13!
1118

1119
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
13!
1120
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1121
    pStmt->bInfo.needParse = false;
×
1122
  }
1123
  STMT_ERR_RET(stmtCreateRequest(pStmt));
13!
1124

1125
  if (pStmt->bInfo.needParse) {
13✔
1126
    STMT_ERR_RET(stmtParseSql(pStmt));
3!
1127
    if (!pStmt->sql.autoCreateTbl) {
3!
1128
      return TSDB_CODE_SUCCESS;
3✔
1129
    }
1130
  }
1131

1132
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
10!
1133
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1134
  }
1135

1136
  STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, pStmt->bInfo.tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
10!
1137
                            pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
1138
  STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
10!
1139

1140
  STableDataCxt** pDataBlock = NULL;
10✔
1141
  if (pStmt->exec.pCurrBlock) {
10✔
1142
    pDataBlock = &pStmt->exec.pCurrBlock;
8✔
1143
  } else {
1144
    pDataBlock =
1145
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
2✔
1146
    if (NULL == pDataBlock) {
2!
1147
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1148
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1149
    }
1150
  }
1151

1152
  if (!((*pDataBlock)->pData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE)) {
10!
1153
    return TSDB_CODE_SUCCESS;
×
1154
  }
1155

1156
  if (pStmt->sql.fixValueTags) {
10✔
1157
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
8!
1158
    if ((*pCreateTbReq)->name) {
8!
1159
      taosMemoryFree((*pCreateTbReq)->name);
8!
1160
    }
1161
    (*pCreateTbReq)->name = taosStrdup(pStmt->bInfo.tbName);
8!
1162
    int32_t vgId = -1;
8✔
1163
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
8!
1164
    (*pCreateTbReq)->uid = vgId;
8✔
1165
    return TSDB_CODE_SUCCESS;
8✔
1166
  }
1167

1168
  if ((*pDataBlock)->pData->pCreateTbReq) {
2!
1169
    pStmt->sql.fixValueTags = true;
2✔
1170
    STMT_ERR_RET(cloneSVreateTbReq((*pDataBlock)->pData->pCreateTbReq, &pStmt->sql.fixValueTbReq));
2!
1171
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
2!
1172
    (*pCreateTbReq)->uid = (*pDataBlock)->pMeta->vgId;
2✔
1173
  }
1174

1175
  return TSDB_CODE_SUCCESS;
2✔
1176
}
1177

1178
static int stmtFetchColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
×
1179
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
1180
    return pStmt->errCode;
×
1181
  }
1182

1183
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
1184
    tscError("invalid operation to get query column fileds");
×
1185
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1186
  }
1187

1188
  STableDataCxt** pDataBlock = NULL;
×
1189

1190
  if (pStmt->sql.stbInterlaceMode) {
×
1191
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
×
1192
  } else {
1193
    pDataBlock =
1194
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
1195
    if (NULL == pDataBlock) {
×
1196
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1197
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1198
    }
1199
  }
1200

1201
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
×
1202

1203
  return TSDB_CODE_SUCCESS;
×
1204
}
1205

1206
static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_ALL** fields) {
30✔
1207
  int32_t    code = 0;
30✔
1208
  int32_t    preCode = pStmt->errCode;
30✔
1209

1210
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
30!
1211
    return pStmt->errCode;
×
1212
  }
1213

1214
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
30!
1215
    tscError("invalid operation to get query column fileds");
×
1216
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1217
  }
1218

1219
  STableDataCxt** pDataBlock = NULL;
30✔
1220
  bool            cleanStb = false;
30✔
1221

1222
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
30✔
1223
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
6✔
1224
  } else {
1225
    cleanStb = true;
24✔
1226
    pDataBlock =
1227
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
24✔
1228
  }
1229

1230
  if (NULL == pDataBlock || NULL == *pDataBlock) {
30!
1231
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1232
    STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
1233
  }
1234

1235
  STMT_ERRI_JRET(
30!
1236
      qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbNameFlag, fieldNum, fields));
1237

1238
  if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE && cleanStb) {
30!
1239
    pStmt->bInfo.needParse = true;
17✔
1240
    qDestroyStmtDataBlock(*pDataBlock);
17✔
1241
    *pDataBlock = NULL;
17✔
1242
    if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
17!
1243
      tscError("get fileds %s remove exec blockHash fail", pStmt->bInfo.tbFName);
×
1244
      STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
1245
    }
1246
  }
1247

1248
_return:
30✔
1249

1250
  pStmt->errCode = preCode;
30✔
1251

1252
  return code;
30✔
1253
}
1254
/*
1255
SArray* stmtGetFreeCol(STscStmt2* pStmt, int32_t* idx) {
1256
  while (true) {
1257
    if (pStmt->exec.smInfo.pColIdx >= STMT_COL_BUF_SIZE) {
1258
      pStmt->exec.smInfo.pColIdx = 0;
1259
    }
1260

1261
    if ((pStmt->exec.smInfo.pColIdx + 1) == atomic_load_32(&pStmt->exec.smInfo.pColFreeIdx)) {
1262
      taosUsleep(1);
1263
      continue;
1264
    }
1265

1266
    *idx = pStmt->exec.smInfo.pColIdx;
1267
    return pStmt->exec.smInfo.pCols[pStmt->exec.smInfo.pColIdx++];
1268
  }
1269
}
1270
*/
1271
static int32_t stmtAppendTablePostHandle(STscStmt2* pStmt, SStmtQNode* param) {
64✔
1272
  if (NULL == pStmt->sql.siInfo.pVgroupHash) {
64✔
1273
    pStmt->sql.siInfo.pVgroupHash =
25✔
1274
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
25✔
1275
  }
1276
  if (NULL == pStmt->sql.siInfo.pVgroupList) {
64✔
1277
    pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES);
25✔
1278
  }
1279

1280
  if (NULL == pStmt->sql.siInfo.pRequest) {
64✔
1281
    STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false,
9!
1282
                              (SRequestObj**)&pStmt->sql.siInfo.pRequest, pStmt->reqid));
1283

1284
    if (pStmt->reqid != 0) {
9!
1285
      pStmt->reqid++;
×
1286
    }
1287
    pStmt->exec.pRequest->syncQuery = true;
9✔
1288

1289
    pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId;
9✔
1290
    pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self;
9✔
1291
  }
1292

1293
  if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] &&
64✔
1294
      0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) {
21✔
1295
    pStmt->sql.siInfo.tbFromHash = true;
8✔
1296
  }
1297

1298
  if (0 == pStmt->sql.siInfo.firstName[0]) {
64✔
1299
    tstrncpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
9✔
1300
  }
1301

1302
  param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash;
64✔
1303
  param->next = NULL;
64✔
1304

1305
  (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
64✔
1306

1307
  stmtEnqueue(pStmt, param);
64✔
1308

1309
  return TSDB_CODE_SUCCESS;
64✔
1310
}
1311

1312
static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt2* pStmt, SArray** pTableCols) {
1313
  while (true) {
1314
    if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) {
65!
1315
      *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++);
65✔
1316
      break;
65✔
1317
    } else {
1318
      SArray* pTblCols = NULL;
×
1319
      for (int32_t i = 0; i < 100; i++) {
×
1320
        pTblCols = taosArrayInit(20, POINTER_BYTES);
×
1321
        if (NULL == pTblCols) {
×
1322
          return terrno;
×
1323
        }
1324

1325
        if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
×
1326
          return terrno;
×
1327
        }
1328
      }
1329
    }
1330
  }
1331

1332
  return TSDB_CODE_SUCCESS;
65✔
1333
}
1334

1335
static int32_t stmtCacheBlock(STscStmt2* pStmt) {
97✔
1336
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
97✔
1337
    return TSDB_CODE_SUCCESS;
7✔
1338
  }
1339

1340
  uint64_t uid = pStmt->bInfo.tbUid;
90✔
1341
  uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
90!
1342

1343
  if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
90✔
1344
    return TSDB_CODE_SUCCESS;
78✔
1345
  }
1346

1347
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
12✔
1348
  if (!pSrc) {
12!
1349
    return terrno;
×
1350
  }
1351
  STableDataCxt* pDst = NULL;
12✔
1352

1353
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
12!
1354

1355
  SStmtTableCache cache = {
12✔
1356
      .pDataCtx = pDst,
1357
      .boundTags = pStmt->bInfo.boundTags,
12✔
1358
  };
1359

1360
  if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
12!
1361
    return terrno;
×
1362
  }
1363

1364
  if (pStmt->sql.autoCreateTbl) {
12✔
1365
    pStmt->bInfo.tagsCached = true;
10✔
1366
  } else {
1367
    pStmt->bInfo.boundTags = NULL;
2✔
1368
  }
1369

1370
  return TSDB_CODE_SUCCESS;
12✔
1371
}
1372

1373
static int stmtAddBatch2(TAOS_STMT2* stmt) {
122✔
1374
  STscStmt2* pStmt = (STscStmt2*)stmt;
122✔
1375

1376
  int64_t startUs = taosGetTimestampUs();
122✔
1377

1378
  STMT_DLOG_E("start to add batch");
122!
1379

1380
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
122!
1381
    return pStmt->errCode;
×
1382
  }
1383

1384
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
122!
1385

1386
  if (pStmt->sql.stbInterlaceMode) {
122✔
1387
    int64_t startUs2 = taosGetTimestampUs();
25✔
1388
    pStmt->stat.addBatchUs += startUs2 - startUs;
25✔
1389

1390
    pStmt->sql.siInfo.tableColsReady = false;
25✔
1391

1392
    SStmtQNode* param = NULL;
25✔
1393
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
50!
1394
    param->restoreTbCols = true;
25✔
1395
    param->next = NULL;
25✔
1396

1397
    if (pStmt->sql.autoCreateTbl) {
25✔
1398
      pStmt->bInfo.tagsCached = true;
18✔
1399
    }
1400

1401
    stmtEnqueue(pStmt, param);
25✔
1402

1403
    return TSDB_CODE_SUCCESS;
25✔
1404
  }
1405

1406
  STMT_ERR_RET(stmtCacheBlock(pStmt));
97!
1407

1408
  return TSDB_CODE_SUCCESS;
97✔
1409
}
1410
/*
1411
static int32_t stmtBackupQueryFields(STscStmt2* pStmt) {
1412
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
1413
  pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
1414
  pRes->precision = pStmt->exec.pRequest->body.resInfo.precision;
1415

1416
  int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
1417
  pRes->fields = taosMemoryMalloc(size);
1418
  pRes->userFields = taosMemoryMalloc(size);
1419
  if (NULL == pRes->fields || NULL == pRes->userFields) {
1420
    STMT_ERR_RET(terrno);
1421
  }
1422
  (void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
1423
  (void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
1424

1425
  return TSDB_CODE_SUCCESS;
1426
}
1427

1428
static int32_t stmtRestoreQueryFields(STscStmt2* pStmt) {
1429
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
1430
  int32_t            size = pRes->numOfCols * sizeof(TAOS_FIELD);
1431

1432
  pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols;
1433
  pStmt->exec.pRequest->body.resInfo.precision = pRes->precision;
1434

1435
  if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
1436
    pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size);
1437
    if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
1438
      STMT_ERR_RET(terrno);
1439
    }
1440
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
1441
  }
1442

1443
  if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
1444
    pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size);
1445
    if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
1446
      STMT_ERR_RET(terrno);
1447
    }
1448
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
1449
  }
1450

1451
  return TSDB_CODE_SUCCESS;
1452
}
1453
*/
1454
int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx, SVCreateTbReq* pCreateTbReq) {
167✔
1455
  STscStmt2* pStmt = (STscStmt2*)stmt;
167✔
1456
  int32_t    code = 0;
167✔
1457

1458
  int64_t startUs = taosGetTimestampUs();
167✔
1459

1460
  STMT_DLOG("start to bind stmt data, colIdx:%d", colIdx);
167!
1461

1462
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
167!
1463
    return pStmt->errCode;
×
1464
  }
1465

1466
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
167!
1467

1468
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
167!
1469
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
1470
    pStmt->bInfo.needParse = false;
×
1471
  }
1472

1473
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
167✔
1474
    taos_free_result(pStmt->exec.pRequest);
1✔
1475
    pStmt->exec.pRequest = NULL;
1✔
1476
  }
1477

1478
  STMT_ERR_RET(stmtCreateRequest(pStmt));
167!
1479

1480
  if (pStmt->bInfo.needParse) {
167✔
1481
    STMT_ERR_RET(stmtParseSql(pStmt));
1!
1482
  }
1483

1484
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
167✔
1485
    STMT_ERR_RET(qStmtBindParams2(pStmt->sql.pQuery, bind, colIdx, pStmt->taos->optionInfo.charsetCxt));
3!
1486

1487
    SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
3✔
1488
                         .acctId = pStmt->taos->acctId,
3✔
1489
                         .db = pStmt->exec.pRequest->pDb,
3✔
1490
                         .topicQuery = false,
1491
                         .pSql = pStmt->sql.sqlStr,
3✔
1492
                         .sqlLen = pStmt->sql.sqlLen,
3✔
1493
                         .pMsg = pStmt->exec.pRequest->msgBuf,
3✔
1494
                         .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1495
                         .pTransporter = pStmt->taos->pAppInfo->pTransporter,
3✔
1496
                         .pStmtCb = NULL,
1497
                         .pUser = pStmt->taos->user};
3✔
1498
    ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
3✔
1499
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog));
3!
1500

1501
    STMT_ERR_RET(qStmtParseQuerySql(&ctx, pStmt->sql.pQuery));
3!
1502

1503
    if (pStmt->sql.pQuery->haveResultSet) {
3!
1504
      STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
3!
1505
                                    pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true));
1506
      taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
3!
1507
      taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
3!
1508
      setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
3✔
1509
    }
1510

1511
    TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
3✔
1512
    TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
3✔
1513
    TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
3✔
1514

1515
    // if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
1516
    //   STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
1517
    // }
1518

1519
    // STMT_ERR_RET(stmtBackupQueryFields(pStmt));
1520

1521
    return TSDB_CODE_SUCCESS;
3✔
1522
  }
1523

1524
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
164!
1525
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
1526
  }
1527

1528
  STableDataCxt** pDataBlock = NULL;
164✔
1529

1530
  if (pStmt->exec.pCurrBlock) {
164✔
1531
    pDataBlock = &pStmt->exec.pCurrBlock;
140✔
1532
  } else {
1533
    pDataBlock =
1534
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
24✔
1535
    if (NULL == pDataBlock) {
24!
1536
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
×
1537
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
1538
    }
1539
    pStmt->exec.pCurrBlock = *pDataBlock;
24✔
1540
    if (pStmt->sql.stbInterlaceMode) {
24✔
1541
      taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
10✔
1542
      (*pDataBlock)->pData->aCol = NULL;
10✔
1543
    }
1544
    if (colIdx < -1) {
24✔
1545
      pStmt->sql.bindRowFormat = true;
1✔
1546
      taosArrayDestroy((*pDataBlock)->pData->aCol);
1✔
1547
      (*pDataBlock)->pData->aCol = taosArrayInit(20, POINTER_BYTES);
1✔
1548
    }
1549
  }
1550

1551
  int64_t startUs2 = taosGetTimestampUs();
164✔
1552
  pStmt->stat.bindDataUs1 += startUs2 - startUs;
164✔
1553

1554
  SStmtQNode* param = NULL;
164✔
1555
  if (pStmt->sql.stbInterlaceMode) {
164✔
1556
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
130!
1557
    STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, &param->tblData.aCol));
130!
1558
    taosArrayClear(param->tblData.aCol);
65✔
1559

1560
    // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES);
1561

1562
    param->restoreTbCols = false;
65✔
1563
    tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
65✔
1564

1565
    param->pCreateTbReq = pCreateTbReq;
65✔
1566
  }
1567

1568
  int64_t startUs3 = taosGetTimestampUs();
164✔
1569
  pStmt->stat.bindDataUs2 += startUs3 - startUs2;
164✔
1570

1571
  SArray* pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol;
164✔
1572

1573
  if (colIdx < 0) {
164✔
1574
    if (pStmt->sql.stbInterlaceMode) {
158✔
1575
      // (*pDataBlock)->pData->flags = 0;
1576
      (*pDataBlock)->pData->flags &= ~SUBMIT_REQ_COLUMN_DATA_FORMAT;
65✔
1577
      code = qBindStmtStbColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
65✔
1578
                                    pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
65✔
1579
                                    pStmt->taos->optionInfo.charsetCxt);
65✔
1580
    } else {
1581
      if (colIdx == -1) {
93✔
1582
        if (pStmt->sql.bindRowFormat) {
91✔
1583
          tscError("can't mix bind row format and bind column format");
1!
1584
          STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
1!
1585
        }
1586
        code = qBindStmtColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
90✔
1587
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt);
90✔
1588
      } else {
1589
        code = qBindStmt2RowValue(*pDataBlock, (*pDataBlock)->pData->aRowP, bind, pStmt->exec.pRequest->msgBuf,
2✔
1590
                                  pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
2✔
1591
                                  pStmt->taos->optionInfo.charsetCxt);
2✔
1592
      }
1593
    }
1594

1595
    if (code) {
157✔
1596
      tscError("qBindStmtColsValue failed, error:%s", tstrerror(code));
2!
1597
      STMT_ERR_RET(code);
2!
1598
    }
1599
  } else {
1600
    if (pStmt->sql.stbInterlaceMode) {
6!
1601
      tscError("bind single column not allowed in stb insert mode");
×
1602
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1603
    }
1604

1605
    if (pStmt->sql.bindRowFormat) {
6!
1606
      tscError("can't mix bind row format and bind column format");
×
1607
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
1608
    }
1609

1610
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
6!
1611
      tscError("bind column index not in sequence");
×
1612
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1613
    }
1614

1615
    pStmt->bInfo.sBindLastIdx = colIdx;
6✔
1616

1617
    if (0 == colIdx) {
6✔
1618
      pStmt->bInfo.sBindRowNum = bind->num;
3✔
1619
    }
1620

1621
    code = qBindStmtSingleColValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
6✔
1622
                                    pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum, pStmt->taos->optionInfo.charsetCxt);
6✔
1623
    if (code) {
6!
1624
      tscError("qBindStmtSingleColValue failed, error:%s", tstrerror(code));
×
1625
      STMT_ERR_RET(code);
×
1626
    }
1627
  }
1628

1629
  int64_t startUs4 = taosGetTimestampUs();
161✔
1630
  pStmt->stat.bindDataUs3 += startUs4 - startUs3;
161✔
1631

1632
  if (pStmt->sql.stbInterlaceMode) {
161✔
1633
    STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param));
64!
1634
  } else {
1635
    STMT_ERR_RET(stmtAddBatch2(pStmt));
97!
1636
  }
1637

1638
  pStmt->stat.bindDataUs4 += taosGetTimestampUs() - startUs4;
161✔
1639

1640
  return TSDB_CODE_SUCCESS;
161✔
1641
}
1642
/*
1643
int stmtUpdateTableUid(STscStmt2* pStmt, SSubmitRsp* pRsp) {
1644
  tscDebug("stmt start to update tbUid, blockNum:%d", pRsp->nBlocks);
1645

1646
  int32_t code = 0;
1647
  int32_t finalCode = 0;
1648
  size_t  keyLen = 0;
1649
  void*   pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
1650
  while (pIter) {
1651
    STableDataCxt* pBlock = *(STableDataCxt**)pIter;
1652
    char*          key = taosHashGetKey(pIter, &keyLen);
1653

1654
    STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
1655
    if (pMeta->uid) {
1656
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1657
      continue;
1658
    }
1659

1660
    SSubmitBlkRsp* blkRsp = NULL;
1661
    int32_t        i = 0;
1662
    for (; i < pRsp->nBlocks; ++i) {
1663
      blkRsp = pRsp->pBlocks + i;
1664
      if (strlen(blkRsp->tblFName) != keyLen) {
1665
        continue;
1666
      }
1667

1668
      if (strncmp(blkRsp->tblFName, key, keyLen)) {
1669
        continue;
1670
      }
1671

1672
      break;
1673
    }
1674

1675
    if (i < pRsp->nBlocks) {
1676
      tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid,
1677
               blkRsp->uid);
1678

1679
      pMeta->uid = blkRsp->uid;
1680
      pStmt->bInfo.tbUid = blkRsp->uid;
1681
    } else {
1682
      tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
1683
      if (NULL == pStmt->pCatalog) {
1684
        code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
1685
        if (code) {
1686
          pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1687
          finalCode = code;
1688
          continue;
1689
        }
1690
      }
1691

1692
      code = stmtCreateRequest(pStmt);
1693
      if (code) {
1694
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1695
        finalCode = code;
1696
        continue;
1697
      }
1698

1699
      STableMeta*      pTableMeta = NULL;
1700
      SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
1701
                               .requestId = pStmt->exec.pRequest->requestId,
1702
                               .requestObjRefId = pStmt->exec.pRequest->self,
1703
                               .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
1704
      code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
1705

1706
      pStmt->stat.ctgGetTbMetaNum++;
1707

1708
      taos_free_result(pStmt->exec.pRequest);
1709
      pStmt->exec.pRequest = NULL;
1710

1711
      if (code || NULL == pTableMeta) {
1712
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1713
        finalCode = code;
1714
        taosMemoryFree(pTableMeta);
1715
        continue;
1716
      }
1717

1718
      pMeta->uid = pTableMeta->uid;
1719
      pStmt->bInfo.tbUid = pTableMeta->uid;
1720
      taosMemoryFree(pTableMeta);
1721
    }
1722

1723
    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
1724
  }
1725

1726
  return finalCode;
1727
}
1728
*/
1729
/*
1730
int stmtStaticModeExec(TAOS_STMT* stmt) {
1731
  STscStmt2*   pStmt = (STscStmt2*)stmt;
1732
  int32_t     code = 0;
1733
  SSubmitRsp* pRsp = NULL;
1734
  if (pStmt->sql.staticMode) {
1735
    return TSDB_CODE_TSC_STMT_API_ERROR;
1736
  }
1737

1738
  STMT_DLOG_E("start to exec");
1739

1740
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
1741

1742
  STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList,
1743
pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum));
1744

1745
  launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
1746

1747
  if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
1748
    code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
1749
    if (code) {
1750
      pStmt->exec.pRequest->code = code;
1751
    } else {
1752
      tFreeSSubmitRsp(pRsp);
1753
      STMT_ERR_RET(stmtResetStmt(pStmt));
1754
      STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
1755
    }
1756
  }
1757

1758
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
1759

1760
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
1761
  pStmt->affectedRows += pStmt->exec.affectedRows;
1762

1763
_return:
1764

1765
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
1766

1767
  tFreeSSubmitRsp(pRsp);
1768

1769
  ++pStmt->sql.runTimes;
1770

1771
  STMT_RET(code);
1772
}
1773
*/
1774

1775
static int32_t createParseContext(const SRequestObj* pRequest, SParseContext** pCxt, SSqlCallbackWrapper* pWrapper) {
9✔
1776
  const STscObj* pTscObj = pRequest->pTscObj;
9✔
1777

1778
  *pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
9!
1779
  if (*pCxt == NULL) {
9!
1780
    return terrno;
×
1781
  }
1782

1783
  **pCxt = (SParseContext){.requestId = pRequest->requestId,
9✔
1784
                           .requestRid = pRequest->self,
9✔
1785
                           .acctId = pTscObj->acctId,
9✔
1786
                           .db = pRequest->pDb,
9✔
1787
                           .topicQuery = false,
1788
                           .pSql = pRequest->sqlstr,
9✔
1789
                           .sqlLen = pRequest->sqlLen,
9✔
1790
                           .pMsg = pRequest->msgBuf,
9✔
1791
                           .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1792
                           .pTransporter = pTscObj->pAppInfo->pTransporter,
9✔
1793
                           .pStmtCb = NULL,
1794
                           .pUser = pTscObj->user,
9✔
1795
                           .pEffectiveUser = pRequest->effectiveUser,
9✔
1796
                           .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
9✔
1797
                           .enableSysInfo = pTscObj->sysInfo,
9✔
1798
                           .async = true,
1799
                           .svrVer = pTscObj->sVer,
9✔
1800
                           .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
9✔
1801
                           .allocatorId = pRequest->allocatorRefId,
9✔
1802
                           .parseSqlFp = clientParseSql,
1803
                           .parseSqlParam = pWrapper};
1804
  int8_t biMode = atomic_load_8(&((STscObj*)pTscObj)->biMode);
9✔
1805
  (*pCxt)->biMode = biMode;
9✔
1806
  return TSDB_CODE_SUCCESS;
9✔
1807
}
1808

1809
static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
9✔
1810
  STscStmt2*        pStmt = userdata;
9✔
1811
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
9✔
1812

1813
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
9✔
1814
  pStmt->affectedRows += pStmt->exec.affectedRows;
9✔
1815

1816
  fp(pStmt->options.userdata, res, code);
9✔
1817

1818
  while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
9!
1819
    taosUsleep(1);
×
1820
  }
1821
  (void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
9✔
1822
  ++pStmt->sql.runTimes;
9✔
1823

1824
  if (tsem_post(&pStmt->asyncExecSem) != 0) {
9!
1825
    tscError("failed to post asyncExecSem");
×
1826
  }
1827
}
9✔
1828

1829
int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
66✔
1830
  STscStmt2* pStmt = (STscStmt2*)stmt;
66✔
1831
  int32_t    code = 0;
66✔
1832
  int64_t    startUs = taosGetTimestampUs();
66✔
1833

1834
  STMT_DLOG_E("start to exec");
66!
1835

1836
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
66!
1837
    return pStmt->errCode;
×
1838
  }
1839

1840
  TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
66!
1841
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
66!
1842
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
1843
  }
1844
  TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
66!
1845

1846
  if (pStmt->sql.stbInterlaceMode) {
66✔
1847
    STMT_ERR_RET(stmtAddBatch2(pStmt));
25!
1848
  }
1849

1850
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
66✔
1851

1852
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
65✔
1853
    if (pStmt->sql.stbInterlaceMode) {
62✔
1854
      int64_t startTs = taosGetTimestampUs();
25✔
1855
      while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
103✔
1856
        taosUsleep(1);
78✔
1857
      }
1858
      pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs;
25✔
1859

1860
      STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList));
25!
1861
      taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
25✔
1862
      pStmt->sql.siInfo.pVgroupHash = NULL;
25✔
1863
      pStmt->sql.siInfo.pVgroupList = NULL;
25✔
1864
    } else {
1865
      tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
37✔
1866
      taosMemoryFreeClear(pStmt->exec.pCurrTbData);
37!
1867

1868
      STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
37!
1869

1870
      STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
37!
1871
    }
1872
  }
1873

1874
  SRequestObj*      pRequest = pStmt->exec.pRequest;
65✔
1875
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
65✔
1876

1877
  if (!fp) {
65✔
1878
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
56✔
1879

1880
    if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
56!
1881
      code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
×
1882
      if (code) {
×
1883
        pStmt->exec.pRequest->code = code;
×
1884
      } else {
1885
        STMT_ERR_RET(stmtResetStmt(pStmt));
×
1886
        STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
×
1887
      }
1888
    }
1889

1890
    STMT_ERR_JRET(pStmt->exec.pRequest->code);
56!
1891

1892
    pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
56✔
1893
    if (affected_rows) {
56✔
1894
      *affected_rows = pStmt->exec.affectedRows;
53✔
1895
    }
1896
    pStmt->affectedRows += pStmt->exec.affectedRows;
56✔
1897

1898
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
56!
1899
      taosUsleep(1);
×
1900
    }
1901

1902
    STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
56!
1903

1904
    ++pStmt->sql.runTimes;
56✔
1905
  } else {
1906
    SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
9!
1907
    if (pWrapper == NULL) {
9!
1908
      code = terrno;
×
1909
    } else {
1910
      pWrapper->pRequest = pRequest;
9✔
1911
      pRequest->pWrapper = pWrapper;
9✔
1912
    }
1913
    if (TSDB_CODE_SUCCESS == code) {
9!
1914
      code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper);
9✔
1915
    }
1916
    pRequest->syncQuery = false;
9✔
1917
    pRequest->body.queryFp = asyncQueryCb;
9✔
1918
    ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
9✔
1919

1920
    pStmt->execSemWaited = false;
9✔
1921
    launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
9✔
1922
  }
1923

1924
_return:
65✔
1925
  pStmt->stat.execUseUs += taosGetTimestampUs() - startUs;
65✔
1926

1927
  STMT_RET(code);
65!
1928
}
1929

1930
int stmtClose2(TAOS_STMT2* stmt) {
76✔
1931
  STscStmt2* pStmt = (STscStmt2*)stmt;
76✔
1932

1933
  STMT_DLOG_E("start to free stmt");
76!
1934

1935
  if (pStmt->bindThreadInUse) {
76✔
1936
    pStmt->queue.stopQueue = true;
16✔
1937

1938
    (void)taosThreadMutexLock(&pStmt->queue.mutex);
16✔
1939
    (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
16✔
1940
    (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
16✔
1941
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
16✔
1942

1943
    (void)taosThreadJoin(pStmt->bindThread, NULL);
16✔
1944
    pStmt->bindThreadInUse = false;
16✔
1945

1946
    (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
16✔
1947
    (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
16✔
1948
  }
1949

1950
  TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
76!
1951
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
76!
1952
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
1953
  }
1954
  TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
76!
1955

1956
  (void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond);
76✔
1957
  (void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex);
76✔
1958

1959
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
76!
1960
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
4!
1961
      tscError("failed to wait asyncExecSem");
×
1962
    }
1963
  }
1964

1965
  STMT_DLOG("stmt %p closed, stbInterlaceMode:%d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
76!
1966
            ", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
1967
            ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
1968
            ", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64
1969
            ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64,
1970
            pStmt, pStmt->sql.stbInterlaceMode, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo,
1971
            pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum, pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND],
1972
            pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE], pStmt->stat.setTbNameUs,
1973
            pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
1974
            pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
1975

1976
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
76!
1977

1978
  if (pStmt->options.asyncExecFn) {
76✔
1979
    if (tsem_destroy(&pStmt->asyncExecSem) != 0) {
4!
1980
      tscError("failed to destroy asyncExecSem");
×
1981
    }
1982
  }
1983
  taosMemoryFree(stmt);
76!
1984

1985
  return TSDB_CODE_SUCCESS;
76✔
1986
}
1987

1988
const char* stmtErrstr2(TAOS_STMT2* stmt) {
3✔
1989
  STscStmt2* pStmt = (STscStmt2*)stmt;
3✔
1990

1991
  if (stmt == NULL || NULL == pStmt->exec.pRequest) {
3!
1992
    return (char*)tstrerror(terrno);
3✔
1993
  }
1994

1995
  pStmt->exec.pRequest->code = terrno;
×
1996

1997
  return taos_errstr(pStmt->exec.pRequest);
×
1998
}
1999
/*
2000
int stmtAffectedRows(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->affectedRows; }
2001

2002
int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->exec.affectedRows; }
2003
*/
2004

2005
int stmtParseColFields2(TAOS_STMT2* stmt) {
45✔
2006
  int32_t    code = 0;
45✔
2007
  STscStmt2* pStmt = (STscStmt2*)stmt;
45✔
2008
  int32_t    preCode = pStmt->errCode;
45✔
2009

2010
  STMT_DLOG_E("start to get col fields");
45!
2011

2012
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
45!
2013
    return pStmt->errCode;
×
2014
  }
2015

2016
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
45!
2017
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2018
  }
2019

2020
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
45!
2021

2022
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
45!
2023
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
3!
2024
    pStmt->bInfo.needParse = false;
×
2025
  }
2026
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
45✔
2027
    pStmt->bInfo.needParse = false;
6✔
2028
  }
2029
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
45!
2030
    taos_free_result(pStmt->exec.pRequest);
×
2031
    pStmt->exec.pRequest = NULL;
×
2032
    STMT_ERRI_JRET(stmtCreateRequest(pStmt));
×
2033
  }
2034

2035
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
45!
2036

2037
  if (pStmt->bInfo.needParse) {
45✔
2038
    STMT_ERRI_JRET(stmtParseSql(pStmt));
39✔
2039
  }
2040

2041
_return:
30✔
2042
  // compatible with previous versions
2043
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST && (pStmt->bInfo.tbNameFlag & NO_DATA_USING_CLAUSE) == 0x0) {
45!
2044
    code = TSDB_CODE_TSC_STMT_TBNAME_ERROR;
1✔
2045
  }
2046

2047
  pStmt->errCode = preCode;
45✔
2048

2049
  return code;
45✔
2050
}
2051

2052
int stmtGetStbColFields2(TAOS_STMT2* stmt, int* nums, TAOS_FIELD_ALL** fields) {
45✔
2053
  int32_t code = stmtParseColFields2(stmt);
45✔
2054
  if (code != TSDB_CODE_SUCCESS) {
45✔
2055
    return code;
15✔
2056
  }
2057

2058
  return stmtFetchStbColFields2(stmt, nums, fields);
30✔
2059
}
2060

2061
int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) {
9✔
2062
  int32_t    code = 0;
9✔
2063
  STscStmt2* pStmt = (STscStmt2*)stmt;
9✔
2064
  int32_t    preCode = pStmt->errCode;
9✔
2065

2066
  STMT_DLOG_E("start to get param num");
9!
2067

2068
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
9!
2069
    return pStmt->errCode;
×
2070
  }
2071

2072
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
9!
2073

2074
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
9!
2075
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
2076
    pStmt->bInfo.needParse = false;
×
2077
  }
2078

2079
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
9!
2080
    taos_free_result(pStmt->exec.pRequest);
×
2081
    pStmt->exec.pRequest = NULL;
×
2082
  }
2083

2084
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
9!
2085

2086
  if (pStmt->bInfo.needParse) {
9!
2087
    STMT_ERRI_JRET(stmtParseSql(pStmt));
9✔
2088
  }
2089

2090
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
2!
2091
    *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
2✔
2092
  } else {
2093
    STMT_ERRI_JRET(stmtFetchColFields2(stmt, nums, NULL));
×
2094
  }
2095

2096
_return:
×
2097

2098
  pStmt->errCode = preCode;
9✔
2099

2100
  return code;
9✔
2101
}
2102

2103
TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) {
3✔
2104
  STscStmt2* pStmt = (STscStmt2*)stmt;
3✔
2105

2106
  STMT_DLOG_E("start to use result");
3!
2107

2108
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
3!
2109
    tscError("useResult only for query statement");
×
2110
    return NULL;
×
2111
  }
2112

2113
  return pStmt->exec.pRequest;
3✔
2114
}
2115

2116
int32_t stmtAsyncBindThreadFunc(void* args) {
×
2117
  qInfo("async stmt bind thread started");
×
2118

2119
  ThreadArgs* targs = (ThreadArgs*)args;
×
2120
  STscStmt2*  pStmt = (STscStmt2*)targs->stmt;
×
2121

2122
  int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx);
×
2123
  targs->fp(targs->param, NULL, code);
×
2124
  (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
×
2125
  (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
×
2126
  (void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
×
2127
  (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
×
2128
  taosMemoryFree(args);
×
2129

2130
  qInfo("async stmt bind thread stopped");
×
2131

2132
  return code;
×
2133
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc