• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5044

06 May 2026 02:35AM UTC coverage: 73.169% (+0.06%) from 73.107%
#5044

push

travis-ci

web-flow
feat: [6659794715] cpu limit (#35153)

244 of 275 new or added lines in 23 files covered. (88.73%)

526 existing lines in 141 files now uncovered.

277745 of 379596 relevant lines covered (73.17%)

133740972.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.39
/source/client/src/clientStmt2.c
1
#include "clientInt.h"
2
#include "clientLog.h"
3
#include "tdef.h"
4
#include "tglobal.h"
5

6
#include "clientStmt.h"
7
#include "clientStmt2.h"
8
#include "tencode.h"
9
#include "tmsg.h"
10
#include "tname.h"
11
#include "trow.h"
12

13
char* gStmt2StatusStr[] = {"unknown",     "init", "prepare", "settbname", "settags",
14
                           "fetchFields", "bind", "bindCol", "addBatch",  "exec"};
15

16

17
static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pBuf) {
18
  if (pTblBuf->buffOffset < pTblBuf->buffSize) {
1,391,366✔
19
    *pBuf = (char*)pTblBuf->pCurBuff + pTblBuf->buffOffset;
1,391,704✔
20
    pTblBuf->buffOffset += pTblBuf->buffUnit;
1,391,837✔
21
  } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) {
×
22
    pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++);
×
23
    if (NULL == pTblBuf->pCurBuff) {
×
24
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
25
    }
26
    *pBuf = pTblBuf->pCurBuff;
×
27
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
28
  } else {
29
    void* buff = taosMemoryMalloc(pTblBuf->buffSize);
×
30
    if (NULL == buff) {
×
31
      return terrno;
×
32
    }
33

34
    if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
×
35
      return terrno;
×
36
    }
37

38
    pTblBuf->buffIdx++;
×
39
    pTblBuf->pCurBuff = buff;
×
40
    *pBuf = buff;
×
41
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
42
  }
43

44
  return TSDB_CODE_SUCCESS;
1,391,413✔
45
}
46

47
static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
1,391,976✔
48
  int i = 0;
1,391,976✔
49
  while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
6,542,285✔
50
    if (pStmt->queue.stopQueue) {
5,269,854✔
51
      return false;
119,147✔
52
    }
53
    if (i < 10) {
5,150,861✔
54
      taosUsleep(1);
4,819,499✔
55
      i++;
4,818,780✔
56
    } else {
57
      (void)taosThreadMutexLock(&pStmt->queue.mutex);
331,362✔
58
      if (pStmt->queue.stopQueue) {
331,629✔
59
        (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
×
60
        return false;
×
61
      }
62
      if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
331,629✔
63
        (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
331,465✔
64
      }
65
      (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
331,517✔
66
    }
67
  }
68

69
  if (pStmt->queue.stopQueue && 0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
1,272,193✔
70
    return false;
×
71
  }
72

73
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
1,272,193✔
74
  if (pStmt->queue.head == pStmt->queue.tail) {
1,272,789✔
75
    pStmt->queue.qRemainNum = 0;
×
76
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
×
77
    STMT2_ELOG_E("interlace queue is empty, cannot dequeue");
×
78
    return false;
×
79
  }
80

81
  SStmtQNode* node = pStmt->queue.head->next;
1,272,738✔
82
  pStmt->queue.head->next = node->next;
1,272,812✔
83
  if (pStmt->queue.tail == node) {
1,272,775✔
84
    pStmt->queue.tail = pStmt->queue.head;
664,120✔
85
  }
86
  node->next = NULL;
1,272,775✔
87
  *param = node;
1,272,775✔
88

89
  (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
1,272,775✔
90
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
1,272,756✔
91

92
  STMT2_TLOG("dequeue success, node:%p, remainNum:%" PRId64, node, pStmt->queue.qRemainNum);
1,272,728✔
93

94
  return true;
1,272,719✔
95
}
96

97
static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
1,272,391✔
98
  if (param == NULL) {
1,272,391✔
99
    STMT2_ELOG_E("enqueue param is NULL");
×
100
    return;
×
101
  }
102

103
  param->next = NULL;
1,272,391✔
104

105
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
1,272,431✔
106

107
  pStmt->queue.tail->next = param;
1,272,765✔
108
  pStmt->queue.tail = param;
1,272,765✔
109
  pStmt->stat.bindDataNum++;
1,272,765✔
110

111
  (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
1,272,653✔
112
  (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
1,272,788✔
113

114
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
1,272,623✔
115

116
  STMT2_TLOG("enqueue param:%p, remainNum:%" PRId64 ", restoreTbCols:%d", param, pStmt->queue.qRemainNum,
1,272,635✔
117
             param->restoreTbCols);
118
}
119

120
static int32_t stmtCreateRequest(STscStmt2* pStmt) {
51,931,351✔
121
  int32_t code = 0;
51,931,351✔
122

123
  if (pStmt->exec.pRequest == NULL) {
51,931,351✔
124
    code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
137,792✔
125
                        pStmt->reqid);
126
    if (pStmt->reqid != 0) {
137,822✔
127
      pStmt->reqid++;
8✔
128
    }
129
    pStmt->exec.pRequest->type = RES_TYPE__QUERY;
137,822✔
130
    if (pStmt->db != NULL) {
137,822✔
131
      taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
132,462✔
132
      pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
132,462✔
133
    }
134
    if (TSDB_CODE_SUCCESS == code) {
137,822✔
135
      pStmt->exec.pRequest->syncQuery = true;
137,822✔
136
      pStmt->exec.pRequest->stmtBindVersion = 2;
137,792✔
137
    }
138
    STMT2_DLOG("create request:0x%" PRIx64 ", QID:0x%" PRIx64, pStmt->exec.pRequest->self,
137,792✔
139
               pStmt->exec.pRequest->requestId);
140
  }
141

142
  return code;
52,577,123✔
143
}
144

145
static int32_t stmtSwitchStatus(STscStmt2* pStmt, STMT_STATUS newStatus) {
102,340,092✔
146
  int32_t code = 0;
102,340,092✔
147

148
  if (newStatus >= STMT_INIT && newStatus < STMT_MAX) {
102,340,092✔
149
    STMT2_LOG_SEQ(newStatus);
103,550,389✔
150
  }
151

152
  if (pStmt->errCode && newStatus != STMT_PREPARE) {
103,277,185✔
153
    STMT2_ELOG("stmt already failed with err:%s, please use stmt prepare", tstrerror(pStmt->errCode));
×
154
    return pStmt->errCode;
×
155
  }
156

157
  switch (newStatus) {
102,714,943✔
158
    case STMT_PREPARE:
131,330✔
159
      pStmt->errCode = 0;
131,330✔
160
      break;
90,922✔
161
    case STMT_SETTBNAME:
882,443✔
162
      if (STMT_STATUS_EQ(INIT)) {
882,443✔
163
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
164
      }
165
      if (!pStmt->sql.stbInterlaceMode && (STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL))) {
882,489✔
166
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
167
      }
168
      break;
882,503✔
169
    case STMT_SETTAGS:
564,239✔
170
      if (STMT_STATUS_EQ(INIT)) {
564,239✔
171
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
172
      }
173
      break;
564,239✔
174
    case STMT_FETCH_FIELDS:
9,177✔
175
      if (STMT_STATUS_EQ(INIT)) {
9,177✔
176
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
177
      }
178
      break;
9,177✔
179
    case STMT_BIND:
50,513,053✔
180
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
50,513,053✔
181
        code = TSDB_CODE_TSC_STMT_API_ERROR;
100✔
182
      }
183
      /*
184
            if ((pStmt->sql.type == STMT_TYPE_MULTI_INSERT) && ()) {
185
              code = TSDB_CODE_TSC_STMT_API_ERROR;
186
            }
187
      */
188
      break;
51,219,680✔
189
    case STMT_BIND_COL:
×
190
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
×
191
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
192
      }
193
      break;
×
194
    case STMT_ADD_BATCH:
50,087,325✔
195
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
50,087,325✔
196
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
197
      }
198
      break;
50,332,235✔
199
    case STMT_EXECUTE:
527,376✔
200
      if (STMT_TYPE_QUERY == pStmt->sql.type) {
527,376✔
201
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
7,103✔
202
            STMT_STATUS_NE(BIND_COL)) {
412✔
203
          code = TSDB_CODE_TSC_STMT_API_ERROR;
412✔
204
        }
205
      } else {
206
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
520,233✔
207
          code = TSDB_CODE_TSC_STMT_API_ERROR;
×
208
        }
209
      }
210
      break;
527,336✔
211
    default:
×
212
      code = TSDB_CODE_APP_ERROR;
×
213
      break;
×
214
  }
215

216
  STMT_ERR_RET(code);
102,801,137✔
217

218
  pStmt->sql.status = newStatus;
102,800,625✔
219

220
  return TSDB_CODE_SUCCESS;
103,365,618✔
221
}
222

223
static int32_t stmtGetTbName(TAOS_STMT2* stmt, char** tbName) {
15,746✔
224
  STscStmt2* pStmt = (STscStmt2*)stmt;
15,746✔
225

226
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
15,746✔
227

228
  if ('\0' == pStmt->bInfo.tbName[0]) {
15,746✔
229
    tscWarn("no table name set, OK if it is a stmt get fields");
4,417✔
230
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
4,417✔
231
  }
232

233
  *tbName = pStmt->bInfo.tbName;
11,329✔
234

235
  return TSDB_CODE_SUCCESS;
11,294✔
236
}
237

238
static int32_t stmtUpdateBindInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SSHashObj** cols, SName* tbName,
123,653✔
239
                                  const char* sTableName, bool autoCreateTbl, int8_t tbNameFlag) {
240
  STscStmt2* pStmt = (STscStmt2*)stmt;
123,653✔
241
  char       tbFName[TSDB_TABLE_FNAME_LEN];
99,324✔
242
  int32_t    code = tNameExtractFullName(tbName, tbFName);
123,653✔
243
  if (code != 0) {
123,666✔
244
    return code;
×
245
  }
246

247
  if ((tags != NULL && ((SBoundColInfo*)tags)->numOfCols == 0) || !autoCreateTbl) {
123,666✔
248
    pStmt->sql.autoCreateTbl = false;
112,429✔
249
  }
250

251
  (void)memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
123,657✔
252
  tstrncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName));
123,657✔
253
  pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
123,609✔
254

255
  pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
123,653✔
256
  pStmt->bInfo.tbSuid = pTableMeta->suid;
123,661✔
257
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
123,584✔
258
  pStmt->bInfo.tbType = pTableMeta->tableType;
123,630✔
259

260
  if (!pStmt->bInfo.tagsCached) {
123,653✔
261
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
123,009✔
262
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
123,045✔
263
  }
264

265
  // transfer ownership of cols to stmt
266
  if (cols) {
123,691✔
267
    pStmt->bInfo.fixedValueCols = *cols;
123,423✔
268
    *cols = NULL;
123,402✔
269
  }
270

271
  pStmt->bInfo.boundTags = tags;
123,666✔
272
  pStmt->bInfo.tagsCached = false;
123,421✔
273
  pStmt->bInfo.tbNameFlag = tbNameFlag;
123,357✔
274
  tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
123,394✔
275

276
  if (pTableMeta->tableType != TSDB_CHILD_TABLE && pTableMeta->tableType != TSDB_SUPER_TABLE) {
123,350✔
277
    pStmt->sql.stbInterlaceMode = false;
2,606✔
278
  }
279

280
  return TSDB_CODE_SUCCESS;
123,378✔
281
}
282

283
static int32_t stmtUpdateExecInfo(TAOS_STMT2* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
123,369✔
284
  STscStmt2* pStmt = (STscStmt2*)stmt;
123,369✔
285

286
  pStmt->sql.pVgHash = pVgHash;
123,369✔
287
  pStmt->exec.pBlockHash = pBlockHash;
123,628✔
288

289
  return TSDB_CODE_SUCCESS;
123,402✔
290
}
291

292
static int32_t stmtUpdateInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SSHashObj** cols, SName* tbName,
123,651✔
293
                              bool autoCreateTbl, SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName,
294
                              uint8_t tbNameFlag) {
295
  STscStmt2* pStmt = (STscStmt2*)stmt;
123,651✔
296

297
  STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, cols, tbName, sTableName, autoCreateTbl, tbNameFlag));
123,651✔
298
  STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
123,353✔
299

300
  pStmt->sql.autoCreateTbl = autoCreateTbl;
123,384✔
301

302
  return TSDB_CODE_SUCCESS;
123,632✔
303
}
304

305
static int32_t stmtGetExecInfo(TAOS_STMT2* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
1,817✔
306
  STscStmt2* pStmt = (STscStmt2*)stmt;
1,817✔
307

308
  *pVgHash = pStmt->sql.pVgHash;
1,817✔
309
  pStmt->sql.pVgHash = NULL;
1,817✔
310

311
  *pBlockHash = pStmt->exec.pBlockHash;
1,817✔
312
  pStmt->exec.pBlockHash = NULL;
1,817✔
313

314
  return TSDB_CODE_SUCCESS;
1,817✔
315
}
316

317
static int32_t stmtParseSql(STscStmt2* pStmt) {
131,631✔
318
  pStmt->exec.pCurrBlock = NULL;
131,631✔
319

320
  SStmtCallback stmtCb = {
131,847✔
321
      .pStmt = pStmt,
322
      .getTbNameFn = stmtGetTbName,
323
      .setInfoFn = stmtUpdateInfo,
324
      .getExecInfoFn = stmtGetExecInfo,
325
  };
326

327
  STMT_ERR_RET(stmtCreateRequest(pStmt));
131,847✔
328
  pStmt->exec.pRequest->stmtBindVersion = 2;
131,847✔
329

330
  pStmt->stat.parseSqlNum++;
131,761✔
331

332
  STMT2_DLOG("start to parse, QID:0x%" PRIx64, pStmt->exec.pRequest->requestId);
131,831✔
333
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
131,831✔
334

335
  pStmt->sql.siInfo.pQuery = pStmt->sql.pQuery;
129,588✔
336

337
  pStmt->bInfo.needParse = false;
129,856✔
338

339
  if (pStmt->sql.type == 0) {
129,450✔
340
    if (pStmt->sql.pQuery->pRoot && LEGAL_INSERT(nodeType(pStmt->sql.pQuery->pRoot))) {
114,380✔
341
      pStmt->sql.type = STMT_TYPE_INSERT;
108,225✔
342
      pStmt->sql.stbInterlaceMode = false;
108,306✔
343
    } else if (pStmt->sql.pQuery->pPrepareRoot && LEGAL_SELECT(nodeType(pStmt->sql.pQuery->pPrepareRoot))) {
6,178✔
344
      pStmt->sql.type = STMT_TYPE_QUERY;
5,791✔
345
      pStmt->sql.stbInterlaceMode = false;
5,791✔
346

347
      return TSDB_CODE_SUCCESS;
5,791✔
348
    } else {
349
      STMT2_ELOG_E("only support select or insert sql");
412✔
350
      if (pStmt->exec.pRequest->msgBuf) {
412✔
351
        tstrncpy(pStmt->exec.pRequest->msgBuf, "stmt only support select or insert", pStmt->exec.pRequest->msgBufLen);
412✔
352
      }
353
      return TSDB_CODE_PAR_SYNTAX_ERROR;
412✔
354
    }
355
  } else if (pStmt->sql.type == STMT_TYPE_QUERY) {
15,066✔
356
    pStmt->sql.stbInterlaceMode = false;
×
357
    return TSDB_CODE_SUCCESS;
×
358
  } else if (pStmt->sql.type == STMT_TYPE_INSERT) {
15,106✔
359
    pStmt->sql.stbInterlaceMode = false;
×
360
  }
361

362
  STableDataCxt** pSrc =
363
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
123,400✔
364
  if (NULL == pSrc || NULL == *pSrc) {
123,682✔
365
    STMT2_ELOG("fail to get exec.pBlockHash, maybe parse failed, tbFName:%s", pStmt->bInfo.tbFName);
×
366
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
367
  }
368

369
  STableDataCxt* pTableCtx = *pSrc;
123,452✔
370
  if (pStmt->sql.stbInterlaceMode && pTableCtx->pData->pCreateTbReq && (pStmt->bInfo.tbNameFlag & USING_CLAUSE) == 0) {
123,425✔
371
    STMT2_TLOG("destroy pCreateTbReq for no-using insert, tbFName:%s", pStmt->bInfo.tbFName);
2,100✔
372
    tdDestroySVCreateTbReq(pTableCtx->pData->pCreateTbReq);
2,100✔
373
    taosMemoryFreeClear(pTableCtx->pData->pCreateTbReq);
2,100✔
374
    pTableCtx->pData->pCreateTbReq = NULL;
2,100✔
375
  }
376
  // if (pStmt->sql.stbInterlaceMode) {
377
  //   int16_t lastIdx = -1;
378

379
  //   for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) {
380
  //     if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) {
381
  //       pStmt->sql.stbInterlaceMode = false;
382
  //       break;
383
  //     }
384

385
  //     lastIdx = pTableCtx->boundColsInfo.pColIndex[i];
386
  //   }
387
  // }
388

389
  if (NULL == pStmt->sql.pBindInfo) {
123,402✔
390
    pStmt->sql.pBindInfo = taosMemoryMalloc(pTableCtx->boundColsInfo.numOfBound * sizeof(*pStmt->sql.pBindInfo));
121,767✔
391
    if (NULL == pStmt->sql.pBindInfo) {
121,585✔
392
      STMT2_ELOG_E("fail to malloc pBindInfo");
×
393
      return terrno;
×
394
    }
395
  }
396

397
  return TSDB_CODE_SUCCESS;
123,425✔
398
}
399

400
static int32_t stmtPrintBindv(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bindv, int32_t col_idx, bool isTags) {
×
401
  STscStmt2* pStmt = (STscStmt2*)stmt;
×
402
  int32_t    count = 0;
×
403
  int32_t    code = 0;
×
404

405
  if (bindv == NULL) {
×
406
    STMT2_TLOG("bindv is NULL, col_idx:%d, isTags:%d", col_idx, isTags);
×
407
    return TSDB_CODE_SUCCESS;
×
408
  }
409

410
  if (col_idx >= 0) {
×
411
    count = 1;
×
412
    STMT2_TLOG("single col bind, col_idx:%d", col_idx);
×
413
  } else {
414
    if (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type ||
×
415
        (pStmt->sql.type == 0 && stmt2IsInsert(stmt))) {
×
416
      if (pStmt->sql.placeholderOfTags == 0 && pStmt->sql.placeholderOfCols == 0) {
×
417
        code = stmtGetStbColFields2(pStmt, NULL, NULL);
×
418
        if (code != TSDB_CODE_SUCCESS) {
×
419
          return code;
×
420
        }
421
      }
422
      if (isTags) {
×
423
        count = pStmt->sql.placeholderOfTags;
×
424
        STMT2_TLOG("print tags bindv, cols:%d", count);
×
425
      } else {
426
        count = pStmt->sql.placeholderOfCols;
×
427
        STMT2_TLOG("print cols bindv, cols:%d", count);
×
428
      }
429
    } else if (STMT_TYPE_QUERY == pStmt->sql.type || (pStmt->sql.type == 0 && stmt2IsSelect(stmt))) {
×
430
      count = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
×
431
      STMT2_TLOG("print query bindv, cols:%d", count);
×
432
    }
433
  }
434

435
  if (code != TSDB_CODE_SUCCESS) {
×
436
    STMT2_ELOG("failed to get param count, code:%d", code);
×
437
    return code;
×
438
  }
439

440
  for (int i = 0; i < count; i++) {
×
441
    int32_t type = bindv[i].buffer_type;
×
442
    int32_t num = bindv[i].num;
×
443
    char*   current_buf = (char*)bindv[i].buffer;
×
444

445
    for (int j = 0; j < num; j++) {
×
446
      char    buf[256] = {0};
×
447
      int32_t len = 0;
×
448
      bool    isNull = (bindv[i].is_null && bindv[i].is_null[j]);
×
449

450
      if (IS_VAR_DATA_TYPE(type) && bindv[i].length) {
×
451
        len = bindv[i].length[j];
×
452
      } else {
453
        len = tDataTypes[type].bytes;
×
454
      }
455

456
      if (isNull) {
×
457
        snprintf(buf, sizeof(buf), "NULL");
×
458
      } else {
459
        if (current_buf == NULL) {
×
460
          snprintf(buf, sizeof(buf), "NULL(Buf)");
×
461
        } else {
462
          switch (type) {
×
463
            case TSDB_DATA_TYPE_BOOL:
×
464
              snprintf(buf, sizeof(buf), "%d", *(int8_t*)current_buf);
×
465
              break;
×
466
            case TSDB_DATA_TYPE_TINYINT:
×
467
              snprintf(buf, sizeof(buf), "%d", *(int8_t*)current_buf);
×
468
              break;
×
469
            case TSDB_DATA_TYPE_SMALLINT:
×
470
              snprintf(buf, sizeof(buf), "%d", *(int16_t*)current_buf);
×
471
              break;
×
472
            case TSDB_DATA_TYPE_INT:
×
473
              snprintf(buf, sizeof(buf), "%d", *(int32_t*)current_buf);
×
474
              break;
×
475
            case TSDB_DATA_TYPE_BIGINT:
×
476
              snprintf(buf, sizeof(buf), "%" PRId64, *(int64_t*)current_buf);
×
477
              break;
×
478
            case TSDB_DATA_TYPE_FLOAT:
×
479
              snprintf(buf, sizeof(buf), "%f", *(float*)current_buf);
×
480
              break;
×
481
            case TSDB_DATA_TYPE_DOUBLE:
×
482
              snprintf(buf, sizeof(buf), "%f", *(double*)current_buf);
×
483
              break;
×
484
            case TSDB_DATA_TYPE_BINARY:
×
485
            case TSDB_DATA_TYPE_NCHAR:
486
            case TSDB_DATA_TYPE_GEOMETRY:
487
            case TSDB_DATA_TYPE_VARBINARY:
488
              snprintf(buf, sizeof(buf), "len:%d, val:%.*s", len, len, current_buf);
×
489
              break;
×
490
            case TSDB_DATA_TYPE_TIMESTAMP:
×
491
              snprintf(buf, sizeof(buf), "%" PRId64, *(int64_t*)current_buf);
×
492
              break;
×
493
            case TSDB_DATA_TYPE_UTINYINT:
×
494
              snprintf(buf, sizeof(buf), "%u", *(uint8_t*)current_buf);
×
495
              break;
×
496
            case TSDB_DATA_TYPE_USMALLINT:
×
497
              snprintf(buf, sizeof(buf), "%u", *(uint16_t*)current_buf);
×
498
              break;
×
499
            case TSDB_DATA_TYPE_UINT:
×
500
              snprintf(buf, sizeof(buf), "%u", *(uint32_t*)current_buf);
×
501
              break;
×
502
            case TSDB_DATA_TYPE_UBIGINT:
×
503
              snprintf(buf, sizeof(buf), "%" PRIu64, *(uint64_t*)current_buf);
×
504
              break;
×
505
            default:
×
506
              snprintf(buf, sizeof(buf), "UnknownType:%d", type);
×
507
              break;
×
508
          }
509
        }
510
      }
511

512
      STMT2_TLOG("bindv[%d] row[%d]: type:%s, val:%s", i, j, tDataTypes[type].name, buf);
×
513

514
      if (!isNull && current_buf) {
×
515
        current_buf += len;
×
516
      }
517
    }
518
  }
519

520
  return TSDB_CODE_SUCCESS;
×
521
}
522

523
static void resetRequest(STscStmt2* pStmt) {
247,480✔
524
  if (pStmt->exec.pRequest) {
247,480✔
525
    taos_free_result(pStmt->exec.pRequest);
135,299✔
526
    pStmt->exec.pRequest = NULL;
135,299✔
527
  }
528
  pStmt->asyncResultAvailable = false;
247,503✔
529
}
247,480✔
530

531
static int32_t stmtCleanBindInfo(STscStmt2* pStmt) {
672,182✔
532
  pStmt->bInfo.tbUid = 0;
672,182✔
533
  pStmt->bInfo.tbVgId = -1;
672,182✔
534
  pStmt->bInfo.tbType = 0;
672,136✔
535
  pStmt->bInfo.needParse = true;
672,205✔
536
  pStmt->bInfo.inExecCache = false;
672,180✔
537

538
  pStmt->bInfo.tbName[0] = 0;
672,205✔
539
  pStmt->bInfo.tbFName[0] = 0;
672,203✔
540
  if (!pStmt->bInfo.tagsCached) {
672,176✔
541
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
475,923✔
542
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
475,882✔
543
    pStmt->bInfo.boundTags = NULL;
475,910✔
544
  }
545

546
  if (!pStmt->bInfo.boundColsCached) {
672,161✔
547
    tSimpleHashCleanup(pStmt->bInfo.fixedValueCols);
266,777✔
548
    pStmt->bInfo.fixedValueCols = NULL;
266,777✔
549
  }
550

551
  if (!pStmt->sql.autoCreateTbl) {
672,146✔
552
    pStmt->bInfo.stbFName[0] = 0;
466,723✔
553
    pStmt->bInfo.tbSuid = 0;
466,746✔
554
  }
555

556
  STMT2_TLOG("finish clean bind info, tagsCached:%d, autoCreateTbl:%d", pStmt->bInfo.tagsCached,
672,117✔
557
             pStmt->sql.autoCreateTbl);
558

559
  return TSDB_CODE_SUCCESS;
672,111✔
560
}
561

562
static void stmtFreeTableBlkList(STableColsData* pTb) {
×
563
  (void)qResetStmtColumns(pTb->aCol, true);
×
564
  taosArrayDestroy(pTb->aCol);
×
565
}
×
566

567
static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
405,110✔
568
  pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0);
405,110✔
569
  if (NULL == pTblBuf->pCurBuff) {
405,454✔
570
    tscError("QInfo:%p, fail to get buffer from list", pTblBuf);
83✔
571
    return;
×
572
  }
573
  pTblBuf->buffIdx = 1;
405,371✔
574
  pTblBuf->buffOffset = sizeof(*pQueue->head);
405,371✔
575

576
  pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
405,371✔
577
  pQueue->qRemainNum = 0;
405,348✔
578
  pQueue->head->next = NULL;
405,348✔
579
}
580

581
static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
657,696✔
582
  if (pStmt->sql.stbInterlaceMode) {
657,696✔
583
    if (deepClean) {
413,659✔
584
      taosHashCleanup(pStmt->exec.pBlockHash);
8,409✔
585
      pStmt->exec.pBlockHash = NULL;
8,409✔
586

587
      if (NULL != pStmt->exec.pCurrBlock) {
8,409✔
588
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->boundColsInfo.pColIndex);
7,309✔
589
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->pData);
7,309✔
590
        qDestroyStmtDataBlock(pStmt->exec.pCurrBlock);
7,309✔
591
        pStmt->exec.pCurrBlock = NULL;
7,309✔
592
      }
593
      if (STMT_TYPE_QUERY != pStmt->sql.type) {
8,409✔
594
        resetRequest(pStmt);
8,409✔
595
      }
596
    } else {
597
      pStmt->sql.siInfo.pTableColsIdx = 0;
405,250✔
598
      stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue);
405,241✔
599
      tSimpleHashClear(pStmt->sql.siInfo.pTableRowDataHash);
405,382✔
600
    }
601
    if (NULL != pStmt->exec.pRequest) {
413,897✔
602
      pStmt->exec.pRequest->body.resInfo.numOfRows = 0;
405,488✔
603
    }
604
  } else {
605
    if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
244,031✔
606
      resetRequest(pStmt);
237,378✔
607
    }
608

609
    size_t keyLen = 0;
244,127✔
610
    void*  pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
244,127✔
611
    while (pIter) {
478,077✔
612
      STableDataCxt* pBlocks = *(STableDataCxt**)pIter;
233,927✔
613
      char*          key = taosHashGetKey(pIter, &keyLen);
233,927✔
614
      STableMeta*    pMeta = qGetTableMetaInDataBlock(pBlocks);
233,918✔
615

616
      if (keepTable && pBlocks == pStmt->exec.pCurrBlock) {
233,927✔
617
        TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData);
114,638✔
618
        STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
134,199✔
619

620
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
114,600✔
621
        continue;
114,638✔
622
      }
623

624
      qDestroyStmtDataBlock(pBlocks);
119,289✔
625
      STMT_ERR_RET(taosHashRemove(pStmt->exec.pBlockHash, key, keyLen));
119,289✔
626

627
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
119,289✔
628
    }
629

630
    if (keepTable) {
244,150✔
631
      STMT2_TLOG("finish clean exec info, stbInterlaceMode:%d, keepTable:%d, deepClean:%d", pStmt->sql.stbInterlaceMode,
121,329✔
632
                 keepTable, deepClean);
633
      return TSDB_CODE_SUCCESS;
121,329✔
634
    }
635

636
    taosHashCleanup(pStmt->exec.pBlockHash);
122,821✔
637
    pStmt->exec.pBlockHash = NULL;
122,821✔
638

639
    tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
122,821✔
640
    taosMemoryFreeClear(pStmt->exec.pCurrTbData);
122,821✔
641
  }
642

643
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
536,718✔
644
  STMT2_TLOG("finish clean exec info, stbInterlaceMode:%d, keepTable:%d, deepClean:%d", pStmt->sql.stbInterlaceMode,
536,540✔
645
             keepTable, deepClean);
646

647
  return TSDB_CODE_SUCCESS;
536,552✔
648
}
649

650
static void stmtFreeSingleVgDataBlock(void* p) {
562,045✔
651
  SVgDataBlocks* pVg = *(SVgDataBlocks**)p;
562,045✔
652
  if (pVg) {
562,080✔
653
    taosMemoryFree(pVg->pData);
562,210✔
654
    taosMemoryFree(pVg);
562,284✔
655
  }
656
}
562,130✔
657

658
static void stmtFreeVgDataBlocksForRetry(STscStmt2* pStmt) {
651,054✔
659
  if (pStmt->pVgDataBlocksForRetry) {
651,054✔
660
    taosArrayDestroyEx(pStmt->pVgDataBlocksForRetry, stmtFreeSingleVgDataBlock);
519,595✔
661
    pStmt->pVgDataBlocksForRetry = NULL;
519,657✔
662
  }
663
}
651,415✔
664

665
static int32_t stmtSaveVgDataBlocksForRetry(STscStmt2* pStmt) {
519,728✔
666
  stmtFreeVgDataBlocksForRetry(pStmt);
519,728✔
667

668
  SVnodeModifyOpStmt* pModif = (SVnodeModifyOpStmt*)pStmt->sql.pQuery->pRoot;
520,303✔
669
  if (!pModif || !pModif->pDataBlocks || taosArrayGetSize(pModif->pDataBlocks) == 0) {
519,987✔
670
    return TSDB_CODE_SUCCESS;
×
671
  }
672

673
  int32_t num = taosArrayGetSize(pModif->pDataBlocks);
520,021✔
674
  pStmt->pVgDataBlocksForRetry = taosArrayInit(num, POINTER_BYTES);
520,303✔
675
  if (!pStmt->pVgDataBlocksForRetry) {
520,232✔
676
    return terrno;
×
677
  }
678

679
  for (int32_t i = 0; i < num; i++) {
1,082,948✔
680
    SVgDataBlocks* pSrc = taosArrayGetP(pModif->pDataBlocks, i);
562,332✔
681
    SVgDataBlocks* pDst = taosMemoryMalloc(sizeof(SVgDataBlocks));
562,971✔
682
    if (!pDst) {
562,626✔
683
      stmtFreeVgDataBlocksForRetry(pStmt);
×
684
      return terrno;
×
685
    }
686
    *pDst = *pSrc;
562,626✔
687
    pDst->pData = taosMemoryMalloc(pSrc->size);
562,626✔
688
    if (!pDst->pData) {
562,701✔
689
      taosMemoryFree(pDst);
×
690
      stmtFreeVgDataBlocksForRetry(pStmt);
×
691
      return terrno;
×
692
    }
693
    (void)memcpy(pDst->pData, pSrc->pData, pSrc->size);
562,664✔
694
    if (NULL == taosArrayPush(pStmt->pVgDataBlocksForRetry, &pDst)) {
1,125,604✔
695
      taosMemoryFree(pDst->pData);
×
696
      taosMemoryFree(pDst);
×
697
      stmtFreeVgDataBlocksForRetry(pStmt);
×
698
      return terrno;
×
699
    }
700
  }
701
  return TSDB_CODE_SUCCESS;
520,616✔
702
}
703

704
static int32_t stmtRestoreVgDataBlocksForRetry(STscStmt2* pStmt) {
300✔
705
  SVnodeModifyOpStmt* pModif = (SVnodeModifyOpStmt*)pStmt->sql.pQuery->pRoot;
300✔
706
  if (!pModif || !pStmt->pVgDataBlocksForRetry) {
300✔
707
    return TSDB_CODE_SUCCESS;
×
708
  }
709
  // The planner owns pDataBlocks after createQueryPlan (via TSWAP); it has already freed
710
  // the old array. We simply restore a new clone here.
711
  pModif->pDataBlocks = pStmt->pVgDataBlocksForRetry;
300✔
712
  pStmt->pVgDataBlocksForRetry = NULL;
300✔
713
  return TSDB_CODE_SUCCESS;
300✔
714
}
715

716
static STableMeta* stmtCloneTableMetaForRetry(const STableMeta* pSrc) {
200✔
717
  int32_t sz = (int32_t)TABLE_META_FULL_SIZE(pSrc);
200✔
718
  if (sz <= 0) {
200✔
719
    return NULL;
×
720
  }
721
  STableMeta* p = taosMemoryMalloc(sz);
200✔
722
  if (p == NULL) {
200✔
723
    return NULL;
×
724
  }
725
  (void)memcpy(p, pSrc, sz);
200✔
726
  tableMetaResetPointers(p);
200✔
727
  return p;
200✔
728
}
729

730
static void stmtFreeUidTableMetaHash(SHashObj* pHash) {
100✔
731
  if (pHash == NULL) {
100✔
732
    return;
×
733
  }
734
  void* pIter = NULL;
100✔
735
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
300✔
736
    STableMeta* pMeta = *(STableMeta**)pIter;
200✔
737
    taosMemoryFree(pMeta);
200✔
738
  }
739
  taosHashCleanup(pHash);
100✔
740
}
741

742
// tRowGet may succeed with a wrong prefix schema but leave VAR column pointers outside the SRow allocation;
743
// tRowBuild would then memcpy OOB. Require all VAR payloads to lie within [pRow, pRow + pRow->len).
744
static bool stmtScolValVarPayloadInRow(const SRow* pRow, const SColVal* pCv, int8_t colType) {
800✔
745
  if (!COL_VAL_IS_VALUE(pCv) || !IS_VAR_DATA_TYPE(colType)) {
800✔
746
    return true;
400✔
747
  }
748
  if (pCv->value.nData == 0) {
400✔
749
    return true;
×
750
  }
751
  if (pCv->value.pData == NULL) {
400✔
752
    return false;
×
753
  }
754
  const uint8_t* rbeg = (const uint8_t*)pRow;
400✔
755
  const uint8_t* rend = rbeg + pRow->len;
400✔
756
  const uint8_t* p = (const uint8_t*)pCv->value.pData;
400✔
757
  return (p >= rbeg) && (p + pCv->value.nData <= rend);
400✔
758
}
759

760
// Infer decode STSchema: full column count when row sver matches catalog; else try prefix column counts (ADD COLUMN).
761
static int32_t stmtFindDecodeSchemaForRow(SRow* pRow, const STableMeta* pMeta, STSchema** ppOld, int32_t* pnOldCols) {
200✔
762
  uint16_t oldSver = pRow->sver;
200✔
763
  int32_t  nMax = pMeta->tableInfo.numOfColumns;
200✔
764
  SSchema* base = (SSchema*)&pMeta->schema[0];
200✔
765

766
  if ((int32_t)oldSver == pMeta->sversion) {
200✔
767
    STSchema* p = tBuildTSchema(base, nMax, (int32_t)oldSver);
×
768
    if (p == NULL) {
×
769
      return terrno;
×
770
    }
771
    bool ok = true;
×
772
    for (int32_t i = 0; i < nMax; ++i) {
×
773
      SColVal cv = {0};
×
774
      if (tRowGet(pRow, p, i, &cv) != 0) {
×
775
        ok = false;
×
776
        break;
×
777
      }
778
      if (!stmtScolValVarPayloadInRow(pRow, &cv, p->columns[i].type)) {
×
779
        ok = false;
×
780
        break;
×
781
      }
782
    }
783
    if (ok) {
×
784
      *ppOld = p;
×
785
      *pnOldCols = nMax;
×
786
      return TSDB_CODE_SUCCESS;
×
787
    }
788
    tDestroyTSchema(p);
×
789
    return TSDB_CODE_INVALID_PARA;
×
790
  }
791

792
  for (int32_t n = nMax; n >= 1; --n) {
400✔
793
    STSchema* pTry = tBuildTSchema(base, n, (int32_t)oldSver);
400✔
794
    if (pTry == NULL) {
400✔
795
      return terrno;
×
796
    }
797
    bool ok = true;
400✔
798
    for (int32_t i = 0; i < n; ++i) {
1,000✔
799
      SColVal cv = {0};
800✔
800
      if (tRowGet(pRow, pTry, i, &cv) != 0) {
800✔
801
        ok = false;
×
802
        break;
200✔
803
      }
804
      if (!stmtScolValVarPayloadInRow(pRow, &cv, pTry->columns[i].type)) {
800✔
805
        ok = false;
200✔
806
        break;
200✔
807
      }
808
    }
809
    if (ok) {
400✔
810
      *ppOld = pTry;
200✔
811
      *pnOldCols = n;
200✔
812
      return TSDB_CODE_SUCCESS;
200✔
813
    }
814
    tDestroyTSchema(pTry);
200✔
815
  }
816
  return TSDB_CODE_INVALID_PARA;
×
817
}
818

819
static int32_t stmtRebuildOneRowToLatestSchema(SRow* pOldRow, const STableMeta* pMeta, SRow** ppNewRow) {
200✔
820
  STSchema* pOldSch = NULL;
200✔
821
  int32_t   nOldCols = 0;
200✔
822
  int32_t   code = stmtFindDecodeSchemaForRow(pOldRow, pMeta, &pOldSch, &nOldCols);
200✔
823
  if (code != TSDB_CODE_SUCCESS) {
200✔
824
    return code;
×
825
  }
826

827
  int32_t   nNewCols = pMeta->tableInfo.numOfColumns;
200✔
828
  STSchema* pNewSch = tBuildTSchema((SSchema*)&pMeta->schema[0], nNewCols, pMeta->sversion);
200✔
829
  if (pNewSch == NULL) {
200✔
830
    tDestroyTSchema(pOldSch);
×
831
    return terrno;
×
832
  }
833

834
  for (int32_t j = 0; j < nOldCols && j < nNewCols; ++j) {
600✔
835
    if (pOldSch->columns[j].colId != pNewSch->columns[j].colId) {
400✔
836
      tDestroyTSchema(pOldSch);
×
837
      tDestroyTSchema(pNewSch);
×
838
      return TSDB_CODE_INVALID_PARA;
×
839
    }
840
  }
841

842
  SArray* aColVal = taosArrayInit(pNewSch->numOfCols, sizeof(SColVal));
200✔
843
  if (aColVal == NULL) {
200✔
844
    tDestroyTSchema(pOldSch);
×
845
    tDestroyTSchema(pNewSch);
×
846
    return terrno;
×
847
  }
848

849
  for (int32_t j = 0; j < pNewSch->numOfCols; ++j) {
800✔
850
    SColVal cv = {0};
600✔
851
    if (j < nOldCols) {
600✔
852
      code = tRowGet(pOldRow, pOldSch, j, &cv);
400✔
853
      if (code != TSDB_CODE_SUCCESS) {
400✔
854
        taosArrayDestroy(aColVal);
×
855
        tDestroyTSchema(pOldSch);
×
856
        tDestroyTSchema(pNewSch);
×
857
        return code;
×
858
      }
859
    } else {
860
      STColumn* pc = &pNewSch->columns[j];
200✔
861
      cv = COL_VAL_NONE(pc->colId, pc->type);
200✔
862
    }
863
    if (taosArrayPush(aColVal, &cv) == NULL) {
600✔
864
      code = terrno;
×
865
      taosArrayDestroy(aColVal);
×
866
      tDestroyTSchema(pOldSch);
×
867
      tDestroyTSchema(pNewSch);
×
868
      return code;
×
869
    }
870
  }
871

872
  SRowBuildScanInfo sinfo = {0};
200✔
873
  code = tRowBuild(aColVal, pNewSch, ppNewRow, &sinfo);
200✔
874
  taosArrayDestroy(aColVal);
200✔
875
  tDestroyTSchema(pOldSch);
200✔
876
  tDestroyTSchema(pNewSch);
200✔
877
  return code;
200✔
878
}
879

880
static void stmtFreeHeapPatchRowsArray(SArray* aHeapRows) {
100✔
881
  if (aHeapRows == NULL) {
100✔
882
    return;
×
883
  }
884
  int32_t n = (int32_t)taosArrayGetSize(aHeapRows);
100✔
885
  for (int32_t i = 0; i < n; ++i) {
300✔
886
    SRow* p = taosArrayGetP(aHeapRows, i);
200✔
887
    tRowDestroy(p);
200✔
888
  }
889
  taosArrayDestroy(aHeapRows);
100✔
890
}
891

892
// After refreshMeta: set sver from catalog; decode each row with inferred old schema and tRowBuild with latest schema.
893
// aHeapRows: receives pointers from tRowBuild so they can be freed before tDestroySubmitReq (decode path does not free rows).
894
static void stmtPatchOneSubmitTbDataSchemaVer(SSubmitTbData* pTb, SHashObj* pUidMetaHash, SArray* aHeapRows) {
100✔
895
  if (pTb->uid == 0) {
100✔
896
    return;
×
897
  }
898
  void* pMv = taosHashGet(pUidMetaHash, &pTb->uid, sizeof(uint64_t));
100✔
899
  if (pMv == NULL) {
100✔
900
    return;
×
901
  }
902
  STableMeta* pMeta = *(STableMeta**)pMv;
100✔
903
  pTb->sver = pMeta->sversion;
100✔
904
  if (pTb->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
100✔
905
    return;
×
906
  }
907
  if (pTb->aRowP == NULL) {
100✔
908
    return;
×
909
  }
910
  if (pTb->pBlobSet != NULL) {
100✔
911
    int32_t nRow = (int32_t)TARRAY_SIZE(pTb->aRowP);
×
912
    SRow**  rows = (SRow**)TARRAY_DATA(pTb->aRowP);
×
913
    for (int32_t i = 0; i < nRow; ++i) {
×
914
      if (rows[i] != NULL) {
×
915
        rows[i]->sver = (uint16_t)pMeta->sversion;
×
916
      }
917
    }
918
    return;
×
919
  }
920

921
  int32_t nRow = (int32_t)TARRAY_SIZE(pTb->aRowP);
100✔
922
  for (int32_t i = 0; i < nRow; ++i) {
300✔
923
    SRow* pRow = taosArrayGetP(pTb->aRowP, i);
200✔
924
    if (pRow == NULL) {
200✔
925
      continue;
×
926
    }
927
    if ((uint16_t)pMeta->sversion == pRow->sver) {
200✔
928
      continue;
×
929
    }
930
    if (pRow->flag & HAS_BLOB) {
200✔
931
      pRow->sver = (uint16_t)pMeta->sversion;
×
932
      continue;
×
933
    }
934
    SRow* pNew = NULL;
200✔
935
    if (stmtRebuildOneRowToLatestSchema(pRow, pMeta, &pNew) == TSDB_CODE_SUCCESS && pNew != NULL) {
200✔
936
      // pRow points into the decoded submit payload (tDecodeBinaryWithSize); do not tRowDestroy it.
937
      if (aHeapRows != NULL && taosArrayPush(aHeapRows, &pNew) == NULL) {
400✔
938
        tRowDestroy(pNew);
×
939
        // Cannot record heap row for destroy before tDestroySubmitReq; keep embedded row, bump sver only.
940
        pRow->sver = (uint16_t)pMeta->sversion;
×
941
      } else {
942
        (void)taosArraySet(pTb->aRowP, i, &pNew);
200✔
943
      }
944
    } else {
945
      pRow->sver = (uint16_t)pMeta->sversion;
×
946
    }
947
  }
948
}
949

950
static int32_t stmtBuildUidToTableMetaHash(STscStmt2* pStmt, SRequestObj* pRequest, SHashObj** ppHash) {
100✔
951
  int32_t code = TSDB_CODE_SUCCESS;
100✔
952
  *ppHash = NULL;
100✔
953

954
  if (NULL == pStmt->pCatalog) {
100✔
955
    code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
×
956
    if (code != TSDB_CODE_SUCCESS) {
×
957
      return code;
×
958
    }
959
  }
960

961
  SHashObj* pHash =
962
      taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
100✔
963
  if (pHash == NULL) {
100✔
964
    return terrno;
×
965
  }
966

967
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
100✔
968
                           .requestId = pRequest->requestId,
100✔
969
                           .requestObjRefId = pRequest->self,
100✔
970
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
100✔
971

972
  int32_t tblNum = pRequest->tableList ? (int32_t)taosArrayGetSize(pRequest->tableList) : 0;
100✔
973
  for (int32_t i = 0; i < tblNum; ++i) {
300✔
974
    SName*      pName = taosArrayGet(pRequest->tableList, i);
200✔
975
    STableMeta* pMeta = NULL;
200✔
976
    int32_t     c = catalogGetTableMeta(pStmt->pCatalog, &conn, pName, &pMeta);
200✔
977
    if (c != TSDB_CODE_SUCCESS) {
200✔
978
      if (pMeta != NULL) {
×
979
        taosMemoryFree(pMeta);
×
980
      }
981
      taosHashCleanup(pHash);
×
982
      return c;
×
983
    }
984
    if (pMeta != NULL) {
200✔
985
      STableMeta* pDup = stmtCloneTableMetaForRetry(pMeta);
200✔
986
      taosMemoryFree(pMeta);
200✔
987
      pMeta = NULL;
200✔
988
      if (pDup != NULL) {
200✔
989
        int32_t putCode = taosHashPut(pHash, &pDup->uid, sizeof(uint64_t), &pDup, POINTER_BYTES);
200✔
990
        if (putCode != TSDB_CODE_SUCCESS) {
200✔
991
          STMT2_ELOG("stmtBuildUidToTableMetaHash taosHashPut failed uid:%" PRIu64 ", code:%s", (uint64_t)pDup->uid,
×
992
                     tstrerror(putCode));
993
          taosMemoryFree(pDup);
×
994
          taosHashCleanup(pHash);
×
995
          return putCode;
×
996
        }
997
      }
998
    }
999
  }
1000

1001
  if (taosHashGetSize(pHash) == 0 && pStmt->bInfo.sname.type != 0) {
100✔
1002
    STableMeta* pMeta = NULL;
×
1003
    code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pMeta);
×
1004
    if (code == TSDB_CODE_SUCCESS && pMeta != NULL) {
×
1005
      STableMeta* pDup = stmtCloneTableMetaForRetry(pMeta);
×
1006
      taosMemoryFree(pMeta);
×
1007
      pMeta = NULL;
×
1008
      if (pDup != NULL) {
×
1009
        int32_t putCode = taosHashPut(pHash, &pDup->uid, sizeof(uint64_t), &pDup, POINTER_BYTES);
×
1010
        if (putCode != TSDB_CODE_SUCCESS) {
×
1011
          STMT2_ELOG("stmtBuildUidToTableMetaHash taosHashPut failed uid:%" PRIu64 ", code:%s", (uint64_t)pDup->uid,
×
1012
                     tstrerror(putCode));
1013
          taosMemoryFree(pDup);
×
1014
          taosHashCleanup(pHash);
×
1015
          return putCode;
×
1016
        }
1017
      }
1018
    } else if (pMeta != NULL) {
×
1019
      taosMemoryFree(pMeta);
×
1020
    }
1021
  }
1022

1023
  *ppHash = pHash;
100✔
1024
  return TSDB_CODE_SUCCESS;
100✔
1025
}
1026

1027
static int32_t stmtUpdateVgDataBlocksSchemaVer(STscStmt2* pStmt, SRequestObj* pRequest) {
100✔
1028
  if (pStmt->pVgDataBlocksForRetry == NULL || taosArrayGetSize(pStmt->pVgDataBlocksForRetry) == 0) {
100✔
1029
    return TSDB_CODE_SUCCESS;
×
1030
  }
1031

1032
  SHashObj* pUidMetaHash = NULL;
100✔
1033
  int32_t   code = stmtBuildUidToTableMetaHash(pStmt, pRequest, &pUidMetaHash);
100✔
1034
  if (code != TSDB_CODE_SUCCESS) {
100✔
1035
    return code;
×
1036
  }
1037
  if (pUidMetaHash == NULL || taosHashGetSize(pUidMetaHash) == 0) {
100✔
1038
    if (pUidMetaHash != NULL) {
×
1039
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
1040
    }
1041
    return TSDB_CODE_SUCCESS;
×
1042
  }
1043

1044
  const int32_t headSz = (int32_t)sizeof(SSubmitReq2Msg);
100✔
1045
  int32_t       nBlk = (int32_t)taosArrayGetSize(pStmt->pVgDataBlocksForRetry);
100✔
1046

1047
  for (int32_t b = 0; b < nBlk; ++b) {
200✔
1048
    SVgDataBlocks* pVg = *(SVgDataBlocks**)taosArrayGet(pStmt->pVgDataBlocksForRetry, b);
100✔
1049
    if (pVg == NULL || pVg->pData == NULL || pVg->size <= headSz) {
100✔
1050
      continue;
×
1051
    }
1052

1053
    SDecoder     decoder = {0};
100✔
1054
    int32_t      bodyLen = pVg->size - headSz;
100✔
1055
    SSubmitReq2  req = {0};
100✔
1056

1057
    tDecoderInit(&decoder, (uint8_t*)pVg->pData + headSz, bodyLen);
100✔
1058
    code = tDecodeSubmitReq(&decoder, &req, NULL);
100✔
1059
    tDecoderClear(&decoder);
100✔
1060
    if (code != TSDB_CODE_SUCCESS) {
100✔
1061
      STMT2_ELOG("tDecodeSubmitReq failed when patching schema ver for retry, code:%s", tstrerror(code));
×
1062
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
1063
      return code;
×
1064
    }
1065
    if (req.raw) {
100✔
1066
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1067
      continue;
×
1068
    }
1069

1070
    SArray* aHeapRows = taosArrayInit(8, POINTER_BYTES);
100✔
1071
    if (aHeapRows == NULL) {
100✔
1072
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1073
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
1074
      return terrno;
×
1075
    }
1076

1077
    int32_t nTb = (int32_t)taosArrayGetSize(req.aSubmitTbData);
100✔
1078
    for (int32_t t = 0; t < nTb; ++t) {
200✔
1079
      stmtPatchOneSubmitTbDataSchemaVer(taosArrayGet(req.aSubmitTbData, t), pUidMetaHash, aHeapRows);
100✔
1080
    }
1081

1082
    int32_t encCap = 0;
100✔
1083
    int32_t szRet = 0;
100✔
1084
    tEncodeSize(tEncodeSubmitReq, &req, encCap, szRet);
100✔
1085
    if (szRet != 0) {
100✔
1086
      stmtFreeHeapPatchRowsArray(aHeapRows);
×
1087
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1088
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
1089
      return TSDB_CODE_INVALID_PARA;
×
1090
    }
1091

1092
    int32_t allocLen = headSz + encCap;
100✔
1093
    void*   pNew = taosMemoryMalloc(allocLen);
100✔
1094
    if (pNew == NULL) {
100✔
1095
      stmtFreeHeapPatchRowsArray(aHeapRows);
×
1096
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1097
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
1098
      return terrno;
×
1099
    }
1100

1101
    (void)memcpy(pNew, pVg->pData, headSz);
100✔
1102
    ((SSubmitReq2Msg*)pNew)->header.vgId = htonl(pVg->vg.vgId);
100✔
1103
    ((SSubmitReq2Msg*)pNew)->version = htobe64(1);
100✔
1104

1105
    SEncoder encoder = {0};
100✔
1106
    tEncoderInit(&encoder, (uint8_t*)pNew + headSz, encCap);
100✔
1107
    code = tEncodeSubmitReq(&encoder, &req);
100✔
1108
    int32_t bodyWritten = (int32_t)encoder.pos;
100✔
1109
    tEncoderClear(&encoder);
100✔
1110
    stmtFreeHeapPatchRowsArray(aHeapRows);
100✔
1111
    tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
100✔
1112

1113
    if (code != TSDB_CODE_SUCCESS) {
100✔
1114
      taosMemoryFree(pNew);
×
1115
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
1116
      return code;
×
1117
    }
1118

1119
    int32_t totalLen = headSz + bodyWritten;
100✔
1120
    ((SSubmitReq2Msg*)pNew)->header.contLen = htonl(totalLen);
100✔
1121

1122
    taosMemoryFree(pVg->pData);
100✔
1123
    pVg->pData = pNew;
100✔
1124
    pVg->size = totalLen;
100✔
1125
  }
1126

1127
  stmtFreeUidTableMetaHash(pUidMetaHash);
100✔
1128
  return TSDB_CODE_SUCCESS;
100✔
1129
}
1130

1131
typedef struct SStmtRetryTbPatch {
1132
  uint64_t uid;
1133
  uint64_t suid;
1134
  int32_t  sver;
1135
} SStmtRetryTbPatch;
1136

1137
// After refreshMeta, drop cached tbName->uid from stmt2 interlace bind so insGetStmtTableVgUid refetches from catalog.
1138
static void stmtInvalidateStbInterlaceTableUidCache(STscStmt2* pStmt) {
300✔
1139
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pTableHash != NULL) {
300✔
1140
    tSimpleHashClear(pStmt->sql.siInfo.pTableHash);
300✔
1141
  }
1142
}
300✔
1143

1144
// Super-table catalog meta uses uid == suid (see queryCreateTableMetaFromMsg); that must not be written onto
1145
// child-table SSubmitTbData. Only use child/normal/virtual-child meta here.
1146
static bool stmtRetryTbMetaIsSuperTable(const STableMeta* pMeta) {
400✔
1147
  return (pMeta != NULL && pMeta->tableType == TSDB_SUPER_TABLE);
400✔
1148
}
1149

1150
// Resolve uid/suid/sver for one SSubmitTbData after catalog refresh. tbIdx is the index within this submit req.
1151
static int32_t stmtFetchOneRetryTbMetaPatch(STscStmt2* pStmt, SRequestObj* pRequest, SSubmitTbData* pTb, int32_t tbIdx,
200✔
1152
                                            int32_t nSubmitTb, SStmtRetryTbPatch* pPatch) {
1153
  if (NULL == pStmt->pCatalog) {
200✔
1154
    int32_t c = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
×
1155
    if (c != TSDB_CODE_SUCCESS) {
×
1156
      return c;
×
1157
    }
1158
  }
1159

1160
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
200✔
1161
                           .requestId = pRequest->requestId,
200✔
1162
                           .requestObjRefId = pRequest->self,
200✔
1163
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
200✔
1164

1165
  // 1) Auto-create child: look up by child table name (never use STB-only name without child name).
1166
  if (pTb->pCreateTbReq != NULL && pTb->pCreateTbReq->name != NULL) {
200✔
1167
    SName         nm = {0};
×
1168
    int32_t       nc = TSDB_CODE_SUCCESS;
×
1169
    STableMeta*   pMeta = NULL;
×
1170
    if (pStmt->bInfo.sname.type != 0) {
×
1171
      tNameAssign(&nm, &pStmt->bInfo.sname);
×
1172
      nc = tNameAddTbName(&nm, pTb->pCreateTbReq->name, strlen(pTb->pCreateTbReq->name));
×
1173
    } else if (pRequest->tableList != NULL && taosArrayGetSize(pRequest->tableList) > 0) {
×
1174
      SName* p0 = taosArrayGet(pRequest->tableList, 0);
×
1175
      tNameAssign(&nm, p0);
×
1176
      nc = tNameAddTbName(&nm, pTb->pCreateTbReq->name, strlen(pTb->pCreateTbReq->name));
×
1177
    } else {
1178
      STMT2_ELOG_E("retry patch: no db/sname context for createTbReq name");
×
1179
      return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1180
    }
1181
    if (nc != TSDB_CODE_SUCCESS) {
×
1182
      return nc;
×
1183
    }
1184
    nc = catalogGetTableMeta(pStmt->pCatalog, &conn, &nm, &pMeta);
×
1185
    if (nc != TSDB_CODE_SUCCESS) {
×
1186
      taosMemoryFreeClear(pMeta);
×
1187
      return nc;
×
1188
    }
1189
    if (pMeta == NULL) {
×
1190
      return TSDB_CODE_INTERNAL_ERROR;
×
1191
    }
1192
    if (stmtRetryTbMetaIsSuperTable(pMeta)) {
×
1193
      taosMemoryFree(pMeta);
×
1194
      STMT2_ELOG_E("retry patch: createTbReq resolved to super table meta (unexpected)");
×
1195
      return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1196
    }
1197
    pPatch->uid = pMeta->uid;
×
1198
    pPatch->suid = pMeta->suid;
×
1199
    pPatch->sver = pMeta->sversion;
×
1200
    taosMemoryFree(pMeta);
×
1201
    return TSDB_CODE_SUCCESS;
×
1202
  }
1203

1204
  // 2) request->tableList: align tbIdx with the tbIdx-th non-super-table entry (skip super table names).
1205
  if (pRequest->tableList != NULL) {
200✔
1206
    int32_t          nList = (int32_t)taosArrayGetSize(pRequest->tableList);
200✔
1207
    int32_t          nonStbOrd = 0;
200✔
1208
    for (int32_t li = 0; li < nList; ++li) {
400✔
1209
      SName*      pName = taosArrayGet(pRequest->tableList, li);
400✔
1210
      STableMeta* pMeta = NULL;
400✔
1211
      int32_t     c = catalogGetTableMeta(pStmt->pCatalog, &conn, pName, &pMeta);
400✔
1212
      if (c != TSDB_CODE_SUCCESS) {
400✔
1213
        taosMemoryFreeClear(pMeta);
×
1214
        return c;
200✔
1215
      }
1216
      if (pMeta == NULL) {
400✔
1217
        return TSDB_CODE_INTERNAL_ERROR;
×
1218
      }
1219
      if (stmtRetryTbMetaIsSuperTable(pMeta)) {
400✔
1220
        taosMemoryFree(pMeta);
200✔
1221
        continue;
200✔
1222
      }
1223
      if (nonStbOrd == tbIdx) {
200✔
1224
        pPatch->uid = pMeta->uid;
200✔
1225
        pPatch->suid = pMeta->suid;
200✔
1226
        pPatch->sver = pMeta->sversion;
200✔
1227
        taosMemoryFree(pMeta);
200✔
1228
        return TSDB_CODE_SUCCESS;
200✔
1229
      }
1230
      taosMemoryFree(pMeta);
×
1231
      nonStbOrd++;
×
1232
    }
1233
  }
1234

1235
  // 3) Single-table statement: bInfo.sname
1236
  if (nSubmitTb == 1 && pStmt->bInfo.sname.type != 0) {
×
1237
    STableMeta* pMeta = NULL;
×
1238
    int32_t     c = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pMeta);
×
1239
    if (c != TSDB_CODE_SUCCESS) {
×
1240
      taosMemoryFreeClear(pMeta);
×
1241
      return c;
×
1242
    }
1243
    if (pMeta == NULL) {
×
1244
      return TSDB_CODE_INTERNAL_ERROR;
×
1245
    }
1246
    if (stmtRetryTbMetaIsSuperTable(pMeta)) {
×
1247
      taosMemoryFree(pMeta);
×
1248
      STMT2_ELOG_E("retry patch: bInfo.sname resolved to super table meta; need child table name");
×
1249
      return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1250
    }
1251
    pPatch->uid = pMeta->uid;
×
1252
    pPatch->suid = pMeta->suid;
×
1253
    pPatch->sver = pMeta->sversion;
×
1254
    taosMemoryFree(pMeta);
×
1255
    return TSDB_CODE_SUCCESS;
×
1256
  }
1257

1258
  STMT2_ELOG("retry patch: cannot resolve catalog meta for submit block (tb idx %d, uid %" PRId64 ")", tbIdx,
×
1259
             (int64_t)pTb->uid);
1260
  return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1261
}
1262

1263
// TSDB_CODE_TDB_TABLE_NOT_EXIST: refresh child table uid/suid/sver in serialized submit from catalog.
1264
static int32_t stmtUpdateVgDataBlocksTbMetaFromCatalog(STscStmt2* pStmt, SRequestObj* pRequest) {
200✔
1265
  if (pStmt->pVgDataBlocksForRetry == NULL || taosArrayGetSize(pStmt->pVgDataBlocksForRetry) == 0) {
200✔
1266
    return TSDB_CODE_SUCCESS;
×
1267
  }
1268

1269
  const int32_t headSz = (int32_t)sizeof(SSubmitReq2Msg);
200✔
1270
  int32_t       nBlk = (int32_t)taosArrayGetSize(pStmt->pVgDataBlocksForRetry);
200✔
1271

1272
  for (int32_t b = 0; b < nBlk; ++b) {
400✔
1273
    SVgDataBlocks* pVg = *(SVgDataBlocks**)taosArrayGet(pStmt->pVgDataBlocksForRetry, b);
200✔
1274
    if (pVg == NULL || pVg->pData == NULL || pVg->size <= headSz) {
200✔
1275
      continue;
×
1276
    }
1277

1278
    SDecoder     decoder = {0};
200✔
1279
    int32_t      bodyLen = pVg->size - headSz;
200✔
1280
    SSubmitReq2  req = {0};
200✔
1281
    int32_t      code = 0;
200✔
1282

1283
    tDecoderInit(&decoder, (uint8_t*)pVg->pData + headSz, bodyLen);
200✔
1284
    code = tDecodeSubmitReq(&decoder, &req, NULL);
200✔
1285
    tDecoderClear(&decoder);
200✔
1286
    if (code != TSDB_CODE_SUCCESS) {
200✔
1287
      STMT2_ELOG("tDecodeSubmitReq failed when patching table meta for retry, code:%s", tstrerror(code));
×
1288
      return code;
×
1289
    }
1290
    if (req.raw) {
200✔
1291
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1292
      continue;
×
1293
    }
1294

1295
    int32_t nTb = (int32_t)taosArrayGetSize(req.aSubmitTbData);
200✔
1296
    for (int32_t t = 0; t < nTb; ++t) {
400✔
1297
      SStmtRetryTbPatch patch = {0};
200✔
1298
      code = stmtFetchOneRetryTbMetaPatch(pStmt, pRequest, taosArrayGet(req.aSubmitTbData, t), t, nTb, &patch);
200✔
1299
      if (code != TSDB_CODE_SUCCESS) {
200✔
1300
        tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1301
        return code;
×
1302
      }
1303
      SSubmitTbData* pRow = taosArrayGet(req.aSubmitTbData, t);
200✔
1304
      pRow->uid = (int64_t)patch.uid;
200✔
1305
      pRow->suid = (int64_t)patch.suid;
200✔
1306
      pRow->sver = patch.sver;
200✔
1307
    }
1308

1309
    int32_t encCap = 0;
200✔
1310
    int32_t szRet = 0;
200✔
1311
    tEncodeSize(tEncodeSubmitReq, &req, encCap, szRet);
200✔
1312
    if (szRet != 0) {
200✔
1313
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1314
      return TSDB_CODE_INVALID_PARA;
×
1315
    }
1316

1317
    int32_t allocLen = headSz + encCap;
200✔
1318
    void*   pNew = taosMemoryMalloc(allocLen);
200✔
1319
    if (pNew == NULL) {
200✔
1320
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1321
      return terrno;
×
1322
    }
1323

1324
    (void)memcpy(pNew, pVg->pData, headSz);
200✔
1325
    ((SSubmitReq2Msg*)pNew)->header.vgId = htonl(pVg->vg.vgId);
200✔
1326
    ((SSubmitReq2Msg*)pNew)->version = htobe64(1);
200✔
1327

1328
    SEncoder encoder = {0};
200✔
1329
    tEncoderInit(&encoder, (uint8_t*)pNew + headSz, encCap);
200✔
1330
    code = tEncodeSubmitReq(&encoder, &req);
200✔
1331
    int32_t bodyWritten = (int32_t)encoder.pos;
200✔
1332
    tEncoderClear(&encoder);
200✔
1333
    tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
200✔
1334

1335
    if (code != TSDB_CODE_SUCCESS) {
200✔
1336
      taosMemoryFree(pNew);
×
1337
      return code;
×
1338
    }
1339

1340
    int32_t totalLen = headSz + bodyWritten;
200✔
1341
    ((SSubmitReq2Msg*)pNew)->header.contLen = htonl(totalLen);
200✔
1342

1343
    taosMemoryFree(pVg->pData);
200✔
1344
    pVg->pData = pNew;
200✔
1345
    pVg->size = totalLen;
200✔
1346
  }
1347

1348
  return TSDB_CODE_SUCCESS;
200✔
1349
}
1350

1351
static bool stmtIsSchemaVersionRetryError(int32_t err) {
100✔
1352
  return (bool)(NEED_CLIENT_REFRESH_TBLMETA_ERROR(err) || err == TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION);
100✔
1353
}
1354

1355
static void stmtFreeTbBuf(void* buf) {
119,147✔
1356
  void* pBuf = *(void**)buf;
119,147✔
1357
  taosMemoryFree(pBuf);
119,147✔
1358
}
119,147✔
1359

1360
static void stmtFreeTbCols(void* buf) {
7,309,000✔
1361
  SArray* pCols = *(SArray**)buf;
7,309,000✔
1362
  taosArrayDestroy(pCols);
7,309,000✔
1363
}
7,309,000✔
1364

1365
static int32_t stmtCleanSQLInfo(STscStmt2* pStmt) {
131,181✔
1366
  STMT2_TLOG_E("start to free SQL info");
131,181✔
1367

1368
  taosMemoryFree(pStmt->sql.pBindInfo);
131,181✔
1369
  taosMemoryFree(pStmt->sql.queryRes.fields);
131,230✔
1370
  taosMemoryFree(pStmt->sql.queryRes.userFields);
131,230✔
1371
  taosMemoryFree(pStmt->sql.sqlStr);
131,230✔
1372
  qDestroyQuery(pStmt->sql.pQuery);
131,230✔
1373
  taosArrayDestroy(pStmt->sql.nodeList);
131,230✔
1374
  taosHashCleanup(pStmt->sql.pVgHash);
131,230✔
1375
  pStmt->sql.pVgHash = NULL;
131,230✔
1376
  if (pStmt->sql.fixValueTags) {
131,230✔
1377
    pStmt->sql.fixValueTags = false;
1,760✔
1378
    tdDestroySVCreateTbReq(pStmt->sql.fixValueTbReq);
1,760✔
1379
    taosMemoryFreeClear(pStmt->sql.fixValueTbReq);
1,760✔
1380
    pStmt->sql.fixValueTbReq = NULL;
1,760✔
1381
  }
1382

1383
  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
131,230✔
1384
  while (pIter) {
134,550✔
1385
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
3,320✔
1386

1387
    qDestroyStmtDataBlock(pCache->pDataCtx);
3,320✔
1388
    qDestroyBoundColInfo(pCache->boundTags);
3,320✔
1389
    taosMemoryFreeClear(pCache->boundTags);
3,320✔
1390

1391
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
3,320✔
1392
  }
1393
  taosHashCleanup(pStmt->sql.pTableCache);
131,230✔
1394
  pStmt->sql.pTableCache = NULL;
131,230✔
1395

1396
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
131,230✔
1397
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
131,230✔
1398
  stmtFreeVgDataBlocksForRetry(pStmt);
131,230✔
1399

1400
  taos_free_result(pStmt->sql.siInfo.pRequest);
131,230✔
1401
  taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
131,230✔
1402
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
131,230✔
1403
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableRowDataHash);
131,230✔
1404
  taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
131,202✔
1405
  taosMemoryFree(pStmt->sql.siInfo.pTSchema);
131,230✔
1406
  qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
131,230✔
1407
  taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
131,230✔
1408
  pStmt->sql.siInfo.pTableCols = NULL;
131,230✔
1409

1410
  (void)memset(&pStmt->sql, 0, sizeof(pStmt->sql));
131,230✔
1411
  pStmt->sql.siInfo.tableColsReady = true;
131,230✔
1412

1413
  STMT2_TLOG_E("end to free SQL info");
131,230✔
1414

1415
  return TSDB_CODE_SUCCESS;
131,230✔
1416
}
1417

1418
static int32_t stmtTryAddTableVgroupInfo(STscStmt2* pStmt, int32_t* vgId) {
557,715✔
1419
  if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) {
557,715✔
1420
    return TSDB_CODE_SUCCESS;
1,000✔
1421
  }
1422

1423
  SVgroupInfo      vgInfo = {0};
556,715✔
1424
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
556,715✔
1425
                           .requestId = pStmt->exec.pRequest->requestId,
556,715✔
1426
                           .requestObjRefId = pStmt->exec.pRequest->self,
556,715✔
1427
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
556,715✔
1428

1429
  int32_t code = catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo);
556,921✔
1430
  if (TSDB_CODE_SUCCESS != code) {
556,933✔
1431
    STMT2_ELOG("fail to get vgroup info from catalog, code:%d", code);
×
1432
    return code;
×
1433
  }
1434

1435
  code =
1436
      taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
556,933✔
1437
  if (TSDB_CODE_SUCCESS != code) {
556,980✔
1438
    STMT2_ELOG("fail to put vgroup info, code:%d", code);
92✔
1439
    return code;
×
1440
  }
1441

1442
  *vgId = vgInfo.vgId;
556,888✔
1443

1444
  return TSDB_CODE_SUCCESS;
556,888✔
1445
}
1446

1447
int32_t stmtGetTableMetaAndValidate(STscStmt2* pStmt, uint64_t* uid, uint64_t* suid, int32_t* vgId, int8_t* tableType) {
5,909✔
1448
  STableMeta*      pTableMeta = NULL;
5,909✔
1449
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
5,909✔
1450
                           .requestId = pStmt->exec.pRequest->requestId,
5,909✔
1451
                           .requestObjRefId = pStmt->exec.pRequest->self,
5,909✔
1452
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
5,909✔
1453
  int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
5,909✔
1454

1455
  pStmt->stat.ctgGetTbMetaNum++;
5,909✔
1456

1457
  if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
5,909✔
1458
    STMT2_ELOG("tb %s not exist", pStmt->bInfo.tbFName);
500✔
1459
    (void)stmtCleanBindInfo(pStmt);
500✔
1460

1461
    if (!pStmt->sql.autoCreateTbl) {
500✔
1462
      STMT2_ELOG("table %s does not exist and autoCreateTbl is disabled", pStmt->bInfo.tbFName);
500✔
1463
      STMT_ERR_RET(TSDB_CODE_PAR_TABLE_NOT_EXIST);
500✔
1464
    }
1465

1466
    STMT_ERR_RET(code);
×
1467
  }
1468

1469
  STMT_ERR_RET(code);
5,409✔
1470

1471
  *uid = pTableMeta->uid;
5,409✔
1472
  *suid = pTableMeta->suid;
5,409✔
1473
  *tableType = pTableMeta->tableType;
5,409✔
1474
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
5,409✔
1475
  *vgId = pTableMeta->vgId;
5,409✔
1476

1477
  taosMemoryFree(pTableMeta);
5,409✔
1478

1479
  return TSDB_CODE_SUCCESS;
5,409✔
1480
}
1481

1482
static int32_t stmtRebuildDataBlock(STscStmt2* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
7,331✔
1483
                                    uint64_t suid, int32_t vgId) {
1484
  STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
7,331✔
1485
  STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgId, pStmt->sql.autoCreateTbl));
7,331✔
1486

1487
  STMT2_DLOG("uid:%" PRId64 ", rebuild table data context, vgId:%d", uid, vgId);
7,331✔
1488

1489
  return TSDB_CODE_SUCCESS;
7,331✔
1490
}
1491

1492
static int32_t stmtGetFromCache(STscStmt2* pStmt) {
22,180✔
1493
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) {
22,180✔
1494
    pStmt->bInfo.needParse = false;
×
1495
    pStmt->bInfo.inExecCache = false;
×
1496
    return TSDB_CODE_SUCCESS;
×
1497
  }
1498

1499
  pStmt->bInfo.needParse = true;
22,260✔
1500
  pStmt->bInfo.inExecCache = false;
22,260✔
1501

1502
  STableDataCxt** pCxtInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
22,260✔
1503
  if (pCxtInExec) {
22,220✔
1504
    pStmt->bInfo.needParse = false;
3,500✔
1505
    pStmt->bInfo.inExecCache = true;
3,500✔
1506

1507
    pStmt->exec.pCurrBlock = *pCxtInExec;
3,500✔
1508

1509
    if (pStmt->sql.autoCreateTbl) {
3,500✔
1510
      STMT2_DLOG("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
3,000✔
1511
      return TSDB_CODE_SUCCESS;
3,000✔
1512
    }
1513
  }
1514

1515
  if (NULL == pStmt->pCatalog) {
19,220✔
1516
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
8,763✔
1517
    pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
8,803✔
1518
  }
1519

1520
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
19,225✔
1521
    if (pStmt->bInfo.inExecCache) {
11,294✔
1522
      pStmt->bInfo.needParse = false;
×
1523
      STMT2_DLOG("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
×
1524
      return TSDB_CODE_SUCCESS;
×
1525
    }
1526

1527
    STMT2_DLOG("no stmt block cache for tb %s", pStmt->bInfo.tbFName);
11,294✔
1528

1529
    return TSDB_CODE_SUCCESS;
11,294✔
1530
  }
1531

1532
  if (pStmt->sql.autoCreateTbl) {
7,931✔
1533
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
6,131✔
1534
    if (pCache) {
6,131✔
1535
      pStmt->bInfo.needParse = false;
6,131✔
1536
      pStmt->bInfo.tbUid = 0;
6,131✔
1537

1538
      STableDataCxt* pNewBlock = NULL;
6,131✔
1539
      STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid, -1));
6,131✔
1540

1541
      if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
6,131✔
1542
                      POINTER_BYTES)) {
1543
        STMT_ERR_RET(terrno);
×
1544
      }
1545

1546
      pStmt->exec.pCurrBlock = pNewBlock;
6,131✔
1547

1548
      STMT2_DLOG("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
6,131✔
1549

1550
      return TSDB_CODE_SUCCESS;
6,131✔
1551
    }
1552

1553
    STMT_RET(stmtCleanBindInfo(pStmt));
×
1554
  }
1555

1556
  uint64_t uid, suid;
×
1557
  int32_t  vgId;
×
1558
  int8_t   tableType;
×
1559

1560
  STMT_ERR_RET(stmtGetTableMetaAndValidate(pStmt, &uid, &suid, &vgId, &tableType));
1,800✔
1561

1562
  uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
1,700✔
1563

1564
  if (uid == pStmt->bInfo.tbUid) {
1,700✔
1565
    pStmt->bInfo.needParse = false;
×
1566

1567
    STMT2_DLOG("tb %s is current table", pStmt->bInfo.tbFName);
×
1568

1569
    return TSDB_CODE_SUCCESS;
×
1570
  }
1571

1572
  if (pStmt->bInfo.inExecCache) {
1,700✔
1573
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
500✔
1574
    if (NULL == pCache) {
500✔
1575
      STMT2_ELOG("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash",
×
1576
                 pStmt->bInfo.tbFName, uid, cacheUid);
1577

1578
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1579
    }
1580

1581
    pStmt->bInfo.needParse = false;
500✔
1582

1583
    pStmt->bInfo.tbUid = uid;
500✔
1584
    pStmt->bInfo.tbSuid = suid;
500✔
1585
    pStmt->bInfo.tbType = tableType;
500✔
1586
    pStmt->bInfo.boundTags = pCache->boundTags;
500✔
1587
    pStmt->bInfo.tagsCached = true;
500✔
1588

1589
    STMT2_DLOG("tb %s in execBlock list, set to current", pStmt->bInfo.tbFName);
500✔
1590

1591
    return TSDB_CODE_SUCCESS;
500✔
1592
  }
1593

1594
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
1,200✔
1595
  if (pCache) {
1,200✔
1596
    pStmt->bInfo.needParse = false;
1,200✔
1597

1598
    pStmt->bInfo.tbUid = uid;
1,200✔
1599
    pStmt->bInfo.tbSuid = suid;
1,200✔
1600
    pStmt->bInfo.tbType = tableType;
1,200✔
1601
    pStmt->bInfo.boundTags = pCache->boundTags;
1,200✔
1602
    pStmt->bInfo.tagsCached = true;
1,200✔
1603

1604
    STableDataCxt* pNewBlock = NULL;
1,200✔
1605
    STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid, vgId));
1,200✔
1606

1607
    if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
1,200✔
1608
                    POINTER_BYTES)) {
1609
      STMT_ERR_RET(terrno);
×
1610
    }
1611

1612
    pStmt->exec.pCurrBlock = pNewBlock;
1,200✔
1613

1614
    tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);
1,200✔
1615

1616
    return TSDB_CODE_SUCCESS;
1,200✔
1617
  }
1618

1619
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
1620

1621
  return TSDB_CODE_SUCCESS;
×
1622
}
1623

1624
static int32_t stmtResetStmt(STscStmt2* pStmt) {
×
1625
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
×
1626

1627
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1628
  if (NULL == pStmt->sql.pTableCache) {
×
1629
    STMT2_ELOG("fail to allocate memory for pTableCache in stmtResetStmt:%s", tstrerror(terrno));
×
1630
    STMT_ERR_RET(terrno);
×
1631
  }
1632

1633
  pStmt->sql.status = STMT_INIT;
×
1634

1635
  return TSDB_CODE_SUCCESS;
×
1636
}
1637

1638
static void stmtAsyncOutput(STscStmt2* pStmt, void* param) {
1,272,670✔
1639
  SStmtQNode* pParam = (SStmtQNode*)param;
1,272,670✔
1640

1641
  if (pParam->restoreTbCols) {
1,272,670✔
1642
    for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) {
1,272,270✔
1643
      SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i);
866,860✔
1644
      *p = taosArrayInit(20, POINTER_BYTES);
866,860✔
1645
      if (*p == NULL) {
866,728✔
1646
        pStmt->errCode = terrno;
34✔
1647
      }
1648
    }
1649
    atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
405,410✔
1650
    STMT2_TLOG_E("restore pTableCols finished");
405,507✔
1651
  } else {
1652
    int code = qAppendStmt2TableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
867,117✔
1653
                                       &pStmt->sql.siInfo, pParam->pCreateTbReq);
1654
    // taosMemoryFree(pParam->pTbData);
1655
    if (code != TSDB_CODE_SUCCESS) {
867,064✔
1656
      STMT2_ELOG("async append stmt output failed, tbname:%s, err:%s", pParam->tblData.tbName, tstrerror(code));
100✔
1657
      pStmt->errCode = code;
100✔
1658
    }
1659
    (void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
867,064✔
1660
  }
1661
}
1,272,775✔
1662

1663
static void* stmtBindThreadFunc(void* param) {
119,247✔
1664
  setThreadName("stmt2Bind");
119,247✔
1665

1666
  STscStmt2* pStmt = (STscStmt2*)param;
119,247✔
1667
  STMT2_DLOG_E("stmt2 bind thread started");
119,247✔
1668

1669
  while (true) {
1,272,753✔
1670
    SStmtQNode* asyncParam = NULL;
1,392,000✔
1671

1672
    if (!stmtDequeue(pStmt, &asyncParam)) {
1,392,000✔
1673
      if (pStmt->queue.stopQueue && 0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
119,147✔
1674
        STMT2_DLOG_E("queue is empty and stopQueue is set, thread will exit");
119,147✔
1675
        break;
119,119✔
1676
      }
1677
      continue;
×
1678
    }
1679

1680
    stmtAsyncOutput(pStmt, asyncParam);
1,272,634✔
1681
  }
1682

1683
  STMT2_DLOG_E("stmt2 bind thread stopped");
119,131✔
1684
  return NULL;
119,131✔
1685
}
1686

1687
static int32_t stmtStartBindThread(STscStmt2* pStmt) {
119,209✔
1688
  TdThreadAttr thAttr;
100,155✔
1689
  if (taosThreadAttrInit(&thAttr) != 0) {
119,209✔
1690
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1691
  }
1692
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
119,209✔
1693
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1694
  }
1695

1696
  if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) {
119,209✔
1697
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1698
    STMT_ERR_RET(terrno);
×
1699
  }
1700

1701
  pStmt->bindThreadInUse = true;
119,247✔
1702

1703
  (void)taosThreadAttrDestroy(&thAttr);
119,247✔
1704
  return TSDB_CODE_SUCCESS;
119,247✔
1705
}
1706

1707
static int32_t stmtInitQueue(STscStmt2* pStmt) {
119,247✔
1708
  (void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
119,247✔
1709
  (void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
119,247✔
1710
  STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
238,494✔
1711
  pStmt->queue.tail = pStmt->queue.head;
119,247✔
1712

1713
  return TSDB_CODE_SUCCESS;
119,247✔
1714
}
1715

1716
static int32_t stmtIniAsyncBind(STscStmt2* pStmt) {
130,830✔
1717
  (void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL);
130,830✔
1718
  (void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL);
130,830✔
1719
  pStmt->asyncBindParam.asyncBindNum = 0;
130,830✔
1720

1721
  return TSDB_CODE_SUCCESS;
130,830✔
1722
}
1723

1724
static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
119,209✔
1725
  pTblBuf->buffUnit = sizeof(SStmtQNode);
119,209✔
1726
  pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
119,209✔
1727
  pTblBuf->pBufList = taosArrayInit(100, POINTER_BYTES);
119,209✔
1728
  if (NULL == pTblBuf->pBufList) {
119,247✔
1729
    return terrno;
×
1730
  }
1731
  void* buff = taosMemoryMalloc(pTblBuf->buffSize);
119,247✔
1732
  if (NULL == buff) {
119,247✔
1733
    return terrno;
×
1734
  }
1735

1736
  if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
238,494✔
1737
    return terrno;
×
1738
  }
1739

1740
  pTblBuf->pCurBuff = buff;
119,247✔
1741
  pTblBuf->buffIdx = 1;
119,247✔
1742
  pTblBuf->buffOffset = 0;
119,247✔
1743

1744
  return TSDB_CODE_SUCCESS;
119,247✔
1745
}
1746

1747
TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
128,066✔
1748
  STscObj*   pObj = (STscObj*)taos;
128,066✔
1749
  STscStmt2* pStmt = NULL;
128,066✔
1750
  int32_t    code = 0;
128,066✔
1751

1752
  pStmt = taosMemoryCalloc(1, sizeof(STscStmt2));
128,066✔
1753
  if (NULL == pStmt) {
128,066✔
1754
    STMT2_ELOG_E("fail to allocate memory for pStmt");
×
1755
    return NULL;
×
1756
  }
1757

1758
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
128,066✔
1759
  if (NULL == pStmt->sql.pTableCache) {
128,104✔
1760
    STMT2_ELOG("fail to allocate memory for pTableCache in stmtInit2:%s", tstrerror(terrno));
×
1761
    taosMemoryFree(pStmt);
×
1762
    return NULL;
×
1763
  }
1764

1765
  pStmt->taos = pObj;
128,104✔
1766
  if (taos->db[0] != '\0') {
128,104✔
1767
    pStmt->db = taosStrdup(taos->db);
122,444✔
1768
  }
1769
  pStmt->bInfo.needParse = true;
128,066✔
1770
  pStmt->sql.status = STMT_INIT;
128,066✔
1771
  pStmt->errCode = TSDB_CODE_SUCCESS;
128,066✔
1772

1773
  if (NULL != pOptions) {
128,066✔
1774
    (void)memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
124,594✔
1775
    if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) {
124,594✔
1776
      pStmt->stbInterlaceMode = true;
116,483✔
1777
    }
1778

1779
    pStmt->reqid = pOptions->reqid;
124,594✔
1780
  }
1781

1782
  if (pStmt->stbInterlaceMode) {
128,066✔
1783
    pStmt->sql.siInfo.transport = taos->pAppInfo->pTransporter;
116,483✔
1784
    pStmt->sql.siInfo.acctId = taos->acctId;
116,483✔
1785
    pStmt->sql.siInfo.dbname = taos->db;
116,483✔
1786
    pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
116,483✔
1787

1788
    pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
116,521✔
1789
    if (NULL == pStmt->sql.siInfo.pTableHash) {
116,521✔
1790
      STMT2_ELOG("fail to allocate memory for pTableHash:%s", tstrerror(terrno));
×
1791
      (void)stmtClose2(pStmt);
×
1792
      return NULL;
×
1793
    }
1794

1795
    pStmt->sql.siInfo.pTableRowDataHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
116,263✔
1796
    if (NULL == pStmt->sql.siInfo.pTableRowDataHash) {
116,253✔
1797
      STMT2_ELOG("fail to allocate memory for pTableRowDataHash:%s", tstrerror(terrno));
×
1798
      (void)stmtClose2(pStmt);
×
1799
      return NULL;
×
1800
    }
1801

1802
    pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
116,455✔
1803
    if (NULL == pStmt->sql.siInfo.pTableCols) {
116,483✔
1804
      STMT2_ELOG("fail to allocate memory for pTableCols:%s", tstrerror(terrno));
×
1805
      (void)stmtClose2(pStmt);
×
1806
      return NULL;
×
1807
    }
1808

1809
    code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
116,253✔
1810
    if (TSDB_CODE_SUCCESS == code) {
116,521✔
1811
      code = stmtInitQueue(pStmt);
116,521✔
1812
    }
1813
    if (TSDB_CODE_SUCCESS == code) {
116,521✔
1814
      code = stmtStartBindThread(pStmt);
116,521✔
1815
    }
1816
    if (TSDB_CODE_SUCCESS != code) {
116,521✔
1817
      terrno = code;
×
1818
      STMT2_ELOG("fail to init stmt2 bind thread:%s", tstrerror(code));
×
1819
      (void)stmtClose2(pStmt);
×
1820
      return NULL;
×
1821
    }
1822
  }
1823

1824
  pStmt->sql.siInfo.tableColsReady = true;
128,104✔
1825
  if (pStmt->options.asyncExecFn) {
128,104✔
1826
    if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) {
1,300✔
1827
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1828
      STMT2_ELOG("fail to init asyncExecSem:%s", tstrerror(terrno));
×
1829
      (void)stmtClose2(pStmt);
×
1830
      return NULL;
×
1831
    }
1832
  }
1833
  code = stmtIniAsyncBind(pStmt);
128,104✔
1834
  if (TSDB_CODE_SUCCESS != code) {
128,104✔
1835
    terrno = code;
×
1836
    STMT2_ELOG("fail to start init asyncExecSem:%s", tstrerror(code));
×
1837

1838
    (void)stmtClose2(pStmt);
×
1839
    return NULL;
×
1840
  }
1841

1842
  pStmt->execSemWaited = false;
128,104✔
1843

1844
  // STMT_LOG_SEQ(STMT_INIT);
1845

1846
  STMT2_DLOG("stmt2 initialize finished, seqId:%d, db:%s, interlaceMode:%d, asyncExec:%d", pStmt->seqId, pStmt->db,
128,104✔
1847
             pStmt->stbInterlaceMode, pStmt->options.asyncExecFn != NULL);
1848

1849
  return pStmt;
128,104✔
1850
}
1851

1852
static int stmtSetDbName2(TAOS_STMT2* stmt, const char* dbName) {
118,098✔
1853
  STscStmt2* pStmt = (STscStmt2*)stmt;
118,098✔
1854
  if (dbName == NULL || dbName[0] == '\0') {
118,098✔
UNCOV
1855
    STMT2_ELOG_E("dbname in sql is illegal");
×
1856
    return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
×
1857
  }
1858

1859
  STMT2_DLOG("dbname is specified in sql:%s", dbName);
118,098✔
1860
  if (pStmt->db == NULL || pStmt->db[0] == '\0') {
118,098✔
1861
    taosMemoryFreeClear(pStmt->db);
5,030✔
1862
    STMT2_DLOG("dbname:%s is by sql, not by taosconnect", dbName);
5,060✔
1863
    pStmt->db = taosStrdup(dbName);
5,060✔
1864
    (void)strdequote(pStmt->db);
5,060✔
1865
  }
1866
  STMT_ERR_RET(stmtCreateRequest(pStmt));
118,068✔
1867

1868
  // The SQL statement specifies a database name, overriding the previously specified database
1869
  taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
118,098✔
1870
  pStmt->exec.pRequest->pDb = taosStrdup(dbName);
118,075✔
1871
  (void)strdequote(pStmt->exec.pRequest->pDb);
118,075✔
1872
  if (pStmt->exec.pRequest->pDb == NULL) {
118,068✔
1873
    return terrno;
×
1874
  }
1875
  if (pStmt->sql.stbInterlaceMode) {
118,068✔
1876
    pStmt->sql.siInfo.dbname = pStmt->exec.pRequest->pDb;
112,138✔
1877
  }
1878
  return TSDB_CODE_SUCCESS;
118,075✔
1879
}
1880
static int32_t stmtResetStbInterlaceCache(STscStmt2* pStmt) {
2,726✔
1881
  int32_t code = TSDB_CODE_SUCCESS;
2,726✔
1882

1883
  pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
2,726✔
1884
  if (NULL == pStmt->sql.siInfo.pTableHash) {
2,726✔
1885
    return terrno;
×
1886
  }
1887

1888
  pStmt->sql.siInfo.pTableRowDataHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
2,726✔
1889
  if (NULL == pStmt->sql.siInfo.pTableRowDataHash) {
2,726✔
1890
    return terrno;
×
1891
  }
1892

1893
  pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
2,726✔
1894
  if (NULL == pStmt->sql.siInfo.pTableCols) {
2,726✔
1895
    return terrno;
×
1896
  }
1897

1898
  code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
2,726✔
1899

1900
  if (TSDB_CODE_SUCCESS == code) {
2,726✔
1901
    code = stmtInitQueue(pStmt);
2,726✔
1902
    pStmt->queue.stopQueue = false;
2,726✔
1903
  }
1904
  if (TSDB_CODE_SUCCESS == code) {
2,726✔
1905
    code = stmtStartBindThread(pStmt);
2,726✔
1906
  }
1907
  if (TSDB_CODE_SUCCESS != code) {
2,726✔
1908
    return code;
×
1909
  }
1910

1911
  return TSDB_CODE_SUCCESS;
2,726✔
1912
}
1913

1914
static int32_t stmtDeepReset(STscStmt2* pStmt) {
3,526✔
1915
  // Save state that needs to be preserved
1916
  char*             db = pStmt->db;
3,526✔
1917
  TAOS_STMT2_OPTION options = pStmt->options;
3,526✔
1918
  uint32_t          reqid = pStmt->reqid;
3,526✔
1919
  bool              stbInterlaceMode = pStmt->stbInterlaceMode;
3,526✔
1920

1921
  pStmt->errCode = 0;
3,526✔
1922

1923
  // Wait for async execution to complete
1924
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
3,526✔
1925
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
200✔
1926
      STMT2_ELOG_E("bind param wait asyncExecSem failed");
×
1927
    }
1928
    pStmt->execSemWaited = true;
200✔
1929
  }
1930

1931
  // Stop bind thread if in use (similar to stmtClose2)
1932
  if (stbInterlaceMode && pStmt->bindThreadInUse) {
3,526✔
1933
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
2,726✔
1934
      taosUsleep(1);
×
1935
    }
1936
    (void)taosThreadMutexLock(&pStmt->queue.mutex);
2,726✔
1937
    pStmt->queue.stopQueue = true;
2,726✔
1938
    (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
2,726✔
1939
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
2,726✔
1940

1941
    (void)taosThreadJoin(pStmt->bindThread, NULL);
2,726✔
1942
    pStmt->bindThreadInUse = false;
2,726✔
1943
    pStmt->queue.head = NULL;
2,726✔
1944
    pStmt->queue.tail = NULL;
2,726✔
1945
    pStmt->queue.qRemainNum = 0;
2,726✔
1946

1947
    (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
2,726✔
1948
    (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
2,726✔
1949
  }
1950

1951
  // Clean all SQL and execution info (stmtCleanSQLInfo already handles most cleanup)
1952
  pStmt->bInfo.boundColsCached = false;
3,526✔
1953
  if (stbInterlaceMode) {
3,526✔
1954
    pStmt->bInfo.tagsCached = false;
2,726✔
1955
  }
1956
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
3,526✔
1957

1958
  // Reinitialize resources (similar to stmtInit2)
1959
  if (stbInterlaceMode) {
3,526✔
1960
    pStmt->sql.siInfo.transport = pStmt->taos->pAppInfo->pTransporter;
2,726✔
1961
    pStmt->sql.siInfo.acctId = pStmt->taos->acctId;
2,726✔
1962
    pStmt->sql.siInfo.dbname = pStmt->taos->db;
2,726✔
1963
    pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
2,726✔
1964

1965
    if (NULL == pStmt->pCatalog) {
2,726✔
1966
      STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
300✔
1967
    }
1968
    pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
2,726✔
1969

1970
    STMT_ERR_RET(stmtResetStbInterlaceCache(pStmt));
2,726✔
1971

1972
    int32_t code = stmtIniAsyncBind(pStmt);
2,726✔
1973
    if (TSDB_CODE_SUCCESS != code) {
2,726✔
1974
      STMT2_ELOG("fail to reinit async bind in stmtDeepReset:%s", tstrerror(code));
×
1975
      return code;
×
1976
    }
1977
  }
1978

1979
  // Restore preserved state
1980
  pStmt->db = db;
3,526✔
1981
  pStmt->options = options;
3,526✔
1982
  pStmt->reqid = reqid;
3,526✔
1983
  pStmt->stbInterlaceMode = stbInterlaceMode;
3,526✔
1984

1985
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
3,526✔
1986
  if (NULL == pStmt->sql.pTableCache) {
3,526✔
1987
    STMT2_ELOG("fail to allocate memory for pTableCache in stmtDeepReset:%s", tstrerror(terrno));
×
1988
    return terrno;
×
1989
  }
1990

1991
  pStmt->bInfo.needParse = true;
3,526✔
1992
  pStmt->sql.status = STMT_INIT;
3,526✔
1993
  pStmt->sql.siInfo.tableColsReady = true;
3,526✔
1994

1995
  return TSDB_CODE_SUCCESS;
3,526✔
1996
}
1997

1998
int stmtPrepare2(TAOS_STMT2* stmt, const char* sql, unsigned long length) {
131,388✔
1999
  STscStmt2* pStmt = (STscStmt2*)stmt;
131,388✔
2000
  int32_t    code = 0;
131,388✔
2001

2002
  STMT2_DLOG("start to prepare with sql:%s", sql);
131,388✔
2003

2004
  if (stmt == NULL || sql == NULL) {
131,388✔
UNCOV
2005
    STMT2_ELOG_E("stmt or sql is NULL");
×
2006
    return TSDB_CODE_INVALID_PARA;
×
2007
  }
2008

2009
  if (pStmt->sql.status >= STMT_PREPARE) {
131,430✔
2010
    STMT2_DLOG("stmt status is %d, need to reset stmt2 cache before prepare", pStmt->sql.status);
3,526✔
2011
    STMT_ERR_RET(stmtDeepReset(pStmt));
3,526✔
2012
  }
2013

2014
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
131,430✔
2015
    STMT2_ELOG("errCode is not success before, ErrCode: 0x%x, errorsyt: %s\n. ", pStmt->errCode,
100✔
2016
               tstrerror(pStmt->errCode));
2017
    return pStmt->errCode;
100✔
2018
  }
2019

2020
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));
131,330✔
2021

2022
  if (length <= 0) {
131,330✔
2023
    length = strlen(sql);
125,549✔
2024
  }
2025
  pStmt->sql.sqlStr = taosStrndup(sql, length);
131,330✔
2026
  if (!pStmt->sql.sqlStr) {
131,330✔
2027
    STMT2_ELOG("fail to allocate memory for sqlStr:%s", tstrerror(terrno));
×
2028
    STMT_ERR_RET(terrno);
×
2029
  }
2030
  pStmt->sql.sqlLen = length;
131,330✔
2031
  STMT_ERR_RET(stmtCreateRequest(pStmt));
131,330✔
2032

2033
  if (stmt2IsInsert(pStmt)) {
131,300✔
2034
    pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode;
124,727✔
2035
    char* dbName = NULL;
124,727✔
2036
    if (qParseDbName(sql, length, &dbName)) {
124,727✔
2037
      STMT_ERR_RET(stmtSetDbName2(stmt, dbName));
118,098✔
2038
      taosMemoryFreeClear(dbName);
118,068✔
2039
    } else if (pStmt->db != NULL && pStmt->db[0] != '\0') {
6,629✔
2040
      taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
6,529✔
2041
      pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
6,529✔
2042
      if (pStmt->exec.pRequest->pDb == NULL) {
6,529✔
2043
        STMT_ERR_RET(terrno);
×
2044
      }
2045
      (void)strdequote(pStmt->exec.pRequest->pDb);
6,529✔
2046

2047
      if (pStmt->sql.stbInterlaceMode) {
6,529✔
2048
        pStmt->sql.siInfo.dbname = pStmt->exec.pRequest->pDb;
4,009✔
2049
      }
2050
    }
2051

2052
  } else if (stmt2IsSelect(pStmt)) {
6,603✔
2053
    pStmt->sql.stbInterlaceMode = false;
6,303✔
2054
    STMT_ERR_RET(stmtParseSql(pStmt));
6,303✔
2055
  } else {
2056
    return stmtBuildErrorMsgWithCode(pStmt, "stmt only support 'SELECT' or 'INSERT'", TSDB_CODE_PAR_SYNTAX_ERROR);
300✔
2057
  }
2058
  return TSDB_CODE_SUCCESS;
130,518✔
2059
}
2060

2061
static int32_t stmtInitStbInterlaceTableInfo(STscStmt2* pStmt) {
7,309✔
2062
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
7,309✔
2063
  if (!pSrc) {
7,309✔
2064
    return terrno;
×
2065
  }
2066
  STableDataCxt* pDst = NULL;
7,309✔
2067

2068
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
7,309✔
2069
  pStmt->sql.siInfo.pDataCtx = pDst;
7,309✔
2070

2071
  SArray* pTblCols = NULL;
7,309✔
2072
  for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) {
7,269,753✔
2073
    pTblCols = taosArrayInit(20, POINTER_BYTES);
7,262,444✔
2074
    if (NULL == pTblCols) {
7,237,424✔
2075
      return terrno;
×
2076
    }
2077

2078
    if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
14,495,727✔
2079
      return terrno;
×
2080
    }
2081
  }
2082

2083
  pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
7,309✔
2084

2085
  STMT2_TLOG("init stb interlace table info, tbName:%s, pDataCtx:%p, boundTags:%p", pStmt->bInfo.tbFName,
7,309✔
2086
             pStmt->sql.siInfo.pDataCtx, pStmt->sql.siInfo.boundTags);
2087

2088
  return TSDB_CODE_SUCCESS;
7,309✔
2089
}
2090

2091
bool stmt2IsInsert(TAOS_STMT2* stmt) {
1,023,552✔
2092
  STscStmt2* pStmt = (STscStmt2*)stmt;
1,023,552✔
2093
  if (pStmt->sql.type) {
1,023,552✔
2094
    return (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
875,953✔
2095
  }
2096

2097
  return qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
147,636✔
2098
}
2099

2100
bool stmt2IsSelect(TAOS_STMT2* stmt) {
124,053✔
2101
  STscStmt2* pStmt = (STscStmt2*)stmt;
124,053✔
2102

2103
  if (pStmt->sql.type) {
124,053✔
2104
    return STMT_TYPE_QUERY == pStmt->sql.type;
×
2105
  }
2106
  return qIsSelectFromSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
124,023✔
2107
}
2108

2109
int stmtSetTbName2(TAOS_STMT2* stmt, const char* tbName) {
882,503✔
2110
  STscStmt2* pStmt = (STscStmt2*)stmt;
882,503✔
2111

2112
  int64_t startUs = taosGetTimestampUs();
882,546✔
2113

2114
  STMT2_TLOG("start to set tbName:%s", tbName);
882,546✔
2115

2116
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
882,641✔
2117
    return pStmt->errCode;
×
2118
  }
2119

2120
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
882,713✔
2121

2122
  int32_t insert = 0;
882,422✔
2123
  if (!stmt2IsInsert(stmt)) {
882,422✔
2124
    STMT2_ELOG_E("set tb name not available for no-insert statement");
×
2125
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2126
  }
2127
  // process tbname
2128
  STMT_ERR_RET(stmtCreateRequest(pStmt));
882,677✔
2129

2130
  STMT_ERR_RET(qCreateSName2(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
882,550✔
2131
                             pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
2132
  STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
881,620✔
2133
  tstrncpy(pStmt->bInfo.tbName, (char*)tNameGetTableName(&pStmt->bInfo.sname), TSDB_TABLE_NAME_LEN);
882,146✔
2134

2135
  if (!pStmt->sql.stbInterlaceMode || NULL == pStmt->sql.siInfo.pDataCtx) {
882,072✔
2136
    STMT_ERR_RET(stmtGetFromCache(pStmt));
22,689✔
2137

2138
    if (pStmt->bInfo.needParse) {
22,160✔
2139
      STMT_ERR_RET(stmtParseSql(pStmt));
11,294✔
2140
      if (!pStmt->sql.autoCreateTbl) {
11,329✔
2141
        uint64_t uid, suid;
1,851✔
2142
        int32_t  vgId;
1,851✔
2143
        int8_t   tableType;
1,851✔
2144

2145
        int32_t code = stmtGetTableMetaAndValidate(pStmt, &uid, &suid, &vgId, &tableType);
4,109✔
2146
        if (code != TSDB_CODE_SUCCESS) {
4,109✔
2147
          return code;
400✔
2148
        }
2149
      }
2150
    }
2151

2152
  } else {
2153
    pStmt->exec.pRequest->requestId++;
859,369✔
2154
    pStmt->bInfo.needParse = false;
859,468✔
2155
  }
2156

2157
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
881,073✔
2158
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
7,309✔
2159
  }
2160

2161
  int64_t startUs2 = taosGetTimestampUs();
881,437✔
2162
  pStmt->stat.setTbNameUs += startUs2 - startUs;
881,437✔
2163

2164
  return TSDB_CODE_SUCCESS;
881,520✔
2165
}
2166

2167
int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags, SVCreateTbReq** pCreateTbReq) {
465,332✔
2168
  STscStmt2* pStmt = (STscStmt2*)stmt;
465,332✔
2169

2170
  STMT2_TLOG_E("start to set tbTags");
465,332✔
2171
  if (qDebugFlag & DEBUG_TRACE) {
465,332✔
2172
    (void)stmtPrintBindv(stmt, tags, -1, true);
×
2173
  }
2174

2175
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
465,255✔
2176
    return pStmt->errCode;
×
2177
  }
2178

2179
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
465,255✔
2180

2181
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
465,299✔
2182
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
2183
    pStmt->bInfo.needParse = false;
×
2184
  }
2185
  STMT_ERR_RET(stmtCreateRequest(pStmt));
465,299✔
2186

2187
  if (pStmt->bInfo.needParse) {
465,288✔
2188
    STMT_ERR_RET(stmtParseSql(pStmt));
200✔
2189
  }
2190
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
465,288✔
2191
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
2192
  }
2193

2194
  SBoundColInfo* tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
465,321✔
2195

2196
  STableDataCxt** pDataBlock = NULL;
465,321✔
2197
  if (pStmt->exec.pCurrBlock) {
465,321✔
2198
    pDataBlock = &pStmt->exec.pCurrBlock;
459,661✔
2199
  } else {
2200
    pDataBlock =
2201
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
5,660✔
2202
    if (NULL == pDataBlock) {
5,660✔
2203
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2204
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
2205
    }
2206
    if (pStmt->sql.stbInterlaceMode && (*pDataBlock)->pData->pCreateTbReq) {
5,660✔
2207
      tdDestroySVCreateTbReq((*pDataBlock)->pData->pCreateTbReq);
200✔
2208
      taosMemoryFreeClear((*pDataBlock)->pData->pCreateTbReq);
200✔
2209
      (*pDataBlock)->pData->pCreateTbReq = NULL;
200✔
2210
    }
2211
  }
2212
  if (pStmt->bInfo.inExecCache && !pStmt->sql.autoCreateTbl) {
465,321✔
2213
    return TSDB_CODE_SUCCESS;
×
2214
  }
2215

2216
  STMT2_TLOG_E("start to bind stmt tag values");
465,321✔
2217

2218
  void* boundTags = NULL;
465,222✔
2219
  if (pStmt->sql.stbInterlaceMode) {
465,222✔
2220
    boundTags = pStmt->sql.siInfo.boundTags;
453,171✔
2221
    *pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
453,171✔
2222
    if (NULL == pCreateTbReq) {
453,303✔
2223
      return terrno;
×
2224
    }
2225
    int32_t vgId = -1;
453,303✔
2226
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
453,303✔
2227
    (*pCreateTbReq)->uid = vgId;
453,325✔
2228
  } else {
2229
    boundTags = pStmt->bInfo.boundTags;
12,051✔
2230
  }
2231

2232
  STMT_ERR_RET(qBindStmtTagsValue2(*pDataBlock, boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
465,376✔
2233
                                   pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
2234
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt, *pCreateTbReq));
2235

2236
  return TSDB_CODE_SUCCESS;
465,188✔
2237
}
2238

2239
int stmtCheckTags2(TAOS_STMT2* stmt, SVCreateTbReq** pCreateTbReq) {
98,836✔
2240
  STscStmt2* pStmt = (STscStmt2*)stmt;
98,836✔
2241

2242
  STMT2_TLOG_E("start to clone createTbRequest for fixed tags");
98,836✔
2243

2244
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
98,968✔
2245
    return pStmt->errCode;
×
2246
  }
2247

2248
  if (!pStmt->sql.stbInterlaceMode) {
98,968✔
2249
    return TSDB_CODE_SUCCESS;
×
2250
  }
2251

2252
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
98,968✔
2253

2254
  if (pStmt->sql.fixValueTags) {
98,932✔
2255
    STMT2_TLOG_E("tags are fixed, use one createTbReq");
97,172✔
2256
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
97,172✔
2257
    if ((*pCreateTbReq)->name) {
97,160✔
2258
      taosMemoryFree((*pCreateTbReq)->name);
97,196✔
2259
    }
2260
    (*pCreateTbReq)->name = taosStrdup(pStmt->bInfo.tbName);
97,100✔
2261
    int32_t vgId = -1;
97,196✔
2262
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
97,196✔
2263
    (*pCreateTbReq)->uid = vgId;
97,256✔
2264
    return TSDB_CODE_SUCCESS;
97,256✔
2265
  }
2266

2267
  STMT_ERR_RET(stmtCreateRequest(pStmt));
1,760✔
2268
  if (pStmt->bInfo.needParse) {
1,760✔
2269
    STMT_ERR_RET(stmtParseSql(pStmt));
×
2270
    if (!pStmt->sql.autoCreateTbl) {
×
2271
      STMT2_WLOG_E("don't need to create table, will not check tags");
×
2272
      return TSDB_CODE_SUCCESS;
×
2273
    }
2274
  }
2275

2276
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
1,760✔
2277
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
2278
  }
2279

2280
  STableDataCxt** pDataBlock = NULL;
1,760✔
2281
  if (pStmt->exec.pCurrBlock) {
1,760✔
2282
    pDataBlock = &pStmt->exec.pCurrBlock;
×
2283
  } else {
2284
    pDataBlock =
2285
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
1,760✔
2286
    if (NULL == pDataBlock) {
1,760✔
2287
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2288
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
2289
    }
2290
  }
2291

2292
  if (!((*pDataBlock)->pData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE)) {
1,760✔
2293
    STMT2_DLOG_E("don't need to create, will not check tags");
×
2294
    return TSDB_CODE_SUCCESS;
×
2295
  }
2296

2297

2298
  if ((*pDataBlock)->pData->pCreateTbReq) {
1,760✔
2299
    STMT2_TLOG_E("tags are fixed, set createTbReq first time");
1,760✔
2300
    pStmt->sql.fixValueTags = true;
1,760✔
2301
    STMT_ERR_RET(cloneSVreateTbReq((*pDataBlock)->pData->pCreateTbReq, &pStmt->sql.fixValueTbReq));
1,760✔
2302
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
1,760✔
2303
    (*pCreateTbReq)->uid = (*pDataBlock)->pMeta->vgId;
1,760✔
2304

2305
    // destroy the createTbReq in the data block
2306
    tdDestroySVCreateTbReq((*pDataBlock)->pData->pCreateTbReq);
1,760✔
2307
    taosMemoryFreeClear((*pDataBlock)->pData->pCreateTbReq);
1,760✔
2308
    (*pDataBlock)->pData->pCreateTbReq = NULL;
1,760✔
2309
  }
2310

2311
  return TSDB_CODE_SUCCESS;
1,760✔
2312
}
2313

2314
static int stmtFetchColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
×
2315
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
2316
    return pStmt->errCode;
×
2317
  }
2318

2319
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
2320
    tscError("invalid operation to get query column fileds");
×
2321
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2322
  }
2323

2324
  STableDataCxt** pDataBlock = NULL;
×
2325

2326
  if (pStmt->sql.stbInterlaceMode) {
×
2327
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
×
2328
  } else {
2329
    pDataBlock =
2330
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
2331
    if (NULL == pDataBlock) {
×
2332
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2333
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
2334
    }
2335
  }
2336

2337
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
×
2338

2339
  return TSDB_CODE_SUCCESS;
×
2340
}
2341

2342
static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_ALL** fields) {
5,417✔
2343
  int32_t code = 0;
5,417✔
2344
  int32_t preCode = pStmt->errCode;
5,417✔
2345

2346
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
5,417✔
2347
    return pStmt->errCode;
×
2348
  }
2349

2350
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
5,417✔
2351
    STMT2_ELOG_E("stmtFetchStbColFields2 only for insert statement");
×
2352
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2353
  }
2354

2355
  STableDataCxt** pDataBlock = NULL;
5,417✔
2356
  bool            cleanStb = false;
5,417✔
2357

2358
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
5,417✔
2359
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
600✔
2360
  } else {
2361
    cleanStb = true;
4,817✔
2362
    pDataBlock =
2363
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
4,817✔
2364
  }
2365

2366
  if (NULL == pDataBlock || NULL == *pDataBlock) {
5,417✔
2367
    STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2368
    STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
2369
  }
2370

2371
  pStmt->sql.placeholderOfTags = 0;
5,417✔
2372
  pStmt->sql.placeholderOfCols = 0;
5,417✔
2373
  int32_t totalNum = 0;
5,417✔
2374
  STMT_ERRI_JRET(qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.fixedValueCols,
5,417✔
2375
                                        pStmt->bInfo.tbNameFlag, &totalNum, fields, &pStmt->sql.placeholderOfTags,
2376
                                        &pStmt->sql.placeholderOfCols));
2377

2378
  if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE && cleanStb) {
5,417✔
2379
    taosMemoryFreeClear((*pDataBlock)->boundColsInfo.pColIndex);
3,817✔
2380
    qDestroyStmtDataBlock(*pDataBlock);
3,817✔
2381
    *pDataBlock = NULL;
3,817✔
2382
    if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
3,817✔
2383
      STMT2_ELOG("fail to remove remove stb:%s exec blockHash", pStmt->bInfo.tbFName);
×
2384
      STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
2385
    }
2386
    pStmt->sql.autoCreateTbl = false;
3,817✔
2387
    pStmt->bInfo.tagsCached = false;
3,817✔
2388
    pStmt->bInfo.sname = (SName){0};
3,817✔
2389
    STMT_ERR_RET(stmtCleanBindInfo(pStmt));
3,817✔
2390
  }
2391

2392
  if (fieldNum != NULL) {
5,417✔
2393
    *fieldNum = totalNum;
5,417✔
2394
  }
2395

2396
  STMT2_DLOG("get insert fields totalNum:%d, tagsNum:%d, colsNum:%d", totalNum, pStmt->sql.placeholderOfTags,
5,417✔
2397
             pStmt->sql.placeholderOfCols);
2398

2399
_return:
5,417✔
2400

2401
  pStmt->errCode = preCode;
5,417✔
2402

2403
  return code;
5,417✔
2404
}
2405
/*
2406
SArray* stmtGetFreeCol(STscStmt2* pStmt, int32_t* idx) {
2407
  while (true) {
2408
    if (pStmt->exec.smInfo.pColIdx >= STMT_COL_BUF_SIZE) {
2409
      pStmt->exec.smInfo.pColIdx = 0;
2410
    }
2411

2412
    if ((pStmt->exec.smInfo.pColIdx + 1) == atomic_load_32(&pStmt->exec.smInfo.pColFreeIdx)) {
2413
      taosUsleep(1);
2414
      continue;
2415
    }
2416

2417
    *idx = pStmt->exec.smInfo.pColIdx;
2418
    return pStmt->exec.smInfo.pCols[pStmt->exec.smInfo.pColIdx++];
2419
  }
2420
}
2421
*/
2422
static int32_t stmtAppendTablePostHandle(STscStmt2* pStmt, SStmtQNode* param) {
866,979✔
2423
  if (NULL == pStmt->sql.siInfo.pVgroupHash) {
866,979✔
2424
    pStmt->sql.siInfo.pVgroupHash =
405,794✔
2425
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
405,806✔
2426
  }
2427
  if (NULL == pStmt->sql.siInfo.pVgroupList) {
866,990✔
2428
    pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES);
405,829✔
2429
  }
2430

2431
  if (NULL == pStmt->sql.siInfo.pRequest) {
867,057✔
2432
    STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false,
7,109✔
2433
                              (SRequestObj**)&pStmt->sql.siInfo.pRequest, pStmt->reqid));
2434

2435
    if (pStmt->reqid != 0) {
7,069✔
2436
      pStmt->reqid++;
8✔
2437
    }
2438
    pStmt->exec.pRequest->syncQuery = true;
7,034✔
2439

2440
    pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId;
7,074✔
2441
    pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self;
7,109✔
2442
  }
2443

2444
  if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] &&
867,019✔
2445
      0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) {
20,806✔
2446
    pStmt->sql.siInfo.tbFromHash = true;
2,783✔
2447
  }
2448

2449
  if (0 == pStmt->sql.siInfo.firstName[0]) {
867,057✔
2450
    tstrncpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
7,074✔
2451
  }
2452

2453
  param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash;
866,982✔
2454
  param->next = NULL;
866,959✔
2455

2456
  (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
867,024✔
2457

2458
  if (pStmt->queue.stopQueue) {
867,258✔
2459
    STMT2_ELOG_E("bind thread already stopped, cannot enqueue");
×
2460
    return TSDB_CODE_TSC_STMT_API_ERROR;
×
2461
  }
2462
  stmtEnqueue(pStmt, param);
867,188✔
2463

2464
  return TSDB_CODE_SUCCESS;
867,104✔
2465
}
2466

2467
static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt2* pStmt, SArray** pTableCols) {
2468
  while (true) {
2469
    if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) {
866,902✔
2470
      *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++);
867,112✔
2471
      break;
867,087✔
2472
    } else {
2473
      SArray* pTblCols = NULL;
×
2474
      for (int32_t i = 0; i < 100; i++) {
×
2475
        pTblCols = taosArrayInit(20, POINTER_BYTES);
×
2476
        if (NULL == pTblCols) {
×
2477
          return terrno;
×
2478
        }
2479

2480
        if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
×
2481
          return terrno;
×
2482
        }
2483
      }
2484
    }
2485
  }
2486

2487
  return TSDB_CODE_SUCCESS;
867,087✔
2488
}
2489

2490
static int32_t stmtCacheBlock(STscStmt2* pStmt) {
49,100,161✔
2491
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
49,100,161✔
2492
    return TSDB_CODE_SUCCESS;
49,693,712✔
2493
  }
2494

2495
  uint64_t uid = pStmt->bInfo.tbUid;
14,000✔
2496
  uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
14,000✔
2497

2498
  if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
14,000✔
2499
    STMT2_TLOG("table %s already cached, no need to cache again", pStmt->bInfo.tbFName);
10,731✔
2500
    return TSDB_CODE_SUCCESS;
10,731✔
2501
  }
2502

2503
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
3,420✔
2504
  if (!pSrc) {
3,420✔
2505
    STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2506
    return terrno;
×
2507
  }
2508
  STableDataCxt* pDst = NULL;
3,420✔
2509

2510
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
3,420✔
2511

2512
  SStmtTableCache cache = {
3,420✔
2513
      .pDataCtx = pDst,
2514
      .boundTags = pStmt->bInfo.boundTags,
3,420✔
2515
  };
2516

2517
  if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
3,420✔
2518
    STMT2_ELOG("fail to put table cache:%s", tstrerror(terrno));
×
2519
    return terrno;
×
2520
  }
2521

2522
  if (pStmt->sql.autoCreateTbl) {
3,420✔
2523
    pStmt->bInfo.tagsCached = true;
2,620✔
2524
  } else {
2525
    pStmt->bInfo.boundTags = NULL;
800✔
2526
  }
2527

2528
  return TSDB_CODE_SUCCESS;
3,420✔
2529
}
2530

2531
static int stmtAddBatch2(TAOS_STMT2* stmt) {
49,944,632✔
2532
  STscStmt2* pStmt = (STscStmt2*)stmt;
49,944,632✔
2533

2534
  int64_t startUs = taosGetTimestampUs();
50,756,834✔
2535

2536
  // STMT2_TLOG_E("start to add batch");
2537

2538
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
50,756,834✔
2539
    return pStmt->errCode;
×
2540
  }
2541

2542
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
50,816,519✔
2543

2544
  if (pStmt->sql.stbInterlaceMode) {
50,525,809✔
2545
    int64_t startUs2 = taosGetTimestampUs();
405,455✔
2546
    pStmt->stat.addBatchUs += startUs2 - startUs;
405,455✔
2547

2548
    pStmt->sql.siInfo.tableColsReady = false;
405,432✔
2549

2550
    SStmtQNode* param = NULL;
405,467✔
2551
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
810,770✔
2552
    param->restoreTbCols = true;
405,298✔
2553
    param->next = NULL;
405,361✔
2554

2555
    if (pStmt->sql.autoCreateTbl) {
405,361✔
2556
      pStmt->bInfo.tagsCached = true;
190,467✔
2557
    }
2558
    pStmt->bInfo.boundColsCached = true;
405,298✔
2559

2560
    if (pStmt->queue.stopQueue) {
405,391✔
2561
      STMT2_ELOG_E("stmt bind thread is stopped,cannot enqueue bind request");
×
2562
      return TSDB_CODE_TSC_STMT_API_ERROR;
×
2563
    }
2564

2565
    stmtEnqueue(pStmt, param);
405,338✔
2566

2567
    return TSDB_CODE_SUCCESS;
405,454✔
2568
  }
2569

2570
  STMT_ERR_RET(stmtCacheBlock(pStmt));
49,866,091✔
2571

2572
  return TSDB_CODE_SUCCESS;
49,564,001✔
2573
}
2574
/*
2575
static int32_t stmtBackupQueryFields(STscStmt2* pStmt) {
2576
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
2577
  pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
2578
  pRes->precision = pStmt->exec.pRequest->body.resInfo.precision;
2579

2580
  int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
2581
  pRes->fields = taosMemoryMalloc(size);
2582
  pRes->userFields = taosMemoryMalloc(size);
2583
  if (NULL == pRes->fields || NULL == pRes->userFields) {
2584
    STMT_ERR_RET(terrno);
2585
  }
2586
  (void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
2587
  (void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
2588

2589
  return TSDB_CODE_SUCCESS;
2590
}
2591

2592
static int32_t stmtRestoreQueryFields(STscStmt2* pStmt) {
2593
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
2594
  int32_t            size = pRes->numOfCols * sizeof(TAOS_FIELD);
2595

2596
  pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols;
2597
  pStmt->exec.pRequest->body.resInfo.precision = pRes->precision;
2598

2599
  if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
2600
    pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size);
2601
    if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
2602
      STMT_ERR_RET(terrno);
2603
    }
2604
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
2605
  }
2606

2607
  if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
2608
    pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size);
2609
    if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
2610
      STMT_ERR_RET(terrno);
2611
    }
2612
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
2613
  }
2614

2615
  return TSDB_CODE_SUCCESS;
2616
}
2617
*/
2618

2619
/**
2620
 * Fetch metadata for query statement after parameter binding.
2621
 * This function collects metadata requirements from the query (after binding),
2622
 * fetches metadata synchronously from catalog, and returns it for parsing.
2623
 *
2624
 * Note: We fetch metadata on every bind because:
2625
 * 1. Parameter values in WHERE conditions (e.g., dataname IN (?,?)) may change
2626
 * 2. Different parameter values may require different vgroup lists for virtual tables
2627
 * 3. Metadata requirements can only be determined after parameters are bound
2628
 *
2629
 * @param pStmt Statement handle
2630
 * @param pCxt Parse context (must have catalog handle initialized)
2631
 * @param pMetaData Output: Fetched metadata (caller responsible for cleanup)
2632
 * @return TSDB_CODE_SUCCESS on success, error code otherwise
2633
 */
2634
// Callback parameter structure for synchronous catalog metadata fetch
2635
typedef struct {
2636
  SMetaData* pRsp;
2637
  int32_t    code;
2638
  tsem_t     sem;
2639
} SCatalogSyncCbParam;
2640

2641
// Callback function for catalogAsyncGetAllMeta to make it synchronous
2642
static void stmtCatalogSyncGetAllMetaCb(SMetaData* pResultMeta, void* param, int32_t code) {
6,791✔
2643
  SCatalogSyncCbParam* pCbParam = (SCatalogSyncCbParam*)param;
6,791✔
2644
  if (TSDB_CODE_SUCCESS == code && pResultMeta) {
6,791✔
2645
    *pCbParam->pRsp = *pResultMeta;
6,791✔
2646
    TAOS_MEMSET(pResultMeta, 0, sizeof(SMetaData));  // Clear to avoid double free
6,791✔
2647
  }
2648
  pCbParam->code = code;
6,791✔
2649
  if (tsem_post(&pCbParam->sem) != 0) {
6,791✔
2650
    tscError("failed to post semaphore");
×
2651
  }
2652
}
6,791✔
2653

2654
static int32_t stmtFetchMetadataForQuery(STscStmt2* pStmt, SParseContext* pCxt, SMetaData* pMetaData) {
6,791✔
2655
  int32_t          code = 0;
6,791✔
2656
  SParseMetaCache  metaCache = {0};
6,791✔
2657
  SCatalogReq      catalogReq = {0};
6,791✔
2658
  SRequestConnInfo conn = {.pTrans = pCxt->pTransporter,
9,675✔
2659
                           .requestId = pCxt->requestId,
6,791✔
2660
                           .requestObjRefId = pCxt->requestRid,
6,791✔
2661
                           .mgmtEps = pCxt->mgmtEpSet};
2662

2663
  TAOS_MEMSET(pMetaData, 0, sizeof(SMetaData));
6,791✔
2664

2665
  code = collectMetaKey(pCxt, pStmt->sql.pQuery, &metaCache);
6,791✔
2666
  if (TSDB_CODE_SUCCESS == code) {
6,791✔
2667
    code = buildCatalogReq(&metaCache, &catalogReq);
6,791✔
2668
  }
2669
  if (TSDB_CODE_SUCCESS == code) {
6,791✔
2670
    SCatalogSyncCbParam cbParam = {.pRsp = pMetaData, .code = TSDB_CODE_SUCCESS};
6,791✔
2671
    if (tsem_init(&cbParam.sem, 0, 0) != 0) {
6,791✔
2672
      code = TSDB_CODE_CTG_INTERNAL_ERROR;
×
2673
    } else {
2674
      code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, stmtCatalogSyncGetAllMetaCb, &cbParam, NULL);
6,791✔
2675
      if (TSDB_CODE_SUCCESS == code) {
6,791✔
2676
        code = tsem_wait(&cbParam.sem);
6,791✔
2677
        if (code != TSDB_CODE_SUCCESS) {
6,791✔
2678
          catalogFreeMetaData(pMetaData);
×
2679
          TAOS_MEMSET(pMetaData, 0, sizeof(SMetaData));
×
2680
        } else {
2681
          code = cbParam.code;
6,791✔
2682
        }
2683
      }
2684

2685
      if (tsem_destroy(&cbParam.sem) != 0) {
6,791✔
2686
        tscError("failed to destroy semaphore");
×
2687
        code = TSDB_CODE_CTG_INTERNAL_ERROR;
×
2688
        catalogFreeMetaData(pMetaData);
×
2689
        TAOS_MEMSET(pMetaData, 0, sizeof(SMetaData));
×
2690
      }
2691
    }
2692
  }
2693

2694
  // metaCache currently holds "reserved/request" structures built by collectMetaKey/buildCatalogReq.
2695
  // It must be destroyed with request=true to release nested table-request hashes.
2696
  destoryParseMetaCache(&metaCache, true);
6,791✔
2697
  destoryCatalogReq(&catalogReq);
6,791✔
2698

2699
  if (TSDB_CODE_SUCCESS != code) {
6,791✔
2700
    catalogFreeMetaData(pMetaData);
×
2701
  }
2702

2703
  return code;
6,791✔
2704
}
2705

2706
int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx, SVCreateTbReq* pCreateTbReq) {
50,550,682✔
2707
  STscStmt2* pStmt = (STscStmt2*)stmt;
50,550,682✔
2708
  int32_t    code = 0;
50,550,682✔
2709

2710
  int64_t startUs = taosGetTimestampUs();
50,753,622✔
2711

2712
  if (qDebugFlag & DEBUG_TRACE) {
50,753,622✔
2713
    (void)stmtPrintBindv(stmt, bind, colIdx, false);
×
2714
  }
2715

2716
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
50,843,212✔
2717
    return pStmt->errCode;
100✔
2718
  }
2719

2720
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
51,170,734✔
2721

2722
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
50,979,949✔
2723
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
2724
    pStmt->bInfo.needParse = false;
×
2725
  }
2726

2727
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
50,535,418✔
2728
    resetRequest(pStmt);
1,312✔
2729
  }
2730

2731
  STMT_ERR_RET(stmtCreateRequest(pStmt));
50,916,231✔
2732
  if (pStmt->bInfo.needParse) {
50,768,013✔
2733
    code = stmtParseSql(pStmt);
107,415✔
2734
    if (code != TSDB_CODE_SUCCESS) {
107,179✔
2735
      goto cleanup_root;
×
2736
    }
2737
  }
2738

2739
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
50,631,615✔
2740
    code = qStmtBindParams2(pStmt->sql.pQuery, bind, colIdx, pStmt->taos->optionInfo.charsetCxt);
6,791✔
2741
    if (code != TSDB_CODE_SUCCESS) {
6,791✔
2742
      goto cleanup_root;
×
2743
    }
2744
    SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
6,791✔
2745
                         .acctId = pStmt->taos->acctId,
6,791✔
2746
                         .minSecLevel = pStmt->taos->minSecLevel,
6,791✔
2747
                         .maxSecLevel = pStmt->taos->maxSecLevel,
6,791✔
2748
                         .db = pStmt->exec.pRequest->pDb,
6,791✔
2749
                         .topicQuery = false,
2750
                         .pSql = pStmt->sql.sqlStr,
6,791✔
2751
                         .sqlLen = pStmt->sql.sqlLen,
6,791✔
2752
                         .pMsg = pStmt->exec.pRequest->msgBuf,
6,791✔
2753
                         .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
2754
                         .pTransporter = pStmt->taos->pAppInfo->pTransporter,
6,791✔
2755
                         .pStmtCb = NULL,
2756
                         .pUser = pStmt->taos->user,
6,791✔
2757
                         .stmtBindVersion = pStmt->exec.pRequest->stmtBindVersion};
6,791✔
2758
    ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
6,791✔
2759
    code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog);
6,791✔
2760
    if (code != TSDB_CODE_SUCCESS) {
6,791✔
2761
      goto cleanup_root;
×
2762
    }
2763

2764
    // Fetch metadata for query(vtable need)
2765
    SMetaData metaData = {0};
6,791✔
2766
    code = stmtFetchMetadataForQuery(pStmt, &ctx, &metaData);
6,791✔
2767
    if (TSDB_CODE_SUCCESS != code) {
6,791✔
2768
      goto cleanup_root;
×
2769
    }
2770

2771
    code = qStmtParseQuerySql(&ctx, pStmt->sql.pQuery, &metaData);
6,791✔
2772
    if (TSDB_CODE_SUCCESS == code) {
6,791✔
2773
      // Copy metaData to pRequest->parseMeta for potential future use
2774
      // Similar to doAsyncQueryFromAnalyse when parseOnly is true
2775
      (void)memcpy(&pStmt->exec.pRequest->parseMeta, &metaData, sizeof(SMetaData));
6,691✔
2776
      (void)memset(&metaData, 0, sizeof(SMetaData));  // Clear to avoid double free
6,691✔
2777
    } else {
2778
      // Clean up metaData on failure - free all arrays
2779
      if (metaData.pVStbRefDbs) {
100✔
2780
        taosArrayDestroy(metaData.pVStbRefDbs);
100✔
2781
        metaData.pVStbRefDbs = NULL;
100✔
2782
      }
2783
      // Note: Other fields in metaData are managed by catalog module if ctgFree is true
2784
      goto cleanup_root;
100✔
2785
    }
2786

2787
    if (pStmt->sql.pQuery->haveResultSet) {
6,691✔
2788
      STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
10,598✔
2789
                                    pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true));
2790
      taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
6,691✔
2791
      taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
6,691✔
2792
      setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
6,691✔
2793
    }
2794

2795
    TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
6,691✔
2796
    TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
6,691✔
2797
    TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
6,691✔
2798

2799
    // if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
2800
    //   STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
2801
    // }
2802

2803
    // STMT_ERR_RET(stmtBackupQueryFields(pStmt));
2804

2805
    return TSDB_CODE_SUCCESS;
6,691✔
2806

2807
  cleanup_root:
100✔
2808
    STMT2_ELOG("parse query statment unexpected failed code:%d, need to clean node", code);
100✔
2809
    if (pStmt->sql.pQuery && pStmt->sql.pQuery->pRoot) {
100✔
2810
      nodesDestroyNode(pStmt->sql.pQuery->pRoot);
100✔
2811
      pStmt->sql.pQuery->pRoot = NULL;
100✔
2812
    }
2813
    STMT_ERR_RET(code);
100✔
2814
  }
2815

2816
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
50,872,590✔
2817
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
2818
  }
2819

2820
  STableDataCxt** pDataBlock = NULL;
50,104,264✔
2821

2822
  if (pStmt->exec.pCurrBlock) {
50,104,264✔
2823
    pDataBlock = &pStmt->exec.pCurrBlock;
49,788,728✔
2824
  } else {
2825
    pDataBlock =
2826
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
118,237✔
2827
    if (NULL == pDataBlock) {
118,467✔
2828
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2829
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
2830
    }
2831
    pStmt->exec.pCurrBlock = *pDataBlock;
118,467✔
2832
    if (pStmt->sql.stbInterlaceMode) {
118,444✔
2833
      taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
7,309✔
2834
      (*pDataBlock)->pData->aCol = NULL;
7,309✔
2835
    }
2836
    if (colIdx < -1) {
118,467✔
2837
      pStmt->sql.bindRowFormat = true;
100✔
2838
      taosArrayDestroy((*pDataBlock)->pData->aCol);
100✔
2839
      (*pDataBlock)->pData->aCol = taosArrayInit(20, POINTER_BYTES);
100✔
2840
    }
2841
  }
2842

2843
  int64_t startUs2 = taosGetTimestampUs();
50,781,118✔
2844
  pStmt->stat.bindDataUs1 += startUs2 - startUs;
50,781,118✔
2845

2846
  SStmtQNode* param = NULL;
50,792,108✔
2847
  if (pStmt->sql.stbInterlaceMode) {
51,038,894✔
2848
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
1,733,642✔
2849
    STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, &param->tblData.aCol));
1,733,966✔
2850
    taosArrayClear(param->tblData.aCol);
867,087✔
2851

2852
    // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES);
2853

2854
    param->restoreTbCols = false;
866,646✔
2855
    param->tblData.isOrdered = true;
866,646✔
2856
    param->tblData.isDuplicateTs = false;
866,620✔
2857
    tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
866,666✔
2858

2859
    param->pCreateTbReq = pCreateTbReq;
866,615✔
2860
  }
2861

2862
  int64_t startUs3 = taosGetTimestampUs();
50,987,107✔
2863
  pStmt->stat.bindDataUs2 += startUs3 - startUs2;
50,987,107✔
2864

2865
  SArray*   pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol;
50,912,187✔
2866
  SBlobSet* pBlob = NULL;
51,336,953✔
2867
  if (colIdx < 0) {
51,341,612✔
2868
    if (pStmt->sql.stbInterlaceMode) {
51,451,665✔
2869
      (*pDataBlock)->pData->flags &= ~SUBMIT_REQ_COLUMN_DATA_FORMAT;
867,273✔
2870
      code = qBindStmtStbColsValue2(*pDataBlock, pCols, pStmt->bInfo.fixedValueCols, bind, pStmt->exec.pRequest->msgBuf,
1,068,765✔
2871
                                    pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
867,391✔
2872
                                    pStmt->taos->optionInfo.charsetCxt, &pBlob);
867,391✔
2873
      param->tblData.isOrdered = (*pDataBlock)->ordered;
867,353✔
2874
      param->tblData.isDuplicateTs = (*pDataBlock)->duplicateTs;
867,402✔
2875
    } else {
2876
      if (colIdx == -1) {
50,278,934✔
2877
        if (pStmt->sql.bindRowFormat) {
50,002,809✔
2878
          STMT2_ELOG_E("can't mix bind row format and bind column format");
100✔
2879
          STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
100✔
2880
        }
2881
        code = qBindStmtColsValue2(*pDataBlock, pCols, pStmt->bInfo.fixedValueCols, bind, pStmt->exec.pRequest->msgBuf,
95,027,874✔
2882
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt);
94,763,841✔
2883
      } else {
2884
        code =
2885
            qBindStmt2RowValue(*pDataBlock, (*pDataBlock)->pData->aRowP, pStmt->bInfo.fixedValueCols, bind,
200✔
2886
                               pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen,
200✔
2887
                               &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo, pStmt->taos->optionInfo.charsetCxt);
297,139✔
2888
      }
2889
    }
2890

2891
    if (code) {
50,549,923✔
2892
      STMT2_ELOG("bind cols or rows failed, error:%s", tstrerror(code));
200✔
2893
      STMT_ERR_RET(code);
200✔
2894
    }
2895
  } else {
2896
    if (pStmt->sql.stbInterlaceMode) {
3,792✔
2897
      STMT2_ELOG_E("bind single column not allowed in stb insert mode");
×
2898
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2899
    }
2900

2901
    if (pStmt->sql.bindRowFormat) {
600✔
2902
      STMT2_ELOG_E("can't mix bind row format and bind column format");
×
2903
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2904
    }
2905

2906
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
600✔
2907
      STMT2_ELOG_E("bind column index not in sequence");
×
2908
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2909
    }
2910

2911
    pStmt->bInfo.sBindLastIdx = colIdx;
600✔
2912

2913
    if (0 == colIdx) {
600✔
2914
      pStmt->bInfo.sBindRowNum = bind->num;
300✔
2915
    }
2916

2917
    code = qBindStmtSingleColValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
600✔
2918
                                    pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum,
600✔
2919
                                    pStmt->taos->optionInfo.charsetCxt);
600✔
2920
    if (code) {
600✔
2921
      STMT2_ELOG("bind single col failed, error:%s", tstrerror(code));
×
2922
      STMT_ERR_RET(code);
×
2923
    }
2924
  }
2925

2926
  int64_t startUs4 = taosGetTimestampUs();
50,873,561✔
2927
  pStmt->stat.bindDataUs3 += startUs4 - startUs3;
50,873,561✔
2928

2929
  if (pStmt->stbInterlaceMode) {
51,254,674✔
2930
    if (param) param->tblData.pBlobSet = pBlob;
51,373,276✔
2931
  }
2932

2933
  if (pStmt->sql.stbInterlaceMode) {
51,316,313✔
2934
    STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param));
867,154✔
2935
  } else {
2936
    STMT_ERR_RET(stmtAddBatch2(pStmt));
49,964,654✔
2937
  }
2938

2939
  pStmt->stat.bindDataUs4 += taosGetTimestampUs() - startUs4;
50,748,941✔
2940
  return TSDB_CODE_SUCCESS;
51,156,530✔
2941
}
2942

2943
/*
2944
int stmtUpdateTableUid(STscStmt2* pStmt, SSubmitRsp* pRsp) {
2945
  tscDebug("stmt start to update tbUid, blockNum:%d", pRsp->nBlocks);
2946

2947
  int32_t code = 0;
2948
  int32_t finalCode = 0;
2949
  size_t  keyLen = 0;
2950
  void*   pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
2951
  while (pIter) {
2952
    STableDataCxt* pBlock = *(STableDataCxt**)pIter;
2953
    char*          key = taosHashGetKey(pIter, &keyLen);
2954

2955
    STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
2956
    if (pMeta->uid) {
2957
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
2958
      continue;
2959
    }
2960

2961
    SSubmitBlkRsp* blkRsp = NULL;
2962
    int32_t        i = 0;
2963
    for (; i < pRsp->nBlocks; ++i) {
2964
      blkRsp = pRsp->pBlocks + i;
2965
      if (strlen(blkRsp->tblFName) != keyLen) {
2966
        continue;
2967
      }
2968

2969
      if (strncmp(blkRsp->tblFName, key, keyLen)) {
2970
        continue;
2971
      }
2972

2973
      break;
2974
    }
2975

2976
    if (i < pRsp->nBlocks) {
2977
      tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid,
2978
               blkRsp->uid);
2979

2980
      pMeta->uid = blkRsp->uid;
2981
      pStmt->bInfo.tbUid = blkRsp->uid;
2982
    } else {
2983
      tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
2984
      if (NULL == pStmt->pCatalog) {
2985
        code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
2986
        if (code) {
2987
          pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
2988
          finalCode = code;
2989
          continue;
2990
        }
2991
      }
2992

2993
      code = stmtCreateRequest(pStmt);
2994
      if (code) {
2995
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
2996
        finalCode = code;
2997
        continue;
2998
      }
2999

3000
      STableMeta*      pTableMeta = NULL;
3001
      SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
3002
                               .requestId = pStmt->exec.pRequest->requestId,
3003
                               .requestObjRefId = pStmt->exec.pRequest->self,
3004
                               .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
3005
      code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
3006

3007
      pStmt->stat.ctgGetTbMetaNum++;
3008

3009
      taos_free_result(pStmt->exec.pRequest);
3010
      pStmt->exec.pRequest = NULL;
3011

3012
      if (code || NULL == pTableMeta) {
3013
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
3014
        finalCode = code;
3015
        taosMemoryFree(pTableMeta);
3016
        continue;
3017
      }
3018

3019
      pMeta->uid = pTableMeta->uid;
3020
      pStmt->bInfo.tbUid = pTableMeta->uid;
3021
      taosMemoryFree(pTableMeta);
3022
    }
3023

3024
    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
3025
  }
3026

3027
  return finalCode;
3028
}
3029
*/
3030
/*
3031
int stmtStaticModeExec(TAOS_STMT* stmt) {
3032
  STscStmt2*   pStmt = (STscStmt2*)stmt;
3033
  int32_t     code = 0;
3034
  SSubmitRsp* pRsp = NULL;
3035
  if (pStmt->sql.staticMode) {
3036
    return TSDB_CODE_TSC_STMT_API_ERROR;
3037
  }
3038

3039
  STMT_DLOG_E("start to exec");
3040

3041
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
3042

3043
  STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList,
3044
pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum));
3045

3046
  launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
3047

3048
  if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
3049
    code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
3050
    if (code) {
3051
      pStmt->exec.pRequest->code = code;
3052
    } else {
3053
      tFreeSSubmitRsp(pRsp);
3054
      STMT_ERR_RET(stmtResetStmt(pStmt));
3055
      STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
3056
    }
3057
  }
3058

3059
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
3060

3061
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
3062
  pStmt->affectedRows += pStmt->exec.affectedRows;
3063

3064
_return:
3065

3066
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
3067

3068
  tFreeSSubmitRsp(pRsp);
3069

3070
  ++pStmt->sql.runTimes;
3071

3072
  STMT_RET(code);
3073
}
3074
*/
3075

3076
static int32_t createParseContext(const SRequestObj* pRequest, SParseContext** pCxt, SSqlCallbackWrapper* pWrapper) {
2,800✔
3077
  const STscObj* pTscObj = pRequest->pTscObj;
2,800✔
3078

3079
  *pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
2,800✔
3080
  if (*pCxt == NULL) {
2,800✔
3081
    return terrno;
×
3082
  }
3083

3084
  **pCxt = (SParseContext){.requestId = pRequest->requestId,
×
3085
                           .requestRid = pRequest->self,
2,800✔
3086
                           .acctId = pTscObj->acctId,
2,800✔
3087
                           .db = pRequest->pDb,
2,800✔
3088
                           .topicQuery = false,
3089
                           .pSql = pRequest->sqlstr,
2,800✔
3090
                           .sqlLen = pRequest->sqlLen,
2,800✔
3091
                           .pMsg = pRequest->msgBuf,
2,800✔
3092
                           .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
3093
                           .pTransporter = pTscObj->pAppInfo->pTransporter,
2,800✔
3094
                           .pStmtCb = NULL,
3095
                           .pUser = pTscObj->user,
2,800✔
3096
                           .userId = pTscObj->userId,
2,800✔
3097
                           .pEffectiveUser = pRequest->effectiveUser,
2,800✔
3098
                           .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
2,800✔
3099
                           .enableSysInfo = pTscObj->sysInfo,
2,800✔
3100
                           .sodInitial = pTscObj->pAppInfo->serverCfg.sodInitial,
2,800✔
3101
                           .privInfo = pWrapper->pParseCtx ? pWrapper->pParseCtx->privInfo : 0,
2,800✔
3102
                           .async = true,
3103
                           .svrVer = pTscObj->sVer,
2,800✔
3104
                           .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
2,800✔
3105
                           .allocatorId = pRequest->allocatorRefId,
2,800✔
3106
                           .parseSqlFp = clientParseSql,
3107
                           .parseSqlParam = pWrapper};
3108
  int8_t biMode = atomic_load_8(&((STscObj*)pTscObj)->biMode);
2,800✔
3109
  (*pCxt)->biMode = biMode;
2,800✔
3110
  return TSDB_CODE_SUCCESS;
2,800✔
3111
}
3112

3113
static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
2,800✔
3114
  STscStmt2*        pStmt = userdata;
2,800✔
3115
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
2,800✔
3116
  pStmt->asyncResultAvailable = true;
2,800✔
3117
  pStmt->exec.pRequest->inCallback = true;
2,800✔
3118

3119
  // NEED_CLIENT_HANDLE_ERROR: retry internally without notifying user; retry completion uses this same cb + fp once.
3120
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pStmt->pVgDataBlocksForRetry != NULL) {
2,900✔
3121
    int32_t origExecCode = code;
200✔
3122
    STMT2_ELOG("async exec got NEED_CLIENT_HANDLE_ERROR (code:%s), retrying internally", tstrerror(code));
200✔
3123

3124
    // Try to retry internally; completion uses asyncQueryCb so user fp runs once with the final result.
3125
    int32_t retryCode = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
200✔
3126
    if (retryCode == TSDB_CODE_SUCCESS) {
200✔
3127
      stmtInvalidateStbInterlaceTableUidCache(pStmt);
100✔
3128
      if (origExecCode == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
100✔
3129
        retryCode = stmtUpdateVgDataBlocksTbMetaFromCatalog(pStmt, pStmt->exec.pRequest);
100✔
3130
      } else if (stmtIsSchemaVersionRetryError(origExecCode)) {
×
3131
        retryCode = stmtUpdateVgDataBlocksSchemaVer(pStmt, pStmt->exec.pRequest);
×
3132
      }
3133
    }
3134
    if (retryCode == TSDB_CODE_SUCCESS) {
200✔
3135
      (void)stmtRestoreVgDataBlocksForRetry(pStmt);
100✔
3136
      resetRequest(pStmt);
100✔
3137
      pStmt->asyncResultAvailable = false;
100✔
3138
      retryCode = stmtCreateRequest(pStmt);
100✔
3139
      if (retryCode == TSDB_CODE_SUCCESS) {
100✔
3140
        SRequestObj*         pNewReq = pStmt->exec.pRequest;
100✔
3141
        SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
100✔
3142
        if (pWrapper == NULL) {
100✔
3143
          retryCode = terrno;
×
3144
          resetRequest(pStmt);
×
3145
        } else {
3146
          pWrapper->pRequest = pNewReq;
100✔
3147
          pNewReq->pWrapper = pWrapper;
100✔
3148
          retryCode = createParseContext(pNewReq, &pWrapper->pParseCtx, pWrapper);
100✔
3149
          if (retryCode == TSDB_CODE_SUCCESS) {
100✔
3150
            pNewReq->syncQuery = false;
100✔
3151
            // Same as first exec: asyncQueryCb invokes user asyncExecFn once with userdata (not raw pStmt as fp's 1st arg).
3152
            pNewReq->body.queryFp = asyncQueryCb;
100✔
3153
            ((SSyncQueryParam*)(pNewReq)->body.interParam)->userParam = pStmt;
100✔
3154
            launchAsyncQuery(pNewReq, pStmt->sql.pQuery, NULL, pWrapper);
100✔
3155
            // Retry asyncQueryCb will call fp, stmtCleanExecInfo, and tsem_post(asyncExecSem).
3156
            return;
100✔
3157
          }
3158
          // Do not taosMemoryFree(pWrapper): destroyRequest frees it via destorySqlCallbackWrapper.
3159
          resetRequest(pStmt);
×
3160
        }
3161
      }
3162
    }
3163
    // Retry setup failed (did not return above): notify user once with the original error, then cleanup + post sem.
3164
    if (fp) {
100✔
3165
      fp(pStmt->options.userdata, res, code);
100✔
3166
    }
3167
  } else {
3168
    if (code == TSDB_CODE_SUCCESS) {
2,600✔
3169
      pStmt->exec.affectedRows = taos_affected_rows(res);
2,600✔
3170
      pStmt->affectedRows += pStmt->exec.affectedRows;
2,600✔
3171
    }
3172

3173
    if (fp) {
2,600✔
3174
      fp(pStmt->options.userdata, res, code);
2,600✔
3175
    }
3176
  }
3177

3178
  while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
2,700✔
UNCOV
3179
    taosUsleep(1);
×
3180
  }
3181
  (void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
2,700✔
3182
  ++pStmt->sql.runTimes;
2,700✔
3183
  if (pStmt->exec.pRequest != NULL) {
2,700✔
3184
    pStmt->exec.pRequest->inCallback = false;
1,700✔
3185
  }
3186

3187
  if (tsem_post(&pStmt->asyncExecSem) != 0) {
2,700✔
3188
    STMT2_ELOG_E("fail to post asyncExecSem");
×
3189
  }
3190
}
3191

3192
int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
527,092✔
3193
  STscStmt2* pStmt = (STscStmt2*)stmt;
527,092✔
3194
  int32_t    code = 0;
527,092✔
3195
  int64_t    startUs = taosGetTimestampUs();
527,357✔
3196

3197
  STMT2_DLOG_E("start to exec");
527,357✔
3198

3199
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
527,417✔
3200
    return pStmt->errCode;
100✔
3201
  }
3202

3203
  STMT_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
527,512✔
3204
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
527,571✔
3205
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
3206
  }
3207
  STMT_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
527,482✔
3208

3209
  if (pStmt->sql.stbInterlaceMode) {
527,352✔
3210
    STMT_ERR_RET(stmtAddBatch2(pStmt));
405,553✔
3211
  }
3212

3213
  code = stmtSwitchStatus(pStmt, STMT_EXECUTE);
527,301✔
3214
  if (code != TSDB_CODE_SUCCESS) goto _return;
527,218✔
3215

3216
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
526,806✔
3217
    if (pStmt->sql.stbInterlaceMode) {
520,122✔
3218
      int64_t startTs = taosGetTimestampUs();
405,588✔
3219
      // wait for stmt bind thread to finish
3220
      while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
1,561,011✔
3221
        taosUsleep(1);
1,155,235✔
3222
      }
3223

3224
      if (pStmt->errCode != TSDB_CODE_SUCCESS) {
405,577✔
3225
        return pStmt->errCode;
100✔
3226
      }
3227

3228
      pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs;
405,440✔
3229
      STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList));
405,440✔
3230
      taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
405,343✔
3231
      pStmt->sql.siInfo.pVgroupHash = NULL;
405,390✔
3232
      pStmt->sql.siInfo.pVgroupList = NULL;
405,453✔
3233
    } else {
3234
      tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
114,638✔
3235
      taosMemoryFreeClear(pStmt->exec.pCurrTbData);
114,706✔
3236

3237
      STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
114,706✔
3238

3239
      STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
114,606✔
3240
    }
3241
    // Save serialized data blocks for potential NEED_CLIENT_HANDLE_ERROR retry before the planner
3242
    // takes ownership of pDataBlocks during createQueryPlan.
3243
    STMT_ERR_RET(stmtSaveVgDataBlocksForRetry(pStmt));
520,061✔
3244
  }
3245

3246
  pStmt->asyncResultAvailable = false;
526,902✔
3247
  SRequestObj*      pRequest = pStmt->exec.pRequest;
526,655✔
3248
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
526,632✔
3249
  STMT2_DLOG("EXEC INFO :req:0x%" PRIx64 ", QID:0x%" PRIx64 ", exec sql:%s,  conn:%" PRId64, pRequest->self,
526,632✔
3250
             pRequest->requestId, pStmt->sql.sqlStr, pRequest->pTscObj->id);
3251

3252
  if (!fp) {
526,633✔
3253
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
523,933✔
3254

3255
    if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
524,081✔
3256
      int32_t origExecCode = pStmt->exec.pRequest->code;
200✔
3257
      STMT2_WLOG_E("exec failed errorcode:NEED_CLIENT_HANDLE_ERROR, refresh meta and retry internally");
200✔
3258
      code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
200✔
3259
      if (code == TSDB_CODE_SUCCESS) {
200✔
3260
        stmtInvalidateStbInterlaceTableUidCache(pStmt);
200✔
3261
      }
3262
      if (code == TSDB_CODE_SUCCESS && pStmt->pVgDataBlocksForRetry != NULL) {
200✔
3263
        if (origExecCode == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
200✔
3264
          code = stmtUpdateVgDataBlocksTbMetaFromCatalog(pStmt, pStmt->exec.pRequest);
100✔
3265
        } else if (stmtIsSchemaVersionRetryError(origExecCode)) {
100✔
3266
          code = stmtUpdateVgDataBlocksSchemaVer(pStmt, pStmt->exec.pRequest);
100✔
3267
        }
3268
      }
3269
      if (code == TSDB_CODE_SUCCESS && pStmt->pVgDataBlocksForRetry != NULL) {
200✔
3270
        // Restore saved serialized data blocks and re-execute with refreshed meta.
3271
        STMT_ERR_JRET(stmtRestoreVgDataBlocksForRetry(pStmt));
200✔
3272
        resetRequest(pStmt);
200✔
3273
        STMT_ERR_JRET(stmtCreateRequest(pStmt));
200✔
3274
        launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
200✔
3275
        code = pStmt->exec.pRequest->code;
200✔
UNCOV
3276
      } else if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
3277
        code = pStmt->exec.pRequest->code;
×
3278
      } else {
3279
        pStmt->exec.pRequest->code = code;
×
3280
      }
3281
    }
3282

3283
    STMT_ERR_JRET(pStmt->exec.pRequest->code);
524,222✔
3284

3285
    pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
523,979✔
3286
    if (affected_rows) {
524,001✔
3287
      *affected_rows = pStmt->exec.affectedRows;
518,680✔
3288
    }
3289
    pStmt->affectedRows += pStmt->exec.affectedRows;
523,964✔
3290

3291
    // wait for stmt bind thread to finish
3292
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
524,092✔
3293
      taosUsleep(1);
22✔
3294
    }
3295

3296
    STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
523,825✔
3297

3298
    ++pStmt->sql.runTimes;
523,821✔
3299
  } else {
3300
    SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
2,700✔
3301
    if (pWrapper == NULL) {
2,700✔
3302
      code = terrno;
×
3303
    } else {
3304
      pWrapper->pRequest = pRequest;
2,700✔
3305
      pRequest->pWrapper = pWrapper;
2,700✔
3306
    }
3307
    if (TSDB_CODE_SUCCESS == code) {
2,700✔
3308
      code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper);
2,700✔
3309
    }
3310
    pRequest->syncQuery = false;
2,700✔
3311
    pRequest->body.queryFp = asyncQueryCb;
2,700✔
3312
    ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
2,700✔
3313

3314
    pStmt->execSemWaited = false;
2,700✔
3315
    launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
2,700✔
3316
  }
3317

3318
_return:
527,119✔
3319
  if (code) {
527,119✔
3320
    STMT2_ELOG("exec failed, error:%s", tstrerror(code));
612✔
3321
  }
3322
  pStmt->stat.execUseUs += taosGetTimestampUs() - startUs;
527,239✔
3323

3324
  STMT_RET(code);
527,262✔
3325
}
3326

3327
int stmtClose2(TAOS_STMT2* stmt) {
127,704✔
3328
  STscStmt2* pStmt = (STscStmt2*)stmt;
127,704✔
3329

3330
  STMT2_DLOG_E("start to close stmt");
127,704✔
3331
  taosMemoryFreeClear(pStmt->db);
127,704✔
3332

3333
  if (pStmt->bindThreadInUse) {
127,666✔
3334
    // wait for stmt bind thread to finish
3335
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
116,521✔
3336
      taosUsleep(1);
100✔
3337
    }
3338

3339
    (void)taosThreadMutexLock(&pStmt->queue.mutex);
116,421✔
3340
    pStmt->queue.stopQueue = true;
116,421✔
3341
    (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
116,421✔
3342
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
116,369✔
3343

3344
    (void)taosThreadJoin(pStmt->bindThread, NULL);
116,421✔
3345
    pStmt->bindThreadInUse = false;
116,421✔
3346

3347
    (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
116,421✔
3348
    (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
116,421✔
3349
  }
3350

3351
  TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
127,695✔
3352
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
127,704✔
3353
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
3354
  }
3355
  TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
127,704✔
3356

3357
  (void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond);
127,704✔
3358
  (void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex);
127,704✔
3359

3360
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
127,704✔
3361
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
1,300✔
3362
      STMT2_ELOG_E("fail to wait asyncExecSem");
×
3363
    }
3364
  }
3365

3366
  /* On macOS dispatch_semaphore_dispose requires value >= orig (1). After tsem_wait above value is 0; post once before
3367
   * destroy. */
3368
  if (pStmt->options.asyncExecFn) {
127,704✔
3369
    if (tsem_post(&pStmt->asyncExecSem) != 0) {
1,300✔
3370
      STMT2_ELOG_E("fail to post asyncExecSem");
×
3371
    }
3372
  }
3373

3374
  STMT2_DLOG("stbInterlaceMode:%d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
127,704✔
3375
             ", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
3376
             ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
3377
             ", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64
3378
             ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64,
3379
             pStmt->sql.stbInterlaceMode, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo,
3380
             pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum, pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND],
3381
             pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE], pStmt->stat.setTbNameUs,
3382
             pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
3383
             pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
3384
  if (pStmt->sql.stbInterlaceMode) {
127,704✔
3385
    pStmt->bInfo.tagsCached = false;
6,283✔
3386
  }
3387
  pStmt->bInfo.boundColsCached = false;
127,704✔
3388

3389
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
127,704✔
3390

3391
  if (pStmt->options.asyncExecFn) {
127,704✔
3392
    if (tsem_destroy(&pStmt->asyncExecSem) != 0) {
1,300✔
3393
      STMT2_ELOG_E("fail to destroy asyncExecSem");
×
3394
    }
3395
  }
3396
  taosMemoryFree(stmt);
127,704✔
3397

3398
  return TSDB_CODE_SUCCESS;
127,704✔
3399
}
3400

3401
const char* stmt2Errstr(TAOS_STMT2* stmt) {
5,884✔
3402
  STscStmt2* pStmt = (STscStmt2*)stmt;
5,884✔
3403

3404
  if (stmt == NULL || NULL == pStmt->exec.pRequest) {
5,884✔
3405
    return (char*)tstrerror(terrno);
400✔
3406
  }
3407

3408
  // if stmt async exec ,error code is pStmt->exec.pRequest->code
3409
  if (!(pStmt->sql.status >= STMT_EXECUTE && pStmt->options.asyncExecFn != NULL && pStmt->asyncResultAvailable)) {
5,484✔
3410
    pStmt->exec.pRequest->code = terrno;
5,484✔
3411
  }
3412

3413
  SRequestObj* pRequest = pStmt->exec.pRequest;
5,484✔
3414
  if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) {
5,484✔
3415
    return pRequest->msgBuf;
3,512✔
3416
  }
3417
  return (const char*)tstrerror(pRequest->code);
1,972✔
3418
}
3419

3420
// Alias kept for compatibility with object files compiled against older headers.
3421
const char* stmtErrstr2(TAOS_STMT2* stmt) { return stmt2Errstr(stmt); }
×
3422
/*
3423
int stmtAffectedRows(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->affectedRows; }
3424

3425
int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->exec.affectedRows; }
3426
*/
3427

3428
int stmtParseColFields2(TAOS_STMT2* stmt) {
7,277✔
3429
  int32_t    code = 0;
7,277✔
3430
  STscStmt2* pStmt = (STscStmt2*)stmt;
7,277✔
3431
  int32_t    preCode = pStmt->errCode;
7,277✔
3432

3433
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
7,277✔
3434
    return pStmt->errCode;
×
3435
  }
3436

3437
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
7,277✔
3438
    STMT2_ELOG_E("stmtParseColFields2 only for insert");
×
3439
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
3440
  }
3441

3442
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
7,277✔
3443

3444
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
7,277✔
3445
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
300✔
3446
    pStmt->bInfo.needParse = false;
×
3447
  }
3448
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
7,277✔
3449
    pStmt->bInfo.needParse = false;
600✔
3450
  }
3451

3452
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
7,277✔
3453

3454
  if (pStmt->bInfo.needParse) {
7,277✔
3455
    STMT_ERRI_JRET(stmtParseSql(pStmt));
6,577✔
3456
  }
3457

3458
_return:
5,417✔
3459
  // compatible with previous versions
3460
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST && (pStmt->bInfo.tbNameFlag & NO_DATA_USING_CLAUSE) == 0x0) {
7,277✔
3461
    code = TSDB_CODE_TSC_STMT_TBNAME_ERROR;
400✔
3462
  }
3463

3464
  pStmt->errCode = preCode;
7,277✔
3465

3466
  return code;
7,277✔
3467
}
3468

3469
int stmtGetStbColFields2(TAOS_STMT2* stmt, int* nums, TAOS_FIELD_ALL** fields) {
7,277✔
3470
  int32_t code = stmtParseColFields2(stmt);
7,277✔
3471
  if (code != TSDB_CODE_SUCCESS) {
7,277✔
3472
    return code;
1,860✔
3473
  }
3474

3475
  return stmtFetchStbColFields2(stmt, nums, fields);
5,417✔
3476
}
3477

3478
int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) {
1,900✔
3479
  int32_t    code = 0;
1,900✔
3480
  STscStmt2* pStmt = (STscStmt2*)stmt;
1,900✔
3481
  int32_t    preCode = pStmt->errCode;
1,900✔
3482

3483
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
1,900✔
3484
    return pStmt->errCode;
×
3485
  }
3486

3487
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
1,900✔
3488

3489
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
1,900✔
3490
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
3491
    pStmt->bInfo.needParse = false;
×
3492
  }
3493

3494
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
1,900✔
3495
    resetRequest(pStmt);
×
3496
  }
3497

3498
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
1,900✔
3499

3500
  if (pStmt->bInfo.needParse) {
1,900✔
3501
    STMT_ERRI_JRET(stmtParseSql(pStmt));
×
3502
  }
3503

3504
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
1,900✔
3505
    *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
1,900✔
3506
  } else {
3507
    STMT_ERRI_JRET(stmtFetchColFields2(stmt, nums, NULL));
×
3508
  }
3509

3510
  STMT2_DLOG("get param num success, nums:%d", *nums);
1,900✔
3511

3512
_return:
1,900✔
3513

3514
  pStmt->errCode = preCode;
1,900✔
3515

3516
  return code;
1,900✔
3517
}
3518

3519
TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) {
6,291✔
3520
  STscStmt2* pStmt = (STscStmt2*)stmt;
6,291✔
3521

3522
  STMT2_TLOG_E("start to use result");
6,291✔
3523

3524
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
6,291✔
3525
    STMT2_ELOG_E("useResult only for query statement");
×
3526
    return NULL;
×
3527
  }
3528

3529
  if (pStmt->options.asyncExecFn != NULL && !pStmt->asyncResultAvailable) {
6,291✔
3530
    STMT2_ELOG_E("use result after callBackFn return");
100✔
3531
    return NULL;
100✔
3532
  }
3533

3534
  if (tsUseAdapter) {
6,191✔
3535
    TAOS_RES* res = (TAOS_RES*)pStmt->exec.pRequest;
2,300✔
3536
    pStmt->exec.pRequest = NULL;
2,300✔
3537
    return res;
2,300✔
3538
  }
3539

3540
  return pStmt->exec.pRequest;
3,891✔
3541
}
3542

3543
int32_t stmtAsyncBindThreadFunc(void* args) {
×
3544
  qInfo("async stmt bind thread started");
×
3545

3546
  ThreadArgs* targs = (ThreadArgs*)args;
×
3547
  STscStmt2*  pStmt = (STscStmt2*)targs->stmt;
×
3548

3549
  int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx);
×
3550
  targs->fp(targs->param, NULL, code);
×
3551
  (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
×
3552
  (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
×
3553
  (void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
×
3554
  (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
×
3555
  taosMemoryFree(args);
×
3556

3557
  qInfo("async stmt bind thread stopped");
×
3558

3559
  return code;
×
3560
}
3561

3562
void stmtBuildErrorMsg(STscStmt2* pStmt, const char* msg) {
300✔
3563
  if (pStmt == NULL || msg == NULL) {
300✔
3564
    return;
×
3565
  }
3566

3567
  if (pStmt->exec.pRequest == NULL) {
300✔
3568
    return;
×
3569
  }
3570

3571
  if (pStmt->exec.pRequest->msgBuf == NULL) {
300✔
3572
    return;
×
3573
  }
3574

3575
  size_t msgLen = strlen(msg);
300✔
3576
  size_t bufLen = pStmt->exec.pRequest->msgBufLen;
300✔
3577

3578
  if (msgLen >= bufLen) {
300✔
3579
    tstrncpy(pStmt->exec.pRequest->msgBuf, msg, bufLen - 1);
×
3580
    pStmt->exec.pRequest->msgBuf[bufLen - 1] = '\0';
×
3581
    pStmt->exec.pRequest->msgBufLen = bufLen - 1;
×
3582
  } else {
3583
    tstrncpy(pStmt->exec.pRequest->msgBuf, msg, bufLen);
300✔
3584
    pStmt->exec.pRequest->msgBufLen = msgLen;
300✔
3585
  }
3586

3587
  return;
300✔
3588
}
3589

3590
int32_t stmtBuildErrorMsgWithCode(STscStmt2* pStmt, const char* msg, int32_t errorCode) {
300✔
3591
  stmtBuildErrorMsg(pStmt, msg);
300✔
3592
  pStmt->errCode = errorCode;
300✔
3593

3594
  return errorCode;
300✔
3595
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc