• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5043

29 Apr 2026 11:44AM UTC coverage: 73.107% (-0.06%) from 73.17%
#5043

push

travis-ci

web-flow
feat(statewindow): support multi columns (#35136)

1563 of 1828 new or added lines in 18 files covered. (85.5%)

7490 existing lines in 148 files now uncovered.

277321 of 379338 relevant lines covered (73.11%)

131116908.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.63
/source/client/src/clientStmt2.c
1
#include "clientInt.h"
2
#include "clientLog.h"
3
#include "tdef.h"
4
#include "tglobal.h"
5

6
#include "clientStmt.h"
7
#include "clientStmt2.h"
8
#include "tencode.h"
9
#include "tmsg.h"
10
#include "tname.h"
11
#include "trow.h"
12

13
char* gStmt2StatusStr[] = {"unknown",     "init", "prepare", "settbname", "settags",
14
                           "fetchFields", "bind", "bindCol", "addBatch",  "exec"};
15

16

17
static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pBuf) {
18
  if (pTblBuf->buffOffset < pTblBuf->buffSize) {
1,719,609✔
19
    *pBuf = (char*)pTblBuf->pCurBuff + pTblBuf->buffOffset;
1,719,974✔
20
    pTblBuf->buffOffset += pTblBuf->buffUnit;
1,719,888✔
21
  } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) {
×
22
    pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++);
×
23
    if (NULL == pTblBuf->pCurBuff) {
×
24
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
25
    }
26
    *pBuf = pTblBuf->pCurBuff;
×
27
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
28
  } else {
29
    void* buff = taosMemoryMalloc(pTblBuf->buffSize);
×
30
    if (NULL == buff) {
×
31
      return terrno;
×
32
    }
33

34
    if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
×
35
      return terrno;
×
36
    }
37

38
    pTblBuf->buffIdx++;
×
39
    pTblBuf->pCurBuff = buff;
×
40
    *pBuf = buff;
×
41
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
42
  }
43

44
  return TSDB_CODE_SUCCESS;
1,719,502✔
45
}
46

47
static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
1,719,864✔
48
  int i = 0;
1,719,864✔
49
  while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
8,079,649✔
50
    if (pStmt->queue.stopQueue) {
6,504,588✔
51
      return false;
143,232✔
52
    }
53
    if (i < 10) {
6,361,314✔
54
      taosUsleep(1);
5,946,013✔
55
      i++;
5,944,409✔
56
    } else {
57
      (void)taosThreadMutexLock(&pStmt->queue.mutex);
415,301✔
58
      if (pStmt->queue.stopQueue) {
415,659✔
59
        (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
×
60
        return false;
×
61
      }
62
      if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
415,617✔
63
        (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
415,492✔
64
      }
65
      (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
415,363✔
66
    }
67
  }
68

69
  if (pStmt->queue.stopQueue && 0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
1,575,868✔
70
    return false;
×
71
  }
72

73
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
1,575,912✔
74
  if (pStmt->queue.head == pStmt->queue.tail) {
1,576,513✔
75
    pStmt->queue.qRemainNum = 0;
×
76
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
×
77
    STMT2_ELOG_E("interlace queue is empty, cannot dequeue");
×
78
    return false;
×
79
  }
80

81
  SStmtQNode* node = pStmt->queue.head->next;
1,576,476✔
82
  pStmt->queue.head->next = node->next;
1,576,513✔
83
  if (pStmt->queue.tail == node) {
1,576,513✔
84
    pStmt->queue.tail = pStmt->queue.head;
864,208✔
85
  }
86
  node->next = NULL;
1,576,550✔
87
  *param = node;
1,576,550✔
88

89
  (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
1,576,550✔
90
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
1,576,500✔
91

92
  STMT2_TLOG("dequeue success, node:%p, remainNum:%" PRId64, node, pStmt->queue.qRemainNum);
1,576,471✔
93

94
  return true;
1,576,427✔
95
}
96

97
static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
1,576,256✔
98
  if (param == NULL) {
1,576,256✔
99
    STMT2_ELOG_E("enqueue param is NULL");
×
100
    return;
×
101
  }
102

103
  param->next = NULL;
1,576,256✔
104

105
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
1,576,330✔
106

107
  pStmt->queue.tail->next = param;
1,576,498✔
108
  pStmt->queue.tail = param;
1,576,498✔
109
  pStmt->stat.bindDataNum++;
1,576,498✔
110

111
  (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
1,576,387✔
112
  (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
1,576,539✔
113

114
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
1,576,336✔
115

116
  STMT2_TLOG("enqueue param:%p, remainNum:%" PRId64 ", restoreTbCols:%d", param, pStmt->queue.qRemainNum,
1,576,506✔
117
             param->restoreTbCols);
118
}
119

120
static int32_t stmtCreateRequest(STscStmt2* pStmt) {
87,067,244✔
121
  int32_t code = 0;
87,067,244✔
122

123
  if (pStmt->exec.pRequest == NULL) {
87,067,244✔
124
    code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
177,340✔
125
                        pStmt->reqid);
126
    if (pStmt->reqid != 0) {
177,265✔
127
      pStmt->reqid++;
10✔
128
    }
129
    pStmt->exec.pRequest->type = RES_TYPE__QUERY;
177,320✔
130
    if (pStmt->db != NULL) {
177,300✔
131
      taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
166,928✔
132
      pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
166,926✔
133
    }
134
    if (TSDB_CODE_SUCCESS == code) {
177,222✔
135
      pStmt->exec.pRequest->syncQuery = true;
177,354✔
136
      pStmt->exec.pRequest->stmtBindVersion = 2;
177,259✔
137
    }
138
    STMT2_DLOG("create request:0x%" PRIx64 ", QID:0x%" PRIx64, pStmt->exec.pRequest->self,
177,127✔
139
               pStmt->exec.pRequest->requestId);
140
  }
141

142
  return code;
88,302,810✔
143
}
144

145
static int32_t stmtSwitchStatus(STscStmt2* pStmt, STMT_STATUS newStatus) {
172,521,012✔
146
  int32_t code = 0;
172,521,012✔
147

148
  if (newStatus >= STMT_INIT && newStatus < STMT_MAX) {
172,521,012✔
149
    STMT2_LOG_SEQ(newStatus);
175,441,134✔
150
  }
151

152
  if (pStmt->errCode && newStatus != STMT_PREPARE) {
174,722,887✔
153
    STMT2_ELOG("stmt already failed with err:%s, please use stmt prepare", tstrerror(pStmt->errCode));
×
154
    return pStmt->errCode;
×
155
  }
156

157
  switch (newStatus) {
173,766,787✔
158
    case STMT_PREPARE:
164,182✔
159
      pStmt->errCode = 0;
164,182✔
160
      break;
98,782✔
161
    case STMT_SETTBNAME:
1,099,932✔
162
      if (STMT_STATUS_EQ(INIT)) {
1,099,932✔
163
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
164
      }
165
      if (!pStmt->sql.stbInterlaceMode && (STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL))) {
1,100,059✔
166
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
167
      }
168
      break;
1,099,787✔
169
    case STMT_SETTAGS:
677,159✔
170
      if (STMT_STATUS_EQ(INIT)) {
677,159✔
171
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
172
      }
173
      break;
677,159✔
174
    case STMT_FETCH_FIELDS:
18,018✔
175
      if (STMT_STATUS_EQ(INIT)) {
18,018✔
176
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
177
      }
178
      break;
18,018✔
179
    case STMT_BIND:
85,499,047✔
180
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
85,499,047✔
181
        code = TSDB_CODE_TSC_STMT_API_ERROR;
198✔
182
      }
183
      /*
184
            if ((pStmt->sql.type == STMT_TYPE_MULTI_INSERT) && ()) {
185
              code = TSDB_CODE_TSC_STMT_API_ERROR;
186
            }
187
      */
188
      break;
87,400,565✔
189
    case STMT_BIND_COL:
×
190
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
×
191
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
192
      }
193
      break;
×
194
    case STMT_ADD_BATCH:
85,653,501✔
195
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
85,653,501✔
196
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
197
      }
198
      break;
86,274,330✔
199
    case STMT_EXECUTE:
654,948✔
200
      if (STMT_TYPE_QUERY == pStmt->sql.type) {
654,948✔
201
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
10,950✔
202
            STMT_STATUS_NE(BIND_COL)) {
427✔
203
          code = TSDB_CODE_TSC_STMT_API_ERROR;
427✔
204
        }
205
      } else {
206
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
643,919✔
207
          code = TSDB_CODE_TSC_STMT_API_ERROR;
×
208
        }
209
      }
210
      break;
654,990✔
211
    default:
×
212
      code = TSDB_CODE_APP_ERROR;
×
213
      break;
×
214
  }
215

216
  STMT_ERR_RET(code);
174,655,547✔
217

218
  pStmt->sql.status = newStatus;
174,654,922✔
219

220
  return TSDB_CODE_SUCCESS;
175,795,594✔
221
}
222

223
static int32_t stmtGetTbName(TAOS_STMT2* stmt, char** tbName) {
29,608✔
224
  STscStmt2* pStmt = (STscStmt2*)stmt;
29,608✔
225

226
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
29,608✔
227

228
  if ('\0' == pStmt->bInfo.tbName[0]) {
29,663✔
229
    tscWarn("no table name set, OK if it is a stmt get fields");
8,739✔
230
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
8,739✔
231
  }
232

233
  *tbName = pStmt->bInfo.tbName;
20,869✔
234

235
  return TSDB_CODE_SUCCESS;
20,924✔
236
}
237

238
static int32_t stmtUpdateBindInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SSHashObj** cols, SName* tbName,
151,814✔
239
                                  const char* sTableName, bool autoCreateTbl, int8_t tbNameFlag) {
240
  STscStmt2* pStmt = (STscStmt2*)stmt;
151,814✔
241
  char       tbFName[TSDB_TABLE_FNAME_LEN];
112,436✔
242
  int32_t    code = tNameExtractFullName(tbName, tbFName);
151,814✔
243
  if (code != 0) {
152,028✔
244
    return code;
×
245
  }
246

247
  if ((tags != NULL && ((SBoundColInfo*)tags)->numOfCols == 0) || !autoCreateTbl) {
152,028✔
248
    pStmt->sql.autoCreateTbl = false;
129,967✔
249
  }
250

251
  (void)memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
151,949✔
252
  tstrncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName));
151,947✔
253
  pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
151,975✔
254

255
  pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
151,975✔
256
  pStmt->bInfo.tbSuid = pTableMeta->suid;
151,896✔
257
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
151,810✔
258
  pStmt->bInfo.tbType = pTableMeta->tableType;
151,890✔
259

260
  if (!pStmt->bInfo.tagsCached) {
151,569✔
261
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
150,666✔
262
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
150,622✔
263
  }
264

265
  // transfer ownership of cols to stmt
266
  if (cols) {
151,811✔
267
    pStmt->bInfo.fixedValueCols = *cols;
151,852✔
268
    *cols = NULL;
151,586✔
269
  }
270

271
  pStmt->bInfo.boundTags = tags;
151,769✔
272
  pStmt->bInfo.tagsCached = false;
151,855✔
273
  pStmt->bInfo.tbNameFlag = tbNameFlag;
151,651✔
274
  tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
151,570✔
275

276
  if (pTableMeta->tableType != TSDB_CHILD_TABLE && pTableMeta->tableType != TSDB_SUPER_TABLE) {
151,613✔
277
    pStmt->sql.stbInterlaceMode = false;
5,061✔
278
  }
279

280
  return TSDB_CODE_SUCCESS;
151,849✔
281
}
282

283
static int32_t stmtUpdateExecInfo(TAOS_STMT2* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
151,889✔
284
  STscStmt2* pStmt = (STscStmt2*)stmt;
151,889✔
285

286
  pStmt->sql.pVgHash = pVgHash;
151,889✔
287
  pStmt->exec.pBlockHash = pBlockHash;
151,908✔
288

289
  return TSDB_CODE_SUCCESS;
151,893✔
290
}
291

292
static int32_t stmtUpdateInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SSHashObj** cols, SName* tbName,
151,775✔
293
                              bool autoCreateTbl, SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName,
294
                              uint8_t tbNameFlag) {
295
  STscStmt2* pStmt = (STscStmt2*)stmt;
151,775✔
296

297
  STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, cols, tbName, sTableName, autoCreateTbl, tbNameFlag));
151,775✔
298
  STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
151,566✔
299

300
  pStmt->sql.autoCreateTbl = autoCreateTbl;
151,893✔
301

302
  return TSDB_CODE_SUCCESS;
151,910✔
303
}
304

305
static int32_t stmtGetExecInfo(TAOS_STMT2* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
3,591✔
306
  STscStmt2* pStmt = (STscStmt2*)stmt;
3,591✔
307

308
  *pVgHash = pStmt->sql.pVgHash;
3,591✔
309
  pStmt->sql.pVgHash = NULL;
3,591✔
310

311
  *pBlockHash = pStmt->exec.pBlockHash;
3,591✔
312
  pStmt->exec.pBlockHash = NULL;
3,591✔
313

314
  return TSDB_CODE_SUCCESS;
3,591✔
315
}
316

317
static int32_t stmtParseSql(STscStmt2* pStmt) {
164,716✔
318
  pStmt->exec.pCurrBlock = NULL;
164,716✔
319

320
  SStmtCallback stmtCb = {
164,777✔
321
      .pStmt = pStmt,
322
      .getTbNameFn = stmtGetTbName,
323
      .setInfoFn = stmtUpdateInfo,
324
      .getExecInfoFn = stmtGetExecInfo,
325
  };
326

327
  STMT_ERR_RET(stmtCreateRequest(pStmt));
164,816✔
328
  pStmt->exec.pRequest->stmtBindVersion = 2;
164,735✔
329

330
  pStmt->stat.parseSqlNum++;
164,813✔
331

332
  STMT2_DLOG("start to parse, QID:0x%" PRIx64, pStmt->exec.pRequest->requestId);
164,933✔
333
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
164,933✔
334

335
  pStmt->sql.siInfo.pQuery = pStmt->sql.pQuery;
161,061✔
336

337
  pStmt->bInfo.needParse = false;
161,061✔
338

339
  if (pStmt->sql.type == 0) {
161,021✔
340
    if (pStmt->sql.pQuery->pRoot && LEGAL_INSERT(nodeType(pStmt->sql.pQuery->pRoot))) {
132,640✔
341
      pStmt->sql.type = STMT_TYPE_INSERT;
123,470✔
342
      pStmt->sql.stbInterlaceMode = false;
123,468✔
343
    } else if (pStmt->sql.pQuery->pPrepareRoot && LEGAL_SELECT(nodeType(pStmt->sql.pQuery->pPrepareRoot))) {
9,130✔
344
      pStmt->sql.type = STMT_TYPE_QUERY;
8,741✔
345
      pStmt->sql.stbInterlaceMode = false;
8,741✔
346

347
      return TSDB_CODE_SUCCESS;
8,741✔
348
    } else {
349
      STMT2_ELOG_E("only support select or insert sql");
468✔
350
      if (pStmt->exec.pRequest->msgBuf) {
427✔
351
        tstrncpy(pStmt->exec.pRequest->msgBuf, "stmt only support select or insert", pStmt->exec.pRequest->msgBufLen);
427✔
352
      }
353
      return TSDB_CODE_PAR_SYNTAX_ERROR;
427✔
354
    }
355
  } else if (pStmt->sql.type == STMT_TYPE_QUERY) {
28,420✔
356
    pStmt->sql.stbInterlaceMode = false;
×
357
    return TSDB_CODE_SUCCESS;
×
358
  } else if (pStmt->sql.type == STMT_TYPE_INSERT) {
28,420✔
359
    pStmt->sql.stbInterlaceMode = false;
×
360
  }
361

362
  STableDataCxt** pSrc =
363
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
151,847✔
364
  if (NULL == pSrc || NULL == *pSrc) {
152,030✔
365
    STMT2_ELOG("fail to get exec.pBlockHash, maybe parse failed, tbFName:%s", pStmt->bInfo.tbFName);
×
366
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
367
  }
368

369
  STableDataCxt* pTableCtx = *pSrc;
151,975✔
370
  if (pStmt->sql.stbInterlaceMode && pTableCtx->pData->pCreateTbReq && (pStmt->bInfo.tbNameFlag & USING_CLAUSE) == 0) {
151,973✔
371
    STMT2_TLOG("destroy pCreateTbReq for no-using insert, tbFName:%s", pStmt->bInfo.tbFName);
4,158✔
372
    tdDestroySVCreateTbReq(pTableCtx->pData->pCreateTbReq);
4,158✔
373
    taosMemoryFreeClear(pTableCtx->pData->pCreateTbReq);
4,158✔
374
    pTableCtx->pData->pCreateTbReq = NULL;
4,158✔
375
  }
376
  // if (pStmt->sql.stbInterlaceMode) {
377
  //   int16_t lastIdx = -1;
378

379
  //   for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) {
380
  //     if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) {
381
  //       pStmt->sql.stbInterlaceMode = false;
382
  //       break;
383
  //     }
384

385
  //     lastIdx = pTableCtx->boundColsInfo.pColIndex[i];
386
  //   }
387
  // }
388

389
  if (NULL == pStmt->sql.pBindInfo) {
152,028✔
390
    pStmt->sql.pBindInfo = taosMemoryMalloc(pTableCtx->boundColsInfo.numOfBound * sizeof(*pStmt->sql.pBindInfo));
148,218✔
391
    if (NULL == pStmt->sql.pBindInfo) {
148,263✔
392
      STMT2_ELOG_E("fail to malloc pBindInfo");
×
393
      return terrno;
×
394
    }
395
  }
396

397
  return TSDB_CODE_SUCCESS;
151,973✔
398
}
399

400
static int32_t stmtPrintBindv(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bindv, int32_t col_idx, bool isTags) {
×
401
  STscStmt2* pStmt = (STscStmt2*)stmt;
×
402
  int32_t    count = 0;
×
403
  int32_t    code = 0;
×
404

405
  if (bindv == NULL) {
×
406
    STMT2_TLOG("bindv is NULL, col_idx:%d, isTags:%d", col_idx, isTags);
×
407
    return TSDB_CODE_SUCCESS;
×
408
  }
409

410
  if (col_idx >= 0) {
×
411
    count = 1;
×
412
    STMT2_TLOG("single col bind, col_idx:%d", col_idx);
×
413
  } else {
414
    if (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type ||
×
415
        (pStmt->sql.type == 0 && stmt2IsInsert(stmt))) {
×
416
      if (pStmt->sql.placeholderOfTags == 0 && pStmt->sql.placeholderOfCols == 0) {
×
417
        code = stmtGetStbColFields2(pStmt, NULL, NULL);
×
418
        if (code != TSDB_CODE_SUCCESS) {
×
419
          return code;
×
420
        }
421
      }
422
      if (isTags) {
×
423
        count = pStmt->sql.placeholderOfTags;
×
424
        STMT2_TLOG("print tags bindv, cols:%d", count);
×
425
      } else {
426
        count = pStmt->sql.placeholderOfCols;
×
427
        STMT2_TLOG("print cols bindv, cols:%d", count);
×
428
      }
429
    } else if (STMT_TYPE_QUERY == pStmt->sql.type || (pStmt->sql.type == 0 && stmt2IsSelect(stmt))) {
×
430
      count = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
×
431
      STMT2_TLOG("print query bindv, cols:%d", count);
×
432
    }
433
  }
434

435
  if (code != TSDB_CODE_SUCCESS) {
×
436
    STMT2_ELOG("failed to get param count, code:%d", code);
×
437
    return code;
×
438
  }
439

440
  for (int i = 0; i < count; i++) {
×
441
    int32_t type = bindv[i].buffer_type;
×
442
    int32_t num = bindv[i].num;
×
443
    char*   current_buf = (char*)bindv[i].buffer;
×
444

445
    for (int j = 0; j < num; j++) {
×
446
      char    buf[256] = {0};
×
447
      int32_t len = 0;
×
448
      bool    isNull = (bindv[i].is_null && bindv[i].is_null[j]);
×
449

450
      if (IS_VAR_DATA_TYPE(type) && bindv[i].length) {
×
451
        len = bindv[i].length[j];
×
452
      } else {
453
        len = tDataTypes[type].bytes;
×
454
      }
455

456
      if (isNull) {
×
457
        snprintf(buf, sizeof(buf), "NULL");
×
458
      } else {
459
        if (current_buf == NULL) {
×
460
          snprintf(buf, sizeof(buf), "NULL(Buf)");
×
461
        } else {
462
          switch (type) {
×
463
            case TSDB_DATA_TYPE_BOOL:
×
464
              snprintf(buf, sizeof(buf), "%d", *(int8_t*)current_buf);
×
465
              break;
×
466
            case TSDB_DATA_TYPE_TINYINT:
×
467
              snprintf(buf, sizeof(buf), "%d", *(int8_t*)current_buf);
×
468
              break;
×
469
            case TSDB_DATA_TYPE_SMALLINT:
×
470
              snprintf(buf, sizeof(buf), "%d", *(int16_t*)current_buf);
×
471
              break;
×
472
            case TSDB_DATA_TYPE_INT:
×
473
              snprintf(buf, sizeof(buf), "%d", *(int32_t*)current_buf);
×
474
              break;
×
475
            case TSDB_DATA_TYPE_BIGINT:
×
476
              snprintf(buf, sizeof(buf), "%" PRId64, *(int64_t*)current_buf);
×
477
              break;
×
478
            case TSDB_DATA_TYPE_FLOAT:
×
479
              snprintf(buf, sizeof(buf), "%f", *(float*)current_buf);
×
480
              break;
×
481
            case TSDB_DATA_TYPE_DOUBLE:
×
482
              snprintf(buf, sizeof(buf), "%f", *(double*)current_buf);
×
483
              break;
×
484
            case TSDB_DATA_TYPE_BINARY:
×
485
            case TSDB_DATA_TYPE_NCHAR:
486
            case TSDB_DATA_TYPE_GEOMETRY:
487
            case TSDB_DATA_TYPE_VARBINARY:
488
              snprintf(buf, sizeof(buf), "len:%d, val:%.*s", len, len, current_buf);
×
489
              break;
×
490
            case TSDB_DATA_TYPE_TIMESTAMP:
×
491
              snprintf(buf, sizeof(buf), "%" PRId64, *(int64_t*)current_buf);
×
492
              break;
×
493
            case TSDB_DATA_TYPE_UTINYINT:
×
494
              snprintf(buf, sizeof(buf), "%u", *(uint8_t*)current_buf);
×
495
              break;
×
496
            case TSDB_DATA_TYPE_USMALLINT:
×
497
              snprintf(buf, sizeof(buf), "%u", *(uint16_t*)current_buf);
×
498
              break;
×
499
            case TSDB_DATA_TYPE_UINT:
×
500
              snprintf(buf, sizeof(buf), "%u", *(uint32_t*)current_buf);
×
501
              break;
×
502
            case TSDB_DATA_TYPE_UBIGINT:
×
503
              snprintf(buf, sizeof(buf), "%" PRIu64, *(uint64_t*)current_buf);
×
504
              break;
×
505
            default:
×
506
              snprintf(buf, sizeof(buf), "UnknownType:%d", type);
×
507
              break;
×
508
          }
509
        }
510
      }
511

512
      STMT2_TLOG("bindv[%d] row[%d]: type:%s, val:%s", i, j, tDataTypes[type].name, buf);
×
513

514
      if (!isNull && current_buf) {
×
515
        current_buf += len;
×
516
      }
517
    }
518
  }
519

520
  return TSDB_CODE_SUCCESS;
×
521
}
522

523
static void resetRequest(STscStmt2* pStmt) {
303,038✔
524
  if (pStmt->exec.pRequest) {
303,038✔
525
    taos_free_result(pStmt->exec.pRequest);
172,405✔
526
    pStmt->exec.pRequest = NULL;
172,445✔
527
  }
528
  pStmt->asyncResultAvailable = false;
303,059✔
529
}
303,059✔
530

531
static int32_t stmtCleanBindInfo(STscStmt2* pStmt) {
843,243✔
532
  pStmt->bInfo.tbUid = 0;
843,243✔
533
  pStmt->bInfo.tbVgId = -1;
843,243✔
534
  pStmt->bInfo.tbType = 0;
843,201✔
535
  pStmt->bInfo.needParse = true;
843,243✔
536
  pStmt->bInfo.inExecCache = false;
843,243✔
537

538
  pStmt->bInfo.tbName[0] = 0;
843,204✔
539
  pStmt->bInfo.tbFName[0] = 0;
843,164✔
540
  if (!pStmt->bInfo.tagsCached) {
843,243✔
541
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
611,546✔
542
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
611,329✔
543
    pStmt->bInfo.boundTags = NULL;
611,381✔
544
  }
545

546
  if (!pStmt->bInfo.boundColsCached) {
843,152✔
547
    tSimpleHashCleanup(pStmt->bInfo.fixedValueCols);
336,313✔
548
    pStmt->bInfo.fixedValueCols = NULL;
336,192✔
549
  }
550

551
  if (!pStmt->sql.autoCreateTbl) {
843,070✔
552
    pStmt->bInfo.stbFName[0] = 0;
593,588✔
553
    pStmt->bInfo.tbSuid = 0;
593,585✔
554
  }
555

556
  STMT2_TLOG("finish clean bind info, tagsCached:%d, autoCreateTbl:%d", pStmt->bInfo.tagsCached,
842,922✔
557
             pStmt->sql.autoCreateTbl);
558

559
  return TSDB_CODE_SUCCESS;
842,845✔
560
}
561

562
static void stmtFreeTableBlkList(STableColsData* pTb) {
×
563
  (void)qResetStmtColumns(pTb->aCol, true);
×
564
  taosArrayDestroy(pTb->aCol);
×
565
}
×
566

567
static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
506,516✔
568
  pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0);
506,516✔
569
  if (NULL == pTblBuf->pCurBuff) {
506,841✔
570
    tscError("QInfo:%p, fail to get buffer from list", pTblBuf);
52✔
571
    return;
×
572
  }
573
  pTblBuf->buffIdx = 1;
506,826✔
574
  pTblBuf->buffOffset = sizeof(*pQueue->head);
506,826✔
575

576
  pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
506,789✔
577
  pQueue->qRemainNum = 0;
506,789✔
578
  pQueue->head->next = NULL;
506,826✔
579
}
580

581
static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
816,919✔
582
  if (pStmt->sql.stbInterlaceMode) {
816,919✔
583
    if (deepClean) {
522,118✔
584
      taosHashCleanup(pStmt->exec.pBlockHash);
15,419✔
585
      pStmt->exec.pBlockHash = NULL;
15,419✔
586

587
      if (NULL != pStmt->exec.pCurrBlock) {
15,419✔
588
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->boundColsInfo.pColIndex);
12,975✔
589
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->pData);
12,975✔
590
        qDestroyStmtDataBlock(pStmt->exec.pCurrBlock);
12,975✔
591
        pStmt->exec.pCurrBlock = NULL;
12,975✔
592
      }
593
      if (STMT_TYPE_QUERY != pStmt->sql.type) {
15,419✔
594
        resetRequest(pStmt);
15,419✔
595
      }
596
    } else {
597
      pStmt->sql.siInfo.pTableColsIdx = 0;
506,699✔
598
      stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue);
506,736✔
599
      tSimpleHashClear(pStmt->sql.siInfo.pTableRowDataHash);
506,786✔
600
    }
601
    if (NULL != pStmt->exec.pRequest) {
522,362✔
602
      pStmt->exec.pRequest->body.resInfo.numOfRows = 0;
506,943✔
603
    }
604
  } else {
605
    if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
294,880✔
606
      resetRequest(pStmt);
284,620✔
607
    }
608

609
    size_t keyLen = 0;
295,160✔
610
    void*  pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
295,199✔
611
    while (pIter) {
576,391✔
612
      STableDataCxt* pBlocks = *(STableDataCxt**)pIter;
281,133✔
613
      char*          key = taosHashGetKey(pIter, &keyLen);
281,133✔
614
      STableMeta*    pMeta = qGetTableMetaInDataBlock(pBlocks);
281,133✔
615

616
      if (keepTable && pBlocks == pStmt->exec.pCurrBlock) {
281,133✔
617
        TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData);
136,370✔
618
        STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
166,668✔
619

620
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
136,290✔
621
        continue;
136,366✔
622
      }
623

624
      qDestroyStmtDataBlock(pBlocks);
144,763✔
625
      STMT_ERR_RET(taosHashRemove(pStmt->exec.pBlockHash, key, keyLen));
144,763✔
626

627
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
144,763✔
628
    }
629

630
    if (keepTable) {
295,258✔
631
      STMT2_TLOG("finish clean exec info, stbInterlaceMode:%d, keepTable:%d, deepClean:%d", pStmt->sql.stbInterlaceMode,
146,791✔
632
                 keepTable, deepClean);
633
      return TSDB_CODE_SUCCESS;
146,791✔
634
    }
635

636
    taosHashCleanup(pStmt->exec.pBlockHash);
148,467✔
637
    pStmt->exec.pBlockHash = NULL;
148,467✔
638

639
    tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
148,467✔
640
    taosMemoryFreeClear(pStmt->exec.pCurrTbData);
148,467✔
641
  }
642

643
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
670,829✔
644
  STMT2_TLOG("finish clean exec info, stbInterlaceMode:%d, keepTable:%d, deepClean:%d", pStmt->sql.stbInterlaceMode,
670,509✔
645
             keepTable, deepClean);
646

647
  return TSDB_CODE_SUCCESS;
670,546✔
648
}
649

650
static void stmtFreeSingleVgDataBlock(void* p) {
701,987✔
651
  SVgDataBlocks* pVg = *(SVgDataBlocks**)p;
701,987✔
652
  if (pVg) {
702,024✔
653
    taosMemoryFree(pVg->pData);
702,168✔
654
    taosMemoryFree(pVg);
702,231✔
655
  }
656
}
701,947✔
657

658
static void stmtFreeVgDataBlocksForRetry(STscStmt2* pStmt) {
807,347✔
659
  if (pStmt->pVgDataBlocksForRetry) {
807,347✔
660
    taosArrayDestroyEx(pStmt->pVgDataBlocksForRetry, stmtFreeSingleVgDataBlock);
642,482✔
661
    pStmt->pVgDataBlocksForRetry = NULL;
642,418✔
662
  }
663
}
807,115✔
664

665
static int32_t stmtSaveVgDataBlocksForRetry(STscStmt2* pStmt) {
643,432✔
666
  stmtFreeVgDataBlocksForRetry(pStmt);
643,432✔
667

668
  SVnodeModifyOpStmt* pModif = (SVnodeModifyOpStmt*)pStmt->sql.pQuery->pRoot;
643,565✔
669
  if (!pModif || !pModif->pDataBlocks || taosArrayGetSize(pModif->pDataBlocks) == 0) {
643,565✔
670
    return TSDB_CODE_SUCCESS;
×
671
  }
672

673
  int32_t num = taosArrayGetSize(pModif->pDataBlocks);
643,598✔
674
  pStmt->pVgDataBlocksForRetry = taosArrayInit(num, POINTER_BYTES);
643,400✔
675
  if (!pStmt->pVgDataBlocksForRetry) {
643,361✔
676
    return terrno;
×
677
  }
678

679
  for (int32_t i = 0; i < num; i++) {
1,346,828✔
680
    SVgDataBlocks* pSrc = taosArrayGetP(pModif->pDataBlocks, i);
703,478✔
681
    SVgDataBlocks* pDst = taosMemoryMalloc(sizeof(SVgDataBlocks));
703,630✔
682
    if (!pDst) {
703,311✔
683
      stmtFreeVgDataBlocksForRetry(pStmt);
×
684
      return terrno;
×
685
    }
686
    *pDst = *pSrc;
703,311✔
687
    pDst->pData = taosMemoryMalloc(pSrc->size);
703,311✔
688
    if (!pDst->pData) {
703,495✔
689
      taosMemoryFree(pDst);
×
690
      stmtFreeVgDataBlocksForRetry(pStmt);
×
691
      return terrno;
×
692
    }
693
    (void)memcpy(pDst->pData, pSrc->pData, pSrc->size);
703,606✔
694
    if (NULL == taosArrayPush(pStmt->pVgDataBlocksForRetry, &pDst)) {
1,407,108✔
695
      taosMemoryFree(pDst->pData);
×
696
      taosMemoryFree(pDst);
×
697
      stmtFreeVgDataBlocksForRetry(pStmt);
×
698
      return terrno;
×
699
    }
700
  }
701
  return TSDB_CODE_SUCCESS;
643,350✔
702
}
703

704
static int32_t stmtRestoreVgDataBlocksForRetry(STscStmt2* pStmt) {
594✔
705
  SVnodeModifyOpStmt* pModif = (SVnodeModifyOpStmt*)pStmt->sql.pQuery->pRoot;
594✔
706
  if (!pModif || !pStmt->pVgDataBlocksForRetry) {
594✔
707
    return TSDB_CODE_SUCCESS;
×
708
  }
709
  // The planner owns pDataBlocks after createQueryPlan (via TSWAP); it has already freed
710
  // the old array. We simply restore a new clone here.
711
  pModif->pDataBlocks = pStmt->pVgDataBlocksForRetry;
594✔
712
  pStmt->pVgDataBlocksForRetry = NULL;
594✔
713
  return TSDB_CODE_SUCCESS;
594✔
714
}
715

716
static STableMeta* stmtCloneTableMetaForRetry(const STableMeta* pSrc) {
396✔
717
  int32_t sz = (int32_t)TABLE_META_FULL_SIZE(pSrc);
396✔
718
  if (sz <= 0) {
396✔
719
    return NULL;
×
720
  }
721
  STableMeta* p = taosMemoryMalloc(sz);
396✔
722
  if (p == NULL) {
396✔
723
    return NULL;
×
724
  }
725
  (void)memcpy(p, pSrc, sz);
396✔
726
  tableMetaResetPointers(p);
396✔
727
  return p;
396✔
728
}
729

730
static void stmtFreeUidTableMetaHash(SHashObj* pHash) {
198✔
731
  if (pHash == NULL) {
198✔
732
    return;
×
733
  }
734
  void* pIter = NULL;
198✔
735
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
594✔
736
    STableMeta* pMeta = *(STableMeta**)pIter;
396✔
737
    taosMemoryFree(pMeta);
396✔
738
  }
739
  taosHashCleanup(pHash);
198✔
740
}
741

742
// tRowGet may succeed with a wrong prefix schema but leave VAR column pointers outside the SRow allocation;
743
// tRowBuild would then memcpy OOB. Require all VAR payloads to lie within [pRow, pRow + pRow->len).
744
static bool stmtScolValVarPayloadInRow(const SRow* pRow, const SColVal* pCv, int8_t colType) {
1,584✔
745
  if (!COL_VAL_IS_VALUE(pCv) || !IS_VAR_DATA_TYPE(colType)) {
1,584✔
746
    return true;
792✔
747
  }
748
  if (pCv->value.nData == 0) {
792✔
749
    return true;
×
750
  }
751
  if (pCv->value.pData == NULL) {
792✔
752
    return false;
×
753
  }
754
  const uint8_t* rbeg = (const uint8_t*)pRow;
792✔
755
  const uint8_t* rend = rbeg + pRow->len;
792✔
756
  const uint8_t* p = (const uint8_t*)pCv->value.pData;
792✔
757
  return (p >= rbeg) && (p + pCv->value.nData <= rend);
792✔
758
}
759

760
// Infer decode STSchema: full column count when row sver matches catalog; else try prefix column counts (ADD COLUMN).
761
static int32_t stmtFindDecodeSchemaForRow(SRow* pRow, const STableMeta* pMeta, STSchema** ppOld, int32_t* pnOldCols) {
396✔
762
  uint16_t oldSver = pRow->sver;
396✔
763
  int32_t  nMax = pMeta->tableInfo.numOfColumns;
396✔
764
  SSchema* base = (SSchema*)&pMeta->schema[0];
396✔
765

766
  if ((int32_t)oldSver == pMeta->sversion) {
396✔
767
    STSchema* p = tBuildTSchema(base, nMax, (int32_t)oldSver);
×
768
    if (p == NULL) {
×
769
      return terrno;
×
770
    }
771
    bool ok = true;
×
772
    for (int32_t i = 0; i < nMax; ++i) {
×
773
      SColVal cv = {0};
×
774
      if (tRowGet(pRow, p, i, &cv) != 0) {
×
775
        ok = false;
×
776
        break;
×
777
      }
778
      if (!stmtScolValVarPayloadInRow(pRow, &cv, p->columns[i].type)) {
×
779
        ok = false;
×
780
        break;
×
781
      }
782
    }
783
    if (ok) {
×
784
      *ppOld = p;
×
785
      *pnOldCols = nMax;
×
786
      return TSDB_CODE_SUCCESS;
×
787
    }
788
    tDestroyTSchema(p);
×
789
    return TSDB_CODE_INVALID_PARA;
×
790
  }
791

792
  for (int32_t n = nMax; n >= 1; --n) {
792✔
793
    STSchema* pTry = tBuildTSchema(base, n, (int32_t)oldSver);
792✔
794
    if (pTry == NULL) {
792✔
795
      return terrno;
×
796
    }
797
    bool ok = true;
792✔
798
    for (int32_t i = 0; i < n; ++i) {
1,980✔
799
      SColVal cv = {0};
1,584✔
800
      if (tRowGet(pRow, pTry, i, &cv) != 0) {
1,584✔
801
        ok = false;
×
802
        break;
396✔
803
      }
804
      if (!stmtScolValVarPayloadInRow(pRow, &cv, pTry->columns[i].type)) {
1,584✔
805
        ok = false;
396✔
806
        break;
396✔
807
      }
808
    }
809
    if (ok) {
792✔
810
      *ppOld = pTry;
396✔
811
      *pnOldCols = n;
396✔
812
      return TSDB_CODE_SUCCESS;
396✔
813
    }
814
    tDestroyTSchema(pTry);
396✔
815
  }
816
  return TSDB_CODE_INVALID_PARA;
×
817
}
818

819
static int32_t stmtRebuildOneRowToLatestSchema(SRow* pOldRow, const STableMeta* pMeta, SRow** ppNewRow) {
396✔
820
  STSchema* pOldSch = NULL;
396✔
821
  int32_t   nOldCols = 0;
396✔
822
  int32_t   code = stmtFindDecodeSchemaForRow(pOldRow, pMeta, &pOldSch, &nOldCols);
396✔
823
  if (code != TSDB_CODE_SUCCESS) {
396✔
824
    return code;
×
825
  }
826

827
  int32_t   nNewCols = pMeta->tableInfo.numOfColumns;
396✔
828
  STSchema* pNewSch = tBuildTSchema((SSchema*)&pMeta->schema[0], nNewCols, pMeta->sversion);
396✔
829
  if (pNewSch == NULL) {
396✔
830
    tDestroyTSchema(pOldSch);
×
831
    return terrno;
×
832
  }
833

834
  for (int32_t j = 0; j < nOldCols && j < nNewCols; ++j) {
1,188✔
835
    if (pOldSch->columns[j].colId != pNewSch->columns[j].colId) {
792✔
836
      tDestroyTSchema(pOldSch);
×
837
      tDestroyTSchema(pNewSch);
×
838
      return TSDB_CODE_INVALID_PARA;
×
839
    }
840
  }
841

842
  SArray* aColVal = taosArrayInit(pNewSch->numOfCols, sizeof(SColVal));
396✔
843
  if (aColVal == NULL) {
396✔
844
    tDestroyTSchema(pOldSch);
×
845
    tDestroyTSchema(pNewSch);
×
846
    return terrno;
×
847
  }
848

849
  for (int32_t j = 0; j < pNewSch->numOfCols; ++j) {
1,584✔
850
    SColVal cv = {0};
1,188✔
851
    if (j < nOldCols) {
1,188✔
852
      code = tRowGet(pOldRow, pOldSch, j, &cv);
792✔
853
      if (code != TSDB_CODE_SUCCESS) {
792✔
854
        taosArrayDestroy(aColVal);
×
855
        tDestroyTSchema(pOldSch);
×
856
        tDestroyTSchema(pNewSch);
×
857
        return code;
×
858
      }
859
    } else {
860
      STColumn* pc = &pNewSch->columns[j];
396✔
861
      cv = COL_VAL_NONE(pc->colId, pc->type);
396✔
862
    }
863
    if (taosArrayPush(aColVal, &cv) == NULL) {
1,188✔
864
      code = terrno;
×
865
      taosArrayDestroy(aColVal);
×
866
      tDestroyTSchema(pOldSch);
×
867
      tDestroyTSchema(pNewSch);
×
868
      return code;
×
869
    }
870
  }
871

872
  SRowBuildScanInfo sinfo = {0};
396✔
873
  code = tRowBuild(aColVal, pNewSch, ppNewRow, &sinfo);
396✔
874
  taosArrayDestroy(aColVal);
396✔
875
  tDestroyTSchema(pOldSch);
396✔
876
  tDestroyTSchema(pNewSch);
396✔
877
  return code;
396✔
878
}
879

880
static void stmtFreeHeapPatchRowsArray(SArray* aHeapRows) {
198✔
881
  if (aHeapRows == NULL) {
198✔
882
    return;
×
883
  }
884
  int32_t n = (int32_t)taosArrayGetSize(aHeapRows);
198✔
885
  for (int32_t i = 0; i < n; ++i) {
594✔
886
    SRow* p = taosArrayGetP(aHeapRows, i);
396✔
887
    tRowDestroy(p);
396✔
888
  }
889
  taosArrayDestroy(aHeapRows);
198✔
890
}
891

892
// After refreshMeta: set sver from catalog; decode each row with inferred old schema and tRowBuild with latest schema.
893
// aHeapRows: receives pointers from tRowBuild so they can be freed before tDestroySubmitReq (decode path does not free rows).
894
static void stmtPatchOneSubmitTbDataSchemaVer(SSubmitTbData* pTb, SHashObj* pUidMetaHash, SArray* aHeapRows) {
198✔
895
  if (pTb->uid == 0) {
198✔
896
    return;
×
897
  }
898
  void* pMv = taosHashGet(pUidMetaHash, &pTb->uid, sizeof(uint64_t));
198✔
899
  if (pMv == NULL) {
198✔
900
    return;
×
901
  }
902
  STableMeta* pMeta = *(STableMeta**)pMv;
198✔
903
  pTb->sver = pMeta->sversion;
198✔
904
  if (pTb->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
198✔
905
    return;
×
906
  }
907
  if (pTb->aRowP == NULL) {
198✔
908
    return;
×
909
  }
910
  if (pTb->pBlobSet != NULL) {
198✔
911
    int32_t nRow = (int32_t)TARRAY_SIZE(pTb->aRowP);
×
912
    SRow**  rows = (SRow**)TARRAY_DATA(pTb->aRowP);
×
913
    for (int32_t i = 0; i < nRow; ++i) {
×
914
      if (rows[i] != NULL) {
×
915
        rows[i]->sver = (uint16_t)pMeta->sversion;
×
916
      }
917
    }
918
    return;
×
919
  }
920

921
  int32_t nRow = (int32_t)TARRAY_SIZE(pTb->aRowP);
198✔
922
  for (int32_t i = 0; i < nRow; ++i) {
594✔
923
    SRow* pRow = taosArrayGetP(pTb->aRowP, i);
396✔
924
    if (pRow == NULL) {
396✔
925
      continue;
×
926
    }
927
    if ((uint16_t)pMeta->sversion == pRow->sver) {
396✔
928
      continue;
×
929
    }
930
    if (pRow->flag & HAS_BLOB) {
396✔
931
      pRow->sver = (uint16_t)pMeta->sversion;
×
932
      continue;
×
933
    }
934
    SRow* pNew = NULL;
396✔
935
    if (stmtRebuildOneRowToLatestSchema(pRow, pMeta, &pNew) == TSDB_CODE_SUCCESS && pNew != NULL) {
396✔
936
      // pRow points into the decoded submit payload (tDecodeBinaryWithSize); do not tRowDestroy it.
937
      if (aHeapRows != NULL && taosArrayPush(aHeapRows, &pNew) == NULL) {
792✔
938
        tRowDestroy(pNew);
×
939
        // Cannot record heap row for destroy before tDestroySubmitReq; keep embedded row, bump sver only.
940
        pRow->sver = (uint16_t)pMeta->sversion;
×
941
      } else {
942
        (void)taosArraySet(pTb->aRowP, i, &pNew);
396✔
943
      }
944
    } else {
945
      pRow->sver = (uint16_t)pMeta->sversion;
×
946
    }
947
  }
948
}
949

950
static int32_t stmtBuildUidToTableMetaHash(STscStmt2* pStmt, SRequestObj* pRequest, SHashObj** ppHash) {
198✔
951
  int32_t code = TSDB_CODE_SUCCESS;
198✔
952
  *ppHash = NULL;
198✔
953

954
  if (NULL == pStmt->pCatalog) {
198✔
955
    code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
×
956
    if (code != TSDB_CODE_SUCCESS) {
×
957
      return code;
×
958
    }
959
  }
960

961
  SHashObj* pHash =
962
      taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
198✔
963
  if (pHash == NULL) {
198✔
964
    return terrno;
×
965
  }
966

967
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
198✔
968
                           .requestId = pRequest->requestId,
198✔
969
                           .requestObjRefId = pRequest->self,
198✔
970
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
198✔
971

972
  int32_t tblNum = pRequest->tableList ? (int32_t)taosArrayGetSize(pRequest->tableList) : 0;
198✔
973
  for (int32_t i = 0; i < tblNum; ++i) {
594✔
974
    SName*      pName = taosArrayGet(pRequest->tableList, i);
396✔
975
    STableMeta* pMeta = NULL;
396✔
976
    int32_t     c = catalogGetTableMeta(pStmt->pCatalog, &conn, pName, &pMeta);
396✔
977
    if (c != TSDB_CODE_SUCCESS) {
396✔
978
      if (pMeta != NULL) {
×
979
        taosMemoryFree(pMeta);
×
980
      }
981
      taosHashCleanup(pHash);
×
982
      return c;
×
983
    }
984
    if (pMeta != NULL) {
396✔
985
      STableMeta* pDup = stmtCloneTableMetaForRetry(pMeta);
396✔
986
      taosMemoryFree(pMeta);
396✔
987
      pMeta = NULL;
396✔
988
      if (pDup != NULL) {
396✔
989
        int32_t putCode = taosHashPut(pHash, &pDup->uid, sizeof(uint64_t), &pDup, POINTER_BYTES);
396✔
990
        if (putCode != TSDB_CODE_SUCCESS) {
396✔
991
          STMT2_ELOG("stmtBuildUidToTableMetaHash taosHashPut failed uid:%" PRIu64 ", code:%s", (uint64_t)pDup->uid,
×
992
                     tstrerror(putCode));
993
          taosMemoryFree(pDup);
×
994
          taosHashCleanup(pHash);
×
995
          return putCode;
×
996
        }
997
      }
998
    }
999
  }
1000

1001
  if (taosHashGetSize(pHash) == 0 && pStmt->bInfo.sname.type != 0) {
198✔
1002
    STableMeta* pMeta = NULL;
×
1003
    code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pMeta);
×
1004
    if (code == TSDB_CODE_SUCCESS && pMeta != NULL) {
×
1005
      STableMeta* pDup = stmtCloneTableMetaForRetry(pMeta);
×
1006
      taosMemoryFree(pMeta);
×
1007
      pMeta = NULL;
×
1008
      if (pDup != NULL) {
×
1009
        int32_t putCode = taosHashPut(pHash, &pDup->uid, sizeof(uint64_t), &pDup, POINTER_BYTES);
×
1010
        if (putCode != TSDB_CODE_SUCCESS) {
×
1011
          STMT2_ELOG("stmtBuildUidToTableMetaHash taosHashPut failed uid:%" PRIu64 ", code:%s", (uint64_t)pDup->uid,
×
1012
                     tstrerror(putCode));
1013
          taosMemoryFree(pDup);
×
1014
          taosHashCleanup(pHash);
×
1015
          return putCode;
×
1016
        }
1017
      }
1018
    } else if (pMeta != NULL) {
×
1019
      taosMemoryFree(pMeta);
×
1020
    }
1021
  }
1022

1023
  *ppHash = pHash;
198✔
1024
  return TSDB_CODE_SUCCESS;
198✔
1025
}
1026

1027
static int32_t stmtUpdateVgDataBlocksSchemaVer(STscStmt2* pStmt, SRequestObj* pRequest) {
198✔
1028
  if (pStmt->pVgDataBlocksForRetry == NULL || taosArrayGetSize(pStmt->pVgDataBlocksForRetry) == 0) {
198✔
1029
    return TSDB_CODE_SUCCESS;
×
1030
  }
1031

1032
  SHashObj* pUidMetaHash = NULL;
198✔
1033
  int32_t   code = stmtBuildUidToTableMetaHash(pStmt, pRequest, &pUidMetaHash);
198✔
1034
  if (code != TSDB_CODE_SUCCESS) {
198✔
1035
    return code;
×
1036
  }
1037
  if (pUidMetaHash == NULL || taosHashGetSize(pUidMetaHash) == 0) {
198✔
1038
    if (pUidMetaHash != NULL) {
×
1039
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
1040
    }
1041
    return TSDB_CODE_SUCCESS;
×
1042
  }
1043

1044
  const int32_t headSz = (int32_t)sizeof(SSubmitReq2Msg);
198✔
1045
  int32_t       nBlk = (int32_t)taosArrayGetSize(pStmt->pVgDataBlocksForRetry);
198✔
1046

1047
  for (int32_t b = 0; b < nBlk; ++b) {
396✔
1048
    SVgDataBlocks* pVg = *(SVgDataBlocks**)taosArrayGet(pStmt->pVgDataBlocksForRetry, b);
198✔
1049
    if (pVg == NULL || pVg->pData == NULL || pVg->size <= headSz) {
198✔
1050
      continue;
×
1051
    }
1052

1053
    SDecoder     decoder = {0};
198✔
1054
    int32_t      bodyLen = pVg->size - headSz;
198✔
1055
    SSubmitReq2  req = {0};
198✔
1056

1057
    tDecoderInit(&decoder, (uint8_t*)pVg->pData + headSz, bodyLen);
198✔
1058
    code = tDecodeSubmitReq(&decoder, &req, NULL);
198✔
1059
    tDecoderClear(&decoder);
198✔
1060
    if (code != TSDB_CODE_SUCCESS) {
198✔
1061
      STMT2_ELOG("tDecodeSubmitReq failed when patching schema ver for retry, code:%s", tstrerror(code));
×
1062
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
1063
      return code;
×
1064
    }
1065
    if (req.raw) {
198✔
1066
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1067
      continue;
×
1068
    }
1069

1070
    SArray* aHeapRows = taosArrayInit(8, POINTER_BYTES);
198✔
1071
    if (aHeapRows == NULL) {
198✔
1072
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1073
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
1074
      return terrno;
×
1075
    }
1076

1077
    int32_t nTb = (int32_t)taosArrayGetSize(req.aSubmitTbData);
198✔
1078
    for (int32_t t = 0; t < nTb; ++t) {
396✔
1079
      stmtPatchOneSubmitTbDataSchemaVer(taosArrayGet(req.aSubmitTbData, t), pUidMetaHash, aHeapRows);
198✔
1080
    }
1081

1082
    int32_t encCap = 0;
198✔
1083
    int32_t szRet = 0;
198✔
1084
    tEncodeSize(tEncodeSubmitReq, &req, encCap, szRet);
198✔
1085
    if (szRet != 0) {
198✔
1086
      stmtFreeHeapPatchRowsArray(aHeapRows);
×
1087
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1088
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
1089
      return TSDB_CODE_INVALID_PARA;
×
1090
    }
1091

1092
    int32_t allocLen = headSz + encCap;
198✔
1093
    void*   pNew = taosMemoryMalloc(allocLen);
198✔
1094
    if (pNew == NULL) {
198✔
1095
      stmtFreeHeapPatchRowsArray(aHeapRows);
×
1096
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1097
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
1098
      return terrno;
×
1099
    }
1100

1101
    (void)memcpy(pNew, pVg->pData, headSz);
198✔
1102
    ((SSubmitReq2Msg*)pNew)->header.vgId = htonl(pVg->vg.vgId);
198✔
1103
    ((SSubmitReq2Msg*)pNew)->version = htobe64(1);
198✔
1104

1105
    SEncoder encoder = {0};
198✔
1106
    tEncoderInit(&encoder, (uint8_t*)pNew + headSz, encCap);
198✔
1107
    code = tEncodeSubmitReq(&encoder, &req);
198✔
1108
    int32_t bodyWritten = (int32_t)encoder.pos;
198✔
1109
    tEncoderClear(&encoder);
198✔
1110
    stmtFreeHeapPatchRowsArray(aHeapRows);
198✔
1111
    tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
198✔
1112

1113
    if (code != TSDB_CODE_SUCCESS) {
198✔
1114
      taosMemoryFree(pNew);
×
1115
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
1116
      return code;
×
1117
    }
1118

1119
    int32_t totalLen = headSz + bodyWritten;
198✔
1120
    ((SSubmitReq2Msg*)pNew)->header.contLen = htonl(totalLen);
198✔
1121

1122
    taosMemoryFree(pVg->pData);
198✔
1123
    pVg->pData = pNew;
198✔
1124
    pVg->size = totalLen;
198✔
1125
  }
1126

1127
  stmtFreeUidTableMetaHash(pUidMetaHash);
198✔
1128
  return TSDB_CODE_SUCCESS;
198✔
1129
}
1130

1131
typedef struct SStmtRetryTbPatch {
1132
  uint64_t uid;
1133
  uint64_t suid;
1134
  int32_t  sver;
1135
} SStmtRetryTbPatch;
1136

1137
// After refreshMeta, drop cached tbName->uid from stmt2 interlace bind so insGetStmtTableVgUid refetches from catalog.
1138
static void stmtInvalidateStbInterlaceTableUidCache(STscStmt2* pStmt) {
692✔
1139
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pTableHash != NULL) {
692✔
1140
    tSimpleHashClear(pStmt->sql.siInfo.pTableHash);
594✔
1141
  }
1142
}
692✔
1143

1144
// Super-table catalog meta uses uid == suid (see queryCreateTableMetaFromMsg); that must not be written onto
1145
// child-table SSubmitTbData. Only use child/normal/virtual-child meta here.
1146
static bool stmtRetryTbMetaIsSuperTable(const STableMeta* pMeta) {
792✔
1147
  return (pMeta != NULL && pMeta->tableType == TSDB_SUPER_TABLE);
792✔
1148
}
1149

1150
// Resolve uid/suid/sver for one SSubmitTbData after catalog refresh. tbIdx is the index within this submit req.
1151
static int32_t stmtFetchOneRetryTbMetaPatch(STscStmt2* pStmt, SRequestObj* pRequest, SSubmitTbData* pTb, int32_t tbIdx,
396✔
1152
                                            int32_t nSubmitTb, SStmtRetryTbPatch* pPatch) {
1153
  if (NULL == pStmt->pCatalog) {
396✔
1154
    int32_t c = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
×
1155
    if (c != TSDB_CODE_SUCCESS) {
×
1156
      return c;
×
1157
    }
1158
  }
1159

1160
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
396✔
1161
                           .requestId = pRequest->requestId,
396✔
1162
                           .requestObjRefId = pRequest->self,
396✔
1163
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
396✔
1164

1165
  // 1) Auto-create child: look up by child table name (never use STB-only name without child name).
1166
  if (pTb->pCreateTbReq != NULL && pTb->pCreateTbReq->name != NULL) {
396✔
1167
    SName         nm = {0};
×
1168
    int32_t       nc = TSDB_CODE_SUCCESS;
×
1169
    STableMeta*   pMeta = NULL;
×
1170
    if (pStmt->bInfo.sname.type != 0) {
×
1171
      tNameAssign(&nm, &pStmt->bInfo.sname);
×
1172
      nc = tNameAddTbName(&nm, pTb->pCreateTbReq->name, strlen(pTb->pCreateTbReq->name));
×
1173
    } else if (pRequest->tableList != NULL && taosArrayGetSize(pRequest->tableList) > 0) {
×
1174
      SName* p0 = taosArrayGet(pRequest->tableList, 0);
×
1175
      tNameAssign(&nm, p0);
×
1176
      nc = tNameAddTbName(&nm, pTb->pCreateTbReq->name, strlen(pTb->pCreateTbReq->name));
×
1177
    } else {
1178
      STMT2_ELOG_E("retry patch: no db/sname context for createTbReq name");
×
1179
      return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1180
    }
1181
    if (nc != TSDB_CODE_SUCCESS) {
×
1182
      return nc;
×
1183
    }
1184
    nc = catalogGetTableMeta(pStmt->pCatalog, &conn, &nm, &pMeta);
×
1185
    if (nc != TSDB_CODE_SUCCESS) {
×
1186
      taosMemoryFreeClear(pMeta);
×
1187
      return nc;
×
1188
    }
1189
    if (pMeta == NULL) {
×
1190
      return TSDB_CODE_INTERNAL_ERROR;
×
1191
    }
1192
    if (stmtRetryTbMetaIsSuperTable(pMeta)) {
×
1193
      taosMemoryFree(pMeta);
×
1194
      STMT2_ELOG_E("retry patch: createTbReq resolved to super table meta (unexpected)");
×
1195
      return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1196
    }
1197
    pPatch->uid = pMeta->uid;
×
1198
    pPatch->suid = pMeta->suid;
×
1199
    pPatch->sver = pMeta->sversion;
×
1200
    taosMemoryFree(pMeta);
×
1201
    return TSDB_CODE_SUCCESS;
×
1202
  }
1203

1204
  // 2) request->tableList: align tbIdx with the tbIdx-th non-super-table entry (skip super table names).
1205
  if (pRequest->tableList != NULL) {
396✔
1206
    int32_t          nList = (int32_t)taosArrayGetSize(pRequest->tableList);
396✔
1207
    int32_t          nonStbOrd = 0;
396✔
1208
    for (int32_t li = 0; li < nList; ++li) {
792✔
1209
      SName*      pName = taosArrayGet(pRequest->tableList, li);
792✔
1210
      STableMeta* pMeta = NULL;
792✔
1211
      int32_t     c = catalogGetTableMeta(pStmt->pCatalog, &conn, pName, &pMeta);
792✔
1212
      if (c != TSDB_CODE_SUCCESS) {
792✔
1213
        taosMemoryFreeClear(pMeta);
×
1214
        return c;
396✔
1215
      }
1216
      if (pMeta == NULL) {
792✔
1217
        return TSDB_CODE_INTERNAL_ERROR;
×
1218
      }
1219
      if (stmtRetryTbMetaIsSuperTable(pMeta)) {
792✔
1220
        taosMemoryFree(pMeta);
396✔
1221
        continue;
396✔
1222
      }
1223
      if (nonStbOrd == tbIdx) {
396✔
1224
        pPatch->uid = pMeta->uid;
396✔
1225
        pPatch->suid = pMeta->suid;
396✔
1226
        pPatch->sver = pMeta->sversion;
396✔
1227
        taosMemoryFree(pMeta);
396✔
1228
        return TSDB_CODE_SUCCESS;
396✔
1229
      }
1230
      taosMemoryFree(pMeta);
×
1231
      nonStbOrd++;
×
1232
    }
1233
  }
1234

1235
  // 3) Single-table statement: bInfo.sname
1236
  if (nSubmitTb == 1 && pStmt->bInfo.sname.type != 0) {
×
1237
    STableMeta* pMeta = NULL;
×
1238
    int32_t     c = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pMeta);
×
1239
    if (c != TSDB_CODE_SUCCESS) {
×
1240
      taosMemoryFreeClear(pMeta);
×
1241
      return c;
×
1242
    }
1243
    if (pMeta == NULL) {
×
1244
      return TSDB_CODE_INTERNAL_ERROR;
×
1245
    }
1246
    if (stmtRetryTbMetaIsSuperTable(pMeta)) {
×
1247
      taosMemoryFree(pMeta);
×
1248
      STMT2_ELOG_E("retry patch: bInfo.sname resolved to super table meta; need child table name");
×
1249
      return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1250
    }
1251
    pPatch->uid = pMeta->uid;
×
1252
    pPatch->suid = pMeta->suid;
×
1253
    pPatch->sver = pMeta->sversion;
×
1254
    taosMemoryFree(pMeta);
×
1255
    return TSDB_CODE_SUCCESS;
×
1256
  }
1257

1258
  STMT2_ELOG("retry patch: cannot resolve catalog meta for submit block (tb idx %d, uid %" PRId64 ")", tbIdx,
×
1259
             (int64_t)pTb->uid);
1260
  return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1261
}
1262

1263
// TSDB_CODE_TDB_TABLE_NOT_EXIST: refresh child table uid/suid/sver in serialized submit from catalog.
1264
static int32_t stmtUpdateVgDataBlocksTbMetaFromCatalog(STscStmt2* pStmt, SRequestObj* pRequest) {
396✔
1265
  if (pStmt->pVgDataBlocksForRetry == NULL || taosArrayGetSize(pStmt->pVgDataBlocksForRetry) == 0) {
396✔
1266
    return TSDB_CODE_SUCCESS;
×
1267
  }
1268

1269
  const int32_t headSz = (int32_t)sizeof(SSubmitReq2Msg);
396✔
1270
  int32_t       nBlk = (int32_t)taosArrayGetSize(pStmt->pVgDataBlocksForRetry);
396✔
1271

1272
  for (int32_t b = 0; b < nBlk; ++b) {
792✔
1273
    SVgDataBlocks* pVg = *(SVgDataBlocks**)taosArrayGet(pStmt->pVgDataBlocksForRetry, b);
396✔
1274
    if (pVg == NULL || pVg->pData == NULL || pVg->size <= headSz) {
396✔
1275
      continue;
×
1276
    }
1277

1278
    SDecoder     decoder = {0};
396✔
1279
    int32_t      bodyLen = pVg->size - headSz;
396✔
1280
    SSubmitReq2  req = {0};
396✔
1281
    int32_t      code = 0;
396✔
1282

1283
    tDecoderInit(&decoder, (uint8_t*)pVg->pData + headSz, bodyLen);
396✔
1284
    code = tDecodeSubmitReq(&decoder, &req, NULL);
396✔
1285
    tDecoderClear(&decoder);
396✔
1286
    if (code != TSDB_CODE_SUCCESS) {
396✔
1287
      STMT2_ELOG("tDecodeSubmitReq failed when patching table meta for retry, code:%s", tstrerror(code));
×
1288
      return code;
×
1289
    }
1290
    if (req.raw) {
396✔
1291
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1292
      continue;
×
1293
    }
1294

1295
    int32_t nTb = (int32_t)taosArrayGetSize(req.aSubmitTbData);
396✔
1296
    for (int32_t t = 0; t < nTb; ++t) {
792✔
1297
      SStmtRetryTbPatch patch = {0};
396✔
1298
      code = stmtFetchOneRetryTbMetaPatch(pStmt, pRequest, taosArrayGet(req.aSubmitTbData, t), t, nTb, &patch);
396✔
1299
      if (code != TSDB_CODE_SUCCESS) {
396✔
1300
        tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1301
        return code;
×
1302
      }
1303
      SSubmitTbData* pRow = taosArrayGet(req.aSubmitTbData, t);
396✔
1304
      pRow->uid = (int64_t)patch.uid;
396✔
1305
      pRow->suid = (int64_t)patch.suid;
396✔
1306
      pRow->sver = patch.sver;
396✔
1307
    }
1308

1309
    int32_t encCap = 0;
396✔
1310
    int32_t szRet = 0;
396✔
1311
    tEncodeSize(tEncodeSubmitReq, &req, encCap, szRet);
396✔
1312
    if (szRet != 0) {
396✔
1313
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1314
      return TSDB_CODE_INVALID_PARA;
×
1315
    }
1316

1317
    int32_t allocLen = headSz + encCap;
396✔
1318
    void*   pNew = taosMemoryMalloc(allocLen);
396✔
1319
    if (pNew == NULL) {
396✔
1320
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
1321
      return terrno;
×
1322
    }
1323

1324
    (void)memcpy(pNew, pVg->pData, headSz);
396✔
1325
    ((SSubmitReq2Msg*)pNew)->header.vgId = htonl(pVg->vg.vgId);
396✔
1326
    ((SSubmitReq2Msg*)pNew)->version = htobe64(1);
396✔
1327

1328
    SEncoder encoder = {0};
396✔
1329
    tEncoderInit(&encoder, (uint8_t*)pNew + headSz, encCap);
396✔
1330
    code = tEncodeSubmitReq(&encoder, &req);
396✔
1331
    int32_t bodyWritten = (int32_t)encoder.pos;
396✔
1332
    tEncoderClear(&encoder);
396✔
1333
    tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
396✔
1334

1335
    if (code != TSDB_CODE_SUCCESS) {
396✔
1336
      taosMemoryFree(pNew);
×
1337
      return code;
×
1338
    }
1339

1340
    int32_t totalLen = headSz + bodyWritten;
396✔
1341
    ((SSubmitReq2Msg*)pNew)->header.contLen = htonl(totalLen);
396✔
1342

1343
    taosMemoryFree(pVg->pData);
396✔
1344
    pVg->pData = pNew;
396✔
1345
    pVg->size = totalLen;
396✔
1346
  }
1347

1348
  return TSDB_CODE_SUCCESS;
396✔
1349
}
1350

1351
static bool stmtIsSchemaVersionRetryError(int32_t err) {
198✔
1352
  return (bool)(NEED_CLIENT_REFRESH_TBLMETA_ERROR(err) || err == TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION);
198✔
1353
}
1354

1355
static void stmtFreeTbBuf(void* buf) {
143,232✔
1356
  void* pBuf = *(void**)buf;
143,232✔
1357
  taosMemoryFree(pBuf);
143,192✔
1358
}
143,232✔
1359

1360
static void stmtFreeTbCols(void* buf) {
12,975,000✔
1361
  SArray* pCols = *(SArray**)buf;
12,975,000✔
1362
  taosArrayDestroy(pCols);
12,975,000✔
1363
}
12,975,000✔
1364

1365
static int32_t stmtCleanSQLInfo(STscStmt2* pStmt) {
163,846✔
1366
  STMT2_TLOG_E("start to free SQL info");
163,846✔
1367

1368
  taosMemoryFree(pStmt->sql.pBindInfo);
163,846✔
1369
  taosMemoryFree(pStmt->sql.queryRes.fields);
163,883✔
1370
  taosMemoryFree(pStmt->sql.queryRes.userFields);
163,886✔
1371
  taosMemoryFree(pStmt->sql.sqlStr);
163,886✔
1372
  qDestroyQuery(pStmt->sql.pQuery);
163,886✔
1373
  taosArrayDestroy(pStmt->sql.nodeList);
163,886✔
1374
  taosHashCleanup(pStmt->sql.pVgHash);
163,886✔
1375
  pStmt->sql.pVgHash = NULL;
163,886✔
1376
  if (pStmt->sql.fixValueTags) {
163,886✔
1377
    pStmt->sql.fixValueTags = false;
3,346✔
1378
    tdDestroySVCreateTbReq(pStmt->sql.fixValueTbReq);
3,346✔
1379
    taosMemoryFreeClear(pStmt->sql.fixValueTbReq);
3,346✔
1380
    pStmt->sql.fixValueTbReq = NULL;
3,346✔
1381
  }
1382

1383
  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
163,846✔
1384
  while (pIter) {
170,449✔
1385
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
6,563✔
1386

1387
    qDestroyStmtDataBlock(pCache->pDataCtx);
6,563✔
1388
    qDestroyBoundColInfo(pCache->boundTags);
6,563✔
1389
    taosMemoryFreeClear(pCache->boundTags);
6,563✔
1390

1391
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
6,563✔
1392
  }
1393
  taosHashCleanup(pStmt->sql.pTableCache);
163,886✔
1394
  pStmt->sql.pTableCache = NULL;
163,886✔
1395

1396
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
163,886✔
1397
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
163,886✔
1398
  stmtFreeVgDataBlocksForRetry(pStmt);
163,886✔
1399

1400
  taos_free_result(pStmt->sql.siInfo.pRequest);
163,886✔
1401
  taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
163,886✔
1402
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
163,846✔
1403
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableRowDataHash);
163,886✔
1404
  taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
163,886✔
1405
  taosMemoryFree(pStmt->sql.siInfo.pTSchema);
163,886✔
1406
  qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
163,886✔
1407
  taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
163,886✔
1408
  pStmt->sql.siInfo.pTableCols = NULL;
163,886✔
1409

1410
  (void)memset(&pStmt->sql, 0, sizeof(pStmt->sql));
163,886✔
1411
  pStmt->sql.siInfo.tableColsReady = true;
163,886✔
1412

1413
  STMT2_TLOG_E("end to free SQL info");
163,886✔
1414

1415
  return TSDB_CODE_SUCCESS;
163,886✔
1416
}
1417

1418
static int32_t stmtTryAddTableVgroupInfo(STscStmt2* pStmt, int32_t* vgId) {
664,517✔
1419
  if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) {
664,517✔
1420
    return TSDB_CODE_SUCCESS;
1,980✔
1421
  }
1422

1423
  SVgroupInfo      vgInfo = {0};
662,537✔
1424
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
662,537✔
1425
                           .requestId = pStmt->exec.pRequest->requestId,
662,537✔
1426
                           .requestObjRefId = pStmt->exec.pRequest->self,
662,537✔
1427
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
662,537✔
1428

1429
  int32_t code = catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo);
662,693✔
1430
  if (TSDB_CODE_SUCCESS != code) {
662,810✔
1431
    STMT2_ELOG("fail to get vgroup info from catalog, code:%d", code);
×
1432
    return code;
×
1433
  }
1434

1435
  code =
1436
      taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
662,810✔
1437
  if (TSDB_CODE_SUCCESS != code) {
662,797✔
1438
    STMT2_ELOG("fail to put vgroup info, code:%d", code);
117✔
1439
    return code;
×
1440
  }
1441

1442
  *vgId = vgInfo.vgId;
662,680✔
1443

1444
  return TSDB_CODE_SUCCESS;
662,680✔
1445
}
1446

1447
int32_t stmtGetTableMetaAndValidate(STscStmt2* pStmt, uint64_t* uid, uint64_t* suid, int32_t* vgId, int8_t* tableType) {
10,333✔
1448
  STableMeta*      pTableMeta = NULL;
10,333✔
1449
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
10,333✔
1450
                           .requestId = pStmt->exec.pRequest->requestId,
10,372✔
1451
                           .requestObjRefId = pStmt->exec.pRequest->self,
10,333✔
1452
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
10,372✔
1453
  int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
10,372✔
1454

1455
  pStmt->stat.ctgGetTbMetaNum++;
10,372✔
1456

1457
  if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
10,317✔
1458
    STMT2_ELOG("tb %s not exist", pStmt->bInfo.tbFName);
990✔
1459
    (void)stmtCleanBindInfo(pStmt);
990✔
1460

1461
    if (!pStmt->sql.autoCreateTbl) {
990✔
1462
      STMT2_ELOG("table %s does not exist and autoCreateTbl is disabled", pStmt->bInfo.tbFName);
990✔
1463
      STMT_ERR_RET(TSDB_CODE_PAR_TABLE_NOT_EXIST);
990✔
1464
    }
1465

1466
    STMT_ERR_RET(code);
×
1467
  }
1468

1469
  STMT_ERR_RET(code);
9,327✔
1470

1471
  *uid = pTableMeta->uid;
9,327✔
1472
  *suid = pTableMeta->suid;
9,327✔
1473
  *tableType = pTableMeta->tableType;
9,327✔
1474
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
9,327✔
1475
  *vgId = pTableMeta->vgId;
9,327✔
1476

1477
  taosMemoryFree(pTableMeta);
9,327✔
1478

1479
  return TSDB_CODE_SUCCESS;
9,382✔
1480
}
1481

1482
static int32_t stmtRebuildDataBlock(STscStmt2* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
14,447✔
1483
                                    uint64_t suid, int32_t vgId) {
1484
  STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
14,447✔
1485
  STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgId, pStmt->sql.autoCreateTbl));
14,447✔
1486

1487
  STMT2_DLOG("uid:%" PRId64 ", rebuild table data context, vgId:%d", uid, vgId);
14,447✔
1488

1489
  return TSDB_CODE_SUCCESS;
14,447✔
1490
}
1491

1492
static int32_t stmtGetFromCache(STscStmt2* pStmt) {
42,444✔
1493
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) {
42,444✔
1494
    pStmt->bInfo.needParse = false;
×
1495
    pStmt->bInfo.inExecCache = false;
×
1496
    return TSDB_CODE_SUCCESS;
×
1497
  }
1498

1499
  pStmt->bInfo.needParse = true;
42,389✔
1500
  pStmt->bInfo.inExecCache = false;
42,444✔
1501

1502
  STableDataCxt** pCxtInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
42,402✔
1503
  if (pCxtInExec) {
42,499✔
1504
    pStmt->bInfo.needParse = false;
6,930✔
1505
    pStmt->bInfo.inExecCache = true;
6,930✔
1506

1507
    pStmt->exec.pCurrBlock = *pCxtInExec;
6,930✔
1508

1509
    if (pStmt->sql.autoCreateTbl) {
6,930✔
1510
      STMT2_DLOG("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
5,940✔
1511
      return TSDB_CODE_SUCCESS;
5,940✔
1512
    }
1513
  }
1514

1515
  if (NULL == pStmt->pCatalog) {
36,559✔
1516
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
16,032✔
1517
    pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
15,977✔
1518
  }
1519

1520
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
36,517✔
1521
    if (pStmt->bInfo.inExecCache) {
20,924✔
1522
      pStmt->bInfo.needParse = false;
×
1523
      STMT2_DLOG("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
×
1524
      return TSDB_CODE_SUCCESS;
×
1525
    }
1526

1527
    STMT2_DLOG("no stmt block cache for tb %s", pStmt->bInfo.tbFName);
20,924✔
1528

1529
    return TSDB_CODE_SUCCESS;
20,924✔
1530
  }
1531

1532
  if (pStmt->sql.autoCreateTbl) {
15,635✔
1533
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
12,071✔
1534
    if (pCache) {
12,071✔
1535
      pStmt->bInfo.needParse = false;
12,071✔
1536
      pStmt->bInfo.tbUid = 0;
12,071✔
1537

1538
      STableDataCxt* pNewBlock = NULL;
12,071✔
1539
      STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid, -1));
12,071✔
1540

1541
      if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
12,071✔
1542
                      POINTER_BYTES)) {
1543
        STMT_ERR_RET(terrno);
×
1544
      }
1545

1546
      pStmt->exec.pCurrBlock = pNewBlock;
12,071✔
1547

1548
      STMT2_DLOG("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
12,071✔
1549

1550
      return TSDB_CODE_SUCCESS;
12,071✔
1551
    }
1552

1553
    STMT_RET(stmtCleanBindInfo(pStmt));
×
1554
  }
1555

1556
  uint64_t uid, suid;
×
1557
  int32_t  vgId;
×
1558
  int8_t   tableType;
×
1559

1560
  STMT_ERR_RET(stmtGetTableMetaAndValidate(pStmt, &uid, &suid, &vgId, &tableType));
3,564✔
1561

1562
  uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
3,366✔
1563

1564
  if (uid == pStmt->bInfo.tbUid) {
3,366✔
1565
    pStmt->bInfo.needParse = false;
×
1566

1567
    STMT2_DLOG("tb %s is current table", pStmt->bInfo.tbFName);
×
1568

1569
    return TSDB_CODE_SUCCESS;
×
1570
  }
1571

1572
  if (pStmt->bInfo.inExecCache) {
3,366✔
1573
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
990✔
1574
    if (NULL == pCache) {
990✔
1575
      STMT2_ELOG("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash",
×
1576
                 pStmt->bInfo.tbFName, uid, cacheUid);
1577

1578
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1579
    }
1580

1581
    pStmt->bInfo.needParse = false;
990✔
1582

1583
    pStmt->bInfo.tbUid = uid;
990✔
1584
    pStmt->bInfo.tbSuid = suid;
990✔
1585
    pStmt->bInfo.tbType = tableType;
990✔
1586
    pStmt->bInfo.boundTags = pCache->boundTags;
990✔
1587
    pStmt->bInfo.tagsCached = true;
990✔
1588

1589
    STMT2_DLOG("tb %s in execBlock list, set to current", pStmt->bInfo.tbFName);
990✔
1590

1591
    return TSDB_CODE_SUCCESS;
990✔
1592
  }
1593

1594
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
2,376✔
1595
  if (pCache) {
2,376✔
1596
    pStmt->bInfo.needParse = false;
2,376✔
1597

1598
    pStmt->bInfo.tbUid = uid;
2,376✔
1599
    pStmt->bInfo.tbSuid = suid;
2,376✔
1600
    pStmt->bInfo.tbType = tableType;
2,376✔
1601
    pStmt->bInfo.boundTags = pCache->boundTags;
2,376✔
1602
    pStmt->bInfo.tagsCached = true;
2,376✔
1603

1604
    STableDataCxt* pNewBlock = NULL;
2,376✔
1605
    STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid, vgId));
2,376✔
1606

1607
    if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
2,376✔
1608
                    POINTER_BYTES)) {
1609
      STMT_ERR_RET(terrno);
×
1610
    }
1611

1612
    pStmt->exec.pCurrBlock = pNewBlock;
2,376✔
1613

1614
    tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);
2,376✔
1615

1616
    return TSDB_CODE_SUCCESS;
2,376✔
1617
  }
1618

1619
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
1620

1621
  return TSDB_CODE_SUCCESS;
×
1622
}
1623

1624
static int32_t stmtResetStmt(STscStmt2* pStmt) {
×
1625
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
×
1626

1627
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1628
  if (NULL == pStmt->sql.pTableCache) {
×
1629
    STMT2_ELOG("fail to allocate memory for pTableCache in stmtResetStmt:%s", tstrerror(terrno));
×
1630
    STMT_ERR_RET(terrno);
×
1631
  }
1632

1633
  pStmt->sql.status = STMT_INIT;
×
1634

1635
  return TSDB_CODE_SUCCESS;
×
1636
}
1637

1638
static void stmtAsyncOutput(STscStmt2* pStmt, void* param) {
1,576,391✔
1639
  SStmtQNode* pParam = (SStmtQNode*)param;
1,576,391✔
1640

1641
  if (pParam->restoreTbCols) {
1,576,391✔
1642
    for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) {
1,575,823✔
1643
      SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i);
1,068,789✔
1644
      *p = taosArrayInit(20, POINTER_BYTES);
1,068,789✔
1645
      if (*p == NULL) {
1,068,734✔
1646
        pStmt->errCode = terrno;
52✔
1647
      }
1648
    }
1649
    atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
507,034✔
1650
    STMT2_TLOG_E("restore pTableCols finished");
507,154✔
1651
  } else {
1652
    int code = qAppendStmt2TableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
1,069,250✔
1653
                                       &pStmt->sql.siInfo, pParam->pCreateTbReq);
1654
    // taosMemoryFree(pParam->pTbData);
1655
    if (code != TSDB_CODE_SUCCESS) {
1,068,944✔
1656
      STMT2_ELOG("async append stmt output failed, tbname:%s, err:%s", pParam->tblData.tbName, tstrerror(code));
198✔
1657
      pStmt->errCode = code;
198✔
1658
    }
1659
    (void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
1,068,944✔
1660
  }
1661
}
1,576,487✔
1662

1663
static void* stmtBindThreadFunc(void* param) {
143,528✔
1664
  setThreadName("stmt2Bind");
143,528✔
1665

1666
  STscStmt2* pStmt = (STscStmt2*)param;
143,528✔
1667
  STMT2_DLOG_E("stmt2 bind thread started");
143,528✔
1668

1669
  while (true) {
1,576,487✔
1670
    SStmtQNode* asyncParam = NULL;
1,720,015✔
1671

1672
    if (!stmtDequeue(pStmt, &asyncParam)) {
1,719,973✔
1673
      if (pStmt->queue.stopQueue && 0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
143,232✔
1674
        STMT2_DLOG_E("queue is empty and stopQueue is set, thread will exit");
143,232✔
1675
        break;
143,232✔
1676
      }
1677
      continue;
×
1678
    }
1679

1680
    stmtAsyncOutput(pStmt, asyncParam);
1,576,443✔
1681
  }
1682

1683
  STMT2_DLOG_E("stmt2 bind thread stopped");
143,232✔
1684
  return NULL;
143,232✔
1685
}
1686

1687
static int32_t stmtStartBindThread(STscStmt2* pStmt) {
143,435✔
1688
  TdThreadAttr thAttr;
114,378✔
1689
  if (taosThreadAttrInit(&thAttr) != 0) {
143,477✔
1690
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1691
  }
1692
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
143,435✔
1693
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1694
  }
1695

1696
  if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) {
143,476✔
1697
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1698
    STMT_ERR_RET(terrno);
×
1699
  }
1700

1701
  pStmt->bindThreadInUse = true;
143,528✔
1702

1703
  (void)taosThreadAttrDestroy(&thAttr);
143,528✔
1704
  return TSDB_CODE_SUCCESS;
143,528✔
1705
}
1706

1707
static int32_t stmtInitQueue(STscStmt2* pStmt) {
143,445✔
1708
  (void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
143,445✔
1709
  (void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
143,477✔
1710
  STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
286,932✔
1711
  pStmt->queue.tail = pStmt->queue.head;
143,445✔
1712

1713
  return TSDB_CODE_SUCCESS;
143,445✔
1714
}
1715

1716
static int32_t stmtIniAsyncBind(STscStmt2* pStmt) {
163,192✔
1717
  (void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL);
163,192✔
1718
  (void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL);
163,192✔
1719
  pStmt->asyncBindParam.asyncBindNum = 0;
163,153✔
1720

1721
  return TSDB_CODE_SUCCESS;
163,192✔
1722
}
1723

1724
static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
143,276✔
1725
  pTblBuf->buffUnit = sizeof(SStmtQNode);
143,276✔
1726
  pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
143,276✔
1727
  pTblBuf->pBufList = taosArrayInit(100, POINTER_BYTES);
143,276✔
1728
  if (NULL == pTblBuf->pBufList) {
143,276✔
1729
    return terrno;
×
1730
  }
1731
  void* buff = taosMemoryMalloc(pTblBuf->buffSize);
143,276✔
1732
  if (NULL == buff) {
143,528✔
1733
    return terrno;
×
1734
  }
1735

1736
  if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
287,056✔
1737
    return terrno;
×
1738
  }
1739

1740
  pTblBuf->pCurBuff = buff;
143,528✔
1741
  pTblBuf->buffIdx = 1;
143,528✔
1742
  pTblBuf->buffOffset = 0;
143,528✔
1743

1744
  return TSDB_CODE_SUCCESS;
143,528✔
1745
}
1746

1747
TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
157,327✔
1748
  STscObj*   pObj = (STscObj*)taos;
157,327✔
1749
  STscStmt2* pStmt = NULL;
157,327✔
1750
  int32_t    code = 0;
157,327✔
1751

1752
  pStmt = taosMemoryCalloc(1, sizeof(STscStmt2));
157,327✔
1753
  if (NULL == pStmt) {
157,904✔
1754
    STMT2_ELOG_E("fail to allocate memory for pStmt");
×
1755
    return NULL;
×
1756
  }
1757

1758
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
157,904✔
1759
  if (NULL == pStmt->sql.pTableCache) {
157,662✔
1760
    STMT2_ELOG("fail to allocate memory for pTableCache in stmtInit2:%s", tstrerror(terrno));
×
1761
    taosMemoryFree(pStmt);
×
1762
    return NULL;
×
1763
  }
1764

1765
  pStmt->taos = pObj;
157,662✔
1766
  if (taos->db[0] != '\0') {
157,662✔
1767
    pStmt->db = taosStrdup(taos->db);
146,843✔
1768
  }
1769
  pStmt->bInfo.needParse = true;
157,379✔
1770
  pStmt->sql.status = STMT_INIT;
157,621✔
1771
  pStmt->errCode = TSDB_CODE_SUCCESS;
157,618✔
1772

1773
  if (NULL != pOptions) {
157,860✔
1774
    (void)memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
154,045✔
1775
    if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) {
154,048✔
1776
      pStmt->stbInterlaceMode = true;
137,957✔
1777
    }
1778

1779
    pStmt->reqid = pOptions->reqid;
154,290✔
1780
  }
1781

1782
  if (pStmt->stbInterlaceMode) {
158,105✔
1783
    pStmt->sql.siInfo.transport = taos->pAppInfo->pTransporter;
138,240✔
1784
    pStmt->sql.siInfo.acctId = taos->acctId;
138,240✔
1785
    pStmt->sql.siInfo.dbname = taos->db;
137,998✔
1786
    pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
138,240✔
1787

1788
    pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
137,998✔
1789
    if (NULL == pStmt->sql.siInfo.pTableHash) {
138,144✔
1790
      STMT2_ELOG("fail to allocate memory for pTableHash:%s", tstrerror(terrno));
×
1791
      (void)stmtClose2(pStmt);
×
1792
      return NULL;
×
1793
    }
1794

1795
    pStmt->sql.siInfo.pTableRowDataHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
137,902✔
1796
    if (NULL == pStmt->sql.siInfo.pTableRowDataHash) {
137,957✔
1797
      STMT2_ELOG("fail to allocate memory for pTableRowDataHash:%s", tstrerror(terrno));
×
1798
      (void)stmtClose2(pStmt);
×
1799
      return NULL;
×
1800
    }
1801

1802
    pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
137,957✔
1803
    if (NULL == pStmt->sql.siInfo.pTableCols) {
138,240✔
1804
      STMT2_ELOG("fail to allocate memory for pTableCols:%s", tstrerror(terrno));
×
1805
      (void)stmtClose2(pStmt);
×
1806
      return NULL;
×
1807
    }
1808

1809
    code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
138,240✔
1810
    if (TSDB_CODE_SUCCESS == code) {
138,157✔
1811
      code = stmtInitQueue(pStmt);
138,157✔
1812
    }
1813
    if (TSDB_CODE_SUCCESS == code) {
138,147✔
1814
      code = stmtStartBindThread(pStmt);
138,147✔
1815
    }
1816
    if (TSDB_CODE_SUCCESS != code) {
138,240✔
1817
      terrno = code;
×
1818
      STMT2_ELOG("fail to init stmt2 bind thread:%s", tstrerror(code));
×
1819
      (void)stmtClose2(pStmt);
×
1820
      return NULL;
×
1821
    }
1822
  }
1823

1824
  pStmt->sql.siInfo.tableColsReady = true;
157,863✔
1825
  if (pStmt->options.asyncExecFn) {
157,863✔
1826
    if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) {
2,574✔
1827
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
1828
      STMT2_ELOG("fail to init asyncExecSem:%s", tstrerror(terrno));
×
1829
      (void)stmtClose2(pStmt);
×
1830
      return NULL;
×
1831
    }
1832
  }
1833
  code = stmtIniAsyncBind(pStmt);
157,863✔
1834
  if (TSDB_CODE_SUCCESS != code) {
157,865✔
1835
    terrno = code;
×
1836
    STMT2_ELOG("fail to start init asyncExecSem:%s", tstrerror(code));
×
1837

1838
    (void)stmtClose2(pStmt);
×
1839
    return NULL;
×
1840
  }
1841

1842
  pStmt->execSemWaited = false;
157,865✔
1843

1844
  // STMT_LOG_SEQ(STMT_INIT);
1845

1846
  STMT2_DLOG("stmt2 initialize finished, seqId:%d, db:%s, interlaceMode:%d, asyncExec:%d", pStmt->seqId, pStmt->db,
157,865✔
1847
             pStmt->stbInterlaceMode, pStmt->options.asyncExecFn != NULL);
1848

1849
  return pStmt;
157,865✔
1850
}
1851

1852
static int stmtSetDbName2(TAOS_STMT2* stmt, const char* dbName) {
142,564✔
1853
  STscStmt2* pStmt = (STscStmt2*)stmt;
142,564✔
1854
  if (dbName == NULL || dbName[0] == '\0') {
142,564✔
1855
    STMT2_ELOG_E("dbname in sql is illegal");
42✔
1856
    return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
×
1857
  }
1858

1859
  STMT2_DLOG("dbname is specified in sql:%s", dbName);
142,522✔
1860
  if (pStmt->db == NULL || pStmt->db[0] == '\0') {
142,522✔
1861
    taosMemoryFreeClear(pStmt->db);
9,833✔
1862
    STMT2_DLOG("dbname:%s is by sql, not by taosconnect", dbName);
9,873✔
1863
    pStmt->db = taosStrdup(dbName);
9,873✔
1864
    (void)strdequote(pStmt->db);
9,873✔
1865
  }
1866
  STMT_ERR_RET(stmtCreateRequest(pStmt));
142,604✔
1867

1868
  // The SQL statement specifies a database name, overriding the previously specified database
1869
  taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
142,564✔
1870
  pStmt->exec.pRequest->pDb = taosStrdup(dbName);
142,481✔
1871
  (void)strdequote(pStmt->exec.pRequest->pDb);
142,523✔
1872
  if (pStmt->exec.pRequest->pDb == NULL) {
142,564✔
UNCOV
1873
    return terrno;
×
1874
  }
1875
  if (pStmt->sql.stbInterlaceMode) {
142,522✔
1876
    pStmt->sql.siInfo.dbname = pStmt->exec.pRequest->pDb;
130,907✔
1877
  }
1878
  return TSDB_CODE_SUCCESS;
142,487✔
1879
}
1880
static int32_t stmtResetStbInterlaceCache(STscStmt2* pStmt) {
5,288✔
1881
  int32_t code = TSDB_CODE_SUCCESS;
5,288✔
1882

1883
  pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
5,288✔
1884
  if (NULL == pStmt->sql.siInfo.pTableHash) {
5,288✔
1885
    return terrno;
×
1886
  }
1887

1888
  pStmt->sql.siInfo.pTableRowDataHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
5,288✔
1889
  if (NULL == pStmt->sql.siInfo.pTableRowDataHash) {
5,288✔
1890
    return terrno;
×
1891
  }
1892

1893
  pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
5,288✔
1894
  if (NULL == pStmt->sql.siInfo.pTableCols) {
5,288✔
1895
    return terrno;
×
1896
  }
1897

1898
  code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
5,288✔
1899

1900
  if (TSDB_CODE_SUCCESS == code) {
5,288✔
1901
    code = stmtInitQueue(pStmt);
5,288✔
1902
    pStmt->queue.stopQueue = false;
5,288✔
1903
  }
1904
  if (TSDB_CODE_SUCCESS == code) {
5,288✔
1905
    code = stmtStartBindThread(pStmt);
5,288✔
1906
  }
1907
  if (TSDB_CODE_SUCCESS != code) {
5,288✔
1908
    return code;
×
1909
  }
1910

1911
  return TSDB_CODE_SUCCESS;
5,288✔
1912
}
1913

1914
static int32_t stmtDeepReset(STscStmt2* pStmt) {
6,872✔
1915
  // Save state that needs to be preserved
1916
  char*             db = pStmt->db;
6,872✔
1917
  TAOS_STMT2_OPTION options = pStmt->options;
6,872✔
1918
  uint32_t          reqid = pStmt->reqid;
6,872✔
1919
  bool              stbInterlaceMode = pStmt->stbInterlaceMode;
6,872✔
1920

1921
  pStmt->errCode = 0;
6,872✔
1922

1923
  // Wait for async execution to complete
1924
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
6,872✔
1925
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
396✔
1926
      STMT2_ELOG_E("bind param wait asyncExecSem failed");
×
1927
    }
1928
    pStmt->execSemWaited = true;
396✔
1929
  }
1930

1931
  // Stop bind thread if in use (similar to stmtClose2)
1932
  if (stbInterlaceMode && pStmt->bindThreadInUse) {
6,872✔
1933
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
5,288✔
1934
      taosUsleep(1);
×
1935
    }
1936
    (void)taosThreadMutexLock(&pStmt->queue.mutex);
5,288✔
1937
    pStmt->queue.stopQueue = true;
5,288✔
1938
    (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
5,288✔
1939
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
5,288✔
1940

1941
    (void)taosThreadJoin(pStmt->bindThread, NULL);
5,288✔
1942
    pStmt->bindThreadInUse = false;
5,288✔
1943
    pStmt->queue.head = NULL;
5,288✔
1944
    pStmt->queue.tail = NULL;
5,288✔
1945
    pStmt->queue.qRemainNum = 0;
5,288✔
1946

1947
    (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
5,288✔
1948
    (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
5,288✔
1949
  }
1950

1951
  // Clean all SQL and execution info (stmtCleanSQLInfo already handles most cleanup)
1952
  pStmt->bInfo.boundColsCached = false;
6,872✔
1953
  if (stbInterlaceMode) {
6,872✔
1954
    pStmt->bInfo.tagsCached = false;
5,288✔
1955
  }
1956
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
6,872✔
1957

1958
  // Reinitialize resources (similar to stmtInit2)
1959
  if (stbInterlaceMode) {
6,872✔
1960
    pStmt->sql.siInfo.transport = pStmt->taos->pAppInfo->pTransporter;
5,288✔
1961
    pStmt->sql.siInfo.acctId = pStmt->taos->acctId;
5,288✔
1962
    pStmt->sql.siInfo.dbname = pStmt->taos->db;
5,288✔
1963
    pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
5,288✔
1964

1965
    if (NULL == pStmt->pCatalog) {
5,288✔
1966
      STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
594✔
1967
    }
1968
    pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
5,288✔
1969

1970
    STMT_ERR_RET(stmtResetStbInterlaceCache(pStmt));
5,288✔
1971

1972
    int32_t code = stmtIniAsyncBind(pStmt);
5,288✔
1973
    if (TSDB_CODE_SUCCESS != code) {
5,288✔
1974
      STMT2_ELOG("fail to reinit async bind in stmtDeepReset:%s", tstrerror(code));
×
1975
      return code;
×
1976
    }
1977
  }
1978

1979
  // Restore preserved state
1980
  pStmt->db = db;
6,872✔
1981
  pStmt->options = options;
6,872✔
1982
  pStmt->reqid = reqid;
6,872✔
1983
  pStmt->stbInterlaceMode = stbInterlaceMode;
6,872✔
1984

1985
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
6,872✔
1986
  if (NULL == pStmt->sql.pTableCache) {
6,872✔
1987
    STMT2_ELOG("fail to allocate memory for pTableCache in stmtDeepReset:%s", tstrerror(terrno));
×
1988
    return terrno;
×
1989
  }
1990

1991
  pStmt->bInfo.needParse = true;
6,872✔
1992
  pStmt->sql.status = STMT_INIT;
6,872✔
1993
  pStmt->sql.siInfo.tableColsReady = true;
6,872✔
1994

1995
  return TSDB_CODE_SUCCESS;
6,872✔
1996
}
1997

1998
int stmtPrepare2(TAOS_STMT2* stmt, const char* sql, unsigned long length) {
164,340✔
1999
  STscStmt2* pStmt = (STscStmt2*)stmt;
164,340✔
2000
  int32_t    code = 0;
164,340✔
2001

2002
  STMT2_DLOG("start to prepare with sql:%s", sql);
164,340✔
2003

2004
  if (stmt == NULL || sql == NULL) {
164,340✔
2005
    STMT2_ELOG_E("stmt or sql is NULL");
41✔
2006
    return TSDB_CODE_INVALID_PARA;
×
2007
  }
2008

2009
  if (pStmt->sql.status >= STMT_PREPARE) {
164,299✔
2010
    STMT2_DLOG("stmt status is %d, need to reset stmt2 cache before prepare", pStmt->sql.status);
6,872✔
2011
    STMT_ERR_RET(stmtDeepReset(pStmt));
6,872✔
2012
  }
2013

2014
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
164,340✔
2015
    STMT2_ELOG("errCode is not success before, ErrCode: 0x%x, errorsyt: %s\n. ", pStmt->errCode,
198✔
2016
               tstrerror(pStmt->errCode));
2017
    return pStmt->errCode;
198✔
2018
  }
2019

2020
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));
164,142✔
2021

2022
  if (length <= 0) {
164,050✔
2023
    length = strlen(sql);
157,453✔
2024
  }
2025
  pStmt->sql.sqlStr = taosStrndup(sql, length);
164,050✔
2026
  if (!pStmt->sql.sqlStr) {
164,182✔
2027
    STMT2_ELOG("fail to allocate memory for sqlStr:%s", tstrerror(terrno));
×
2028
    STMT_ERR_RET(terrno);
×
2029
  }
2030
  pStmt->sql.sqlLen = length;
164,142✔
2031
  STMT_ERR_RET(stmtCreateRequest(pStmt));
164,127✔
2032

2033
  if (stmt2IsInsert(pStmt)) {
164,066✔
2034
    pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode;
154,127✔
2035
    char* dbName = NULL;
154,182✔
2036
    if (qParseDbName(sql, length, &dbName)) {
154,167✔
2037
      STMT_ERR_RET(stmtSetDbName2(stmt, dbName));
142,522✔
2038
      taosMemoryFreeClear(dbName);
142,447✔
2039
    } else if (pStmt->db != NULL && pStmt->db[0] != '\0') {
11,563✔
2040
      taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
11,328✔
2041
      pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
11,335✔
2042
      if (pStmt->exec.pRequest->pDb == NULL) {
11,408✔
2043
        STMT_ERR_RET(terrno);
×
2044
      }
2045
      (void)strdequote(pStmt->exec.pRequest->pDb);
11,353✔
2046

2047
      if (pStmt->sql.stbInterlaceMode) {
11,316✔
2048
        pStmt->sql.siInfo.dbname = pStmt->exec.pRequest->pDb;
6,429✔
2049
      }
2050
    }
2051

2052
  } else if (stmt2IsSelect(pStmt)) {
9,960✔
2053
    pStmt->sql.stbInterlaceMode = false;
9,366✔
2054
    STMT_ERR_RET(stmtParseSql(pStmt));
9,366✔
2055
  } else {
2056
    return stmtBuildErrorMsgWithCode(pStmt, "stmt only support 'SELECT' or 'INSERT'", TSDB_CODE_PAR_SYNTAX_ERROR);
594✔
2057
  }
2058
  return TSDB_CODE_SUCCESS;
162,856✔
2059
}
2060

2061
static int32_t stmtInitStbInterlaceTableInfo(STscStmt2* pStmt) {
12,881✔
2062
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
12,881✔
2063
  if (!pSrc) {
12,975✔
2064
    return terrno;
×
2065
  }
2066
  STableDataCxt* pDst = NULL;
12,975✔
2067

2068
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
12,975✔
2069
  pStmt->sql.siInfo.pDataCtx = pDst;
12,975✔
2070

2071
  SArray* pTblCols = NULL;
12,975✔
2072
  for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) {
12,964,866✔
2073
    pTblCols = taosArrayInit(20, POINTER_BYTES);
12,951,911✔
2074
    if (NULL == pTblCols) {
12,943,555✔
2075
      return terrno;
×
2076
    }
2077

2078
    if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
25,894,237✔
2079
      return terrno;
×
2080
    }
2081
  }
2082

2083
  pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
12,955✔
2084

2085
  STMT2_TLOG("init stb interlace table info, tbName:%s, pDataCtx:%p, boundTags:%p", pStmt->bInfo.tbFName,
12,955✔
2086
             pStmt->sql.siInfo.pDataCtx, pStmt->sql.siInfo.boundTags);
2087

2088
  return TSDB_CODE_SUCCESS;
12,975✔
2089
}
2090

2091
bool stmt2IsInsert(TAOS_STMT2* stmt) {
1,280,180✔
2092
  STscStmt2* pStmt = (STscStmt2*)stmt;
1,280,180✔
2093
  if (pStmt->sql.type) {
1,280,180✔
2094
    return (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
1,085,695✔
2095
  }
2096

2097
  return qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
194,489✔
2098
}
2099

2100
bool stmt2IsSelect(TAOS_STMT2* stmt) {
149,621✔
2101
  STscStmt2* pStmt = (STscStmt2*)stmt;
149,621✔
2102

2103
  if (pStmt->sql.type) {
149,621✔
2104
    return STMT_TYPE_QUERY == pStmt->sql.type;
×
2105
  }
2106
  return qIsSelectFromSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
149,566✔
2107
}
2108

2109
int stmtSetTbName2(TAOS_STMT2* stmt, const char* tbName) {
1,099,988✔
2110
  STscStmt2* pStmt = (STscStmt2*)stmt;
1,099,988✔
2111

2112
  int64_t startUs = taosGetTimestampUs();
1,100,080✔
2113

2114
  STMT2_TLOG("start to set tbName:%s", tbName);
1,100,080✔
2115

2116
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
1,100,262✔
2117
    return pStmt->errCode;
×
2118
  }
2119

2120
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
1,100,225✔
2121

2122
  int32_t insert = 0;
1,099,846✔
2123
  if (!stmt2IsInsert(stmt)) {
1,099,846✔
2124
    STMT2_ELOG_E("set tb name not available for no-insert statement");
×
2125
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2126
  }
2127
  // process tbname
2128
  STMT_ERR_RET(stmtCreateRequest(pStmt));
1,099,940✔
2129

2130
  STMT_ERR_RET(qCreateSName2(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
1,099,849✔
2131
                             pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
2132
  STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
1,098,578✔
2133
  tstrncpy(pStmt->bInfo.tbName, (char*)tNameGetTableName(&pStmt->bInfo.sname), TSDB_TABLE_NAME_LEN);
1,099,014✔
2134

2135
  if (!pStmt->sql.stbInterlaceMode || NULL == pStmt->sql.siInfo.pDataCtx) {
1,098,834✔
2136
    STMT_ERR_RET(stmtGetFromCache(pStmt));
43,019✔
2137

2138
    if (pStmt->bInfo.needParse) {
42,301✔
2139
      STMT_ERR_RET(stmtParseSql(pStmt));
20,869✔
2140
      if (!pStmt->sql.autoCreateTbl) {
20,869✔
2141
        uint64_t uid, suid;
2,487✔
2142
        int32_t  vgId;
2,487✔
2143
        int8_t   tableType;
2,542✔
2144

2145
        int32_t code = stmtGetTableMetaAndValidate(pStmt, &uid, &suid, &vgId, &tableType);
6,753✔
2146
        if (code != TSDB_CODE_SUCCESS) {
6,808✔
2147
          return code;
792✔
2148
        }
2149
      }
2150
    }
2151

2152
  } else {
2153
    pStmt->exec.pRequest->requestId++;
1,055,967✔
2154
    pStmt->bInfo.needParse = false;
1,056,256✔
2155
  }
2156

2157
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
1,097,672✔
2158
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
12,881✔
2159
  }
2160

2161
  int64_t startUs2 = taosGetTimestampUs();
1,097,932✔
2162
  pStmt->stat.setTbNameUs += startUs2 - startUs;
1,097,932✔
2163

2164
  return TSDB_CODE_SUCCESS;
1,097,888✔
2165
}
2166

2167
int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags, SVCreateTbReq** pCreateTbReq) {
567,184✔
2168
  STscStmt2* pStmt = (STscStmt2*)stmt;
567,184✔
2169

2170
  STMT2_TLOG_E("start to set tbTags");
567,184✔
2171
  if (qDebugFlag & DEBUG_TRACE) {
567,184✔
2172
    (void)stmtPrintBindv(stmt, tags, -1, true);
×
2173
  }
2174

2175
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
567,223✔
2176
    return pStmt->errCode;
×
2177
  }
2178

2179
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
567,223✔
2180

2181
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
567,275✔
2182
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
2183
    pStmt->bInfo.needParse = false;
×
2184
  }
2185
  STMT_ERR_RET(stmtCreateRequest(pStmt));
567,275✔
2186

2187
  if (pStmt->bInfo.needParse) {
567,262✔
2188
    STMT_ERR_RET(stmtParseSql(pStmt));
396✔
2189
  }
2190
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
567,262✔
2191
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
2192
  }
2193

2194
  SBoundColInfo* tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
567,236✔
2195

2196
  STableDataCxt** pDataBlock = NULL;
567,236✔
2197
  if (pStmt->exec.pCurrBlock) {
567,236✔
2198
    pDataBlock = &pStmt->exec.pCurrBlock;
556,070✔
2199
  } else {
2200
    pDataBlock =
2201
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
11,166✔
2202
    if (NULL == pDataBlock) {
11,166✔
2203
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2204
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
2205
    }
2206
    if (pStmt->sql.stbInterlaceMode && (*pDataBlock)->pData->pCreateTbReq) {
11,166✔
2207
      tdDestroySVCreateTbReq((*pDataBlock)->pData->pCreateTbReq);
396✔
2208
      taosMemoryFreeClear((*pDataBlock)->pData->pCreateTbReq);
396✔
2209
      (*pDataBlock)->pData->pCreateTbReq = NULL;
396✔
2210
    }
2211
  }
2212
  if (pStmt->bInfo.inExecCache && !pStmt->sql.autoCreateTbl) {
567,236✔
2213
    return TSDB_CODE_SUCCESS;
×
2214
  }
2215

2216
  STMT2_TLOG_E("start to bind stmt tag values");
567,236✔
2217

2218
  void* boundTags = NULL;
567,145✔
2219
  if (pStmt->sql.stbInterlaceMode) {
567,145✔
2220
    boundTags = pStmt->sql.siInfo.boundTags;
543,363✔
2221
    *pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
543,363✔
2222
    if (NULL == pCreateTbReq) {
543,454✔
2223
      return terrno;
×
2224
    }
2225
    int32_t vgId = -1;
543,454✔
2226
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
543,454✔
2227
    (*pCreateTbReq)->uid = vgId;
543,649✔
2228
  } else {
2229
    boundTags = pStmt->bInfo.boundTags;
23,782✔
2230
  }
2231

2232
  STMT_ERR_RET(qBindStmtTagsValue2(*pDataBlock, boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
567,431✔
2233
                                   pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
2234
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt, *pCreateTbReq));
2235

2236
  return TSDB_CODE_SUCCESS;
567,142✔
2237
}
2238

2239
int stmtCheckTags2(TAOS_STMT2* stmt, SVCreateTbReq** pCreateTbReq) {
109,897✔
2240
  STscStmt2* pStmt = (STscStmt2*)stmt;
109,897✔
2241

2242
  STMT2_TLOG_E("start to clone createTbRequest for fixed tags");
109,897✔
2243

2244
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
109,962✔
2245
    return pStmt->errCode;
×
2246
  }
2247

2248
  if (!pStmt->sql.stbInterlaceMode) {
109,962✔
2249
    return TSDB_CODE_SUCCESS;
×
2250
  }
2251

2252
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
109,962✔
2253

2254
  if (pStmt->sql.fixValueTags) {
109,936✔
2255
    STMT2_TLOG_E("tags are fixed, use one createTbReq");
106,590✔
2256
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
106,590✔
2257
    if ((*pCreateTbReq)->name) {
106,512✔
2258
      taosMemoryFree((*pCreateTbReq)->name);
106,525✔
2259
    }
2260
    (*pCreateTbReq)->name = taosStrdup(pStmt->bInfo.tbName);
106,538✔
2261
    int32_t vgId = -1;
106,590✔
2262
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
106,590✔
2263
    (*pCreateTbReq)->uid = vgId;
106,551✔
2264
    return TSDB_CODE_SUCCESS;
106,551✔
2265
  }
2266

2267
  STMT_ERR_RET(stmtCreateRequest(pStmt));
3,346✔
2268
  if (pStmt->bInfo.needParse) {
3,346✔
2269
    STMT_ERR_RET(stmtParseSql(pStmt));
×
2270
    if (!pStmt->sql.autoCreateTbl) {
×
2271
      STMT2_WLOG_E("don't need to create table, will not check tags");
×
2272
      return TSDB_CODE_SUCCESS;
×
2273
    }
2274
  }
2275

2276
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
3,346✔
2277
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
2278
  }
2279

2280
  STableDataCxt** pDataBlock = NULL;
3,346✔
2281
  if (pStmt->exec.pCurrBlock) {
3,346✔
2282
    pDataBlock = &pStmt->exec.pCurrBlock;
×
2283
  } else {
2284
    pDataBlock =
2285
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
3,346✔
2286
    if (NULL == pDataBlock) {
3,346✔
2287
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2288
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
2289
    }
2290
  }
2291

2292
  if (!((*pDataBlock)->pData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE)) {
3,346✔
2293
    STMT2_DLOG_E("don't need to create, will not check tags");
×
2294
    return TSDB_CODE_SUCCESS;
×
2295
  }
2296

2297

2298
  if ((*pDataBlock)->pData->pCreateTbReq) {
3,346✔
2299
    STMT2_TLOG_E("tags are fixed, set createTbReq first time");
3,346✔
2300
    pStmt->sql.fixValueTags = true;
3,346✔
2301
    STMT_ERR_RET(cloneSVreateTbReq((*pDataBlock)->pData->pCreateTbReq, &pStmt->sql.fixValueTbReq));
3,346✔
2302
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
3,346✔
2303
    (*pCreateTbReq)->uid = (*pDataBlock)->pMeta->vgId;
3,346✔
2304

2305
    // destroy the createTbReq in the data block
2306
    tdDestroySVCreateTbReq((*pDataBlock)->pData->pCreateTbReq);
3,346✔
2307
    taosMemoryFreeClear((*pDataBlock)->pData->pCreateTbReq);
3,346✔
2308
    (*pDataBlock)->pData->pCreateTbReq = NULL;
3,346✔
2309
  }
2310

2311
  return TSDB_CODE_SUCCESS;
3,346✔
2312
}
2313

2314
static int stmtFetchColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
×
2315
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
2316
    return pStmt->errCode;
×
2317
  }
2318

2319
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
2320
    tscError("invalid operation to get query column fileds");
×
2321
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2322
  }
2323

2324
  STableDataCxt** pDataBlock = NULL;
×
2325

2326
  if (pStmt->sql.stbInterlaceMode) {
×
2327
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
×
2328
  } else {
2329
    pDataBlock =
2330
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
2331
    if (NULL == pDataBlock) {
×
2332
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2333
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
2334
    }
2335
  }
2336

2337
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
×
2338

2339
  return TSDB_CODE_SUCCESS;
×
2340
}
2341

2342
static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_ALL** fields) {
10,719✔
2343
  int32_t code = 0;
10,719✔
2344
  int32_t preCode = pStmt->errCode;
10,719✔
2345

2346
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
10,719✔
2347
    return pStmt->errCode;
×
2348
  }
2349

2350
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
10,719✔
2351
    STMT2_ELOG_E("stmtFetchStbColFields2 only for insert statement");
×
2352
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2353
  }
2354

2355
  STableDataCxt** pDataBlock = NULL;
10,719✔
2356
  bool            cleanStb = false;
10,719✔
2357

2358
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
10,719✔
2359
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
1,188✔
2360
  } else {
2361
    cleanStb = true;
9,531✔
2362
    pDataBlock =
2363
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
9,531✔
2364
  }
2365

2366
  if (NULL == pDataBlock || NULL == *pDataBlock) {
10,719✔
2367
    STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2368
    STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
2369
  }
2370

2371
  pStmt->sql.placeholderOfTags = 0;
10,719✔
2372
  pStmt->sql.placeholderOfCols = 0;
10,719✔
2373
  int32_t totalNum = 0;
10,719✔
2374
  STMT_ERRI_JRET(qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.fixedValueCols,
10,719✔
2375
                                        pStmt->bInfo.tbNameFlag, &totalNum, fields, &pStmt->sql.placeholderOfTags,
2376
                                        &pStmt->sql.placeholderOfCols));
2377

2378
  if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE && cleanStb) {
10,719✔
2379
    taosMemoryFreeClear((*pDataBlock)->boundColsInfo.pColIndex);
7,551✔
2380
    qDestroyStmtDataBlock(*pDataBlock);
7,551✔
2381
    *pDataBlock = NULL;
7,551✔
2382
    if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
7,551✔
2383
      STMT2_ELOG("fail to remove remove stb:%s exec blockHash", pStmt->bInfo.tbFName);
×
2384
      STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
2385
    }
2386
    pStmt->sql.autoCreateTbl = false;
7,551✔
2387
    pStmt->bInfo.tagsCached = false;
7,551✔
2388
    pStmt->bInfo.sname = (SName){0};
7,551✔
2389
    STMT_ERR_RET(stmtCleanBindInfo(pStmt));
7,551✔
2390
  }
2391

2392
  if (fieldNum != NULL) {
10,719✔
2393
    *fieldNum = totalNum;
10,719✔
2394
  }
2395

2396
  STMT2_DLOG("get insert fields totalNum:%d, tagsNum:%d, colsNum:%d", totalNum, pStmt->sql.placeholderOfTags,
10,719✔
2397
             pStmt->sql.placeholderOfCols);
2398

2399
_return:
10,719✔
2400

2401
  pStmt->errCode = preCode;
10,719✔
2402

2403
  return code;
10,719✔
2404
}
2405
/*
2406
SArray* stmtGetFreeCol(STscStmt2* pStmt, int32_t* idx) {
2407
  while (true) {
2408
    if (pStmt->exec.smInfo.pColIdx >= STMT_COL_BUF_SIZE) {
2409
      pStmt->exec.smInfo.pColIdx = 0;
2410
    }
2411

2412
    if ((pStmt->exec.smInfo.pColIdx + 1) == atomic_load_32(&pStmt->exec.smInfo.pColFreeIdx)) {
2413
      taosUsleep(1);
2414
      continue;
2415
    }
2416

2417
    *idx = pStmt->exec.smInfo.pColIdx;
2418
    return pStmt->exec.smInfo.pCols[pStmt->exec.smInfo.pColIdx++];
2419
  }
2420
}
2421
*/
2422
static int32_t stmtAppendTablePostHandle(STscStmt2* pStmt, SStmtQNode* param) {
1,069,065✔
2423
  if (NULL == pStmt->sql.siInfo.pVgroupHash) {
1,069,065✔
2424
    pStmt->sql.siInfo.pVgroupHash =
507,619✔
2425
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
507,624✔
2426
  }
2427
  if (NULL == pStmt->sql.siInfo.pVgroupList) {
1,069,215✔
2428
    pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES);
507,656✔
2429
  }
2430

2431
  if (NULL == pStmt->sql.siInfo.pRequest) {
1,069,255✔
2432
    STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false,
12,579✔
2433
                              (SRequestObj**)&pStmt->sql.siInfo.pRequest, pStmt->reqid));
2434

2435
    if (pStmt->reqid != 0) {
12,579✔
2436
      pStmt->reqid++;
10✔
2437
    }
2438
    pStmt->exec.pRequest->syncQuery = true;
12,579✔
2439

2440
    pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId;
12,579✔
2441
    pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self;
12,579✔
2442
  }
2443

2444
  if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] &&
1,069,255✔
2445
      0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) {
28,786✔
2446
    pStmt->sql.siInfo.tbFromHash = true;
4,593✔
2447
  }
2448

2449
  if (0 == pStmt->sql.siInfo.firstName[0]) {
1,069,292✔
2450
    tstrncpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
12,579✔
2451
  }
2452

2453
  param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash;
1,069,255✔
2454
  param->next = NULL;
1,069,255✔
2455

2456
  (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
1,069,292✔
2457

2458
  if (pStmt->queue.stopQueue) {
1,069,448✔
2459
    STMT2_ELOG_E("bind thread already stopped, cannot enqueue");
×
2460
    return TSDB_CODE_TSC_STMT_API_ERROR;
×
2461
  }
2462
  stmtEnqueue(pStmt, param);
1,069,448✔
2463

2464
  return TSDB_CODE_SUCCESS;
1,069,352✔
2465
}
2466

2467
static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt2* pStmt, SArray** pTableCols) {
2468
  while (true) {
2469
    if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) {
1,069,080✔
2470
      *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++);
1,069,301✔
2471
      break;
1,069,177✔
2472
    } else {
2473
      SArray* pTblCols = NULL;
×
2474
      for (int32_t i = 0; i < 100; i++) {
×
2475
        pTblCols = taosArrayInit(20, POINTER_BYTES);
×
2476
        if (NULL == pTblCols) {
×
2477
          return terrno;
×
2478
        }
2479

2480
        if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
×
2481
          return terrno;
×
2482
        }
2483
      }
2484
    }
2485
  }
2486

2487
  return TSDB_CODE_SUCCESS;
1,069,177✔
2488
}
2489

2490
static int32_t stmtCacheBlock(STscStmt2* pStmt) {
84,346,106✔
2491
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
84,346,106✔
2492
    return TSDB_CODE_SUCCESS;
85,523,480✔
2493
  }
2494

2495
  uint64_t uid = pStmt->bInfo.tbUid;
27,720✔
2496
  uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
27,720✔
2497

2498
  if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
27,720✔
2499
    STMT2_TLOG("table %s already cached, no need to cache again", pStmt->bInfo.tbFName);
21,179✔
2500
    return TSDB_CODE_SUCCESS;
21,179✔
2501
  }
2502

2503
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
6,761✔
2504
  if (!pSrc) {
6,761✔
2505
    STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2506
    return terrno;
×
2507
  }
2508
  STableDataCxt* pDst = NULL;
6,761✔
2509

2510
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
6,761✔
2511

2512
  SStmtTableCache cache = {
6,761✔
2513
      .pDataCtx = pDst,
2514
      .boundTags = pStmt->bInfo.boundTags,
6,761✔
2515
  };
2516

2517
  if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
6,761✔
2518
    STMT2_ELOG("fail to put table cache:%s", tstrerror(terrno));
×
2519
    return terrno;
×
2520
  }
2521

2522
  if (pStmt->sql.autoCreateTbl) {
6,761✔
2523
    pStmt->bInfo.tagsCached = true;
5,177✔
2524
  } else {
2525
    pStmt->bInfo.boundTags = NULL;
1,584✔
2526
  }
2527

2528
  return TSDB_CODE_SUCCESS;
6,761✔
2529
}
2530

2531
static int stmtAddBatch2(TAOS_STMT2* stmt) {
85,056,040✔
2532
  STscStmt2* pStmt = (STscStmt2*)stmt;
85,056,040✔
2533

2534
  int64_t startUs = taosGetTimestampUs();
86,683,060✔
2535

2536
  // STMT2_TLOG_E("start to add batch");
2537

2538
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
86,683,060✔
2539
    return pStmt->errCode;
×
2540
  }
2541

2542
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
86,880,558✔
2543

2544
  if (pStmt->sql.stbInterlaceMode) {
86,576,046✔
2545
    int64_t startUs2 = taosGetTimestampUs();
507,104✔
2546
    pStmt->stat.addBatchUs += startUs2 - startUs;
507,104✔
2547

2548
    pStmt->sql.siInfo.tableColsReady = false;
507,104✔
2549

2550
    SStmtQNode* param = NULL;
507,104✔
2551
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
1,014,206✔
2552
    param->restoreTbCols = true;
507,065✔
2553
    param->next = NULL;
507,065✔
2554

2555
    if (pStmt->sql.autoCreateTbl) {
507,065✔
2556
      pStmt->bInfo.tagsCached = true;
220,409✔
2557
    }
2558
    pStmt->bInfo.boundColsCached = true;
507,065✔
2559

2560
    if (pStmt->queue.stopQueue) {
507,102✔
2561
      STMT2_ELOG_E("stmt bind thread is stopped,cannot enqueue bind request");
×
2562
      return TSDB_CODE_TSC_STMT_API_ERROR;
×
2563
    }
2564

2565
    stmtEnqueue(pStmt, param);
507,102✔
2566

2567
    return TSDB_CODE_SUCCESS;
507,112✔
2568
  }
2569

2570
  STMT_ERR_RET(stmtCacheBlock(pStmt));
85,558,717✔
2571

2572
  return TSDB_CODE_SUCCESS;
84,909,805✔
2573
}
2574
/*
2575
static int32_t stmtBackupQueryFields(STscStmt2* pStmt) {
2576
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
2577
  pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
2578
  pRes->precision = pStmt->exec.pRequest->body.resInfo.precision;
2579

2580
  int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
2581
  pRes->fields = taosMemoryMalloc(size);
2582
  pRes->userFields = taosMemoryMalloc(size);
2583
  if (NULL == pRes->fields || NULL == pRes->userFields) {
2584
    STMT_ERR_RET(terrno);
2585
  }
2586
  (void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
2587
  (void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
2588

2589
  return TSDB_CODE_SUCCESS;
2590
}
2591

2592
static int32_t stmtRestoreQueryFields(STscStmt2* pStmt) {
2593
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
2594
  int32_t            size = pRes->numOfCols * sizeof(TAOS_FIELD);
2595

2596
  pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols;
2597
  pStmt->exec.pRequest->body.resInfo.precision = pRes->precision;
2598

2599
  if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
2600
    pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size);
2601
    if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
2602
      STMT_ERR_RET(terrno);
2603
    }
2604
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
2605
  }
2606

2607
  if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
2608
    pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size);
2609
    if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
2610
      STMT_ERR_RET(terrno);
2611
    }
2612
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
2613
  }
2614

2615
  return TSDB_CODE_SUCCESS;
2616
}
2617
*/
2618

2619
/**
2620
 * Fetch metadata for query statement after parameter binding.
2621
 * This function collects metadata requirements from the query (after binding),
2622
 * fetches metadata synchronously from catalog, and returns it for parsing.
2623
 *
2624
 * Note: We fetch metadata on every bind because:
2625
 * 1. Parameter values in WHERE conditions (e.g., dataname IN (?,?)) may change
2626
 * 2. Different parameter values may require different vgroup lists for virtual tables
2627
 * 3. Metadata requirements can only be determined after parameters are bound
2628
 *
2629
 * @param pStmt Statement handle
2630
 * @param pCxt Parse context (must have catalog handle initialized)
2631
 * @param pMetaData Output: Fetched metadata (caller responsible for cleanup)
2632
 * @return TSDB_CODE_SUCCESS on success, error code otherwise
2633
 */
2634
// Callback parameter structure for synchronous catalog metadata fetch
2635
typedef struct {
2636
  SMetaData* pRsp;
2637
  int32_t    code;
2638
  tsem_t     sem;
2639
} SCatalogSyncCbParam;
2640

2641
// Callback function for catalogAsyncGetAllMeta to make it synchronous
2642
static void stmtCatalogSyncGetAllMetaCb(SMetaData* pResultMeta, void* param, int32_t code) {
10,721✔
2643
  SCatalogSyncCbParam* pCbParam = (SCatalogSyncCbParam*)param;
10,721✔
2644
  if (TSDB_CODE_SUCCESS == code && pResultMeta) {
10,721✔
2645
    *pCbParam->pRsp = *pResultMeta;
10,721✔
2646
    TAOS_MEMSET(pResultMeta, 0, sizeof(SMetaData));  // Clear to avoid double free
10,721✔
2647
  }
2648
  pCbParam->code = code;
10,721✔
2649
  if (tsem_post(&pCbParam->sem) != 0) {
10,721✔
2650
    tscError("failed to post semaphore");
×
2651
  }
2652
}
10,721✔
2653

2654
static int32_t stmtFetchMetadataForQuery(STscStmt2* pStmt, SParseContext* pCxt, SMetaData* pMetaData) {
10,721✔
2655
  int32_t          code = 0;
10,721✔
2656
  SParseMetaCache  metaCache = {0};
10,721✔
2657
  SCatalogReq      catalogReq = {0};
10,721✔
2658
  SRequestConnInfo conn = {.pTrans = pCxt->pTransporter,
13,710✔
2659
                           .requestId = pCxt->requestId,
10,721✔
2660
                           .requestObjRefId = pCxt->requestRid,
10,721✔
2661
                           .mgmtEps = pCxt->mgmtEpSet};
2662

2663
  TAOS_MEMSET(pMetaData, 0, sizeof(SMetaData));
10,721✔
2664

2665
  code = collectMetaKey(pCxt, pStmt->sql.pQuery, &metaCache);
10,721✔
2666
  if (TSDB_CODE_SUCCESS == code) {
10,721✔
2667
    code = buildCatalogReq(&metaCache, &catalogReq);
10,721✔
2668
  }
2669
  if (TSDB_CODE_SUCCESS == code) {
10,721✔
2670
    SCatalogSyncCbParam cbParam = {.pRsp = pMetaData, .code = TSDB_CODE_SUCCESS};
10,721✔
2671
    if (tsem_init(&cbParam.sem, 0, 0) != 0) {
10,721✔
2672
      code = TSDB_CODE_CTG_INTERNAL_ERROR;
×
2673
    } else {
2674
      code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, stmtCatalogSyncGetAllMetaCb, &cbParam, NULL);
10,721✔
2675
      if (TSDB_CODE_SUCCESS == code) {
10,721✔
2676
        code = tsem_wait(&cbParam.sem);
10,721✔
2677
        if (code != TSDB_CODE_SUCCESS) {
10,721✔
2678
          catalogFreeMetaData(pMetaData);
×
2679
          TAOS_MEMSET(pMetaData, 0, sizeof(SMetaData));
×
2680
        } else {
2681
          code = cbParam.code;
10,721✔
2682
        }
2683
      }
2684

2685
      if (tsem_destroy(&cbParam.sem) != 0) {
10,721✔
2686
        tscError("failed to destroy semaphore");
×
2687
        code = TSDB_CODE_CTG_INTERNAL_ERROR;
×
2688
        catalogFreeMetaData(pMetaData);
×
2689
        TAOS_MEMSET(pMetaData, 0, sizeof(SMetaData));
×
2690
      }
2691
    }
2692
  }
2693

2694
  // metaCache currently holds "reserved/request" structures built by collectMetaKey/buildCatalogReq.
2695
  // It must be destroyed with request=true to release nested table-request hashes.
2696
  destoryParseMetaCache(&metaCache, true);
10,721✔
2697
  destoryCatalogReq(&catalogReq);
10,721✔
2698

2699
  if (TSDB_CODE_SUCCESS != code) {
10,721✔
2700
    catalogFreeMetaData(pMetaData);
×
2701
  }
2702

2703
  return code;
10,721✔
2704
}
2705

2706
int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx, SVCreateTbReq* pCreateTbReq) {
85,807,114✔
2707
  STscStmt2* pStmt = (STscStmt2*)stmt;
85,807,114✔
2708
  int32_t    code = 0;
85,807,114✔
2709

2710
  int64_t startUs = taosGetTimestampUs();
85,879,705✔
2711

2712
  if (qDebugFlag & DEBUG_TRACE) {
85,879,705✔
2713
    (void)stmtPrintBindv(stmt, bind, colIdx, false);
×
2714
  }
2715

2716
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
85,826,165✔
2717
    return pStmt->errCode;
198✔
2718
  }
2719

2720
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
87,059,587✔
2721

2722
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
87,128,302✔
2723
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
2724
    pStmt->bInfo.needParse = false;
×
2725
  }
2726

2727
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
86,210,593✔
2728
    resetRequest(pStmt);
2,209✔
2729
  }
2730

2731
  STMT_ERR_RET(stmtCreateRequest(pStmt));
86,932,426✔
2732
  if (pStmt->bInfo.needParse) {
86,099,360✔
2733
    code = stmtParseSql(pStmt);
121,375✔
2734
    if (code != TSDB_CODE_SUCCESS) {
121,336✔
2735
      goto cleanup_root;
×
2736
    }
2737
  }
2738

2739
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
85,949,604✔
2740
    code = qStmtBindParams2(pStmt->sql.pQuery, bind, colIdx, pStmt->taos->optionInfo.charsetCxt);
10,721✔
2741
    if (code != TSDB_CODE_SUCCESS) {
10,721✔
2742
      goto cleanup_root;
×
2743
    }
2744
    SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
10,721✔
2745
                         .acctId = pStmt->taos->acctId,
10,721✔
2746
                         .minSecLevel = pStmt->taos->minSecLevel,
10,721✔
2747
                         .maxSecLevel = pStmt->taos->maxSecLevel,
10,721✔
2748
                         .db = pStmt->exec.pRequest->pDb,
10,721✔
2749
                         .topicQuery = false,
2750
                         .pSql = pStmt->sql.sqlStr,
10,721✔
2751
                         .sqlLen = pStmt->sql.sqlLen,
10,721✔
2752
                         .pMsg = pStmt->exec.pRequest->msgBuf,
10,721✔
2753
                         .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
2754
                         .pTransporter = pStmt->taos->pAppInfo->pTransporter,
10,721✔
2755
                         .pStmtCb = NULL,
2756
                         .pUser = pStmt->taos->user,
10,721✔
2757
                         .stmtBindVersion = pStmt->exec.pRequest->stmtBindVersion};
10,721✔
2758
    ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
10,721✔
2759
    code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog);
10,721✔
2760
    if (code != TSDB_CODE_SUCCESS) {
10,721✔
2761
      goto cleanup_root;
×
2762
    }
2763

2764
    // Fetch metadata for query(vtable need)
2765
    SMetaData metaData = {0};
10,721✔
2766
    code = stmtFetchMetadataForQuery(pStmt, &ctx, &metaData);
10,721✔
2767
    if (TSDB_CODE_SUCCESS != code) {
10,721✔
2768
      goto cleanup_root;
×
2769
    }
2770

2771
    code = qStmtParseQuerySql(&ctx, pStmt->sql.pQuery, &metaData);
10,721✔
2772
    if (TSDB_CODE_SUCCESS == code) {
10,721✔
2773
      // Copy metaData to pRequest->parseMeta for potential future use
2774
      // Similar to doAsyncQueryFromAnalyse when parseOnly is true
2775
      (void)memcpy(&pStmt->exec.pRequest->parseMeta, &metaData, sizeof(SMetaData));
10,523✔
2776
      (void)memset(&metaData, 0, sizeof(SMetaData));  // Clear to avoid double free
10,523✔
2777
    } else {
2778
      // Clean up metaData on failure - free all arrays
2779
      if (metaData.pVStbRefDbs) {
198✔
2780
        taosArrayDestroy(metaData.pVStbRefDbs);
198✔
2781
        metaData.pVStbRefDbs = NULL;
198✔
2782
      }
2783
      // Note: Other fields in metaData are managed by catalog module if ctgFree is true
2784
      goto cleanup_root;
198✔
2785
    }
2786

2787
    if (pStmt->sql.pQuery->haveResultSet) {
10,523✔
2788
      STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
18,255✔
2789
                                    pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true));
2790
      taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
10,523✔
2791
      taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
10,523✔
2792
      setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
10,523✔
2793
    }
2794

2795
    TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
10,523✔
2796
    TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
10,523✔
2797
    TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
10,523✔
2798

2799
    // if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
2800
    //   STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
2801
    // }
2802

2803
    // STMT_ERR_RET(stmtBackupQueryFields(pStmt));
2804

2805
    return TSDB_CODE_SUCCESS;
10,523✔
2806

2807
  cleanup_root:
198✔
2808
    STMT2_ELOG("parse query statment unexpected failed code:%d, need to clean node", code);
198✔
2809
    if (pStmt->sql.pQuery && pStmt->sql.pQuery->pRoot) {
198✔
2810
      nodesDestroyNode(pStmt->sql.pQuery->pRoot);
198✔
2811
      pStmt->sql.pQuery->pRoot = NULL;
198✔
2812
    }
2813
    STMT_ERR_RET(code);
198✔
2814
  }
2815

2816
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
86,497,754✔
2817
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
2818
  }
2819

2820
  STableDataCxt** pDataBlock = NULL;
85,821,681✔
2821

2822
  if (pStmt->exec.pCurrBlock) {
85,821,681✔
2823
    pDataBlock = &pStmt->exec.pCurrBlock;
85,444,365✔
2824
  } else {
2825
    pDataBlock =
2826
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
141,625✔
2827
    if (NULL == pDataBlock) {
141,707✔
2828
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2829
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
2830
    }
2831
    pStmt->exec.pCurrBlock = *pDataBlock;
141,707✔
2832
    if (pStmt->sql.stbInterlaceMode) {
141,707✔
2833
      taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
12,975✔
2834
      (*pDataBlock)->pData->aCol = NULL;
12,975✔
2835
    }
2836
    if (colIdx < -1) {
141,707✔
2837
      pStmt->sql.bindRowFormat = true;
198✔
2838
      taosArrayDestroy((*pDataBlock)->pData->aCol);
198✔
2839
      (*pDataBlock)->pData->aCol = taosArrayInit(20, POINTER_BYTES);
198✔
2840
    }
2841
  }
2842

2843
  int64_t startUs2 = taosGetTimestampUs();
86,656,775✔
2844
  pStmt->stat.bindDataUs1 += startUs2 - startUs;
86,656,775✔
2845

2846
  SStmtQNode* param = NULL;
86,565,544✔
2847
  if (pStmt->sql.stbInterlaceMode) {
87,221,835✔
2848
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
2,138,197✔
2849
    STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, &param->tblData.aCol));
2,138,169✔
2850
    taosArrayClear(param->tblData.aCol);
1,069,177✔
2851

2852
    // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES);
2853

2854
    param->restoreTbCols = false;
1,068,771✔
2855
    param->tblData.isOrdered = true;
1,068,852✔
2856
    param->tblData.isDuplicateTs = false;
1,068,970✔
2857
    tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
1,068,970✔
2858

2859
    param->pCreateTbReq = pCreateTbReq;
1,068,977✔
2860
  }
2861

2862
  int64_t startUs3 = taosGetTimestampUs();
86,624,624✔
2863
  pStmt->stat.bindDataUs2 += startUs3 - startUs2;
86,624,624✔
2864

2865
  SArray*   pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol;
86,537,246✔
2866
  SBlobSet* pBlob = NULL;
87,675,644✔
2867
  if (colIdx < 0) {
87,647,815✔
2868
    if (pStmt->sql.stbInterlaceMode) {
87,884,632✔
2869
      (*pDataBlock)->pData->flags &= ~SUBMIT_REQ_COLUMN_DATA_FORMAT;
1,069,383✔
2870
      code = qBindStmtStbColsValue2(*pDataBlock, pCols, pStmt->bInfo.fixedValueCols, bind, pStmt->exec.pRequest->msgBuf,
1,358,314✔
2871
                                    pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
1,069,605✔
2872
                                    pStmt->taos->optionInfo.charsetCxt, &pBlob);
1,069,442✔
2873
      param->tblData.isOrdered = (*pDataBlock)->ordered;
1,069,559✔
2874
      param->tblData.isDuplicateTs = (*pDataBlock)->duplicateTs;
1,069,714✔
2875
    } else {
2876
      if (colIdx == -1) {
85,597,905✔
2877
        if (pStmt->sql.bindRowFormat) {
84,809,594✔
2878
          STMT2_ELOG_E("can't mix bind row format and bind column format");
198✔
2879
          STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
198✔
2880
        }
2881
        code = qBindStmtColsValue2(*pDataBlock, pCols, pStmt->bInfo.fixedValueCols, bind, pStmt->exec.pRequest->msgBuf,
166,817,951✔
2882
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt);
165,859,544✔
2883
      } else {
2884
        code =
2885
            qBindStmt2RowValue(*pDataBlock, (*pDataBlock)->pData->aRowP, pStmt->bInfo.fixedValueCols, bind,
396✔
2886
                               pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen,
396✔
2887
                               &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo, pStmt->taos->optionInfo.charsetCxt);
813,224✔
2888
      }
2889
    }
2890

2891
    if (code) {
85,333,680✔
2892
      STMT2_ELOG("bind cols or rows failed, error:%s", tstrerror(code));
396✔
2893
      STMT_ERR_RET(code);
396✔
2894
    }
2895
  } else {
2896
    if (pStmt->sql.stbInterlaceMode) {
1,188✔
2897
      STMT2_ELOG_E("bind single column not allowed in stb insert mode");
×
2898
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2899
    }
2900

2901
    if (pStmt->sql.bindRowFormat) {
1,188✔
2902
      STMT2_ELOG_E("can't mix bind row format and bind column format");
×
2903
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2904
    }
2905

2906
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
1,188✔
2907
      STMT2_ELOG_E("bind column index not in sequence");
×
2908
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2909
    }
2910

2911
    pStmt->bInfo.sBindLastIdx = colIdx;
1,188✔
2912

2913
    if (0 == colIdx) {
1,188✔
2914
      pStmt->bInfo.sBindRowNum = bind->num;
594✔
2915
    }
2916

2917
    code = qBindStmtSingleColValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
1,188✔
2918
                                    pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum,
1,188✔
2919
                                    pStmt->taos->optionInfo.charsetCxt);
1,188✔
2920
    if (code) {
1,188✔
2921
      STMT2_ELOG("bind single col failed, error:%s", tstrerror(code));
×
2922
      STMT_ERR_RET(code);
×
2923
    }
2924
  }
2925

2926
  int64_t startUs4 = taosGetTimestampUs();
85,817,551✔
2927
  pStmt->stat.bindDataUs3 += startUs4 - startUs3;
85,817,551✔
2928

2929
  if (pStmt->stbInterlaceMode) {
87,146,305✔
2930
    if (param) param->tblData.pBlobSet = pBlob;
87,571,917✔
2931
  }
2932

2933
  if (pStmt->sql.stbInterlaceMode) {
87,358,726✔
2934
    STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param));
1,069,333✔
2935
  } else {
2936
    STMT_ERR_RET(stmtAddBatch2(pStmt));
85,233,343✔
2937
  }
2938

2939
  pStmt->stat.bindDataUs4 += taosGetTimestampUs() - startUs4;
86,387,879✔
2940
  return TSDB_CODE_SUCCESS;
87,461,432✔
2941
}
2942

2943
/*
2944
int stmtUpdateTableUid(STscStmt2* pStmt, SSubmitRsp* pRsp) {
2945
  tscDebug("stmt start to update tbUid, blockNum:%d", pRsp->nBlocks);
2946

2947
  int32_t code = 0;
2948
  int32_t finalCode = 0;
2949
  size_t  keyLen = 0;
2950
  void*   pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
2951
  while (pIter) {
2952
    STableDataCxt* pBlock = *(STableDataCxt**)pIter;
2953
    char*          key = taosHashGetKey(pIter, &keyLen);
2954

2955
    STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
2956
    if (pMeta->uid) {
2957
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
2958
      continue;
2959
    }
2960

2961
    SSubmitBlkRsp* blkRsp = NULL;
2962
    int32_t        i = 0;
2963
    for (; i < pRsp->nBlocks; ++i) {
2964
      blkRsp = pRsp->pBlocks + i;
2965
      if (strlen(blkRsp->tblFName) != keyLen) {
2966
        continue;
2967
      }
2968

2969
      if (strncmp(blkRsp->tblFName, key, keyLen)) {
2970
        continue;
2971
      }
2972

2973
      break;
2974
    }
2975

2976
    if (i < pRsp->nBlocks) {
2977
      tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid,
2978
               blkRsp->uid);
2979

2980
      pMeta->uid = blkRsp->uid;
2981
      pStmt->bInfo.tbUid = blkRsp->uid;
2982
    } else {
2983
      tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
2984
      if (NULL == pStmt->pCatalog) {
2985
        code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
2986
        if (code) {
2987
          pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
2988
          finalCode = code;
2989
          continue;
2990
        }
2991
      }
2992

2993
      code = stmtCreateRequest(pStmt);
2994
      if (code) {
2995
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
2996
        finalCode = code;
2997
        continue;
2998
      }
2999

3000
      STableMeta*      pTableMeta = NULL;
3001
      SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
3002
                               .requestId = pStmt->exec.pRequest->requestId,
3003
                               .requestObjRefId = pStmt->exec.pRequest->self,
3004
                               .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
3005
      code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
3006

3007
      pStmt->stat.ctgGetTbMetaNum++;
3008

3009
      taos_free_result(pStmt->exec.pRequest);
3010
      pStmt->exec.pRequest = NULL;
3011

3012
      if (code || NULL == pTableMeta) {
3013
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
3014
        finalCode = code;
3015
        taosMemoryFree(pTableMeta);
3016
        continue;
3017
      }
3018

3019
      pMeta->uid = pTableMeta->uid;
3020
      pStmt->bInfo.tbUid = pTableMeta->uid;
3021
      taosMemoryFree(pTableMeta);
3022
    }
3023

3024
    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
3025
  }
3026

3027
  return finalCode;
3028
}
3029
*/
3030
/*
3031
int stmtStaticModeExec(TAOS_STMT* stmt) {
3032
  STscStmt2*   pStmt = (STscStmt2*)stmt;
3033
  int32_t     code = 0;
3034
  SSubmitRsp* pRsp = NULL;
3035
  if (pStmt->sql.staticMode) {
3036
    return TSDB_CODE_TSC_STMT_API_ERROR;
3037
  }
3038

3039
  STMT_DLOG_E("start to exec");
3040

3041
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
3042

3043
  STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList,
3044
pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum));
3045

3046
  launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
3047

3048
  if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
3049
    code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
3050
    if (code) {
3051
      pStmt->exec.pRequest->code = code;
3052
    } else {
3053
      tFreeSSubmitRsp(pRsp);
3054
      STMT_ERR_RET(stmtResetStmt(pStmt));
3055
      STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
3056
    }
3057
  }
3058

3059
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
3060

3061
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
3062
  pStmt->affectedRows += pStmt->exec.affectedRows;
3063

3064
_return:
3065

3066
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
3067

3068
  tFreeSSubmitRsp(pRsp);
3069

3070
  ++pStmt->sql.runTimes;
3071

3072
  STMT_RET(code);
3073
}
3074
*/
3075

3076
static int32_t createParseContext(const SRequestObj* pRequest, SParseContext** pCxt, SSqlCallbackWrapper* pWrapper) {
5,544✔
3077
  const STscObj* pTscObj = pRequest->pTscObj;
5,544✔
3078

3079
  *pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
5,544✔
3080
  if (*pCxt == NULL) {
5,544✔
3081
    return terrno;
×
3082
  }
3083

3084
  **pCxt = (SParseContext){.requestId = pRequest->requestId,
×
3085
                           .requestRid = pRequest->self,
5,544✔
3086
                           .acctId = pTscObj->acctId,
5,544✔
3087
                           .db = pRequest->pDb,
5,544✔
3088
                           .topicQuery = false,
3089
                           .pSql = pRequest->sqlstr,
5,544✔
3090
                           .sqlLen = pRequest->sqlLen,
5,544✔
3091
                           .pMsg = pRequest->msgBuf,
5,544✔
3092
                           .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
3093
                           .pTransporter = pTscObj->pAppInfo->pTransporter,
5,544✔
3094
                           .pStmtCb = NULL,
3095
                           .pUser = pTscObj->user,
5,544✔
3096
                           .userId = pTscObj->userId,
5,544✔
3097
                           .pEffectiveUser = pRequest->effectiveUser,
5,544✔
3098
                           .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
5,544✔
3099
                           .enableSysInfo = pTscObj->sysInfo,
5,544✔
3100
                           .sodInitial = pTscObj->pAppInfo->serverCfg.sodInitial,
5,544✔
3101
                           .privInfo = pWrapper->pParseCtx ? pWrapper->pParseCtx->privInfo : 0,
5,544✔
3102
                           .async = true,
3103
                           .svrVer = pTscObj->sVer,
5,544✔
3104
                           .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
5,544✔
3105
                           .allocatorId = pRequest->allocatorRefId,
5,544✔
3106
                           .parseSqlFp = clientParseSql,
3107
                           .parseSqlParam = pWrapper};
3108
  int8_t biMode = atomic_load_8(&((STscObj*)pTscObj)->biMode);
5,544✔
3109
  (*pCxt)->biMode = biMode;
5,544✔
3110
  return TSDB_CODE_SUCCESS;
5,544✔
3111
}
3112

3113
static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
5,544✔
3114
  STscStmt2*        pStmt = userdata;
5,544✔
3115
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
5,544✔
3116
  pStmt->asyncResultAvailable = true;
5,544✔
3117
  pStmt->exec.pRequest->inCallback = true;
5,544✔
3118

3119
  // NEED_CLIENT_HANDLE_ERROR: retry internally without notifying user; retry completion uses this same cb + fp once.
3120
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pStmt->pVgDataBlocksForRetry != NULL) {
5,742✔
3121
    int32_t origExecCode = code;
396✔
3122
    STMT2_ELOG("async exec got NEED_CLIENT_HANDLE_ERROR (code:%s), retrying internally", tstrerror(code));
396✔
3123

3124
    // Try to retry internally; completion uses asyncQueryCb so user fp runs once with the final result.
3125
    int32_t retryCode = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
396✔
3126
    if (retryCode == TSDB_CODE_SUCCESS) {
396✔
3127
      stmtInvalidateStbInterlaceTableUidCache(pStmt);
198✔
3128
      if (origExecCode == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
198✔
3129
        retryCode = stmtUpdateVgDataBlocksTbMetaFromCatalog(pStmt, pStmt->exec.pRequest);
198✔
3130
      } else if (stmtIsSchemaVersionRetryError(origExecCode)) {
×
3131
        retryCode = stmtUpdateVgDataBlocksSchemaVer(pStmt, pStmt->exec.pRequest);
×
3132
      }
3133
    }
3134
    if (retryCode == TSDB_CODE_SUCCESS) {
396✔
3135
      (void)stmtRestoreVgDataBlocksForRetry(pStmt);
198✔
3136
      resetRequest(pStmt);
198✔
3137
      pStmt->asyncResultAvailable = false;
198✔
3138
      retryCode = stmtCreateRequest(pStmt);
198✔
3139
      if (retryCode == TSDB_CODE_SUCCESS) {
198✔
3140
        SRequestObj*         pNewReq = pStmt->exec.pRequest;
198✔
3141
        SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
198✔
3142
        if (pWrapper == NULL) {
198✔
3143
          retryCode = terrno;
×
3144
          resetRequest(pStmt);
×
3145
        } else {
3146
          pWrapper->pRequest = pNewReq;
198✔
3147
          pNewReq->pWrapper = pWrapper;
198✔
3148
          retryCode = createParseContext(pNewReq, &pWrapper->pParseCtx, pWrapper);
198✔
3149
          if (retryCode == TSDB_CODE_SUCCESS) {
198✔
3150
            pNewReq->syncQuery = false;
198✔
3151
            // Same as first exec: asyncQueryCb invokes user asyncExecFn once with userdata (not raw pStmt as fp's 1st arg).
3152
            pNewReq->body.queryFp = asyncQueryCb;
198✔
3153
            ((SSyncQueryParam*)(pNewReq)->body.interParam)->userParam = pStmt;
198✔
3154
            launchAsyncQuery(pNewReq, pStmt->sql.pQuery, NULL, pWrapper);
198✔
3155
            // Retry asyncQueryCb will call fp, stmtCleanExecInfo, and tsem_post(asyncExecSem).
3156
            return;
198✔
3157
          }
3158
          // Do not taosMemoryFree(pWrapper): destroyRequest frees it via destorySqlCallbackWrapper.
3159
          resetRequest(pStmt);
×
3160
        }
3161
      }
3162
    }
3163
    // Retry setup failed (did not return above): notify user once with the original error, then cleanup + post sem.
3164
    if (fp) {
198✔
3165
      fp(pStmt->options.userdata, res, code);
198✔
3166
    }
3167
  } else {
3168
    if (code == TSDB_CODE_SUCCESS) {
5,148✔
3169
      pStmt->exec.affectedRows = taos_affected_rows(res);
5,148✔
3170
      pStmt->affectedRows += pStmt->exec.affectedRows;
5,148✔
3171
    }
3172

3173
    if (fp) {
5,148✔
3174
      fp(pStmt->options.userdata, res, code);
5,148✔
3175
    }
3176
  }
3177

3178
  while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
6,620✔
3179
    taosUsleep(1);
1,274✔
3180
  }
3181
  (void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
5,346✔
3182
  ++pStmt->sql.runTimes;
5,346✔
3183
  if (pStmt->exec.pRequest != NULL) {
5,346✔
3184
    pStmt->exec.pRequest->inCallback = false;
3,366✔
3185
  }
3186

3187
  if (tsem_post(&pStmt->asyncExecSem) != 0) {
5,346✔
3188
    STMT2_ELOG_E("fail to post asyncExecSem");
×
3189
  }
3190
}
3191

3192
int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
655,086✔
3193
  STscStmt2* pStmt = (STscStmt2*)stmt;
655,086✔
3194
  int32_t    code = 0;
655,086✔
3195
  int64_t    startUs = taosGetTimestampUs();
655,212✔
3196

3197
  STMT2_DLOG_E("start to exec");
655,212✔
3198

3199
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
655,186✔
3200
    return pStmt->errCode;
198✔
3201
  }
3202

3203
  STMT_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
654,946✔
3204
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
655,055✔
3205
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
3206
  }
3207
  STMT_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
654,922✔
3208

3209
  if (pStmt->sql.stbInterlaceMode) {
654,989✔
3210
    STMT_ERR_RET(stmtAddBatch2(pStmt));
507,154✔
3211
  }
3212

3213
  code = stmtSwitchStatus(pStmt, STMT_EXECUTE);
654,989✔
3214
  if (code != TSDB_CODE_SUCCESS) goto _return;
654,901✔
3215

3216
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
654,474✔
3217
    if (pStmt->sql.stbInterlaceMode) {
643,917✔
3218
      int64_t startTs = taosGetTimestampUs();
507,115✔
3219
      // wait for stmt bind thread to finish
3220
      while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
3,730,557✔
3221
        taosUsleep(1);
3,223,436✔
3222
      }
3223

3224
      if (pStmt->errCode != TSDB_CODE_SUCCESS) {
507,099✔
3225
        return pStmt->errCode;
198✔
3226
      }
3227

3228
      pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs;
506,746✔
3229
      STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList));
506,825✔
3230
      taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
506,943✔
3231
      pStmt->sql.siInfo.pVgroupHash = NULL;
506,919✔
3232
      pStmt->sql.siInfo.pVgroupList = NULL;
506,919✔
3233
    } else {
3234
      tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
136,934✔
3235
      taosMemoryFreeClear(pStmt->exec.pCurrTbData);
136,923✔
3236

3237
      STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
136,923✔
3238

3239
      STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
136,766✔
3240
    }
3241
    // Save serialized data blocks for potential NEED_CLIENT_HANDLE_ERROR retry before the planner
3242
    // takes ownership of pDataBlocks during createQueryPlan.
3243
    STMT_ERR_RET(stmtSaveVgDataBlocksForRetry(pStmt));
643,402✔
3244
  }
3245

3246
  pStmt->asyncResultAvailable = false;
654,051✔
3247
  SRequestObj*      pRequest = pStmt->exec.pRequest;
654,041✔
3248
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
654,041✔
3249
  STMT2_DLOG("EXEC INFO :req:0x%" PRIx64 ", QID:0x%" PRIx64 ", exec sql:%s,  conn:%" PRId64, pRequest->self,
653,836✔
3250
             pRequest->requestId, pStmt->sql.sqlStr, pRequest->pTscObj->id);
3251

3252
  if (!fp) {
654,028✔
3253
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
648,682✔
3254

3255
    if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
648,619✔
3256
      int32_t origExecCode = pStmt->exec.pRequest->code;
494✔
3257
      STMT2_WLOG_E("exec failed errorcode:NEED_CLIENT_HANDLE_ERROR, refresh meta and retry internally");
494✔
3258
      code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
494✔
3259
      if (code == TSDB_CODE_SUCCESS) {
494✔
3260
        stmtInvalidateStbInterlaceTableUidCache(pStmt);
494✔
3261
      }
3262
      if (code == TSDB_CODE_SUCCESS && pStmt->pVgDataBlocksForRetry != NULL) {
494✔
3263
        if (origExecCode == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
396✔
3264
          code = stmtUpdateVgDataBlocksTbMetaFromCatalog(pStmt, pStmt->exec.pRequest);
198✔
3265
        } else if (stmtIsSchemaVersionRetryError(origExecCode)) {
198✔
3266
          code = stmtUpdateVgDataBlocksSchemaVer(pStmt, pStmt->exec.pRequest);
198✔
3267
        }
3268
      }
3269
      if (code == TSDB_CODE_SUCCESS && pStmt->pVgDataBlocksForRetry != NULL) {
494✔
3270
        // Restore saved serialized data blocks and re-execute with refreshed meta.
3271
        STMT_ERR_JRET(stmtRestoreVgDataBlocksForRetry(pStmt));
396✔
3272
        resetRequest(pStmt);
396✔
3273
        STMT_ERR_JRET(stmtCreateRequest(pStmt));
396✔
3274
        launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
396✔
3275
        code = pStmt->exec.pRequest->code;
396✔
3276
      } else if (code == TSDB_CODE_SUCCESS) {
98✔
3277
        code = pStmt->exec.pRequest->code;
98✔
3278
      } else {
3279
        pStmt->exec.pRequest->code = code;
×
3280
      }
3281
    }
3282

3283
    STMT_ERR_JRET(pStmt->exec.pRequest->code);
648,672✔
3284

3285
    pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
647,983✔
3286
    if (affected_rows) {
648,153✔
3287
      *affected_rows = pStmt->exec.affectedRows;
637,153✔
3288
    }
3289
    pStmt->affectedRows += pStmt->exec.affectedRows;
648,114✔
3290

3291
    // wait for stmt bind thread to finish
3292
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
648,469✔
3293
      taosUsleep(1);
221✔
3294
    }
3295

3296
    STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
648,210✔
3297

3298
    ++pStmt->sql.runTimes;
648,134✔
3299
  } else {
3300
    SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
5,346✔
3301
    if (pWrapper == NULL) {
5,346✔
3302
      code = terrno;
×
3303
    } else {
3304
      pWrapper->pRequest = pRequest;
5,346✔
3305
      pRequest->pWrapper = pWrapper;
5,346✔
3306
    }
3307
    if (TSDB_CODE_SUCCESS == code) {
5,346✔
3308
      code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper);
5,346✔
3309
    }
3310
    pRequest->syncQuery = false;
5,346✔
3311
    pRequest->body.queryFp = asyncQueryCb;
5,346✔
3312
    ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
5,346✔
3313

3314
    pStmt->execSemWaited = false;
5,346✔
3315
    launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
5,346✔
3316
  }
3317

3318
_return:
654,290✔
3319
  if (code) {
654,290✔
3320
    STMT2_ELOG("exec failed, error:%s", tstrerror(code));
921✔
3321
  }
3322
  pStmt->stat.execUseUs += taosGetTimestampUs() - startUs;
654,442✔
3323

3324
  STMT_RET(code);
654,474✔
3325
}
3326

3327
int stmtClose2(TAOS_STMT2* stmt) {
156,890✔
3328
  STscStmt2* pStmt = (STscStmt2*)stmt;
156,890✔
3329

3330
  STMT2_DLOG_E("start to close stmt");
156,890✔
3331
  taosMemoryFreeClear(pStmt->db);
156,890✔
3332

3333
  if (pStmt->bindThreadInUse) {
157,051✔
3334
    // wait for stmt bind thread to finish
3335
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
138,399✔
3336
      taosUsleep(1);
492✔
3337
    }
3338

3339
    (void)taosThreadMutexLock(&pStmt->queue.mutex);
137,865✔
3340
    pStmt->queue.stopQueue = true;
137,944✔
3341
    (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
137,944✔
3342
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
137,944✔
3343

3344
    (void)taosThreadJoin(pStmt->bindThread, NULL);
137,944✔
3345
    pStmt->bindThreadInUse = false;
137,944✔
3346

3347
    (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
137,944✔
3348
    (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
137,944✔
3349
  }
3350

3351
  TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
157,051✔
3352
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
157,014✔
3353
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
3354
  }
3355
  TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
157,014✔
3356

3357
  (void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond);
157,014✔
3358
  (void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex);
157,014✔
3359

3360
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
157,014✔
3361
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
2,574✔
3362
      STMT2_ELOG_E("fail to wait asyncExecSem");
×
3363
    }
3364
  }
3365

3366
  /* On macOS dispatch_semaphore_dispose requires value >= orig (1). After tsem_wait above value is 0; post once before
3367
   * destroy. */
3368
  if (pStmt->options.asyncExecFn) {
157,014✔
3369
    if (tsem_post(&pStmt->asyncExecSem) != 0) {
2,574✔
3370
      STMT2_ELOG_E("fail to post asyncExecSem");
×
3371
    }
3372
  }
3373

3374
  STMT2_DLOG("stbInterlaceMode:%d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
157,014✔
3375
             ", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
3376
             ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
3377
             ", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64
3378
             ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64,
3379
             pStmt->sql.stbInterlaceMode, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo,
3380
             pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum, pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND],
3381
             pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE], pStmt->stat.setTbNameUs,
3382
             pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
3383
             pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
3384
  if (pStmt->sql.stbInterlaceMode) {
157,014✔
3385
    pStmt->bInfo.tagsCached = false;
11,319✔
3386
  }
3387
  pStmt->bInfo.boundColsCached = false;
157,014✔
3388

3389
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
157,014✔
3390

3391
  if (pStmt->options.asyncExecFn) {
157,014✔
3392
    if (tsem_destroy(&pStmt->asyncExecSem) != 0) {
2,574✔
3393
      STMT2_ELOG_E("fail to destroy asyncExecSem");
×
3394
    }
3395
  }
3396
  taosMemoryFree(stmt);
156,974✔
3397

3398
  return TSDB_CODE_SUCCESS;
157,014✔
3399
}
3400

3401
const char* stmt2Errstr(TAOS_STMT2* stmt) {
10,825✔
3402
  STscStmt2* pStmt = (STscStmt2*)stmt;
10,825✔
3403

3404
  if (stmt == NULL || NULL == pStmt->exec.pRequest) {
10,825✔
3405
    return (char*)tstrerror(terrno);
792✔
3406
  }
3407

3408
  // if stmt async exec ,error code is pStmt->exec.pRequest->code
3409
  if (!(pStmt->sql.status >= STMT_EXECUTE && pStmt->options.asyncExecFn != NULL && pStmt->asyncResultAvailable)) {
10,033✔
3410
    pStmt->exec.pRequest->code = terrno;
10,033✔
3411
  }
3412

3413
  SRequestObj* pRequest = pStmt->exec.pRequest;
10,033✔
3414
  if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) {
10,033✔
3415
    return pRequest->msgBuf;
6,565✔
3416
  }
3417
  return (const char*)tstrerror(pRequest->code);
3,468✔
3418
}
3419

3420
// Alias kept for compatibility with object files compiled against older headers.
3421
const char* stmtErrstr2(TAOS_STMT2* stmt) { return stmt2Errstr(stmt); }
×
3422
/*
3423
int stmtAffectedRows(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->affectedRows; }
3424

3425
int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->exec.affectedRows; }
3426
*/
3427

3428
int stmtParseColFields2(TAOS_STMT2* stmt) {
14,256✔
3429
  int32_t    code = 0;
14,256✔
3430
  STscStmt2* pStmt = (STscStmt2*)stmt;
14,256✔
3431
  int32_t    preCode = pStmt->errCode;
14,256✔
3432

3433
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
14,256✔
3434
    return pStmt->errCode;
×
3435
  }
3436

3437
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
14,256✔
3438
    STMT2_ELOG_E("stmtParseColFields2 only for insert");
×
3439
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
3440
  }
3441

3442
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
14,256✔
3443

3444
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
14,256✔
3445
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
594✔
3446
    pStmt->bInfo.needParse = false;
×
3447
  }
3448
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
14,256✔
3449
    pStmt->bInfo.needParse = false;
1,188✔
3450
  }
3451

3452
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
14,256✔
3453

3454
  if (pStmt->bInfo.needParse) {
14,256✔
3455
    STMT_ERRI_JRET(stmtParseSql(pStmt));
12,870✔
3456
  }
3457

3458
_return:
10,719✔
3459
  // compatible with previous versions
3460
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST && (pStmt->bInfo.tbNameFlag & NO_DATA_USING_CLAUSE) == 0x0) {
14,256✔
3461
    code = TSDB_CODE_TSC_STMT_TBNAME_ERROR;
792✔
3462
  }
3463

3464
  pStmt->errCode = preCode;
14,256✔
3465

3466
  return code;
14,256✔
3467
}
3468

3469
int stmtGetStbColFields2(TAOS_STMT2* stmt, int* nums, TAOS_FIELD_ALL** fields) {
14,256✔
3470
  int32_t code = stmtParseColFields2(stmt);
14,256✔
3471
  if (code != TSDB_CODE_SUCCESS) {
14,256✔
3472
    return code;
3,537✔
3473
  }
3474

3475
  return stmtFetchStbColFields2(stmt, nums, fields);
10,719✔
3476
}
3477

3478
int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) {
3,762✔
3479
  int32_t    code = 0;
3,762✔
3480
  STscStmt2* pStmt = (STscStmt2*)stmt;
3,762✔
3481
  int32_t    preCode = pStmt->errCode;
3,762✔
3482

3483
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
3,762✔
3484
    return pStmt->errCode;
×
3485
  }
3486

3487
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
3,762✔
3488

3489
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
3,762✔
3490
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
3491
    pStmt->bInfo.needParse = false;
×
3492
  }
3493

3494
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
3,762✔
3495
    resetRequest(pStmt);
×
3496
  }
3497

3498
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
3,762✔
3499

3500
  if (pStmt->bInfo.needParse) {
3,762✔
3501
    STMT_ERRI_JRET(stmtParseSql(pStmt));
×
3502
  }
3503

3504
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
3,762✔
3505
    *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
3,762✔
3506
  } else {
3507
    STMT_ERRI_JRET(stmtFetchColFields2(stmt, nums, NULL));
×
3508
  }
3509

3510
  STMT2_DLOG("get param num success, nums:%d", *nums);
3,762✔
3511

3512
_return:
3,762✔
3513

3514
  pStmt->errCode = preCode;
3,762✔
3515

3516
  return code;
3,762✔
3517
}
3518

3519
TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) {
9,731✔
3520
  STscStmt2* pStmt = (STscStmt2*)stmt;
9,731✔
3521

3522
  STMT2_TLOG_E("start to use result");
9,731✔
3523

3524
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
9,731✔
3525
    STMT2_ELOG_E("useResult only for query statement");
×
3526
    return NULL;
×
3527
  }
3528

3529
  if (pStmt->options.asyncExecFn != NULL && !pStmt->asyncResultAvailable) {
9,731✔
3530
    STMT2_ELOG_E("use result after callBackFn return");
198✔
3531
    return NULL;
198✔
3532
  }
3533

3534
  if (tsUseAdapter) {
9,533✔
3535
    TAOS_RES* res = (TAOS_RES*)pStmt->exec.pRequest;
4,554✔
3536
    pStmt->exec.pRequest = NULL;
4,554✔
3537
    return res;
4,554✔
3538
  }
3539

3540
  return pStmt->exec.pRequest;
4,979✔
3541
}
3542

3543
int32_t stmtAsyncBindThreadFunc(void* args) {
×
3544
  qInfo("async stmt bind thread started");
×
3545

3546
  ThreadArgs* targs = (ThreadArgs*)args;
×
3547
  STscStmt2*  pStmt = (STscStmt2*)targs->stmt;
×
3548

3549
  int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx);
×
3550
  targs->fp(targs->param, NULL, code);
×
3551
  (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
×
3552
  (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
×
3553
  (void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
×
3554
  (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
×
3555
  taosMemoryFree(args);
×
3556

3557
  qInfo("async stmt bind thread stopped");
×
3558

3559
  return code;
×
3560
}
3561

3562
void stmtBuildErrorMsg(STscStmt2* pStmt, const char* msg) {
594✔
3563
  if (pStmt == NULL || msg == NULL) {
594✔
3564
    return;
×
3565
  }
3566

3567
  if (pStmt->exec.pRequest == NULL) {
594✔
3568
    return;
×
3569
  }
3570

3571
  if (pStmt->exec.pRequest->msgBuf == NULL) {
594✔
3572
    return;
×
3573
  }
3574

3575
  size_t msgLen = strlen(msg);
594✔
3576
  size_t bufLen = pStmt->exec.pRequest->msgBufLen;
594✔
3577

3578
  if (msgLen >= bufLen) {
594✔
3579
    tstrncpy(pStmt->exec.pRequest->msgBuf, msg, bufLen - 1);
×
3580
    pStmt->exec.pRequest->msgBuf[bufLen - 1] = '\0';
×
3581
    pStmt->exec.pRequest->msgBufLen = bufLen - 1;
×
3582
  } else {
3583
    tstrncpy(pStmt->exec.pRequest->msgBuf, msg, bufLen);
594✔
3584
    pStmt->exec.pRequest->msgBufLen = msgLen;
594✔
3585
  }
3586

3587
  return;
594✔
3588
}
3589

3590
int32_t stmtBuildErrorMsgWithCode(STscStmt2* pStmt, const char* msg, int32_t errorCode) {
594✔
3591
  stmtBuildErrorMsg(pStmt, msg);
594✔
3592
  pStmt->errCode = errorCode;
594✔
3593

3594
  return errorCode;
594✔
3595
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc