• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5034

24 Apr 2026 11:25AM UTC coverage: 73.058%. Remained the same
#5034

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1336 of 1975 new or added lines in 48 files covered. (67.65%)

14149 existing lines in 164 files now uncovered.

275896 of 377640 relevant lines covered (73.06%)

132944440.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.14
/source/client/src/clientStmt2.c
1
#include "clientInt.h"
2
#include "clientLog.h"
3
#include "tdef.h"
4
#include "tglobal.h"
5

6
#include "clientStmt.h"
7
#include "clientStmt2.h"
8
#include "tencode.h"
9
#include "tmsg.h"
10
#include "tname.h"
11
#include "trow.h"
12

13
char* gStmt2StatusStr[] = {"unknown",     "init", "prepare", "settbname", "settags",
14
                           "fetchFields", "bind", "bindCol", "addBatch",  "exec"};
15

16

17
static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pBuf) {
18
  if (pTblBuf->buffOffset < pTblBuf->buffSize) {
1,688,410✔
19
    *pBuf = (char*)pTblBuf->pCurBuff + pTblBuf->buffOffset;
1,688,558✔
20
    pTblBuf->buffOffset += pTblBuf->buffUnit;
1,688,448✔
UNCOV
21
  } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) {
×
UNCOV
22
    pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++);
×
UNCOV
23
    if (NULL == pTblBuf->pCurBuff) {
×
UNCOV
24
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
25
    }
26
    *pBuf = pTblBuf->pCurBuff;
×
27
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
28
  } else {
UNCOV
29
    void* buff = taosMemoryMalloc(pTblBuf->buffSize);
×
30
    if (NULL == buff) {
×
31
      return terrno;
×
32
    }
33

34
    if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
×
35
      return terrno;
×
36
    }
37

38
    pTblBuf->buffIdx++;
×
39
    pTblBuf->pCurBuff = buff;
×
UNCOV
40
    *pBuf = buff;
×
UNCOV
41
    pTblBuf->buffOffset = pTblBuf->buffUnit;
×
42
  }
43

44
  return TSDB_CODE_SUCCESS;
1,688,263✔
45
}
46

47
static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) {
1,688,217✔
48
  int i = 0;
1,688,217✔
49
  while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
7,929,001✔
50
    if (pStmt->queue.stopQueue) {
6,385,411✔
51
      return false;
144,273✔
52
    }
53
    if (i < 10) {
6,241,063✔
54
      taosUsleep(1);
5,836,336✔
55
      i++;
5,835,444✔
56
    } else {
57
      (void)taosThreadMutexLock(&pStmt->queue.mutex);
404,727✔
58
      if (pStmt->queue.stopQueue) {
405,536✔
UNCOV
59
        (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
×
UNCOV
60
        return false;
×
61
      }
62
      if (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
405,571✔
63
        (void)taosThreadCondWait(&pStmt->queue.waitCond, &pStmt->queue.mutex);
405,317✔
64
      }
65
      (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
405,362✔
66
    }
67
  }
68

69
  if (pStmt->queue.stopQueue && 0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
1,543,262✔
UNCOV
70
    return false;
×
71
  }
72

73
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
1,543,297✔
74
  if (pStmt->queue.head == pStmt->queue.tail) {
1,543,751✔
UNCOV
75
    pStmt->queue.qRemainNum = 0;
×
UNCOV
76
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
×
UNCOV
77
    STMT2_ELOG_E("interlace queue is empty, cannot dequeue");
×
UNCOV
78
    return false;
×
79
  }
80

81
  SStmtQNode* node = pStmt->queue.head->next;
1,543,821✔
82
  pStmt->queue.head->next = node->next;
1,543,786✔
83
  if (pStmt->queue.tail == node) {
1,543,821✔
84
    pStmt->queue.tail = pStmt->queue.head;
801,706✔
85
  }
86
  node->next = NULL;
1,543,821✔
87
  *param = node;
1,543,786✔
88

89
  (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1);
1,543,751✔
90
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
1,543,878✔
91

92
  STMT2_TLOG("dequeue success, node:%p, remainNum:%" PRId64, node, pStmt->queue.qRemainNum);
1,543,882✔
93

94
  return true;
1,543,842✔
95
}
96

97
static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) {
1,543,308✔
98
  if (param == NULL) {
1,543,308✔
UNCOV
99
    STMT2_ELOG_E("enqueue param is NULL");
×
UNCOV
100
    return;
×
101
  }
102

103
  param->next = NULL;
1,543,308✔
104

105
  (void)taosThreadMutexLock(&pStmt->queue.mutex);
1,543,538✔
106

107
  pStmt->queue.tail->next = param;
1,543,695✔
108
  pStmt->queue.tail = param;
1,543,620✔
109
  pStmt->stat.bindDataNum++;
1,543,545✔
110

111
  (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
1,543,695✔
112
  (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
1,543,921✔
113

114
  (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
1,543,594✔
115

116
  STMT2_TLOG("enqueue param:%p, remainNum:%" PRId64 ", restoreTbCols:%d", param, pStmt->queue.qRemainNum,
1,543,820✔
117
             param->restoreTbCols);
118
}
119

120
static int32_t stmtCreateRequest(STscStmt2* pStmt) {
84,660,899✔
121
  int32_t code = 0;
84,660,899✔
122

123
  if (pStmt->exec.pRequest == NULL) {
84,660,899✔
124
    code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
173,014✔
125
                        pStmt->reqid);
126
    if (pStmt->reqid != 0) {
173,014✔
127
      pStmt->reqid++;
9✔
128
    }
129
    pStmt->exec.pRequest->type = RES_TYPE__QUERY;
173,014✔
130
    if (pStmt->db != NULL) {
173,014✔
131
      taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
162,656✔
132
      pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
162,620✔
133
    }
134
    if (TSDB_CODE_SUCCESS == code) {
173,014✔
135
      pStmt->exec.pRequest->syncQuery = true;
173,014✔
136
      pStmt->exec.pRequest->stmtBindVersion = 2;
173,014✔
137
    }
138
    STMT2_DLOG("create request:0x%" PRIx64 ", QID:0x%" PRIx64, pStmt->exec.pRequest->self,
173,014✔
139
               pStmt->exec.pRequest->requestId);
140
  }
141

142
  return code;
85,218,707✔
143
}
144

145
static int32_t stmtSwitchStatus(STscStmt2* pStmt, STMT_STATUS newStatus) {
166,098,424✔
146
  int32_t code = 0;
166,098,424✔
147

148
  if (newStatus >= STMT_INIT && newStatus < STMT_MAX) {
166,098,424✔
149
    STMT2_LOG_SEQ(newStatus);
168,365,552✔
150
  }
151

152
  if (pStmt->errCode && newStatus != STMT_PREPARE) {
168,183,548✔
UNCOV
153
    STMT2_ELOG("stmt already failed with err:%s, please use stmt prepare", tstrerror(pStmt->errCode));
×
UNCOV
154
    return pStmt->errCode;
×
155
  }
156

157
  switch (newStatus) {
168,016,149✔
158
    case STMT_PREPARE:
161,522✔
159
      pStmt->errCode = 0;
161,522✔
160
      break;
46,560✔
161
    case STMT_SETTBNAME:
1,078,220✔
162
      if (STMT_STATUS_EQ(INIT)) {
1,078,220✔
UNCOV
163
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
164
      }
165
      if (!pStmt->sql.stbInterlaceMode && (STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL))) {
1,078,255✔
UNCOV
166
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
167
      }
168
      break;
1,078,255✔
169
    case STMT_SETTAGS:
676,293✔
170
      if (STMT_STATUS_EQ(INIT)) {
676,293✔
UNCOV
171
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
172
      }
173
      break;
676,293✔
174
    case STMT_FETCH_FIELDS:
17,827✔
175
      if (STMT_STATUS_EQ(INIT)) {
17,827✔
UNCOV
176
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
177
      }
178
      break;
17,827✔
179
    case STMT_BIND:
82,734,546✔
180
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
82,734,546✔
181
        code = TSDB_CODE_TSC_STMT_API_ERROR;
196✔
182
      }
183
      /*
184
            if ((pStmt->sql.type == STMT_TYPE_MULTI_INSERT) && ()) {
185
              code = TSDB_CODE_TSC_STMT_API_ERROR;
186
            }
187
      */
188
      break;
84,096,188✔
UNCOV
189
    case STMT_BIND_COL:
×
UNCOV
190
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
×
UNCOV
191
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
192
      }
193
      break;
×
194
    case STMT_ADD_BATCH:
82,706,850✔
195
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
82,706,850✔
UNCOV
196
        code = TSDB_CODE_TSC_STMT_API_ERROR;
×
197
      }
198
      break;
82,975,053✔
199
    case STMT_EXECUTE:
640,891✔
200
      if (STMT_TYPE_QUERY == pStmt->sql.type) {
640,891✔
201
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
7,457✔
UNCOV
202
            STMT_STATUS_NE(BIND_COL)) {
×
UNCOV
203
          code = TSDB_CODE_TSC_STMT_API_ERROR;
×
204
        }
205
      } else {
206
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
633,399✔
207
          code = TSDB_CODE_TSC_STMT_API_ERROR;
×
208
        }
209
      }
210
      break;
640,926✔
211
    default:
×
UNCOV
212
      code = TSDB_CODE_APP_ERROR;
×
UNCOV
213
      break;
×
214
  }
215

216
  STMT_ERR_RET(code);
168,603,717✔
217

218
  pStmt->sql.status = newStatus;
168,603,521✔
219

220
  return TSDB_CODE_SUCCESS;
169,570,331✔
221
}
222

223
static int32_t stmtGetTbName(TAOS_STMT2* stmt, char** tbName) {
29,118✔
224
  STscStmt2* pStmt = (STscStmt2*)stmt;
29,118✔
225

226
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
29,118✔
227

228
  if ('\0' == pStmt->bInfo.tbName[0]) {
29,118✔
229
    tscWarn("no table name set, OK if it is a stmt get fields");
8,645✔
230
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
8,645✔
231
  }
232

233
  *tbName = pStmt->bInfo.tbName;
20,473✔
234

235
  return TSDB_CODE_SUCCESS;
20,473✔
236
}
237

238
static int32_t stmtUpdateBindInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SSHashObj** cols, SName* tbName,
153,150✔
239
                                  const char* sTableName, bool autoCreateTbl, int8_t tbNameFlag) {
240
  STscStmt2* pStmt = (STscStmt2*)stmt;
153,150✔
241
  char       tbFName[TSDB_TABLE_FNAME_LEN];
114,729✔
242
  int32_t    code = tNameExtractFullName(tbName, tbFName);
153,020✔
243
  if (code != 0) {
153,281✔
UNCOV
244
    return code;
×
245
  }
246

247
  if ((tags != NULL && ((SBoundColInfo*)tags)->numOfCols == 0) || !autoCreateTbl) {
153,281✔
248
    pStmt->sql.autoCreateTbl = false;
131,455✔
249
  }
250

251
  (void)memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
153,241✔
252
  tstrncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName));
153,241✔
253
  pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
153,276✔
254

255
  pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
153,276✔
256
  pStmt->bInfo.tbSuid = pTableMeta->suid;
153,208✔
257
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
153,143✔
258
  pStmt->bInfo.tbType = pTableMeta->tableType;
153,176✔
259

260
  if (!pStmt->bInfo.tagsCached) {
152,977✔
261
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
151,839✔
262
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
152,032✔
263
  }
264

265
  // transfer ownership of cols to stmt
266
  if (cols) {
153,405✔
267
    pStmt->bInfo.fixedValueCols = *cols;
153,241✔
268
    *cols = NULL;
152,977✔
269
  }
270

271
  pStmt->bInfo.boundTags = tags;
153,073✔
272
  pStmt->bInfo.tagsCached = false;
152,909✔
273
  pStmt->bInfo.tbNameFlag = tbNameFlag;
153,170✔
274
  tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
153,208✔
275

276
  if (pTableMeta->tableType != TSDB_CHILD_TABLE && pTableMeta->tableType != TSDB_SUPER_TABLE) {
153,205✔
277
    pStmt->sql.stbInterlaceMode = false;
4,617✔
278
  }
279

280
  return TSDB_CODE_SUCCESS;
153,205✔
281
}
282

283
static int32_t stmtUpdateExecInfo(TAOS_STMT2* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
153,108✔
284
  STscStmt2* pStmt = (STscStmt2*)stmt;
153,108✔
285

286
  pStmt->sql.pVgHash = pVgHash;
153,108✔
287
  pStmt->exec.pBlockHash = pBlockHash;
153,148✔
288

289
  return TSDB_CODE_SUCCESS;
152,880✔
290
}
291

292
static int32_t stmtUpdateInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* tags, SSHashObj** cols, SName* tbName,
153,172✔
293
                              bool autoCreateTbl, SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName,
294
                              uint8_t tbNameFlag) {
295
  STscStmt2* pStmt = (STscStmt2*)stmt;
153,172✔
296

297
  STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, cols, tbName, sTableName, autoCreateTbl, tbNameFlag));
153,172✔
298
  STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
153,181✔
299

300
  pStmt->sql.autoCreateTbl = autoCreateTbl;
152,915✔
301

302
  return TSDB_CODE_SUCCESS;
152,915✔
303
}
304

305
static int32_t stmtGetExecInfo(TAOS_STMT2* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
3,549✔
306
  STscStmt2* pStmt = (STscStmt2*)stmt;
3,549✔
307

308
  *pVgHash = pStmt->sql.pVgHash;
3,549✔
309
  pStmt->sql.pVgHash = NULL;
3,549✔
310

311
  *pBlockHash = pStmt->exec.pBlockHash;
3,549✔
312
  pStmt->exec.pBlockHash = NULL;
3,549✔
313

314
  return TSDB_CODE_SUCCESS;
3,549✔
315
}
316

317
static int32_t stmtParseSql(STscStmt2* pStmt) {
161,904✔
318
  pStmt->exec.pCurrBlock = NULL;
161,904✔
319

320
  SStmtCallback stmtCb = {
161,937✔
321
      .pStmt = pStmt,
322
      .getTbNameFn = stmtGetTbName,
323
      .setInfoFn = stmtUpdateInfo,
324
      .getExecInfoFn = stmtGetExecInfo,
325
  };
326

327
  STMT_ERR_RET(stmtCreateRequest(pStmt));
162,703✔
328
  pStmt->exec.pRequest->stmtBindVersion = 2;
162,741✔
329

330
  pStmt->stat.parseSqlNum++;
161,975✔
331

332
  STMT2_DLOG("start to parse, QID:0x%" PRIx64, pStmt->exec.pRequest->requestId);
162,741✔
333
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
162,741✔
334

335
  pStmt->sql.siInfo.pQuery = pStmt->sql.pQuery;
158,969✔
336

337
  pStmt->bInfo.needParse = false;
158,940✔
338

339
  if (pStmt->sql.type == 0) {
158,712✔
340
    if (pStmt->sql.pQuery->pRoot && LEGAL_INSERT(nodeType(pStmt->sql.pQuery->pRoot))) {
131,005✔
341
      pStmt->sql.type = STMT_TYPE_INSERT;
125,345✔
342
      pStmt->sql.stbInterlaceMode = false;
125,345✔
343
    } else if (pStmt->sql.pQuery->pPrepareRoot && LEGAL_SELECT(nodeType(pStmt->sql.pQuery->pPrepareRoot))) {
5,983✔
344
      pStmt->sql.type = STMT_TYPE_QUERY;
5,693✔
345
      pStmt->sql.stbInterlaceMode = false;
5,693✔
346

347
      return TSDB_CODE_SUCCESS;
5,693✔
348
    } else {
UNCOV
349
      STMT2_ELOG_E("only support select or insert sql");
×
UNCOV
350
      if (pStmt->exec.pRequest->msgBuf) {
×
UNCOV
351
        tstrncpy(pStmt->exec.pRequest->msgBuf, "stmt only support select or insert", pStmt->exec.pRequest->msgBufLen);
×
352
      }
353
      return TSDB_CODE_PAR_SYNTAX_ERROR;
×
354
    }
355
  } else if (pStmt->sql.type == STMT_TYPE_QUERY) {
27,902✔
UNCOV
356
    pStmt->sql.stbInterlaceMode = false;
×
357
    return TSDB_CODE_SUCCESS;
×
358
  } else if (pStmt->sql.type == STMT_TYPE_INSERT) {
27,902✔
UNCOV
359
    pStmt->sql.stbInterlaceMode = false;
×
360
  }
361

362
  STableDataCxt** pSrc =
363
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
153,243✔
364
  if (NULL == pSrc || NULL == *pSrc) {
153,354✔
UNCOV
365
    STMT2_ELOG("fail to get exec.pBlockHash, maybe parse failed, tbFName:%s", pStmt->bInfo.tbFName);
×
UNCOV
366
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
367
  }
368

369
  STableDataCxt* pTableCtx = *pSrc;
153,354✔
370
  if (pStmt->sql.stbInterlaceMode && pTableCtx->pData->pCreateTbReq && (pStmt->bInfo.tbNameFlag & USING_CLAUSE) == 0) {
153,354✔
371
    STMT2_TLOG("destroy pCreateTbReq for no-using insert, tbFName:%s", pStmt->bInfo.tbFName);
4,116✔
372
    tdDestroySVCreateTbReq(pTableCtx->pData->pCreateTbReq);
4,116✔
373
    taosMemoryFreeClear(pTableCtx->pData->pCreateTbReq);
4,116✔
374
    pTableCtx->pData->pCreateTbReq = NULL;
4,116✔
375
  }
376
  // if (pStmt->sql.stbInterlaceMode) {
377
  //   int16_t lastIdx = -1;
378

379
  //   for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) {
380
  //     if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) {
381
  //       pStmt->sql.stbInterlaceMode = false;
382
  //       break;
383
  //     }
384

385
  //     lastIdx = pTableCtx->boundColsInfo.pColIndex[i];
386
  //   }
387
  // }
388

389
  if (NULL == pStmt->sql.pBindInfo) {
153,354✔
390
    pStmt->sql.pBindInfo = taosMemoryMalloc(pTableCtx->boundColsInfo.numOfBound * sizeof(*pStmt->sql.pBindInfo));
149,767✔
391
    if (NULL == pStmt->sql.pBindInfo) {
149,767✔
UNCOV
392
      STMT2_ELOG_E("fail to malloc pBindInfo");
×
UNCOV
393
      return terrno;
×
394
    }
395
  }
396

397
  return TSDB_CODE_SUCCESS;
153,314✔
398
}
399

UNCOV
400
static int32_t stmtPrintBindv(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bindv, int32_t col_idx, bool isTags) {
×
UNCOV
401
  STscStmt2* pStmt = (STscStmt2*)stmt;
×
UNCOV
402
  int32_t    count = 0;
×
UNCOV
403
  int32_t    code = 0;
×
404

405
  if (bindv == NULL) {
×
406
    STMT2_TLOG("bindv is NULL, col_idx:%d, isTags:%d", col_idx, isTags);
×
407
    return TSDB_CODE_SUCCESS;
×
408
  }
409

410
  if (col_idx >= 0) {
×
411
    count = 1;
×
UNCOV
412
    STMT2_TLOG("single col bind, col_idx:%d", col_idx);
×
413
  } else {
414
    if (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type ||
×
415
        (pStmt->sql.type == 0 && stmt2IsInsert(stmt))) {
×
416
      if (pStmt->sql.placeholderOfTags == 0 && pStmt->sql.placeholderOfCols == 0) {
×
UNCOV
417
        code = stmtGetStbColFields2(pStmt, NULL, NULL);
×
418
        if (code != TSDB_CODE_SUCCESS) {
×
419
          return code;
×
420
        }
421
      }
422
      if (isTags) {
×
423
        count = pStmt->sql.placeholderOfTags;
×
UNCOV
424
        STMT2_TLOG("print tags bindv, cols:%d", count);
×
425
      } else {
426
        count = pStmt->sql.placeholderOfCols;
×
427
        STMT2_TLOG("print cols bindv, cols:%d", count);
×
428
      }
UNCOV
429
    } else if (STMT_TYPE_QUERY == pStmt->sql.type || (pStmt->sql.type == 0 && stmt2IsSelect(stmt))) {
×
430
      count = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
×
431
      STMT2_TLOG("print query bindv, cols:%d", count);
×
432
    }
433
  }
434

435
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
436
    STMT2_ELOG("failed to get param count, code:%d", code);
×
UNCOV
437
    return code;
×
438
  }
439

440
  for (int i = 0; i < count; i++) {
×
441
    int32_t type = bindv[i].buffer_type;
×
UNCOV
442
    int32_t num = bindv[i].num;
×
UNCOV
443
    char*   current_buf = (char*)bindv[i].buffer;
×
444

445
    for (int j = 0; j < num; j++) {
×
446
      char    buf[256] = {0};
×
447
      int32_t len = 0;
×
UNCOV
448
      bool    isNull = (bindv[i].is_null && bindv[i].is_null[j]);
×
449

450
      if (IS_VAR_DATA_TYPE(type) && bindv[i].length) {
×
451
        len = bindv[i].length[j];
×
452
      } else {
UNCOV
453
        len = tDataTypes[type].bytes;
×
454
      }
455

UNCOV
456
      if (isNull) {
×
457
        snprintf(buf, sizeof(buf), "NULL");
×
458
      } else {
UNCOV
459
        if (current_buf == NULL) {
×
460
          snprintf(buf, sizeof(buf), "NULL(Buf)");
×
461
        } else {
UNCOV
462
          switch (type) {
×
463
            case TSDB_DATA_TYPE_BOOL:
×
464
              snprintf(buf, sizeof(buf), "%d", *(int8_t*)current_buf);
×
UNCOV
465
              break;
×
466
            case TSDB_DATA_TYPE_TINYINT:
×
467
              snprintf(buf, sizeof(buf), "%d", *(int8_t*)current_buf);
×
468
              break;
×
469
            case TSDB_DATA_TYPE_SMALLINT:
×
470
              snprintf(buf, sizeof(buf), "%d", *(int16_t*)current_buf);
×
471
              break;
×
472
            case TSDB_DATA_TYPE_INT:
×
473
              snprintf(buf, sizeof(buf), "%d", *(int32_t*)current_buf);
×
474
              break;
×
475
            case TSDB_DATA_TYPE_BIGINT:
×
476
              snprintf(buf, sizeof(buf), "%" PRId64, *(int64_t*)current_buf);
×
477
              break;
×
478
            case TSDB_DATA_TYPE_FLOAT:
×
479
              snprintf(buf, sizeof(buf), "%f", *(float*)current_buf);
×
480
              break;
×
481
            case TSDB_DATA_TYPE_DOUBLE:
×
482
              snprintf(buf, sizeof(buf), "%f", *(double*)current_buf);
×
483
              break;
×
484
            case TSDB_DATA_TYPE_BINARY:
×
485
            case TSDB_DATA_TYPE_NCHAR:
486
            case TSDB_DATA_TYPE_GEOMETRY:
487
            case TSDB_DATA_TYPE_VARBINARY:
488
              snprintf(buf, sizeof(buf), "len:%d, val:%.*s", len, len, current_buf);
×
UNCOV
489
              break;
×
UNCOV
490
            case TSDB_DATA_TYPE_TIMESTAMP:
×
UNCOV
491
              snprintf(buf, sizeof(buf), "%" PRId64, *(int64_t*)current_buf);
×
492
              break;
×
493
            case TSDB_DATA_TYPE_UTINYINT:
×
494
              snprintf(buf, sizeof(buf), "%u", *(uint8_t*)current_buf);
×
495
              break;
×
496
            case TSDB_DATA_TYPE_USMALLINT:
×
497
              snprintf(buf, sizeof(buf), "%u", *(uint16_t*)current_buf);
×
498
              break;
×
499
            case TSDB_DATA_TYPE_UINT:
×
500
              snprintf(buf, sizeof(buf), "%u", *(uint32_t*)current_buf);
×
501
              break;
×
502
            case TSDB_DATA_TYPE_UBIGINT:
×
503
              snprintf(buf, sizeof(buf), "%" PRIu64, *(uint64_t*)current_buf);
×
504
              break;
×
505
            default:
×
506
              snprintf(buf, sizeof(buf), "UnknownType:%d", type);
×
507
              break;
×
508
          }
509
        }
510
      }
511

UNCOV
512
      STMT2_TLOG("bindv[%d] row[%d]: type:%s, val:%s", i, j, tDataTypes[type].name, buf);
×
513

UNCOV
514
      if (!isNull && current_buf) {
×
UNCOV
515
        current_buf += len;
×
516
      }
517
    }
518
  }
519

UNCOV
520
  return TSDB_CODE_SUCCESS;
×
521
}
522

523
static void resetRequest(STscStmt2* pStmt) {
300,347✔
524
  if (pStmt->exec.pRequest) {
300,347✔
525
    taos_free_result(pStmt->exec.pRequest);
168,114✔
526
    pStmt->exec.pRequest = NULL;
168,114✔
527
  }
528
  pStmt->asyncResultAvailable = false;
300,347✔
529
}
300,604✔
530

531
static int32_t stmtCleanBindInfo(STscStmt2* pStmt) {
826,792✔
532
  pStmt->bInfo.tbUid = 0;
826,792✔
533
  pStmt->bInfo.tbVgId = -1;
826,792✔
534
  pStmt->bInfo.tbType = 0;
826,792✔
535
  pStmt->bInfo.needParse = true;
826,792✔
536
  pStmt->bInfo.inExecCache = false;
826,792✔
537

538
  pStmt->bInfo.tbName[0] = 0;
826,792✔
539
  pStmt->bInfo.tbFName[0] = 0;
826,792✔
540
  if (!pStmt->bInfo.tagsCached) {
826,763✔
541
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
595,515✔
542
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
595,504✔
543
    pStmt->bInfo.boundTags = NULL;
595,504✔
544
  }
545

546
  if (!pStmt->bInfo.boundColsCached) {
826,752✔
547
    tSimpleHashCleanup(pStmt->bInfo.fixedValueCols);
331,072✔
548
    pStmt->bInfo.fixedValueCols = NULL;
331,072✔
549
  }
550

551
  if (!pStmt->sql.autoCreateTbl) {
826,821✔
552
    pStmt->bInfo.stbFName[0] = 0;
577,804✔
553
    pStmt->bInfo.tbSuid = 0;
577,804✔
554
  }
555

556
  STMT2_TLOG("finish clean bind info, tagsCached:%d, autoCreateTbl:%d", pStmt->bInfo.tagsCached,
826,723✔
557
             pStmt->sql.autoCreateTbl);
558

559
  return TSDB_CODE_SUCCESS;
826,779✔
560
}
561

UNCOV
562
static void stmtFreeTableBlkList(STableColsData* pTb) {
×
UNCOV
563
  (void)qResetStmtColumns(pTb->aCol, true);
×
UNCOV
564
  taosArrayDestroy(pTb->aCol);
×
UNCOV
565
}
×
566

567
static void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
495,555✔
568
  pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0);
495,555✔
569
  if (NULL == pTblBuf->pCurBuff) {
495,746✔
570
    tscError("QInfo:%p, fail to get buffer from list", pTblBuf);
52✔
UNCOV
571
    return;
×
572
  }
573
  pTblBuf->buffIdx = 1;
495,694✔
574
  pTblBuf->buffOffset = sizeof(*pQueue->head);
495,694✔
575

576
  pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
495,694✔
577
  pQueue->qRemainNum = 0;
495,694✔
578
  pQueue->head->next = NULL;
495,694✔
579
}
580

581
static int32_t stmtCleanExecInfo(STscStmt2* pStmt, bool keepTable, bool deepClean) {
801,316✔
582
  if (pStmt->sql.stbInterlaceMode) {
801,316✔
583
    if (deepClean) {
510,033✔
584
      taosHashCleanup(pStmt->exec.pBlockHash);
14,549✔
585
      pStmt->exec.pBlockHash = NULL;
14,549✔
586

587
      if (NULL != pStmt->exec.pCurrBlock) {
14,549✔
588
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->boundColsInfo.pColIndex);
12,611✔
589
        taosMemoryFreeClear(pStmt->exec.pCurrBlock->pData);
12,611✔
590
        qDestroyStmtDataBlock(pStmt->exec.pCurrBlock);
12,611✔
591
        pStmt->exec.pCurrBlock = NULL;
12,611✔
592
      }
593
      if (STMT_TYPE_QUERY != pStmt->sql.type) {
14,549✔
594
        resetRequest(pStmt);
14,549✔
595
      }
596
    } else {
597
      pStmt->sql.siInfo.pTableColsIdx = 0;
495,484✔
598
      stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue);
495,619✔
599
      tSimpleHashClear(pStmt->sql.siInfo.pTableRowDataHash);
495,624✔
600
    }
601
    if (NULL != pStmt->exec.pRequest) {
510,308✔
602
      pStmt->exec.pRequest->body.resInfo.numOfRows = 0;
495,759✔
603
    }
604
  } else {
605
    if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
291,283✔
606
      resetRequest(pStmt);
283,651✔
607
    }
608

609
    size_t keyLen = 0;
291,160✔
610
    void*  pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
291,160✔
611
    while (pIter) {
574,431✔
612
      STableDataCxt* pBlocks = *(STableDataCxt**)pIter;
283,271✔
613
      char*          key = taosHashGetKey(pIter, &keyLen);
283,271✔
614
      STableMeta*    pMeta = qGetTableMetaInDataBlock(pBlocks);
283,271✔
615

616
      if (keepTable && pBlocks == pStmt->exec.pCurrBlock) {
283,233✔
617
        TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData);
136,888✔
618
        STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
166,312✔
619

620
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
136,859✔
621
        continue;
136,926✔
622
      }
623

624
      qDestroyStmtDataBlock(pBlocks);
146,345✔
625
      STMT_ERR_RET(taosHashRemove(pStmt->exec.pBlockHash, key, keyLen));
146,345✔
626

627
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
146,310✔
628
    }
629

630
    if (keepTable) {
291,160✔
631
      STMT2_TLOG("finish clean exec info, stbInterlaceMode:%d, keepTable:%d, deepClean:%d", pStmt->sql.stbInterlaceMode,
144,383✔
632
                 keepTable, deepClean);
633
      return TSDB_CODE_SUCCESS;
144,383✔
634
    }
635

636
    taosHashCleanup(pStmt->exec.pBlockHash);
146,777✔
637
    pStmt->exec.pBlockHash = NULL;
146,777✔
638

639
    tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
146,777✔
640
    taosMemoryFreeClear(pStmt->exec.pCurrTbData);
146,777✔
641
  }
642

643
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
657,085✔
644
  STMT2_TLOG("finish clean exec info, stbInterlaceMode:%d, keepTable:%d, deepClean:%d", pStmt->sql.stbInterlaceMode,
657,020✔
645
             keepTable, deepClean);
646

647
  return TSDB_CODE_SUCCESS;
657,033✔
648
}
649

650
static void stmtFreeSingleVgDataBlock(void* p) {
687,583✔
651
  SVgDataBlocks* pVg = *(SVgDataBlocks**)p;
687,583✔
652
  if (pVg) {
687,583✔
653
    taosMemoryFree(pVg->pData);
687,648✔
654
    taosMemoryFree(pVg);
687,696✔
655
  }
656
}
687,666✔
657

658
static void stmtFreeVgDataBlocksForRetry(STscStmt2* pStmt) {
794,252✔
659
  if (pStmt->pVgDataBlocksForRetry) {
794,252✔
660
    taosArrayDestroyEx(pStmt->pVgDataBlocksForRetry, stmtFreeSingleVgDataBlock);
631,836✔
661
    pStmt->pVgDataBlocksForRetry = NULL;
631,831✔
662
  }
663
}
794,247✔
664

665
static int32_t stmtSaveVgDataBlocksForRetry(STscStmt2* pStmt) {
632,759✔
666
  stmtFreeVgDataBlocksForRetry(pStmt);
632,759✔
667

668
  SVnodeModifyOpStmt* pModif = (SVnodeModifyOpStmt*)pStmt->sql.pQuery->pRoot;
632,940✔
669
  if (!pModif || !pModif->pDataBlocks || taosArrayGetSize(pModif->pDataBlocks) == 0) {
632,969✔
NEW
670
    return TSDB_CODE_SUCCESS;
×
671
  }
672

673
  int32_t num = taosArrayGetSize(pModif->pDataBlocks);
632,956✔
674
  pStmt->pVgDataBlocksForRetry = taosArrayInit(num, POINTER_BYTES);
632,965✔
675
  if (!pStmt->pVgDataBlocksForRetry) {
632,987✔
NEW
676
    return terrno;
×
677
  }
678

679
  for (int32_t i = 0; i < num; i++) {
1,321,984✔
680
    SVgDataBlocks* pSrc = taosArrayGetP(pModif->pDataBlocks, i);
688,971✔
681
    SVgDataBlocks* pDst = taosMemoryMalloc(sizeof(SVgDataBlocks));
689,030✔
682
    if (!pDst) {
689,010✔
NEW
683
      stmtFreeVgDataBlocksForRetry(pStmt);
×
NEW
684
      return terrno;
×
685
    }
686
    *pDst = *pSrc;
689,010✔
687
    pDst->pData = taosMemoryMalloc(pSrc->size);
689,010✔
688
    if (!pDst->pData) {
688,991✔
NEW
689
      taosMemoryFree(pDst);
×
NEW
690
      stmtFreeVgDataBlocksForRetry(pStmt);
×
NEW
691
      return terrno;
×
692
    }
693
    (void)memcpy(pDst->pData, pSrc->pData, pSrc->size);
688,991✔
694
    if (NULL == taosArrayPush(pStmt->pVgDataBlocksForRetry, &pDst)) {
1,378,017✔
NEW
695
      taosMemoryFree(pDst->pData);
×
NEW
696
      taosMemoryFree(pDst);
×
NEW
697
      stmtFreeVgDataBlocksForRetry(pStmt);
×
NEW
698
      return terrno;
×
699
    }
700
  }
701
  return TSDB_CODE_SUCCESS;
633,013✔
702
}
703

704
static int32_t stmtRestoreVgDataBlocksForRetry(STscStmt2* pStmt) {
588✔
705
  SVnodeModifyOpStmt* pModif = (SVnodeModifyOpStmt*)pStmt->sql.pQuery->pRoot;
588✔
706
  if (!pModif || !pStmt->pVgDataBlocksForRetry) {
588✔
NEW
707
    return TSDB_CODE_SUCCESS;
×
708
  }
709
  // The planner owns pDataBlocks after createQueryPlan (via TSWAP); it has already freed
710
  // the old array. We simply restore a new clone here.
711
  pModif->pDataBlocks = pStmt->pVgDataBlocksForRetry;
588✔
712
  pStmt->pVgDataBlocksForRetry = NULL;
588✔
713
  return TSDB_CODE_SUCCESS;
588✔
714
}
715

716
static STableMeta* stmtCloneTableMetaForRetry(const STableMeta* pSrc) {
392✔
717
  int32_t sz = (int32_t)TABLE_META_FULL_SIZE(pSrc);
392✔
718
  if (sz <= 0) {
392✔
NEW
719
    return NULL;
×
720
  }
721
  STableMeta* p = taosMemoryMalloc(sz);
392✔
722
  if (p == NULL) {
392✔
NEW
723
    return NULL;
×
724
  }
725
  (void)memcpy(p, pSrc, sz);
392✔
726
  tableMetaResetPointers(p);
392✔
727
  return p;
392✔
728
}
729

730
static void stmtFreeUidTableMetaHash(SHashObj* pHash) {
196✔
731
  if (pHash == NULL) {
196✔
NEW
732
    return;
×
733
  }
734
  void* pIter = NULL;
196✔
735
  while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
588✔
736
    STableMeta* pMeta = *(STableMeta**)pIter;
392✔
737
    taosMemoryFree(pMeta);
392✔
738
  }
739
  taosHashCleanup(pHash);
196✔
740
}
741

742
// tRowGet may succeed with a wrong prefix schema but leave VAR column pointers outside the SRow allocation;
743
// tRowBuild would then memcpy OOB. Require all VAR payloads to lie within [pRow, pRow + pRow->len).
744
static bool stmtScolValVarPayloadInRow(const SRow* pRow, const SColVal* pCv, int8_t colType) {
1,568✔
745
  if (!COL_VAL_IS_VALUE(pCv) || !IS_VAR_DATA_TYPE(colType)) {
1,568✔
746
    return true;
784✔
747
  }
748
  if (pCv->value.nData == 0) {
784✔
NEW
749
    return true;
×
750
  }
751
  if (pCv->value.pData == NULL) {
784✔
NEW
752
    return false;
×
753
  }
754
  const uint8_t* rbeg = (const uint8_t*)pRow;
784✔
755
  const uint8_t* rend = rbeg + pRow->len;
784✔
756
  const uint8_t* p = (const uint8_t*)pCv->value.pData;
784✔
757
  return (p >= rbeg) && (p + pCv->value.nData <= rend);
784✔
758
}
759

760
// Infer decode STSchema: full column count when row sver matches catalog; else try prefix column counts (ADD COLUMN).
761
static int32_t stmtFindDecodeSchemaForRow(SRow* pRow, const STableMeta* pMeta, STSchema** ppOld, int32_t* pnOldCols) {
392✔
762
  uint16_t oldSver = pRow->sver;
392✔
763
  int32_t  nMax = pMeta->tableInfo.numOfColumns;
392✔
764
  SSchema* base = (SSchema*)&pMeta->schema[0];
392✔
765

766
  if ((int32_t)oldSver == pMeta->sversion) {
392✔
NEW
767
    STSchema* p = tBuildTSchema(base, nMax, (int32_t)oldSver);
×
NEW
768
    if (p == NULL) {
×
NEW
769
      return terrno;
×
770
    }
NEW
771
    bool ok = true;
×
NEW
772
    for (int32_t i = 0; i < nMax; ++i) {
×
NEW
773
      SColVal cv = {0};
×
NEW
774
      if (tRowGet(pRow, p, i, &cv) != 0) {
×
NEW
775
        ok = false;
×
NEW
776
        break;
×
777
      }
NEW
778
      if (!stmtScolValVarPayloadInRow(pRow, &cv, p->columns[i].type)) {
×
NEW
779
        ok = false;
×
NEW
780
        break;
×
781
      }
782
    }
NEW
783
    if (ok) {
×
NEW
784
      *ppOld = p;
×
NEW
785
      *pnOldCols = nMax;
×
NEW
786
      return TSDB_CODE_SUCCESS;
×
787
    }
NEW
788
    tDestroyTSchema(p);
×
NEW
789
    return TSDB_CODE_INVALID_PARA;
×
790
  }
791

792
  for (int32_t n = nMax; n >= 1; --n) {
784✔
793
    STSchema* pTry = tBuildTSchema(base, n, (int32_t)oldSver);
784✔
794
    if (pTry == NULL) {
784✔
NEW
795
      return terrno;
×
796
    }
797
    bool ok = true;
784✔
798
    for (int32_t i = 0; i < n; ++i) {
1,960✔
799
      SColVal cv = {0};
1,568✔
800
      if (tRowGet(pRow, pTry, i, &cv) != 0) {
1,568✔
NEW
801
        ok = false;
×
802
        break;
392✔
803
      }
804
      if (!stmtScolValVarPayloadInRow(pRow, &cv, pTry->columns[i].type)) {
1,568✔
805
        ok = false;
392✔
806
        break;
392✔
807
      }
808
    }
809
    if (ok) {
784✔
810
      *ppOld = pTry;
392✔
811
      *pnOldCols = n;
392✔
812
      return TSDB_CODE_SUCCESS;
392✔
813
    }
814
    tDestroyTSchema(pTry);
392✔
815
  }
NEW
816
  return TSDB_CODE_INVALID_PARA;
×
817
}
818

819
static int32_t stmtRebuildOneRowToLatestSchema(SRow* pOldRow, const STableMeta* pMeta, SRow** ppNewRow) {
392✔
820
  STSchema* pOldSch = NULL;
392✔
821
  int32_t   nOldCols = 0;
392✔
822
  int32_t   code = stmtFindDecodeSchemaForRow(pOldRow, pMeta, &pOldSch, &nOldCols);
392✔
823
  if (code != TSDB_CODE_SUCCESS) {
392✔
NEW
824
    return code;
×
825
  }
826

827
  int32_t   nNewCols = pMeta->tableInfo.numOfColumns;
392✔
828
  STSchema* pNewSch = tBuildTSchema((SSchema*)&pMeta->schema[0], nNewCols, pMeta->sversion);
392✔
829
  if (pNewSch == NULL) {
392✔
NEW
830
    tDestroyTSchema(pOldSch);
×
NEW
831
    return terrno;
×
832
  }
833

834
  for (int32_t j = 0; j < nOldCols && j < nNewCols; ++j) {
1,176✔
835
    if (pOldSch->columns[j].colId != pNewSch->columns[j].colId) {
784✔
NEW
836
      tDestroyTSchema(pOldSch);
×
NEW
837
      tDestroyTSchema(pNewSch);
×
NEW
838
      return TSDB_CODE_INVALID_PARA;
×
839
    }
840
  }
841

842
  SArray* aColVal = taosArrayInit(pNewSch->numOfCols, sizeof(SColVal));
392✔
843
  if (aColVal == NULL) {
392✔
NEW
844
    tDestroyTSchema(pOldSch);
×
NEW
845
    tDestroyTSchema(pNewSch);
×
NEW
846
    return terrno;
×
847
  }
848

849
  for (int32_t j = 0; j < pNewSch->numOfCols; ++j) {
1,568✔
850
    SColVal cv = {0};
1,176✔
851
    if (j < nOldCols) {
1,176✔
852
      code = tRowGet(pOldRow, pOldSch, j, &cv);
784✔
853
      if (code != TSDB_CODE_SUCCESS) {
784✔
NEW
854
        taosArrayDestroy(aColVal);
×
NEW
855
        tDestroyTSchema(pOldSch);
×
NEW
856
        tDestroyTSchema(pNewSch);
×
NEW
857
        return code;
×
858
      }
859
    } else {
860
      STColumn* pc = &pNewSch->columns[j];
392✔
861
      cv = COL_VAL_NONE(pc->colId, pc->type);
392✔
862
    }
863
    if (taosArrayPush(aColVal, &cv) == NULL) {
1,176✔
NEW
864
      code = terrno;
×
NEW
865
      taosArrayDestroy(aColVal);
×
NEW
866
      tDestroyTSchema(pOldSch);
×
NEW
867
      tDestroyTSchema(pNewSch);
×
NEW
868
      return code;
×
869
    }
870
  }
871

872
  SRowBuildScanInfo sinfo = {0};
392✔
873
  code = tRowBuild(aColVal, pNewSch, ppNewRow, &sinfo);
392✔
874
  taosArrayDestroy(aColVal);
392✔
875
  tDestroyTSchema(pOldSch);
392✔
876
  tDestroyTSchema(pNewSch);
392✔
877
  return code;
392✔
878
}
879

880
static void stmtFreeHeapPatchRowsArray(SArray* aHeapRows) {
196✔
881
  if (aHeapRows == NULL) {
196✔
NEW
882
    return;
×
883
  }
884
  int32_t n = (int32_t)taosArrayGetSize(aHeapRows);
196✔
885
  for (int32_t i = 0; i < n; ++i) {
588✔
886
    SRow* p = taosArrayGetP(aHeapRows, i);
392✔
887
    tRowDestroy(p);
392✔
888
  }
889
  taosArrayDestroy(aHeapRows);
196✔
890
}
891

892
// After refreshMeta: set sver from catalog; decode each row with inferred old schema and tRowBuild with latest schema.
893
// aHeapRows: receives pointers from tRowBuild so they can be freed before tDestroySubmitReq (decode path does not free rows).
894
static void stmtPatchOneSubmitTbDataSchemaVer(SSubmitTbData* pTb, SHashObj* pUidMetaHash, SArray* aHeapRows) {
196✔
895
  if (pTb->uid == 0) {
196✔
NEW
896
    return;
×
897
  }
898
  void* pMv = taosHashGet(pUidMetaHash, &pTb->uid, sizeof(uint64_t));
196✔
899
  if (pMv == NULL) {
196✔
NEW
900
    return;
×
901
  }
902
  STableMeta* pMeta = *(STableMeta**)pMv;
196✔
903
  pTb->sver = pMeta->sversion;
196✔
904
  if (pTb->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
196✔
NEW
905
    return;
×
906
  }
907
  if (pTb->aRowP == NULL) {
196✔
NEW
908
    return;
×
909
  }
910
  if (pTb->pBlobSet != NULL) {
196✔
NEW
911
    int32_t nRow = (int32_t)TARRAY_SIZE(pTb->aRowP);
×
NEW
912
    SRow**  rows = (SRow**)TARRAY_DATA(pTb->aRowP);
×
NEW
913
    for (int32_t i = 0; i < nRow; ++i) {
×
NEW
914
      if (rows[i] != NULL) {
×
NEW
915
        rows[i]->sver = (uint16_t)pMeta->sversion;
×
916
      }
917
    }
NEW
918
    return;
×
919
  }
920

921
  int32_t nRow = (int32_t)TARRAY_SIZE(pTb->aRowP);
196✔
922
  for (int32_t i = 0; i < nRow; ++i) {
588✔
923
    SRow* pRow = taosArrayGetP(pTb->aRowP, i);
392✔
924
    if (pRow == NULL) {
392✔
NEW
925
      continue;
×
926
    }
927
    if ((uint16_t)pMeta->sversion == pRow->sver) {
392✔
NEW
928
      continue;
×
929
    }
930
    if (pRow->flag & HAS_BLOB) {
392✔
NEW
931
      pRow->sver = (uint16_t)pMeta->sversion;
×
NEW
932
      continue;
×
933
    }
934
    SRow* pNew = NULL;
392✔
935
    if (stmtRebuildOneRowToLatestSchema(pRow, pMeta, &pNew) == TSDB_CODE_SUCCESS && pNew != NULL) {
392✔
936
      // pRow points into the decoded submit payload (tDecodeBinaryWithSize); do not tRowDestroy it.
937
      if (aHeapRows != NULL && taosArrayPush(aHeapRows, &pNew) == NULL) {
784✔
NEW
938
        tRowDestroy(pNew);
×
939
        // Cannot record heap row for destroy before tDestroySubmitReq; keep embedded row, bump sver only.
NEW
940
        pRow->sver = (uint16_t)pMeta->sversion;
×
941
      } else {
942
        (void)taosArraySet(pTb->aRowP, i, &pNew);
392✔
943
      }
944
    } else {
NEW
945
      pRow->sver = (uint16_t)pMeta->sversion;
×
946
    }
947
  }
948
}
949

950
static int32_t stmtBuildUidToTableMetaHash(STscStmt2* pStmt, SRequestObj* pRequest, SHashObj** ppHash) {
196✔
951
  int32_t code = TSDB_CODE_SUCCESS;
196✔
952
  *ppHash = NULL;
196✔
953

954
  if (NULL == pStmt->pCatalog) {
196✔
NEW
955
    code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
×
NEW
956
    if (code != TSDB_CODE_SUCCESS) {
×
NEW
957
      return code;
×
958
    }
959
  }
960

961
  SHashObj* pHash =
962
      taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
196✔
963
  if (pHash == NULL) {
196✔
NEW
964
    return terrno;
×
965
  }
966

967
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
196✔
968
                           .requestId = pRequest->requestId,
196✔
969
                           .requestObjRefId = pRequest->self,
196✔
970
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
196✔
971

972
  int32_t tblNum = pRequest->tableList ? (int32_t)taosArrayGetSize(pRequest->tableList) : 0;
196✔
973
  for (int32_t i = 0; i < tblNum; ++i) {
588✔
974
    SName*      pName = taosArrayGet(pRequest->tableList, i);
392✔
975
    STableMeta* pMeta = NULL;
392✔
976
    int32_t     c = catalogGetTableMeta(pStmt->pCatalog, &conn, pName, &pMeta);
392✔
977
    if (c != TSDB_CODE_SUCCESS) {
392✔
NEW
978
      if (pMeta != NULL) {
×
NEW
979
        taosMemoryFree(pMeta);
×
980
      }
NEW
981
      taosHashCleanup(pHash);
×
NEW
982
      return c;
×
983
    }
984
    if (pMeta != NULL) {
392✔
985
      STableMeta* pDup = stmtCloneTableMetaForRetry(pMeta);
392✔
986
      taosMemoryFree(pMeta);
392✔
987
      pMeta = NULL;
392✔
988
      if (pDup != NULL) {
392✔
989
        int32_t putCode = taosHashPut(pHash, &pDup->uid, sizeof(uint64_t), &pDup, POINTER_BYTES);
392✔
990
        if (putCode != TSDB_CODE_SUCCESS) {
392✔
NEW
991
          STMT2_ELOG("stmtBuildUidToTableMetaHash taosHashPut failed uid:%" PRIu64 ", code:%s", (uint64_t)pDup->uid,
×
992
                     tstrerror(putCode));
NEW
993
          taosMemoryFree(pDup);
×
NEW
994
          taosHashCleanup(pHash);
×
NEW
995
          return putCode;
×
996
        }
997
      }
998
    }
999
  }
1000

1001
  if (taosHashGetSize(pHash) == 0 && pStmt->bInfo.sname.type != 0) {
196✔
NEW
1002
    STableMeta* pMeta = NULL;
×
NEW
1003
    code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pMeta);
×
NEW
1004
    if (code == TSDB_CODE_SUCCESS && pMeta != NULL) {
×
NEW
1005
      STableMeta* pDup = stmtCloneTableMetaForRetry(pMeta);
×
NEW
1006
      taosMemoryFree(pMeta);
×
NEW
1007
      pMeta = NULL;
×
NEW
1008
      if (pDup != NULL) {
×
NEW
1009
        int32_t putCode = taosHashPut(pHash, &pDup->uid, sizeof(uint64_t), &pDup, POINTER_BYTES);
×
NEW
1010
        if (putCode != TSDB_CODE_SUCCESS) {
×
NEW
1011
          STMT2_ELOG("stmtBuildUidToTableMetaHash taosHashPut failed uid:%" PRIu64 ", code:%s", (uint64_t)pDup->uid,
×
1012
                     tstrerror(putCode));
NEW
1013
          taosMemoryFree(pDup);
×
NEW
1014
          taosHashCleanup(pHash);
×
NEW
1015
          return putCode;
×
1016
        }
1017
      }
NEW
1018
    } else if (pMeta != NULL) {
×
NEW
1019
      taosMemoryFree(pMeta);
×
1020
    }
1021
  }
1022

1023
  *ppHash = pHash;
196✔
1024
  return TSDB_CODE_SUCCESS;
196✔
1025
}
1026

1027
static int32_t stmtUpdateVgDataBlocksSchemaVer(STscStmt2* pStmt, SRequestObj* pRequest) {
196✔
1028
  if (pStmt->pVgDataBlocksForRetry == NULL || taosArrayGetSize(pStmt->pVgDataBlocksForRetry) == 0) {
196✔
NEW
1029
    return TSDB_CODE_SUCCESS;
×
1030
  }
1031

1032
  SHashObj* pUidMetaHash = NULL;
196✔
1033
  int32_t   code = stmtBuildUidToTableMetaHash(pStmt, pRequest, &pUidMetaHash);
196✔
1034
  if (code != TSDB_CODE_SUCCESS) {
196✔
NEW
1035
    return code;
×
1036
  }
1037
  if (pUidMetaHash == NULL || taosHashGetSize(pUidMetaHash) == 0) {
196✔
NEW
1038
    if (pUidMetaHash != NULL) {
×
NEW
1039
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
1040
    }
NEW
1041
    return TSDB_CODE_SUCCESS;
×
1042
  }
1043

1044
  const int32_t headSz = (int32_t)sizeof(SSubmitReq2Msg);
196✔
1045
  int32_t       nBlk = (int32_t)taosArrayGetSize(pStmt->pVgDataBlocksForRetry);
196✔
1046

1047
  for (int32_t b = 0; b < nBlk; ++b) {
392✔
1048
    SVgDataBlocks* pVg = *(SVgDataBlocks**)taosArrayGet(pStmt->pVgDataBlocksForRetry, b);
196✔
1049
    if (pVg == NULL || pVg->pData == NULL || pVg->size <= headSz) {
196✔
NEW
1050
      continue;
×
1051
    }
1052

1053
    SDecoder     decoder = {0};
196✔
1054
    int32_t      bodyLen = pVg->size - headSz;
196✔
1055
    SSubmitReq2  req = {0};
196✔
1056

1057
    tDecoderInit(&decoder, (uint8_t*)pVg->pData + headSz, bodyLen);
196✔
1058
    code = tDecodeSubmitReq(&decoder, &req, NULL);
196✔
1059
    tDecoderClear(&decoder);
196✔
1060
    if (code != TSDB_CODE_SUCCESS) {
196✔
NEW
1061
      STMT2_ELOG("tDecodeSubmitReq failed when patching schema ver for retry, code:%s", tstrerror(code));
×
NEW
1062
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
NEW
1063
      return code;
×
1064
    }
1065
    if (req.raw) {
196✔
NEW
1066
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
NEW
1067
      continue;
×
1068
    }
1069

1070
    SArray* aHeapRows = taosArrayInit(8, POINTER_BYTES);
196✔
1071
    if (aHeapRows == NULL) {
196✔
NEW
1072
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
NEW
1073
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
NEW
1074
      return terrno;
×
1075
    }
1076

1077
    int32_t nTb = (int32_t)taosArrayGetSize(req.aSubmitTbData);
196✔
1078
    for (int32_t t = 0; t < nTb; ++t) {
392✔
1079
      stmtPatchOneSubmitTbDataSchemaVer(taosArrayGet(req.aSubmitTbData, t), pUidMetaHash, aHeapRows);
196✔
1080
    }
1081

1082
    int32_t encCap = 0;
196✔
1083
    int32_t szRet = 0;
196✔
1084
    tEncodeSize(tEncodeSubmitReq, &req, encCap, szRet);
196✔
1085
    if (szRet != 0) {
196✔
NEW
1086
      stmtFreeHeapPatchRowsArray(aHeapRows);
×
NEW
1087
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
NEW
1088
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
NEW
1089
      return TSDB_CODE_INVALID_PARA;
×
1090
    }
1091

1092
    int32_t allocLen = headSz + encCap;
196✔
1093
    void*   pNew = taosMemoryMalloc(allocLen);
196✔
1094
    if (pNew == NULL) {
196✔
NEW
1095
      stmtFreeHeapPatchRowsArray(aHeapRows);
×
NEW
1096
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
NEW
1097
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
NEW
1098
      return terrno;
×
1099
    }
1100

1101
    (void)memcpy(pNew, pVg->pData, headSz);
196✔
1102
    ((SSubmitReq2Msg*)pNew)->header.vgId = htonl(pVg->vg.vgId);
196✔
1103
    ((SSubmitReq2Msg*)pNew)->version = htobe64(1);
196✔
1104

1105
    SEncoder encoder = {0};
196✔
1106
    tEncoderInit(&encoder, (uint8_t*)pNew + headSz, encCap);
196✔
1107
    code = tEncodeSubmitReq(&encoder, &req);
196✔
1108
    int32_t bodyWritten = (int32_t)encoder.pos;
196✔
1109
    tEncoderClear(&encoder);
196✔
1110
    stmtFreeHeapPatchRowsArray(aHeapRows);
196✔
1111
    tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
196✔
1112

1113
    if (code != TSDB_CODE_SUCCESS) {
196✔
NEW
1114
      taosMemoryFree(pNew);
×
NEW
1115
      stmtFreeUidTableMetaHash(pUidMetaHash);
×
NEW
1116
      return code;
×
1117
    }
1118

1119
    int32_t totalLen = headSz + bodyWritten;
196✔
1120
    ((SSubmitReq2Msg*)pNew)->header.contLen = htonl(totalLen);
196✔
1121

1122
    taosMemoryFree(pVg->pData);
196✔
1123
    pVg->pData = pNew;
196✔
1124
    pVg->size = totalLen;
196✔
1125
  }
1126

1127
  stmtFreeUidTableMetaHash(pUidMetaHash);
196✔
1128
  return TSDB_CODE_SUCCESS;
196✔
1129
}
1130

1131
typedef struct SStmtRetryTbPatch {
1132
  uint64_t uid;
1133
  uint64_t suid;
1134
  int32_t  sver;
1135
} SStmtRetryTbPatch;
1136

1137
// After refreshMeta, drop cached tbName->uid from stmt2 interlace bind so insGetStmtTableVgUid refetches from catalog.
1138
static void stmtInvalidateStbInterlaceTableUidCache(STscStmt2* pStmt) {
588✔
1139
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pTableHash != NULL) {
588✔
1140
    tSimpleHashClear(pStmt->sql.siInfo.pTableHash);
588✔
1141
  }
1142
}
588✔
1143

1144
// Super-table catalog meta uses uid == suid (see queryCreateTableMetaFromMsg); that must not be written onto
1145
// child-table SSubmitTbData. Only use child/normal/virtual-child meta here.
1146
static bool stmtRetryTbMetaIsSuperTable(const STableMeta* pMeta) {
784✔
1147
  return (pMeta != NULL && pMeta->tableType == TSDB_SUPER_TABLE);
784✔
1148
}
1149

1150
// Resolve uid/suid/sver for one SSubmitTbData after catalog refresh. tbIdx is the index within this submit req.
1151
static int32_t stmtFetchOneRetryTbMetaPatch(STscStmt2* pStmt, SRequestObj* pRequest, SSubmitTbData* pTb, int32_t tbIdx,
392✔
1152
                                            int32_t nSubmitTb, SStmtRetryTbPatch* pPatch) {
1153
  if (NULL == pStmt->pCatalog) {
392✔
NEW
1154
    int32_t c = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
×
NEW
1155
    if (c != TSDB_CODE_SUCCESS) {
×
NEW
1156
      return c;
×
1157
    }
1158
  }
1159

1160
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
392✔
1161
                           .requestId = pRequest->requestId,
392✔
1162
                           .requestObjRefId = pRequest->self,
392✔
1163
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
392✔
1164

1165
  // 1) Auto-create child: look up by child table name (never use STB-only name without child name).
1166
  if (pTb->pCreateTbReq != NULL && pTb->pCreateTbReq->name != NULL) {
392✔
NEW
1167
    SName         nm = {0};
×
NEW
1168
    int32_t       nc = TSDB_CODE_SUCCESS;
×
NEW
1169
    STableMeta*   pMeta = NULL;
×
NEW
1170
    if (pStmt->bInfo.sname.type != 0) {
×
NEW
1171
      tNameAssign(&nm, &pStmt->bInfo.sname);
×
NEW
1172
      nc = tNameAddTbName(&nm, pTb->pCreateTbReq->name, strlen(pTb->pCreateTbReq->name));
×
NEW
1173
    } else if (pRequest->tableList != NULL && taosArrayGetSize(pRequest->tableList) > 0) {
×
NEW
1174
      SName* p0 = taosArrayGet(pRequest->tableList, 0);
×
NEW
1175
      tNameAssign(&nm, p0);
×
NEW
1176
      nc = tNameAddTbName(&nm, pTb->pCreateTbReq->name, strlen(pTb->pCreateTbReq->name));
×
1177
    } else {
NEW
1178
      STMT2_ELOG_E("retry patch: no db/sname context for createTbReq name");
×
NEW
1179
      return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1180
    }
NEW
1181
    if (nc != TSDB_CODE_SUCCESS) {
×
NEW
1182
      return nc;
×
1183
    }
NEW
1184
    nc = catalogGetTableMeta(pStmt->pCatalog, &conn, &nm, &pMeta);
×
NEW
1185
    if (nc != TSDB_CODE_SUCCESS) {
×
NEW
1186
      taosMemoryFreeClear(pMeta);
×
NEW
1187
      return nc;
×
1188
    }
NEW
1189
    if (pMeta == NULL) {
×
NEW
1190
      return TSDB_CODE_INTERNAL_ERROR;
×
1191
    }
NEW
1192
    if (stmtRetryTbMetaIsSuperTable(pMeta)) {
×
NEW
1193
      taosMemoryFree(pMeta);
×
NEW
1194
      STMT2_ELOG_E("retry patch: createTbReq resolved to super table meta (unexpected)");
×
NEW
1195
      return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1196
    }
NEW
1197
    pPatch->uid = pMeta->uid;
×
NEW
1198
    pPatch->suid = pMeta->suid;
×
NEW
1199
    pPatch->sver = pMeta->sversion;
×
NEW
1200
    taosMemoryFree(pMeta);
×
NEW
1201
    return TSDB_CODE_SUCCESS;
×
1202
  }
1203

1204
  // 2) request->tableList: align tbIdx with the tbIdx-th non-super-table entry (skip super table names).
1205
  if (pRequest->tableList != NULL) {
392✔
1206
    int32_t          nList = (int32_t)taosArrayGetSize(pRequest->tableList);
392✔
1207
    int32_t          nonStbOrd = 0;
392✔
1208
    for (int32_t li = 0; li < nList; ++li) {
784✔
1209
      SName*      pName = taosArrayGet(pRequest->tableList, li);
784✔
1210
      STableMeta* pMeta = NULL;
784✔
1211
      int32_t     c = catalogGetTableMeta(pStmt->pCatalog, &conn, pName, &pMeta);
784✔
1212
      if (c != TSDB_CODE_SUCCESS) {
784✔
NEW
1213
        taosMemoryFreeClear(pMeta);
×
1214
        return c;
392✔
1215
      }
1216
      if (pMeta == NULL) {
784✔
NEW
1217
        return TSDB_CODE_INTERNAL_ERROR;
×
1218
      }
1219
      if (stmtRetryTbMetaIsSuperTable(pMeta)) {
784✔
1220
        taosMemoryFree(pMeta);
392✔
1221
        continue;
392✔
1222
      }
1223
      if (nonStbOrd == tbIdx) {
392✔
1224
        pPatch->uid = pMeta->uid;
392✔
1225
        pPatch->suid = pMeta->suid;
392✔
1226
        pPatch->sver = pMeta->sversion;
392✔
1227
        taosMemoryFree(pMeta);
392✔
1228
        return TSDB_CODE_SUCCESS;
392✔
1229
      }
NEW
1230
      taosMemoryFree(pMeta);
×
NEW
1231
      nonStbOrd++;
×
1232
    }
1233
  }
1234

1235
  // 3) Single-table statement: bInfo.sname
NEW
1236
  if (nSubmitTb == 1 && pStmt->bInfo.sname.type != 0) {
×
NEW
1237
    STableMeta* pMeta = NULL;
×
NEW
1238
    int32_t     c = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pMeta);
×
NEW
1239
    if (c != TSDB_CODE_SUCCESS) {
×
NEW
1240
      taosMemoryFreeClear(pMeta);
×
NEW
1241
      return c;
×
1242
    }
NEW
1243
    if (pMeta == NULL) {
×
NEW
1244
      return TSDB_CODE_INTERNAL_ERROR;
×
1245
    }
NEW
1246
    if (stmtRetryTbMetaIsSuperTable(pMeta)) {
×
NEW
1247
      taosMemoryFree(pMeta);
×
NEW
1248
      STMT2_ELOG_E("retry patch: bInfo.sname resolved to super table meta; need child table name");
×
NEW
1249
      return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1250
    }
NEW
1251
    pPatch->uid = pMeta->uid;
×
NEW
1252
    pPatch->suid = pMeta->suid;
×
NEW
1253
    pPatch->sver = pMeta->sversion;
×
NEW
1254
    taosMemoryFree(pMeta);
×
NEW
1255
    return TSDB_CODE_SUCCESS;
×
1256
  }
1257

NEW
1258
  STMT2_ELOG("retry patch: cannot resolve catalog meta for submit block (tb idx %d, uid %" PRId64 ")", tbIdx,
×
1259
             (int64_t)pTb->uid);
NEW
1260
  return TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
1261
}
1262

1263
// TSDB_CODE_TDB_TABLE_NOT_EXIST: refresh child table uid/suid/sver in serialized submit from catalog.
1264
static int32_t stmtUpdateVgDataBlocksTbMetaFromCatalog(STscStmt2* pStmt, SRequestObj* pRequest) {
392✔
1265
  if (pStmt->pVgDataBlocksForRetry == NULL || taosArrayGetSize(pStmt->pVgDataBlocksForRetry) == 0) {
392✔
NEW
1266
    return TSDB_CODE_SUCCESS;
×
1267
  }
1268

1269
  const int32_t headSz = (int32_t)sizeof(SSubmitReq2Msg);
392✔
1270
  int32_t       nBlk = (int32_t)taosArrayGetSize(pStmt->pVgDataBlocksForRetry);
392✔
1271

1272
  for (int32_t b = 0; b < nBlk; ++b) {
784✔
1273
    SVgDataBlocks* pVg = *(SVgDataBlocks**)taosArrayGet(pStmt->pVgDataBlocksForRetry, b);
392✔
1274
    if (pVg == NULL || pVg->pData == NULL || pVg->size <= headSz) {
392✔
NEW
1275
      continue;
×
1276
    }
1277

1278
    SDecoder     decoder = {0};
392✔
1279
    int32_t      bodyLen = pVg->size - headSz;
392✔
1280
    SSubmitReq2  req = {0};
392✔
1281
    int32_t      code = 0;
392✔
1282

1283
    tDecoderInit(&decoder, (uint8_t*)pVg->pData + headSz, bodyLen);
392✔
1284
    code = tDecodeSubmitReq(&decoder, &req, NULL);
392✔
1285
    tDecoderClear(&decoder);
392✔
1286
    if (code != TSDB_CODE_SUCCESS) {
392✔
NEW
1287
      STMT2_ELOG("tDecodeSubmitReq failed when patching table meta for retry, code:%s", tstrerror(code));
×
NEW
1288
      return code;
×
1289
    }
1290
    if (req.raw) {
392✔
NEW
1291
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
NEW
1292
      continue;
×
1293
    }
1294

1295
    int32_t nTb = (int32_t)taosArrayGetSize(req.aSubmitTbData);
392✔
1296
    for (int32_t t = 0; t < nTb; ++t) {
784✔
1297
      SStmtRetryTbPatch patch = {0};
392✔
1298
      code = stmtFetchOneRetryTbMetaPatch(pStmt, pRequest, taosArrayGet(req.aSubmitTbData, t), t, nTb, &patch);
392✔
1299
      if (code != TSDB_CODE_SUCCESS) {
392✔
NEW
1300
        tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
NEW
1301
        return code;
×
1302
      }
1303
      SSubmitTbData* pRow = taosArrayGet(req.aSubmitTbData, t);
392✔
1304
      pRow->uid = (int64_t)patch.uid;
392✔
1305
      pRow->suid = (int64_t)patch.suid;
392✔
1306
      pRow->sver = patch.sver;
392✔
1307
    }
1308

1309
    int32_t encCap = 0;
392✔
1310
    int32_t szRet = 0;
392✔
1311
    tEncodeSize(tEncodeSubmitReq, &req, encCap, szRet);
392✔
1312
    if (szRet != 0) {
392✔
NEW
1313
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
NEW
1314
      return TSDB_CODE_INVALID_PARA;
×
1315
    }
1316

1317
    int32_t allocLen = headSz + encCap;
392✔
1318
    void*   pNew = taosMemoryMalloc(allocLen);
392✔
1319
    if (pNew == NULL) {
392✔
NEW
1320
      tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
×
NEW
1321
      return terrno;
×
1322
    }
1323

1324
    (void)memcpy(pNew, pVg->pData, headSz);
392✔
1325
    ((SSubmitReq2Msg*)pNew)->header.vgId = htonl(pVg->vg.vgId);
392✔
1326
    ((SSubmitReq2Msg*)pNew)->version = htobe64(1);
392✔
1327

1328
    SEncoder encoder = {0};
392✔
1329
    tEncoderInit(&encoder, (uint8_t*)pNew + headSz, encCap);
392✔
1330
    code = tEncodeSubmitReq(&encoder, &req);
392✔
1331
    int32_t bodyWritten = (int32_t)encoder.pos;
392✔
1332
    tEncoderClear(&encoder);
392✔
1333
    tDestroySubmitReq(&req, TSDB_MSG_FLG_DECODE);
392✔
1334

1335
    if (code != TSDB_CODE_SUCCESS) {
392✔
NEW
1336
      taosMemoryFree(pNew);
×
NEW
1337
      return code;
×
1338
    }
1339

1340
    int32_t totalLen = headSz + bodyWritten;
392✔
1341
    ((SSubmitReq2Msg*)pNew)->header.contLen = htonl(totalLen);
392✔
1342

1343
    taosMemoryFree(pVg->pData);
392✔
1344
    pVg->pData = pNew;
392✔
1345
    pVg->size = totalLen;
392✔
1346
  }
1347

1348
  return TSDB_CODE_SUCCESS;
392✔
1349
}
1350

1351
static bool stmtIsSchemaVersionRetryError(int32_t err) {
196✔
1352
  return (bool)(NEED_CLIENT_REFRESH_TBLMETA_ERROR(err) || err == TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION);
196✔
1353
}
1354

1355
static void stmtFreeTbBuf(void* buf) {
144,273✔
1356
  void* pBuf = *(void**)buf;
144,273✔
1357
  taosMemoryFree(pBuf);
144,273✔
1358
}
144,273✔
1359

1360
static void stmtFreeTbCols(void* buf) {
12,611,000✔
1361
  SArray* pCols = *(SArray**)buf;
12,611,000✔
1362
  taosArrayDestroy(pCols);
12,611,000✔
1363
}
12,611,000✔
1364

1365
static int32_t stmtCleanSQLInfo(STscStmt2* pStmt) {
161,260✔
1366
  STMT2_TLOG_E("start to free SQL info");
161,260✔
1367

1368
  taosMemoryFree(pStmt->sql.pBindInfo);
161,260✔
1369
  taosMemoryFree(pStmt->sql.queryRes.fields);
161,326✔
1370
  taosMemoryFree(pStmt->sql.queryRes.userFields);
161,326✔
1371
  taosMemoryFree(pStmt->sql.sqlStr);
161,326✔
1372
  qDestroyQuery(pStmt->sql.pQuery);
161,326✔
1373
  taosArrayDestroy(pStmt->sql.nodeList);
161,326✔
1374
  taosHashCleanup(pStmt->sql.pVgHash);
161,326✔
1375
  pStmt->sql.pVgHash = NULL;
161,326✔
1376
  if (pStmt->sql.fixValueTags) {
161,326✔
1377
    pStmt->sql.fixValueTags = false;
3,314✔
1378
    tdDestroySVCreateTbReq(pStmt->sql.fixValueTbReq);
3,314✔
1379
    taosMemoryFreeClear(pStmt->sql.fixValueTbReq);
3,314✔
1380
    pStmt->sql.fixValueTbReq = NULL;
3,314✔
1381
  }
1382

1383
  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
161,326✔
1384
  while (pIter) {
167,816✔
1385
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
6,490✔
1386

1387
    qDestroyStmtDataBlock(pCache->pDataCtx);
6,490✔
1388
    qDestroyBoundColInfo(pCache->boundTags);
6,490✔
1389
    taosMemoryFreeClear(pCache->boundTags);
6,490✔
1390

1391
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
6,490✔
1392
  }
1393
  taosHashCleanup(pStmt->sql.pTableCache);
161,326✔
1394
  pStmt->sql.pTableCache = NULL;
161,326✔
1395

1396
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
161,326✔
1397
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
161,326✔
1398
  stmtFreeVgDataBlocksForRetry(pStmt);
161,297✔
1399

1400
  taos_free_result(pStmt->sql.siInfo.pRequest);
161,326✔
1401
  taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
161,326✔
1402
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
161,291✔
1403
  tSimpleHashCleanup(pStmt->sql.siInfo.pTableRowDataHash);
161,326✔
1404
  taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
161,326✔
1405
  taosMemoryFree(pStmt->sql.siInfo.pTSchema);
161,326✔
1406
  qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
161,326✔
1407
  taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
161,326✔
1408
  pStmt->sql.siInfo.pTableCols = NULL;
161,326✔
1409

1410
  (void)memset(&pStmt->sql, 0, sizeof(pStmt->sql));
161,326✔
1411
  pStmt->sql.siInfo.tableColsReady = true;
161,326✔
1412

1413
  STMT2_TLOG_E("end to free SQL info");
161,326✔
1414

1415
  return TSDB_CODE_SUCCESS;
161,326✔
1416
}
1417

1418
static int32_t stmtTryAddTableVgroupInfo(STscStmt2* pStmt, int32_t* vgId) {
663,654✔
1419
  if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) {
663,654✔
1420
    return TSDB_CODE_SUCCESS;
1,960✔
1421
  }
1422

1423
  SVgroupInfo      vgInfo = {0};
661,694✔
1424
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
661,694✔
1425
                           .requestId = pStmt->exec.pRequest->requestId,
661,694✔
1426
                           .requestObjRefId = pStmt->exec.pRequest->self,
661,694✔
1427
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
661,694✔
1428

1429
  int32_t code = catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo);
662,006✔
1430
  if (TSDB_CODE_SUCCESS != code) {
662,006✔
UNCOV
1431
    STMT2_ELOG("fail to get vgroup info from catalog, code:%d", code);
×
UNCOV
1432
    return code;
×
1433
  }
1434

1435
  code =
1436
      taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
662,006✔
1437
  if (TSDB_CODE_SUCCESS != code) {
661,967✔
1438
    STMT2_ELOG("fail to put vgroup info, code:%d", code);
65✔
UNCOV
1439
    return code;
×
1440
  }
1441

1442
  *vgId = vgInfo.vgId;
661,902✔
1443

1444
  return TSDB_CODE_SUCCESS;
661,902✔
1445
}
1446

1447
int32_t stmtGetTableMetaAndValidate(STscStmt2* pStmt, uint64_t* uid, uint64_t* suid, int32_t* vgId, int8_t* tableType) {
9,994✔
1448
  STableMeta*      pTableMeta = NULL;
9,994✔
1449
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
9,994✔
1450
                           .requestId = pStmt->exec.pRequest->requestId,
10,034✔
1451
                           .requestObjRefId = pStmt->exec.pRequest->self,
10,034✔
1452
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
9,994✔
1453
  int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
10,034✔
1454

1455
  pStmt->stat.ctgGetTbMetaNum++;
10,034✔
1456

1457
  if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
10,034✔
1458
    STMT2_ELOG("tb %s not exist", pStmt->bInfo.tbFName);
980✔
1459
    (void)stmtCleanBindInfo(pStmt);
980✔
1460

1461
    if (!pStmt->sql.autoCreateTbl) {
980✔
1462
      STMT2_ELOG("table %s does not exist and autoCreateTbl is disabled", pStmt->bInfo.tbFName);
980✔
1463
      STMT_ERR_RET(TSDB_CODE_PAR_TABLE_NOT_EXIST);
980✔
1464
    }
1465

UNCOV
1466
    STMT_ERR_RET(code);
×
1467
  }
1468

1469
  STMT_ERR_RET(code);
9,054✔
1470

1471
  *uid = pTableMeta->uid;
9,054✔
1472
  *suid = pTableMeta->suid;
9,054✔
1473
  *tableType = pTableMeta->tableType;
9,054✔
1474
  pStmt->bInfo.tbVgId = pTableMeta->vgId;
9,054✔
1475
  *vgId = pTableMeta->vgId;
9,054✔
1476

1477
  taosMemoryFree(pTableMeta);
9,054✔
1478

1479
  return TSDB_CODE_SUCCESS;
9,054✔
1480
}
1481

1482
static int32_t stmtRebuildDataBlock(STscStmt2* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
14,247✔
1483
                                    uint64_t suid, int32_t vgId) {
1484
  STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
14,247✔
1485
  STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgId, pStmt->sql.autoCreateTbl));
14,247✔
1486

1487
  STMT2_DLOG("uid:%" PRId64 ", rebuild table data context, vgId:%d", uid, vgId);
14,247✔
1488

1489
  return TSDB_CODE_SUCCESS;
14,247✔
1490
}
1491

1492
static int32_t stmtGetFromCache(STscStmt2* pStmt) {
41,776✔
1493
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) {
41,776✔
1494
    pStmt->bInfo.needParse = false;
×
1495
    pStmt->bInfo.inExecCache = false;
×
1496
    return TSDB_CODE_SUCCESS;
×
1497
  }
1498

1499
  pStmt->bInfo.needParse = true;
41,776✔
1500
  pStmt->bInfo.inExecCache = false;
41,776✔
1501

1502
  STableDataCxt** pCxtInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
41,776✔
1503
  if (pCxtInExec) {
41,776✔
1504
    pStmt->bInfo.needParse = false;
6,860✔
1505
    pStmt->bInfo.inExecCache = true;
6,860✔
1506

1507
    pStmt->exec.pCurrBlock = *pCxtInExec;
6,860✔
1508

1509
    if (pStmt->sql.autoCreateTbl) {
6,860✔
1510
      STMT2_DLOG("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
5,880✔
1511
      return TSDB_CODE_SUCCESS;
5,880✔
1512
    }
1513
  }
1514

1515
  if (NULL == pStmt->pCatalog) {
35,896✔
1516
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
15,629✔
1517
    pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
15,629✔
1518
  }
1519

1520
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
35,896✔
1521
    if (pStmt->bInfo.inExecCache) {
20,473✔
UNCOV
1522
      pStmt->bInfo.needParse = false;
×
UNCOV
1523
      STMT2_DLOG("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
×
UNCOV
1524
      return TSDB_CODE_SUCCESS;
×
1525
    }
1526

1527
    STMT2_DLOG("no stmt block cache for tb %s", pStmt->bInfo.tbFName);
20,473✔
1528

1529
    return TSDB_CODE_SUCCESS;
20,473✔
1530
  }
1531

1532
  if (pStmt->sql.autoCreateTbl) {
15,423✔
1533
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
11,895✔
1534
    if (pCache) {
11,895✔
1535
      pStmt->bInfo.needParse = false;
11,895✔
1536
      pStmt->bInfo.tbUid = 0;
11,895✔
1537

1538
      STableDataCxt* pNewBlock = NULL;
11,895✔
1539
      STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid, -1));
11,895✔
1540

1541
      if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
11,895✔
1542
                      POINTER_BYTES)) {
UNCOV
1543
        STMT_ERR_RET(terrno);
×
1544
      }
1545

1546
      pStmt->exec.pCurrBlock = pNewBlock;
11,895✔
1547

1548
      STMT2_DLOG("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
11,895✔
1549

1550
      return TSDB_CODE_SUCCESS;
11,895✔
1551
    }
1552

UNCOV
1553
    STMT_RET(stmtCleanBindInfo(pStmt));
×
1554
  }
1555

1556
  uint64_t uid, suid;
×
UNCOV
1557
  int32_t  vgId;
×
UNCOV
1558
  int8_t   tableType;
×
1559

1560
  STMT_ERR_RET(stmtGetTableMetaAndValidate(pStmt, &uid, &suid, &vgId, &tableType));
3,528✔
1561

1562
  uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
3,332✔
1563

1564
  if (uid == pStmt->bInfo.tbUid) {
3,332✔
1565
    pStmt->bInfo.needParse = false;
×
1566

1567
    STMT2_DLOG("tb %s is current table", pStmt->bInfo.tbFName);
×
1568

UNCOV
1569
    return TSDB_CODE_SUCCESS;
×
1570
  }
1571

1572
  if (pStmt->bInfo.inExecCache) {
3,332✔
1573
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
980✔
1574
    if (NULL == pCache) {
980✔
1575
      STMT2_ELOG("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash",
×
1576
                 pStmt->bInfo.tbFName, uid, cacheUid);
1577

1578
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
1579
    }
1580

1581
    pStmt->bInfo.needParse = false;
980✔
1582

1583
    pStmt->bInfo.tbUid = uid;
980✔
1584
    pStmt->bInfo.tbSuid = suid;
980✔
1585
    pStmt->bInfo.tbType = tableType;
980✔
1586
    pStmt->bInfo.boundTags = pCache->boundTags;
980✔
1587
    pStmt->bInfo.tagsCached = true;
980✔
1588

1589
    STMT2_DLOG("tb %s in execBlock list, set to current", pStmt->bInfo.tbFName);
980✔
1590

1591
    return TSDB_CODE_SUCCESS;
980✔
1592
  }
1593

1594
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
2,352✔
1595
  if (pCache) {
2,352✔
1596
    pStmt->bInfo.needParse = false;
2,352✔
1597

1598
    pStmt->bInfo.tbUid = uid;
2,352✔
1599
    pStmt->bInfo.tbSuid = suid;
2,352✔
1600
    pStmt->bInfo.tbType = tableType;
2,352✔
1601
    pStmt->bInfo.boundTags = pCache->boundTags;
2,352✔
1602
    pStmt->bInfo.tagsCached = true;
2,352✔
1603

1604
    STableDataCxt* pNewBlock = NULL;
2,352✔
1605
    STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid, vgId));
2,352✔
1606

1607
    if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
2,352✔
1608
                    POINTER_BYTES)) {
UNCOV
1609
      STMT_ERR_RET(terrno);
×
1610
    }
1611

1612
    pStmt->exec.pCurrBlock = pNewBlock;
2,352✔
1613

1614
    tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);
2,352✔
1615

1616
    return TSDB_CODE_SUCCESS;
2,352✔
1617
  }
1618

UNCOV
1619
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
×
1620

1621
  return TSDB_CODE_SUCCESS;
×
1622
}
1623

1624
static int32_t stmtResetStmt(STscStmt2* pStmt) {
×
1625
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
×
1626

UNCOV
1627
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1628
  if (NULL == pStmt->sql.pTableCache) {
×
UNCOV
1629
    STMT2_ELOG("fail to allocate memory for pTableCache in stmtResetStmt:%s", tstrerror(terrno));
×
UNCOV
1630
    STMT_ERR_RET(terrno);
×
1631
  }
1632

UNCOV
1633
  pStmt->sql.status = STMT_INIT;
×
1634

1635
  return TSDB_CODE_SUCCESS;
×
1636
}
1637

1638
static void stmtAsyncOutput(STscStmt2* pStmt, void* param) {
1,543,751✔
1639
  SStmtQNode* pParam = (SStmtQNode*)param;
1,543,751✔
1640

1641
  if (pParam->restoreTbCols) {
1,543,751✔
1642
    for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) {
1,543,024✔
1643
      SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i);
1,047,217✔
1644
      *p = taosArrayInit(20, POINTER_BYTES);
1,047,182✔
1645
      if (*p == NULL) {
1,047,069✔
UNCOV
1646
        pStmt->errCode = terrno;
×
1647
      }
1648
    }
1649
    atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
495,807✔
1650
    STMT2_TLOG_E("restore pTableCols finished");
495,955✔
1651
  } else {
1652
    int code = qAppendStmt2TableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
1,047,796✔
1653
                                       &pStmt->sql.siInfo, pParam->pCreateTbReq);
1654
    // taosMemoryFree(pParam->pTbData);
1655
    if (code != TSDB_CODE_SUCCESS) {
1,047,858✔
1656
      STMT2_ELOG("async append stmt output failed, tbname:%s, err:%s", pParam->tblData.tbName, tstrerror(code));
196✔
1657
      pStmt->errCode = code;
196✔
1658
    }
1659
    (void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
1,047,858✔
1660
  }
1661
}
1,543,904✔
1662

1663
static void* stmtBindThreadFunc(void* param) {
144,469✔
1664
  setThreadName("stmt2Bind");
144,469✔
1665

1666
  STscStmt2* pStmt = (STscStmt2*)param;
144,431✔
1667
  STMT2_DLOG_E("stmt2 bind thread started");
144,431✔
1668

1669
  while (true) {
1,543,878✔
1670
    SStmtQNode* asyncParam = NULL;
1,688,309✔
1671

1672
    if (!stmtDequeue(pStmt, &asyncParam)) {
1,688,309✔
1673
      if (pStmt->queue.stopQueue && 0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) {
144,273✔
1674
        STMT2_DLOG_E("queue is empty and stopQueue is set, thread will exit");
144,273✔
1675
        break;
144,273✔
1676
      }
UNCOV
1677
      continue;
×
1678
    }
1679

1680
    stmtAsyncOutput(pStmt, asyncParam);
1,543,660✔
1681
  }
1682

1683
  STMT2_DLOG_E("stmt2 bind thread stopped");
144,273✔
1684
  return NULL;
144,273✔
1685
}
1686

1687
static int32_t stmtStartBindThread(STscStmt2* pStmt) {
144,402✔
1688
  TdThreadAttr thAttr;
116,254✔
1689
  if (taosThreadAttrInit(&thAttr) != 0) {
144,402✔
UNCOV
1690
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1691
  }
1692
  if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
144,402✔
UNCOV
1693
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1694
  }
1695

1696
  if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) {
144,402✔
UNCOV
1697
    terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
1698
    STMT_ERR_RET(terrno);
×
1699
  }
1700

1701
  pStmt->bindThreadInUse = true;
144,469✔
1702

1703
  (void)taosThreadAttrDestroy(&thAttr);
144,469✔
1704
  return TSDB_CODE_SUCCESS;
144,469✔
1705
}
1706

1707
static int32_t stmtInitQueue(STscStmt2* pStmt) {
144,402✔
1708
  (void)taosThreadCondInit(&pStmt->queue.waitCond, NULL);
144,402✔
1709
  (void)taosThreadMutexInit(&pStmt->queue.mutex, NULL);
144,440✔
1710
  STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
288,880✔
1711
  pStmt->queue.tail = pStmt->queue.head;
144,440✔
1712

1713
  return TSDB_CODE_SUCCESS;
144,469✔
1714
}
1715

1716
static int32_t stmtIniAsyncBind(STscStmt2* pStmt) {
160,513✔
1717
  (void)taosThreadCondInit(&pStmt->asyncBindParam.waitCond, NULL);
160,513✔
1718
  (void)taosThreadMutexInit(&pStmt->asyncBindParam.mutex, NULL);
160,513✔
1719
  pStmt->asyncBindParam.asyncBindNum = 0;
160,542✔
1720

1721
  return TSDB_CODE_SUCCESS;
160,513✔
1722
}
1723

1724
static int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
144,431✔
1725
  pTblBuf->buffUnit = sizeof(SStmtQNode);
144,431✔
1726
  pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
144,431✔
1727
  pTblBuf->pBufList = taosArrayInit(100, POINTER_BYTES);
144,431✔
1728
  if (NULL == pTblBuf->pBufList) {
144,469✔
1729
    return terrno;
×
1730
  }
1731
  void* buff = taosMemoryMalloc(pTblBuf->buffSize);
144,469✔
1732
  if (NULL == buff) {
144,469✔
UNCOV
1733
    return terrno;
×
1734
  }
1735

1736
  if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL) {
288,938✔
UNCOV
1737
    return terrno;
×
1738
  }
1739

1740
  pTblBuf->pCurBuff = buff;
144,469✔
1741
  pTblBuf->buffIdx = 1;
144,440✔
1742
  pTblBuf->buffOffset = 0;
144,440✔
1743

1744
  return TSDB_CODE_SUCCESS;
144,469✔
1745
}
1746

1747
TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
155,137✔
1748
  STscObj*   pObj = (STscObj*)taos;
155,137✔
1749
  STscStmt2* pStmt = NULL;
155,137✔
1750
  int32_t    code = 0;
155,137✔
1751

1752
  pStmt = taosMemoryCalloc(1, sizeof(STscStmt2));
155,137✔
1753
  if (NULL == pStmt) {
155,306✔
UNCOV
1754
    STMT2_ELOG_E("fail to allocate memory for pStmt");
×
UNCOV
1755
    return NULL;
×
1756
  }
1757

1758
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
155,306✔
1759
  if (NULL == pStmt->sql.pTableCache) {
155,306✔
1760
    STMT2_ELOG("fail to allocate memory for pTableCache in stmtInit2:%s", tstrerror(terrno));
×
UNCOV
1761
    taosMemoryFree(pStmt);
×
UNCOV
1762
    return NULL;
×
1763
  }
1764

1765
  pStmt->taos = pObj;
155,306✔
1766
  if (taos->db[0] != '\0') {
155,306✔
1767
    pStmt->db = taosStrdup(taos->db);
144,360✔
1768
  }
1769
  pStmt->bInfo.needParse = true;
155,268✔
1770
  pStmt->sql.status = STMT_INIT;
155,268✔
1771
  pStmt->errCode = TSDB_CODE_SUCCESS;
155,268✔
1772

1773
  if (NULL != pOptions) {
155,268✔
1774
    (void)memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
155,081✔
1775
    if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) {
155,081✔
1776
      pStmt->stbInterlaceMode = true;
139,233✔
1777
    }
1778

1779
    pStmt->reqid = pOptions->reqid;
155,081✔
1780
  }
1781

1782
  if (pStmt->stbInterlaceMode) {
155,268✔
1783
    pStmt->sql.siInfo.transport = taos->pAppInfo->pTransporter;
139,233✔
1784
    pStmt->sql.siInfo.acctId = taos->acctId;
139,233✔
1785
    pStmt->sql.siInfo.dbname = taos->db;
139,233✔
1786
    pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
139,233✔
1787

1788
    pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
139,204✔
1789
    if (NULL == pStmt->sql.siInfo.pTableHash) {
139,233✔
UNCOV
1790
      STMT2_ELOG("fail to allocate memory for pTableHash:%s", tstrerror(terrno));
×
UNCOV
1791
      (void)stmtClose2(pStmt);
×
UNCOV
1792
      return NULL;
×
1793
    }
1794

1795
    pStmt->sql.siInfo.pTableRowDataHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
139,233✔
1796
    if (NULL == pStmt->sql.siInfo.pTableRowDataHash) {
139,233✔
1797
      STMT2_ELOG("fail to allocate memory for pTableRowDataHash:%s", tstrerror(terrno));
×
1798
      (void)stmtClose2(pStmt);
×
1799
      return NULL;
×
1800
    }
1801

1802
    pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
139,233✔
1803
    if (NULL == pStmt->sql.siInfo.pTableCols) {
139,233✔
UNCOV
1804
      STMT2_ELOG("fail to allocate memory for pTableCols:%s", tstrerror(terrno));
×
1805
      (void)stmtClose2(pStmt);
×
1806
      return NULL;
×
1807
    }
1808

1809
    code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
139,233✔
1810
    if (TSDB_CODE_SUCCESS == code) {
139,204✔
1811
      code = stmtInitQueue(pStmt);
139,233✔
1812
    }
1813
    if (TSDB_CODE_SUCCESS == code) {
139,137✔
1814
      code = stmtStartBindThread(pStmt);
139,166✔
1815
    }
1816
    if (TSDB_CODE_SUCCESS != code) {
139,204✔
UNCOV
1817
      terrno = code;
×
UNCOV
1818
      STMT2_ELOG("fail to init stmt2 bind thread:%s", tstrerror(code));
×
UNCOV
1819
      (void)stmtClose2(pStmt);
×
UNCOV
1820
      return NULL;
×
1821
    }
1822
  }
1823

1824
  pStmt->sql.siInfo.tableColsReady = true;
155,239✔
1825
  if (pStmt->options.asyncExecFn) {
155,268✔
1826
    if (tsem_init(&pStmt->asyncExecSem, 0, 1) != 0) {
2,548✔
UNCOV
1827
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
UNCOV
1828
      STMT2_ELOG("fail to init asyncExecSem:%s", tstrerror(terrno));
×
UNCOV
1829
      (void)stmtClose2(pStmt);
×
UNCOV
1830
      return NULL;
×
1831
    }
1832
  }
1833
  code = stmtIniAsyncBind(pStmt);
155,268✔
1834
  if (TSDB_CODE_SUCCESS != code) {
155,277✔
UNCOV
1835
    terrno = code;
×
UNCOV
1836
    STMT2_ELOG("fail to start init asyncExecSem:%s", tstrerror(code));
×
1837

UNCOV
1838
    (void)stmtClose2(pStmt);
×
UNCOV
1839
    return NULL;
×
1840
  }
1841

1842
  pStmt->execSemWaited = false;
155,277✔
1843

1844
  // STMT_LOG_SEQ(STMT_INIT);
1845

1846
  STMT2_DLOG("stmt2 initialize finished, seqId:%d, db:%s, interlaceMode:%d, asyncExec:%d", pStmt->seqId, pStmt->db,
155,277✔
1847
             pStmt->stbInterlaceMode, pStmt->options.asyncExecFn != NULL);
1848

1849
  return pStmt;
155,277✔
1850
}
1851

1852
static int stmtSetDbName2(TAOS_STMT2* stmt, const char* dbName) {
143,784✔
1853
  STscStmt2* pStmt = (STscStmt2*)stmt;
143,784✔
1854
  if (dbName == NULL || dbName[0] == '\0') {
143,784✔
1855
    STMT2_ELOG_E("dbname in sql is illegal");
36✔
UNCOV
1856
    return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
×
1857
  }
1858

1859
  STMT2_DLOG("dbname is specified in sql:%s", dbName);
143,748✔
1860
  if (pStmt->db == NULL || pStmt->db[0] == '\0') {
143,748✔
1861
    taosMemoryFreeClear(pStmt->db);
9,770✔
1862
    STMT2_DLOG("dbname:%s is by sql, not by taosconnect", dbName);
9,770✔
1863
    pStmt->db = taosStrdup(dbName);
9,770✔
1864
    (void)strdequote(pStmt->db);
9,770✔
1865
  }
1866
  STMT_ERR_RET(stmtCreateRequest(pStmt));
143,784✔
1867

1868
  // The SQL statement specifies a database name, overriding the previously specified database
1869
  taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
143,748✔
1870
  pStmt->exec.pRequest->pDb = taosStrdup(dbName);
143,710✔
1871
  (void)strdequote(pStmt->exec.pRequest->pDb);
143,746✔
1872
  if (pStmt->exec.pRequest->pDb == NULL) {
143,784✔
UNCOV
1873
    return terrno;
×
1874
  }
1875
  if (pStmt->sql.stbInterlaceMode) {
143,748✔
1876
    pStmt->sql.siInfo.dbname = pStmt->exec.pRequest->pDb;
132,250✔
1877
  }
1878
  return TSDB_CODE_SUCCESS;
143,748✔
1879
}
1880
static int32_t stmtResetStbInterlaceCache(STscStmt2* pStmt) {
5,236✔
1881
  int32_t code = TSDB_CODE_SUCCESS;
5,236✔
1882

1883
  pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
5,236✔
1884
  if (NULL == pStmt->sql.siInfo.pTableHash) {
5,236✔
1885
    return terrno;
×
1886
  }
1887

1888
  pStmt->sql.siInfo.pTableRowDataHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
5,236✔
1889
  if (NULL == pStmt->sql.siInfo.pTableRowDataHash) {
5,236✔
UNCOV
1890
    return terrno;
×
1891
  }
1892

1893
  pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
5,236✔
1894
  if (NULL == pStmt->sql.siInfo.pTableCols) {
5,236✔
1895
    return terrno;
×
1896
  }
1897

1898
  code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
5,236✔
1899

1900
  if (TSDB_CODE_SUCCESS == code) {
5,236✔
1901
    code = stmtInitQueue(pStmt);
5,236✔
1902
    pStmt->queue.stopQueue = false;
5,236✔
1903
  }
1904
  if (TSDB_CODE_SUCCESS == code) {
5,236✔
1905
    code = stmtStartBindThread(pStmt);
5,236✔
1906
  }
1907
  if (TSDB_CODE_SUCCESS != code) {
5,236✔
1908
    return code;
×
1909
  }
1910

1911
  return TSDB_CODE_SUCCESS;
5,236✔
1912
}
1913

1914
static int32_t stmtDeepReset(STscStmt2* pStmt) {
6,804✔
1915
  // Save state that needs to be preserved
1916
  char*             db = pStmt->db;
6,804✔
1917
  TAOS_STMT2_OPTION options = pStmt->options;
6,804✔
1918
  uint32_t          reqid = pStmt->reqid;
6,804✔
1919
  bool              stbInterlaceMode = pStmt->stbInterlaceMode;
6,804✔
1920

1921
  pStmt->errCode = 0;
6,804✔
1922

1923
  // Wait for async execution to complete
1924
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
6,804✔
1925
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
392✔
UNCOV
1926
      STMT2_ELOG_E("bind param wait asyncExecSem failed");
×
1927
    }
1928
    pStmt->execSemWaited = true;
392✔
1929
  }
1930

1931
  // Stop bind thread if in use (similar to stmtClose2)
1932
  if (stbInterlaceMode && pStmt->bindThreadInUse) {
6,804✔
1933
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
5,236✔
UNCOV
1934
      taosUsleep(1);
×
1935
    }
1936
    (void)taosThreadMutexLock(&pStmt->queue.mutex);
5,236✔
1937
    pStmt->queue.stopQueue = true;
5,236✔
1938
    (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
5,236✔
1939
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
5,236✔
1940

1941
    (void)taosThreadJoin(pStmt->bindThread, NULL);
5,236✔
1942
    pStmt->bindThreadInUse = false;
5,236✔
1943
    pStmt->queue.head = NULL;
5,236✔
1944
    pStmt->queue.tail = NULL;
5,236✔
1945
    pStmt->queue.qRemainNum = 0;
5,236✔
1946

1947
    (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
5,236✔
1948
    (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
5,236✔
1949
  }
1950

1951
  // Clean all SQL and execution info (stmtCleanSQLInfo already handles most cleanup)
1952
  pStmt->bInfo.boundColsCached = false;
6,804✔
1953
  if (stbInterlaceMode) {
6,804✔
1954
    pStmt->bInfo.tagsCached = false;
5,236✔
1955
  }
1956
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
6,804✔
1957

1958
  // Reinitialize resources (similar to stmtInit2)
1959
  if (stbInterlaceMode) {
6,804✔
1960
    pStmt->sql.siInfo.transport = pStmt->taos->pAppInfo->pTransporter;
5,236✔
1961
    pStmt->sql.siInfo.acctId = pStmt->taos->acctId;
5,236✔
1962
    pStmt->sql.siInfo.dbname = pStmt->taos->db;
5,236✔
1963
    pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
5,236✔
1964

1965
    if (NULL == pStmt->pCatalog) {
5,236✔
1966
      STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
588✔
1967
    }
1968
    pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
5,236✔
1969

1970
    STMT_ERR_RET(stmtResetStbInterlaceCache(pStmt));
5,236✔
1971

1972
    int32_t code = stmtIniAsyncBind(pStmt);
5,236✔
1973
    if (TSDB_CODE_SUCCESS != code) {
5,236✔
UNCOV
1974
      STMT2_ELOG("fail to reinit async bind in stmtDeepReset:%s", tstrerror(code));
×
UNCOV
1975
      return code;
×
1976
    }
1977
  }
1978

1979
  // Restore preserved state
1980
  pStmt->db = db;
6,804✔
1981
  pStmt->options = options;
6,804✔
1982
  pStmt->reqid = reqid;
6,804✔
1983
  pStmt->stbInterlaceMode = stbInterlaceMode;
6,804✔
1984

1985
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
6,804✔
1986
  if (NULL == pStmt->sql.pTableCache) {
6,804✔
UNCOV
1987
    STMT2_ELOG("fail to allocate memory for pTableCache in stmtDeepReset:%s", tstrerror(terrno));
×
UNCOV
1988
    return terrno;
×
1989
  }
1990

1991
  pStmt->bInfo.needParse = true;
6,804✔
1992
  pStmt->sql.status = STMT_INIT;
6,804✔
1993
  pStmt->sql.siInfo.tableColsReady = true;
6,804✔
1994

1995
  return TSDB_CODE_SUCCESS;
6,804✔
1996
}
1997

1998
int stmtPrepare2(TAOS_STMT2* stmt, const char* sql, unsigned long length) {
161,682✔
1999
  STscStmt2* pStmt = (STscStmt2*)stmt;
161,682✔
2000
  int32_t    code = 0;
161,682✔
2001

2002
  STMT2_DLOG("start to prepare with sql:%s", sql);
161,682✔
2003

2004
  if (stmt == NULL || sql == NULL) {
161,682✔
UNCOV
2005
    STMT2_ELOG_E("stmt or sql is NULL");
×
UNCOV
2006
    return TSDB_CODE_INVALID_PARA;
×
2007
  }
2008

2009
  if (pStmt->sql.status >= STMT_PREPARE) {
161,718✔
2010
    STMT2_DLOG("stmt status is %d, need to reset stmt2 cache before prepare", pStmt->sql.status);
6,804✔
2011
    STMT_ERR_RET(stmtDeepReset(pStmt));
6,804✔
2012
  }
2013

2014
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
161,718✔
2015
    STMT2_ELOG("errCode is not success before, ErrCode: 0x%x, errorsyt: %s\n. ", pStmt->errCode,
196✔
2016
               tstrerror(pStmt->errCode));
2017
    return pStmt->errCode;
196✔
2018
  }
2019

2020
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));
161,522✔
2021

2022
  if (length <= 0) {
161,522✔
2023
    length = strlen(sql);
158,524✔
2024
  }
2025
  pStmt->sql.sqlStr = taosStrndup(sql, length);
161,522✔
2026
  if (!pStmt->sql.sqlStr) {
161,522✔
UNCOV
2027
    STMT2_ELOG("fail to allocate memory for sqlStr:%s", tstrerror(terrno));
×
UNCOV
2028
    STMT_ERR_RET(terrno);
×
2029
  }
2030
  pStmt->sql.sqlLen = length;
161,522✔
2031
  STMT_ERR_RET(stmtCreateRequest(pStmt));
161,522✔
2032

2033
  if (stmt2IsInsert(pStmt)) {
161,474✔
2034
    pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode;
155,045✔
2035
    char* dbName = NULL;
155,045✔
2036
    if (qParseDbName(sql, length, &dbName)) {
155,045✔
2037
      STMT_ERR_RET(stmtSetDbName2(stmt, dbName));
143,784✔
2038
      taosMemoryFreeClear(dbName);
143,736✔
2039
    } else if (pStmt->db != NULL && pStmt->db[0] != '\0') {
11,261✔
2040
      taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
11,065✔
2041
      pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
11,065✔
2042
      if (pStmt->exec.pRequest->pDb == NULL) {
11,053✔
UNCOV
2043
        STMT_ERR_RET(terrno);
×
2044
      }
2045
      (void)strdequote(pStmt->exec.pRequest->pDb);
11,053✔
2046

2047
      if (pStmt->sql.stbInterlaceMode) {
11,065✔
2048
        pStmt->sql.siInfo.dbname = pStmt->exec.pRequest->pDb;
6,131✔
2049
      }
2050
    }
2051

2052
  } else if (stmt2IsSelect(pStmt)) {
6,477✔
2053
    pStmt->sql.stbInterlaceMode = false;
5,889✔
2054
    STMT_ERR_RET(stmtParseSql(pStmt));
5,889✔
2055
  } else {
2056
    return stmtBuildErrorMsgWithCode(pStmt, "stmt only support 'SELECT' or 'INSERT'", TSDB_CODE_PAR_SYNTAX_ERROR);
588✔
2057
  }
2058
  return TSDB_CODE_SUCCESS;
160,726✔
2059
}
2060

2061
static int32_t stmtInitStbInterlaceTableInfo(STscStmt2* pStmt) {
12,611✔
2062
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
12,611✔
2063
  if (!pSrc) {
12,611✔
UNCOV
2064
    return terrno;
×
2065
  }
2066
  STableDataCxt* pDst = NULL;
12,611✔
2067

2068
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
12,611✔
2069
  pStmt->sql.siInfo.pDataCtx = pDst;
12,611✔
2070

2071
  SArray* pTblCols = NULL;
12,611✔
2072
  for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) {
12,610,780✔
2073
    pTblCols = taosArrayInit(20, POINTER_BYTES);
12,598,091✔
2074
    if (NULL == pTblCols) {
12,589,979✔
UNCOV
2075
      return terrno;
×
2076
    }
2077

2078
    if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
25,188,295✔
UNCOV
2079
      return terrno;
×
2080
    }
2081
  }
2082

2083
  pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
12,689✔
2084

2085
  STMT2_TLOG("init stb interlace table info, tbName:%s, pDataCtx:%p, boundTags:%p", pStmt->bInfo.tbFName,
12,689✔
2086
             pStmt->sql.siInfo.pDataCtx, pStmt->sql.siInfo.boundTags);
2087

2088
  return TSDB_CODE_SUCCESS;
12,611✔
2089
}
2090

2091
bool stmt2IsInsert(TAOS_STMT2* stmt) {
1,252,673✔
2092
  STscStmt2* pStmt = (STscStmt2*)stmt;
1,252,673✔
2093
  if (pStmt->sql.type) {
1,252,673✔
2094
    return (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
1,061,358✔
2095
  }
2096

2097
  return qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
191,355✔
2098
}
2099

2100
bool stmt2IsSelect(TAOS_STMT2* stmt) {
147,637✔
2101
  STscStmt2* pStmt = (STscStmt2*)stmt;
147,637✔
2102

2103
  if (pStmt->sql.type) {
147,637✔
UNCOV
2104
    return STMT_TYPE_QUERY == pStmt->sql.type;
×
2105
  }
2106
  return qIsSelectFromSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
147,637✔
2107
}
2108

2109
int stmtSetTbName2(TAOS_STMT2* stmt, const char* tbName) {
1,078,268✔
2110
  STscStmt2* pStmt = (STscStmt2*)stmt;
1,078,268✔
2111

2112
  int64_t startUs = taosGetTimestampUs();
1,078,350✔
2113

2114
  STMT2_TLOG("start to set tbName:%s", tbName);
1,078,350✔
2115

2116
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
1,078,363✔
UNCOV
2117
    return pStmt->errCode;
×
2118
  }
2119

2120
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
1,078,328✔
2121

2122
  int32_t insert = 0;
1,078,190✔
2123
  if (!stmt2IsInsert(stmt)) {
1,078,190✔
UNCOV
2124
    STMT2_ELOG_E("set tb name not available for no-insert statement");
×
UNCOV
2125
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2126
  }
2127
  // process tbname
2128
  STMT_ERR_RET(stmtCreateRequest(pStmt));
1,078,257✔
2129

2130
  STMT_ERR_RET(qCreateSName2(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
1,078,273✔
2131
                             pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
2132
  STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
1,077,052✔
2133
  tstrncpy(pStmt->bInfo.tbName, (char*)tNameGetTableName(&pStmt->bInfo.sname), TSDB_TABLE_NAME_LEN);
1,077,265✔
2134

2135
  if (!pStmt->sql.stbInterlaceMode || NULL == pStmt->sql.siInfo.pDataCtx) {
1,077,047✔
2136
    STMT_ERR_RET(stmtGetFromCache(pStmt));
41,831✔
2137

2138
    if (pStmt->bInfo.needParse) {
41,580✔
2139
      STMT_ERR_RET(stmtParseSql(pStmt));
20,473✔
2140
      if (!pStmt->sql.autoCreateTbl) {
20,433✔
2141
        uint64_t uid, suid;
2,243✔
2142
        int32_t  vgId;
2,243✔
2143
        int8_t   tableType;
2,243✔
2144

2145
        int32_t code = stmtGetTableMetaAndValidate(pStmt, &uid, &suid, &vgId, &tableType);
6,466✔
2146
        if (code != TSDB_CODE_SUCCESS) {
6,506✔
2147
          return code;
784✔
2148
        }
2149
      }
2150
    }
2151

2152
  } else {
2153
    pStmt->exec.pRequest->requestId++;
1,035,321✔
2154
    pStmt->bInfo.needParse = false;
1,035,311✔
2155
  }
2156

2157
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
1,076,147✔
2158
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
12,611✔
2159
  }
2160

2161
  int64_t startUs2 = taosGetTimestampUs();
1,076,243✔
2162
  pStmt->stat.setTbNameUs += startUs2 - startUs;
1,076,243✔
2163

2164
  return TSDB_CODE_SUCCESS;
1,076,173✔
2165
}
2166

2167
int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags, SVCreateTbReq** pCreateTbReq) {
566,339✔
2168
  STscStmt2* pStmt = (STscStmt2*)stmt;
566,339✔
2169

2170
  STMT2_TLOG_E("start to set tbTags");
566,339✔
2171
  if (qDebugFlag & DEBUG_TRACE) {
566,339✔
UNCOV
2172
    (void)stmtPrintBindv(stmt, tags, -1, true);
×
2173
  }
2174

2175
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
566,313✔
2176
    return pStmt->errCode;
×
2177
  }
2178

2179
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
566,313✔
2180

2181
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
566,417✔
UNCOV
2182
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
UNCOV
2183
    pStmt->bInfo.needParse = false;
×
2184
  }
2185
  STMT_ERR_RET(stmtCreateRequest(pStmt));
566,417✔
2186

2187
  if (pStmt->bInfo.needParse) {
566,326✔
2188
    STMT_ERR_RET(stmtParseSql(pStmt));
392✔
2189
  }
2190
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
566,326✔
UNCOV
2191
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
2192
  }
2193

2194
  SBoundColInfo* tags_info = (SBoundColInfo*)pStmt->bInfo.boundTags;
566,391✔
2195

2196
  STableDataCxt** pDataBlock = NULL;
566,391✔
2197
  if (pStmt->exec.pCurrBlock) {
566,391✔
2198
    pDataBlock = &pStmt->exec.pCurrBlock;
555,346✔
2199
  } else {
2200
    pDataBlock =
2201
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
11,045✔
2202
    if (NULL == pDataBlock) {
11,045✔
UNCOV
2203
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2204
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
2205
    }
2206
    if (pStmt->sql.stbInterlaceMode && (*pDataBlock)->pData->pCreateTbReq) {
11,045✔
2207
      tdDestroySVCreateTbReq((*pDataBlock)->pData->pCreateTbReq);
392✔
2208
      taosMemoryFreeClear((*pDataBlock)->pData->pCreateTbReq);
392✔
2209
      (*pDataBlock)->pData->pCreateTbReq = NULL;
392✔
2210
    }
2211
  }
2212
  if (pStmt->bInfo.inExecCache && !pStmt->sql.autoCreateTbl) {
566,391✔
UNCOV
2213
    return TSDB_CODE_SUCCESS;
×
2214
  }
2215

2216
  STMT2_TLOG_E("start to bind stmt tag values");
566,391✔
2217

2218
  void* boundTags = NULL;
566,235✔
2219
  if (pStmt->sql.stbInterlaceMode) {
566,235✔
2220
    boundTags = pStmt->sql.siInfo.boundTags;
542,754✔
2221
    *pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
542,754✔
2222
    if (NULL == pCreateTbReq) {
542,884✔
UNCOV
2223
      return terrno;
×
2224
    }
2225
    int32_t vgId = -1;
542,884✔
2226
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
542,884✔
2227
    (*pCreateTbReq)->uid = vgId;
543,027✔
2228
  } else {
2229
    boundTags = pStmt->bInfo.boundTags;
23,481✔
2230
  }
2231

2232
  STMT_ERR_RET(qBindStmtTagsValue2(*pDataBlock, boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
566,508✔
2233
                                   pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
2234
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt, *pCreateTbReq));
2235

2236
  return TSDB_CODE_SUCCESS;
566,325✔
2237
}
2238

2239
int stmtCheckTags2(TAOS_STMT2* stmt, SVCreateTbReq** pCreateTbReq) {
109,863✔
2240
  STscStmt2* pStmt = (STscStmt2*)stmt;
109,863✔
2241

2242
  STMT2_TLOG_E("start to clone createTbRequest for fixed tags");
109,863✔
2243

2244
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
109,876✔
UNCOV
2245
    return pStmt->errCode;
×
2246
  }
2247

2248
  if (!pStmt->sql.stbInterlaceMode) {
109,876✔
UNCOV
2249
    return TSDB_CODE_SUCCESS;
×
2250
  }
2251

2252
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
109,876✔
2253

2254
  if (pStmt->sql.fixValueTags) {
109,876✔
2255
    STMT2_TLOG_E("tags are fixed, use one createTbReq");
106,562✔
2256
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
106,562✔
2257
    if ((*pCreateTbReq)->name) {
106,536✔
2258
      taosMemoryFree((*pCreateTbReq)->name);
106,575✔
2259
    }
2260
    (*pCreateTbReq)->name = taosStrdup(pStmt->bInfo.tbName);
106,523✔
2261
    int32_t vgId = -1;
106,588✔
2262
    STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
106,588✔
2263
    (*pCreateTbReq)->uid = vgId;
106,614✔
2264
    return TSDB_CODE_SUCCESS;
106,614✔
2265
  }
2266

2267
  STMT_ERR_RET(stmtCreateRequest(pStmt));
3,314✔
2268
  if (pStmt->bInfo.needParse) {
3,314✔
UNCOV
2269
    STMT_ERR_RET(stmtParseSql(pStmt));
×
UNCOV
2270
    if (!pStmt->sql.autoCreateTbl) {
×
UNCOV
2271
      STMT2_WLOG_E("don't need to create table, will not check tags");
×
UNCOV
2272
      return TSDB_CODE_SUCCESS;
×
2273
    }
2274
  }
2275

2276
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
3,314✔
2277
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
2278
  }
2279

2280
  STableDataCxt** pDataBlock = NULL;
3,314✔
2281
  if (pStmt->exec.pCurrBlock) {
3,314✔
UNCOV
2282
    pDataBlock = &pStmt->exec.pCurrBlock;
×
2283
  } else {
2284
    pDataBlock =
2285
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
3,314✔
2286
    if (NULL == pDataBlock) {
3,314✔
UNCOV
2287
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2288
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
2289
    }
2290
  }
2291

2292
  if (!((*pDataBlock)->pData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE)) {
3,314✔
UNCOV
2293
    STMT2_DLOG_E("don't need to create, will not check tags");
×
UNCOV
2294
    return TSDB_CODE_SUCCESS;
×
2295
  }
2296

2297

2298
  if ((*pDataBlock)->pData->pCreateTbReq) {
3,314✔
2299
    STMT2_TLOG_E("tags are fixed, set createTbReq first time");
3,314✔
2300
    pStmt->sql.fixValueTags = true;
3,314✔
2301
    STMT_ERR_RET(cloneSVreateTbReq((*pDataBlock)->pData->pCreateTbReq, &pStmt->sql.fixValueTbReq));
3,314✔
2302
    STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
3,314✔
2303
    (*pCreateTbReq)->uid = (*pDataBlock)->pMeta->vgId;
3,314✔
2304

2305
    // destroy the createTbReq in the data block
2306
    tdDestroySVCreateTbReq((*pDataBlock)->pData->pCreateTbReq);
3,314✔
2307
    taosMemoryFreeClear((*pDataBlock)->pData->pCreateTbReq);
3,314✔
2308
    (*pDataBlock)->pData->pCreateTbReq = NULL;
3,314✔
2309
  }
2310

2311
  return TSDB_CODE_SUCCESS;
3,314✔
2312
}
2313

UNCOV
2314
static int stmtFetchColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
×
UNCOV
2315
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
×
UNCOV
2316
    return pStmt->errCode;
×
2317
  }
2318

2319
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
×
UNCOV
2320
    tscError("invalid operation to get query column fileds");
×
UNCOV
2321
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2322
  }
2323

UNCOV
2324
  STableDataCxt** pDataBlock = NULL;
×
2325

UNCOV
2326
  if (pStmt->sql.stbInterlaceMode) {
×
UNCOV
2327
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
×
2328
  } else {
2329
    pDataBlock =
UNCOV
2330
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
×
2331
    if (NULL == pDataBlock) {
×
UNCOV
2332
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
UNCOV
2333
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
×
2334
    }
2335
  }
2336

2337
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
×
2338

2339
  return TSDB_CODE_SUCCESS;
×
2340
}
2341

2342
static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_ALL** fields) {
10,605✔
2343
  int32_t code = 0;
10,605✔
2344
  int32_t preCode = pStmt->errCode;
10,605✔
2345

2346
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
10,605✔
UNCOV
2347
    return pStmt->errCode;
×
2348
  }
2349

2350
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
10,605✔
UNCOV
2351
    STMT2_ELOG_E("stmtFetchStbColFields2 only for insert statement");
×
UNCOV
2352
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2353
  }
2354

2355
  STableDataCxt** pDataBlock = NULL;
10,605✔
2356
  bool            cleanStb = false;
10,605✔
2357

2358
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
10,605✔
2359
    pDataBlock = &pStmt->sql.siInfo.pDataCtx;
1,176✔
2360
  } else {
2361
    cleanStb = true;
9,429✔
2362
    pDataBlock =
2363
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
9,429✔
2364
  }
2365

2366
  if (NULL == pDataBlock || NULL == *pDataBlock) {
10,605✔
UNCOV
2367
    STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
UNCOV
2368
    STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
2369
  }
2370

2371
  pStmt->sql.placeholderOfTags = 0;
10,605✔
2372
  pStmt->sql.placeholderOfCols = 0;
10,605✔
2373
  int32_t totalNum = 0;
10,605✔
2374
  STMT_ERRI_JRET(qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.fixedValueCols,
10,605✔
2375
                                        pStmt->bInfo.tbNameFlag, &totalNum, fields, &pStmt->sql.placeholderOfTags,
2376
                                        &pStmt->sql.placeholderOfCols));
2377

2378
  if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE && cleanStb) {
10,605✔
2379
    taosMemoryFreeClear((*pDataBlock)->boundColsInfo.pColIndex);
7,469✔
2380
    qDestroyStmtDataBlock(*pDataBlock);
7,469✔
2381
    *pDataBlock = NULL;
7,469✔
2382
    if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
7,469✔
UNCOV
2383
      STMT2_ELOG("fail to remove remove stb:%s exec blockHash", pStmt->bInfo.tbFName);
×
UNCOV
2384
      STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
×
2385
    }
2386
    pStmt->sql.autoCreateTbl = false;
7,469✔
2387
    pStmt->bInfo.tagsCached = false;
7,469✔
2388
    pStmt->bInfo.sname = (SName){0};
7,469✔
2389
    STMT_ERR_RET(stmtCleanBindInfo(pStmt));
7,469✔
2390
  }
2391

2392
  if (fieldNum != NULL) {
10,605✔
2393
    *fieldNum = totalNum;
10,605✔
2394
  }
2395

2396
  STMT2_DLOG("get insert fields totalNum:%d, tagsNum:%d, colsNum:%d", totalNum, pStmt->sql.placeholderOfTags,
10,605✔
2397
             pStmt->sql.placeholderOfCols);
2398

2399
_return:
10,605✔
2400

2401
  pStmt->errCode = preCode;
10,605✔
2402

2403
  return code;
10,605✔
2404
}
2405
/*
2406
SArray* stmtGetFreeCol(STscStmt2* pStmt, int32_t* idx) {
2407
  while (true) {
2408
    if (pStmt->exec.smInfo.pColIdx >= STMT_COL_BUF_SIZE) {
2409
      pStmt->exec.smInfo.pColIdx = 0;
2410
    }
2411

2412
    if ((pStmt->exec.smInfo.pColIdx + 1) == atomic_load_32(&pStmt->exec.smInfo.pColFreeIdx)) {
2413
      taosUsleep(1);
2414
      continue;
2415
    }
2416

2417
    *idx = pStmt->exec.smInfo.pColIdx;
2418
    return pStmt->exec.smInfo.pCols[pStmt->exec.smInfo.pColIdx++];
2419
  }
2420
}
2421
*/
2422
static int32_t stmtAppendTablePostHandle(STscStmt2* pStmt, SStmtQNode* param) {
1,046,366✔
2423
  if (NULL == pStmt->sql.siInfo.pVgroupHash) {
1,046,366✔
2424
    pStmt->sql.siInfo.pVgroupHash =
496,421✔
2425
        taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
496,373✔
2426
  }
2427
  if (NULL == pStmt->sql.siInfo.pVgroupList) {
1,047,749✔
2428
    pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES);
496,508✔
2429
  }
2430

2431
  if (NULL == pStmt->sql.siInfo.pRequest) {
1,047,780✔
2432
    STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false,
12,219✔
2433
                              (SRequestObj**)&pStmt->sql.siInfo.pRequest, pStmt->reqid));
2434

2435
    if (pStmt->reqid != 0) {
12,179✔
2436
      pStmt->reqid++;
9✔
2437
    }
2438
    pStmt->exec.pRequest->syncQuery = true;
12,179✔
2439

2440
    pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId;
12,179✔
2441
    pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self;
12,219✔
2442
  }
2443

2444
  if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] &&
1,047,665✔
2445
      0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) {
27,278✔
2446
    pStmt->sql.siInfo.tbFromHash = true;
4,351✔
2447
  }
2448

2449
  if (0 == pStmt->sql.siInfo.firstName[0]) {
1,047,630✔
2450
    tstrncpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
12,179✔
2451
  }
2452

2453
  param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash;
1,047,630✔
2454
  param->next = NULL;
1,047,705✔
2455

2456
  (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
1,047,745✔
2457

2458
  if (pStmt->queue.stopQueue) {
1,047,870✔
UNCOV
2459
    STMT2_ELOG_E("bind thread already stopped, cannot enqueue");
×
UNCOV
2460
    return TSDB_CODE_TSC_STMT_API_ERROR;
×
2461
  }
2462
  stmtEnqueue(pStmt, param);
1,047,835✔
2463

2464
  return TSDB_CODE_SUCCESS;
1,047,848✔
2465
}
2466

2467
static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt2* pStmt, SArray** pTableCols) {
2468
  while (true) {
2469
    if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) {
1,048,118✔
2470
      *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++);
1,048,115✔
2471
      break;
1,048,123✔
2472
    } else {
UNCOV
2473
      SArray* pTblCols = NULL;
×
UNCOV
2474
      for (int32_t i = 0; i < 100; i++) {
×
UNCOV
2475
        pTblCols = taosArrayInit(20, POINTER_BYTES);
×
UNCOV
2476
        if (NULL == pTblCols) {
×
UNCOV
2477
          return terrno;
×
2478
        }
2479

UNCOV
2480
        if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL) {
×
UNCOV
2481
          return terrno;
×
2482
        }
2483
      }
2484
    }
2485
  }
2486

2487
  return TSDB_CODE_SUCCESS;
1,048,123✔
2488
}
2489

2490
static int32_t stmtCacheBlock(STscStmt2* pStmt) {
81,277,459✔
2491
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
81,277,459✔
2492
    return TSDB_CODE_SUCCESS;
82,225,514✔
2493
  }
2494

2495
  uint64_t uid = pStmt->bInfo.tbUid;
27,440✔
2496
  uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
27,440✔
2497

2498
  if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
27,440✔
2499
    STMT2_TLOG("table %s already cached, no need to cache again", pStmt->bInfo.tbFName);
20,911✔
2500
    return TSDB_CODE_SUCCESS;
20,911✔
2501
  }
2502

2503
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
6,686✔
2504
  if (!pSrc) {
6,686✔
UNCOV
2505
    STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
UNCOV
2506
    return terrno;
×
2507
  }
2508
  STableDataCxt* pDst = NULL;
6,686✔
2509

2510
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
6,686✔
2511

2512
  SStmtTableCache cache = {
6,686✔
2513
      .pDataCtx = pDst,
2514
      .boundTags = pStmt->bInfo.boundTags,
6,686✔
2515
  };
2516

2517
  if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
6,686✔
UNCOV
2518
    STMT2_ELOG("fail to put table cache:%s", tstrerror(terrno));
×
UNCOV
2519
    return terrno;
×
2520
  }
2521

2522
  if (pStmt->sql.autoCreateTbl) {
6,686✔
2523
    pStmt->bInfo.tagsCached = true;
5,118✔
2524
  } else {
2525
    pStmt->bInfo.boundTags = NULL;
1,568✔
2526
  }
2527

2528
  return TSDB_CODE_SUCCESS;
6,686✔
2529
}
2530

2531
static int stmtAddBatch2(TAOS_STMT2* stmt) {
81,948,173✔
2532
  STscStmt2* pStmt = (STscStmt2*)stmt;
81,948,173✔
2533

2534
  int64_t startUs = taosGetTimestampUs();
83,153,376✔
2535

2536
  // STMT2_TLOG_E("start to add batch");
2537

2538
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
83,153,376✔
2539
    return pStmt->errCode;
×
2540
  }
2541

2542
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
83,379,908✔
2543

2544
  if (pStmt->sql.stbInterlaceMode) {
82,901,985✔
2545
    int64_t startUs2 = taosGetTimestampUs();
495,942✔
2546
    pStmt->stat.addBatchUs += startUs2 - startUs;
495,942✔
2547

2548
    pStmt->sql.siInfo.tableColsReady = false;
495,872✔
2549

2550
    SStmtQNode* param = NULL;
495,942✔
2551
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
991,762✔
2552
    param->restoreTbCols = true;
495,855✔
2553
    param->next = NULL;
495,855✔
2554

2555
    if (pStmt->sql.autoCreateTbl) {
495,855✔
2556
      pStmt->bInfo.tagsCached = true;
220,192✔
2557
    }
2558
    pStmt->bInfo.boundColsCached = true;
495,890✔
2559

2560
    if (pStmt->queue.stopQueue) {
495,890✔
UNCOV
2561
      STMT2_ELOG_E("stmt bind thread is stopped,cannot enqueue bind request");
×
UNCOV
2562
      return TSDB_CODE_TSC_STMT_API_ERROR;
×
2563
    }
2564

2565
    stmtEnqueue(pStmt, param);
495,820✔
2566

2567
    return TSDB_CODE_SUCCESS;
495,955✔
2568
  }
2569

2570
  STMT_ERR_RET(stmtCacheBlock(pStmt));
82,538,102✔
2571

2572
  return TSDB_CODE_SUCCESS;
82,402,440✔
2573
}
2574
/*
2575
static int32_t stmtBackupQueryFields(STscStmt2* pStmt) {
2576
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
2577
  pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
2578
  pRes->precision = pStmt->exec.pRequest->body.resInfo.precision;
2579

2580
  int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
2581
  pRes->fields = taosMemoryMalloc(size);
2582
  pRes->userFields = taosMemoryMalloc(size);
2583
  if (NULL == pRes->fields || NULL == pRes->userFields) {
2584
    STMT_ERR_RET(terrno);
2585
  }
2586
  (void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
2587
  (void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
2588

2589
  return TSDB_CODE_SUCCESS;
2590
}
2591

2592
static int32_t stmtRestoreQueryFields(STscStmt2* pStmt) {
2593
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
2594
  int32_t            size = pRes->numOfCols * sizeof(TAOS_FIELD);
2595

2596
  pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols;
2597
  pStmt->exec.pRequest->body.resInfo.precision = pRes->precision;
2598

2599
  if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
2600
    pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size);
2601
    if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
2602
      STMT_ERR_RET(terrno);
2603
    }
2604
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
2605
  }
2606

2607
  if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
2608
    pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size);
2609
    if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
2610
      STMT_ERR_RET(terrno);
2611
    }
2612
    (void)memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
2613
  }
2614

2615
  return TSDB_CODE_SUCCESS;
2616
}
2617
*/
2618

2619
/**
2620
 * Fetch metadata for query statement after parameter binding.
2621
 * This function collects metadata requirements from the query (after binding),
2622
 * fetches metadata synchronously from catalog, and returns it for parsing.
2623
 *
2624
 * Note: We fetch metadata on every bind because:
2625
 * 1. Parameter values in WHERE conditions (e.g., dataname IN (?,?)) may change
2626
 * 2. Different parameter values may require different vgroup lists for virtual tables
2627
 * 3. Metadata requirements can only be determined after parameters are bound
2628
 *
2629
 * @param pStmt Statement handle
2630
 * @param pCxt Parse context (must have catalog handle initialized)
2631
 * @param pMetaData Output: Fetched metadata (caller responsible for cleanup)
2632
 * @return TSDB_CODE_SUCCESS on success, error code otherwise
2633
 */
2634
// Callback parameter structure for synchronous catalog metadata fetch
2635
typedef struct {
2636
  SMetaData* pRsp;
2637
  int32_t    code;
2638
  tsem_t     sem;
2639
} SCatalogSyncCbParam;
2640

2641
// Callback function for catalogAsyncGetAllMeta to make it synchronous
2642
static void stmtCatalogSyncGetAllMetaCb(SMetaData* pResultMeta, void* param, int32_t code) {
7,653✔
2643
  SCatalogSyncCbParam* pCbParam = (SCatalogSyncCbParam*)param;
7,653✔
2644
  if (TSDB_CODE_SUCCESS == code && pResultMeta) {
7,653✔
2645
    *pCbParam->pRsp = *pResultMeta;
7,653✔
2646
    TAOS_MEMSET(pResultMeta, 0, sizeof(SMetaData));  // Clear to avoid double free
7,653✔
2647
  }
2648
  pCbParam->code = code;
7,653✔
2649
  if (tsem_post(&pCbParam->sem) != 0) {
7,653✔
UNCOV
2650
    tscError("failed to post semaphore");
×
2651
  }
2652
}
7,653✔
2653

2654
static int32_t stmtFetchMetadataForQuery(STscStmt2* pStmt, SParseContext* pCxt, SMetaData* pMetaData) {
7,653✔
2655
  int32_t          code = 0;
7,653✔
2656
  SParseMetaCache  metaCache = {0};
7,653✔
2657
  SCatalogReq      catalogReq = {0};
7,653✔
2658
  SRequestConnInfo conn = {.pTrans = pCxt->pTransporter,
7,653✔
2659
                           .requestId = pCxt->requestId,
7,653✔
2660
                           .requestObjRefId = pCxt->requestRid,
7,653✔
2661
                           .mgmtEps = pCxt->mgmtEpSet};
2662

2663
  TAOS_MEMSET(pMetaData, 0, sizeof(SMetaData));
7,653✔
2664

2665
  code = collectMetaKey(pCxt, pStmt->sql.pQuery, &metaCache);
7,653✔
2666
  if (TSDB_CODE_SUCCESS == code) {
7,653✔
2667
    code = buildCatalogReq(&metaCache, &catalogReq);
7,653✔
2668
  }
2669
  if (TSDB_CODE_SUCCESS == code) {
7,653✔
2670
    SCatalogSyncCbParam cbParam = {.pRsp = pMetaData, .code = TSDB_CODE_SUCCESS};
7,653✔
2671
    if (tsem_init(&cbParam.sem, 0, 0) != 0) {
7,653✔
UNCOV
2672
      code = TSDB_CODE_CTG_INTERNAL_ERROR;
×
2673
    } else {
2674
      code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, stmtCatalogSyncGetAllMetaCb, &cbParam, NULL);
7,653✔
2675
      if (TSDB_CODE_SUCCESS == code) {
7,653✔
2676
        code = tsem_wait(&cbParam.sem);
7,653✔
2677
        if (code != TSDB_CODE_SUCCESS) {
7,653✔
UNCOV
2678
          catalogFreeMetaData(pMetaData);
×
UNCOV
2679
          TAOS_MEMSET(pMetaData, 0, sizeof(SMetaData));
×
2680
        } else {
2681
          code = cbParam.code;
7,653✔
2682
        }
2683
      }
2684

2685
      if (tsem_destroy(&cbParam.sem) != 0) {
7,653✔
UNCOV
2686
        tscError("failed to destroy semaphore");
×
UNCOV
2687
        code = TSDB_CODE_CTG_INTERNAL_ERROR;
×
UNCOV
2688
        catalogFreeMetaData(pMetaData);
×
UNCOV
2689
        TAOS_MEMSET(pMetaData, 0, sizeof(SMetaData));
×
2690
      }
2691
    }
2692
  }
2693

2694
  // metaCache currently holds "reserved/request" structures built by collectMetaKey/buildCatalogReq.
2695
  // It must be destroyed with request=true to release nested table-request hashes.
2696
  destoryParseMetaCache(&metaCache, true);
7,653✔
2697
  destoryCatalogReq(&catalogReq);
7,653✔
2698

2699
  if (TSDB_CODE_SUCCESS != code) {
7,653✔
UNCOV
2700
    catalogFreeMetaData(pMetaData);
×
2701
  }
2702

2703
  return code;
7,653✔
2704
}
2705

2706
int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx, SVCreateTbReq* pCreateTbReq) {
82,197,586✔
2707
  STscStmt2* pStmt = (STscStmt2*)stmt;
82,197,586✔
2708
  int32_t    code = 0;
82,197,586✔
2709

2710
  int64_t startUs = taosGetTimestampUs();
82,791,628✔
2711

2712
  if (qDebugFlag & DEBUG_TRACE) {
82,791,628✔
UNCOV
2713
    (void)stmtPrintBindv(stmt, bind, colIdx, false);
×
2714
  }
2715

2716
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
82,734,788✔
2717
    return pStmt->errCode;
196✔
2718
  }
2719

2720
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
82,731,959✔
2721

2722
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
84,069,843✔
UNCOV
2723
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
UNCOV
2724
    pStmt->bInfo.needParse = false;
×
2725
  }
2726

2727
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
83,106,382✔
2728
    resetRequest(pStmt);
1,764✔
2729
  }
2730

2731
  STMT_ERR_RET(stmtCreateRequest(pStmt));
83,124,296✔
2732
  if (pStmt->bInfo.needParse) {
82,926,468✔
2733
    code = stmtParseSql(pStmt);
122,490✔
2734
    if (code != TSDB_CODE_SUCCESS) {
123,180✔
UNCOV
2735
      goto cleanup_root;
×
2736
    }
2737
  }
2738

2739
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
83,092,165✔
2740
    code = qStmtBindParams2(pStmt->sql.pQuery, bind, colIdx, pStmt->taos->optionInfo.charsetCxt);
7,653✔
2741
    if (code != TSDB_CODE_SUCCESS) {
7,653✔
UNCOV
2742
      goto cleanup_root;
×
2743
    }
2744
    SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
7,653✔
2745
                         .acctId = pStmt->taos->acctId,
7,653✔
2746
                         .minSecLevel = pStmt->taos->minSecLevel,
7,653✔
2747
                         .maxSecLevel = pStmt->taos->maxSecLevel,
7,653✔
2748
                         .db = pStmt->exec.pRequest->pDb,
7,653✔
2749
                         .topicQuery = false,
2750
                         .pSql = pStmt->sql.sqlStr,
7,653✔
2751
                         .sqlLen = pStmt->sql.sqlLen,
7,653✔
2752
                         .pMsg = pStmt->exec.pRequest->msgBuf,
7,653✔
2753
                         .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
2754
                         .pTransporter = pStmt->taos->pAppInfo->pTransporter,
7,653✔
2755
                         .pStmtCb = NULL,
2756
                         .pUser = pStmt->taos->user,
7,653✔
2757
                         .stmtBindVersion = pStmt->exec.pRequest->stmtBindVersion};
7,653✔
2758
    ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
7,653✔
2759
    code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog);
7,653✔
2760
    if (code != TSDB_CODE_SUCCESS) {
7,653✔
UNCOV
2761
      goto cleanup_root;
×
2762
    }
2763

2764
    // Fetch metadata for query(vtable need)
2765
    SMetaData metaData = {0};
7,653✔
2766
    code = stmtFetchMetadataForQuery(pStmt, &ctx, &metaData);
7,653✔
2767
    if (TSDB_CODE_SUCCESS != code) {
7,653✔
UNCOV
2768
      goto cleanup_root;
×
2769
    }
2770

2771
    code = qStmtParseQuerySql(&ctx, pStmt->sql.pQuery, &metaData);
7,653✔
2772
    if (TSDB_CODE_SUCCESS == code) {
7,653✔
2773
      // Copy metaData to pRequest->parseMeta for potential future use
2774
      // Similar to doAsyncQueryFromAnalyse when parseOnly is true
2775
      (void)memcpy(&pStmt->exec.pRequest->parseMeta, &metaData, sizeof(SMetaData));
7,457✔
2776
      (void)memset(&metaData, 0, sizeof(SMetaData));  // Clear to avoid double free
7,457✔
2777
    } else {
2778
      // Clean up metaData on failure - free all arrays
2779
      if (metaData.pVStbRefDbs) {
196✔
2780
        taosArrayDestroy(metaData.pVStbRefDbs);
196✔
2781
        metaData.pVStbRefDbs = NULL;
196✔
2782
      }
2783
      // Note: Other fields in metaData are managed by catalog module if ctgFree is true
2784
      goto cleanup_root;
196✔
2785
    }
2786

2787
    if (pStmt->sql.pQuery->haveResultSet) {
7,457✔
2788
      STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
15,110✔
2789
                                    pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true));
2790
      taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
7,457✔
2791
      taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
7,457✔
2792
      setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
7,457✔
2793
    }
2794

2795
    TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
7,457✔
2796
    TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
7,457✔
2797
    TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
7,457✔
2798

2799
    // if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
2800
    //   STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
2801
    // }
2802

2803
    // STMT_ERR_RET(stmtBackupQueryFields(pStmt));
2804

2805
    return TSDB_CODE_SUCCESS;
7,457✔
2806

2807
  cleanup_root:
196✔
2808
    STMT2_ELOG("parse query statment unexpected failed code:%d, need to clean node", code);
196✔
2809
    if (pStmt->sql.pQuery && pStmt->sql.pQuery->pRoot) {
196✔
2810
      nodesDestroyNode(pStmt->sql.pQuery->pRoot);
196✔
2811
      pStmt->sql.pQuery->pRoot = NULL;
196✔
2812
    }
2813
    STMT_ERR_RET(code);
196✔
2814
  }
2815

2816
  if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
83,327,598✔
UNCOV
2817
    STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
×
2818
  }
2819

2820
  STableDataCxt** pDataBlock = NULL;
82,601,651✔
2821

2822
  if (pStmt->exec.pCurrBlock) {
82,601,651✔
2823
    pDataBlock = &pStmt->exec.pCurrBlock;
83,657,522✔
2824
  } else {
2825
    pDataBlock =
2826
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
142,768✔
2827
    if (NULL == pDataBlock) {
143,106✔
UNCOV
2828
      STMT2_ELOG("table %s not found in exec blockHash:%p", pStmt->bInfo.tbFName, pStmt->exec.pBlockHash);
×
2829
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
×
2830
    }
2831
    pStmt->exec.pCurrBlock = *pDataBlock;
143,106✔
2832
    if (pStmt->sql.stbInterlaceMode) {
143,106✔
2833
      taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
12,611✔
2834
      (*pDataBlock)->pData->aCol = NULL;
12,611✔
2835
    }
2836
    if (colIdx < -1) {
143,141✔
2837
      pStmt->sql.bindRowFormat = true;
196✔
2838
      taosArrayDestroy((*pDataBlock)->pData->aCol);
196✔
2839
      (*pDataBlock)->pData->aCol = taosArrayInit(20, POINTER_BYTES);
196✔
2840
    }
2841
  }
2842

2843
  int64_t startUs2 = taosGetTimestampUs();
83,439,645✔
2844
  pStmt->stat.bindDataUs1 += startUs2 - startUs;
83,439,645✔
2845

2846
  SStmtQNode* param = NULL;
84,254,819✔
2847
  if (pStmt->sql.stbInterlaceMode) {
84,394,119✔
2848
    STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
2,095,926✔
2849
    STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, &param->tblData.aCol));
2,096,096✔
2850
    taosArrayClear(param->tblData.aCol);
1,048,123✔
2851

2852
    // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES);
2853

2854
    param->restoreTbCols = false;
1,047,767✔
2855
    param->tblData.isOrdered = true;
1,047,802✔
2856
    param->tblData.isDuplicateTs = false;
1,047,802✔
2857
    tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
1,047,837✔
2858

2859
    param->pCreateTbReq = pCreateTbReq;
1,047,802✔
2860
  }
2861

2862
  int64_t startUs3 = taosGetTimestampUs();
82,980,599✔
2863
  pStmt->stat.bindDataUs2 += startUs3 - startUs2;
82,980,599✔
2864

2865
  SArray*   pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol;
83,331,912✔
2866
  SBlobSet* pBlob = NULL;
83,742,768✔
2867
  if (colIdx < 0) {
83,705,462✔
2868
    if (pStmt->sql.stbInterlaceMode) {
84,112,521✔
2869
      (*pDataBlock)->pData->flags &= ~SUBMIT_REQ_COLUMN_DATA_FORMAT;
1,048,193✔
2870
      code = qBindStmtStbColsValue2(*pDataBlock, pCols, pStmt->bInfo.fixedValueCols, bind, pStmt->exec.pRequest->msgBuf,
1,317,291✔
2871
                                    pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
1,048,223✔
2872
                                    pStmt->taos->optionInfo.charsetCxt, &pBlob);
1,048,263✔
2873
      param->tblData.isOrdered = (*pDataBlock)->ordered;
1,048,100✔
2874
      param->tblData.isDuplicateTs = (*pDataBlock)->duplicateTs;
1,048,110✔
2875
    } else {
2876
      if (colIdx == -1) {
83,148,779✔
2877
        if (pStmt->sql.bindRowFormat) {
81,895,853✔
2878
          STMT2_ELOG_E("can't mix bind row format and bind column format");
196✔
2879
          STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
196✔
2880
        }
2881
        code = qBindStmtColsValue2(*pDataBlock, pCols, pStmt->bInfo.fixedValueCols, bind, pStmt->exec.pRequest->msgBuf,
160,326,670✔
2882
                                   pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt);
159,709,448✔
2883
      } else {
2884
        code =
2885
            qBindStmt2RowValue(*pDataBlock, (*pDataBlock)->pData->aRowP, pStmt->bInfo.fixedValueCols, bind,
392✔
2886
                               pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen,
392✔
2887
                               &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo, pStmt->taos->optionInfo.charsetCxt);
1,286,583✔
2888
      }
2889
    }
2890

2891
    if (code) {
82,107,734✔
2892
      STMT2_ELOG("bind cols or rows failed, error:%s", tstrerror(code));
392✔
2893
      STMT_ERR_RET(code);
392✔
2894
    }
2895
  } else {
2896
    if (pStmt->sql.stbInterlaceMode) {
1,176✔
UNCOV
2897
      STMT2_ELOG_E("bind single column not allowed in stb insert mode");
×
UNCOV
2898
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2899
    }
2900

2901
    if (pStmt->sql.bindRowFormat) {
1,176✔
UNCOV
2902
      STMT2_ELOG_E("can't mix bind row format and bind column format");
×
2903
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2904
    }
2905

2906
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
1,176✔
UNCOV
2907
      STMT2_ELOG_E("bind column index not in sequence");
×
UNCOV
2908
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
×
2909
    }
2910

2911
    pStmt->bInfo.sBindLastIdx = colIdx;
1,176✔
2912

2913
    if (0 == colIdx) {
1,176✔
2914
      pStmt->bInfo.sBindRowNum = bind->num;
588✔
2915
    }
2916

2917
    code = qBindStmtSingleColValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
1,176✔
2918
                                    pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum,
1,176✔
2919
                                    pStmt->taos->optionInfo.charsetCxt);
1,176✔
2920
    if (code) {
1,176✔
UNCOV
2921
      STMT2_ELOG("bind single col failed, error:%s", tstrerror(code));
×
UNCOV
2922
      STMT_ERR_RET(code);
×
2923
    }
2924
  }
2925

2926
  int64_t startUs4 = taosGetTimestampUs();
82,179,037✔
2927
  pStmt->stat.bindDataUs3 += startUs4 - startUs3;
82,179,037✔
2928

2929
  if (pStmt->stbInterlaceMode) {
82,725,012✔
2930
    if (param) param->tblData.pBlobSet = pBlob;
83,882,936✔
2931
  }
2932

2933
  if (pStmt->sql.stbInterlaceMode) {
82,758,756✔
2934
    STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param));
1,047,673✔
2935
  } else {
2936
    STMT_ERR_RET(stmtAddBatch2(pStmt));
82,691,372✔
2937
  }
2938

2939
  pStmt->stat.bindDataUs4 += taosGetTimestampUs() - startUs4;
81,349,386✔
2940
  return TSDB_CODE_SUCCESS;
83,357,901✔
2941
}
2942

2943
/*
2944
int stmtUpdateTableUid(STscStmt2* pStmt, SSubmitRsp* pRsp) {
2945
  tscDebug("stmt start to update tbUid, blockNum:%d", pRsp->nBlocks);
2946

2947
  int32_t code = 0;
2948
  int32_t finalCode = 0;
2949
  size_t  keyLen = 0;
2950
  void*   pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
2951
  while (pIter) {
2952
    STableDataCxt* pBlock = *(STableDataCxt**)pIter;
2953
    char*          key = taosHashGetKey(pIter, &keyLen);
2954

2955
    STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
2956
    if (pMeta->uid) {
2957
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
2958
      continue;
2959
    }
2960

2961
    SSubmitBlkRsp* blkRsp = NULL;
2962
    int32_t        i = 0;
2963
    for (; i < pRsp->nBlocks; ++i) {
2964
      blkRsp = pRsp->pBlocks + i;
2965
      if (strlen(blkRsp->tblFName) != keyLen) {
2966
        continue;
2967
      }
2968

2969
      if (strncmp(blkRsp->tblFName, key, keyLen)) {
2970
        continue;
2971
      }
2972

2973
      break;
2974
    }
2975

2976
    if (i < pRsp->nBlocks) {
2977
      tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid,
2978
               blkRsp->uid);
2979

2980
      pMeta->uid = blkRsp->uid;
2981
      pStmt->bInfo.tbUid = blkRsp->uid;
2982
    } else {
2983
      tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
2984
      if (NULL == pStmt->pCatalog) {
2985
        code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
2986
        if (code) {
2987
          pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
2988
          finalCode = code;
2989
          continue;
2990
        }
2991
      }
2992

2993
      code = stmtCreateRequest(pStmt);
2994
      if (code) {
2995
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
2996
        finalCode = code;
2997
        continue;
2998
      }
2999

3000
      STableMeta*      pTableMeta = NULL;
3001
      SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
3002
                               .requestId = pStmt->exec.pRequest->requestId,
3003
                               .requestObjRefId = pStmt->exec.pRequest->self,
3004
                               .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
3005
      code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
3006

3007
      pStmt->stat.ctgGetTbMetaNum++;
3008

3009
      taos_free_result(pStmt->exec.pRequest);
3010
      pStmt->exec.pRequest = NULL;
3011

3012
      if (code || NULL == pTableMeta) {
3013
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
3014
        finalCode = code;
3015
        taosMemoryFree(pTableMeta);
3016
        continue;
3017
      }
3018

3019
      pMeta->uid = pTableMeta->uid;
3020
      pStmt->bInfo.tbUid = pTableMeta->uid;
3021
      taosMemoryFree(pTableMeta);
3022
    }
3023

3024
    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
3025
  }
3026

3027
  return finalCode;
3028
}
3029
*/
3030
/*
3031
int stmtStaticModeExec(TAOS_STMT* stmt) {
3032
  STscStmt2*   pStmt = (STscStmt2*)stmt;
3033
  int32_t     code = 0;
3034
  SSubmitRsp* pRsp = NULL;
3035
  if (pStmt->sql.staticMode) {
3036
    return TSDB_CODE_TSC_STMT_API_ERROR;
3037
  }
3038

3039
  STMT_DLOG_E("start to exec");
3040

3041
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
3042

3043
  STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList,
3044
pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum));
3045

3046
  launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
3047

3048
  if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
3049
    code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
3050
    if (code) {
3051
      pStmt->exec.pRequest->code = code;
3052
    } else {
3053
      tFreeSSubmitRsp(pRsp);
3054
      STMT_ERR_RET(stmtResetStmt(pStmt));
3055
      STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
3056
    }
3057
  }
3058

3059
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
3060

3061
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
3062
  pStmt->affectedRows += pStmt->exec.affectedRows;
3063

3064
_return:
3065

3066
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
3067

3068
  tFreeSSubmitRsp(pRsp);
3069

3070
  ++pStmt->sql.runTimes;
3071

3072
  STMT_RET(code);
3073
}
3074
*/
3075

3076
static int32_t createParseContext(const SRequestObj* pRequest, SParseContext** pCxt, SSqlCallbackWrapper* pWrapper) {
5,488✔
3077
  const STscObj* pTscObj = pRequest->pTscObj;
5,488✔
3078

3079
  *pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
5,488✔
3080
  if (*pCxt == NULL) {
5,488✔
UNCOV
3081
    return terrno;
×
3082
  }
3083

UNCOV
3084
  **pCxt = (SParseContext){.requestId = pRequest->requestId,
×
3085
                           .requestRid = pRequest->self,
5,488✔
3086
                           .acctId = pTscObj->acctId,
5,488✔
3087
                           .db = pRequest->pDb,
5,488✔
3088
                           .topicQuery = false,
3089
                           .pSql = pRequest->sqlstr,
5,488✔
3090
                           .sqlLen = pRequest->sqlLen,
5,488✔
3091
                           .pMsg = pRequest->msgBuf,
5,488✔
3092
                           .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
3093
                           .pTransporter = pTscObj->pAppInfo->pTransporter,
5,488✔
3094
                           .pStmtCb = NULL,
3095
                           .pUser = pTscObj->user,
5,488✔
3096
                           .userId = pTscObj->userId,
5,488✔
3097
                           .pEffectiveUser = pRequest->effectiveUser,
5,488✔
3098
                           .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
5,488✔
3099
                           .enableSysInfo = pTscObj->sysInfo,
5,488✔
3100
                           .sodInitial = pTscObj->pAppInfo->serverCfg.sodInitial,
5,488✔
3101
                           .privInfo = pWrapper->pParseCtx ? pWrapper->pParseCtx->privInfo : 0,
5,488✔
3102
                           .async = true,
3103
                           .svrVer = pTscObj->sVer,
5,488✔
3104
                           .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
5,488✔
3105
                           .allocatorId = pRequest->allocatorRefId,
5,488✔
3106
                           .parseSqlFp = clientParseSql,
3107
                           .parseSqlParam = pWrapper};
3108
  int8_t biMode = atomic_load_8(&((STscObj*)pTscObj)->biMode);
5,488✔
3109
  (*pCxt)->biMode = biMode;
5,488✔
3110
  return TSDB_CODE_SUCCESS;
5,488✔
3111
}
3112

3113
static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
5,488✔
3114
  STscStmt2*        pStmt = userdata;
5,488✔
3115
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
5,488✔
3116
  pStmt->asyncResultAvailable = true;
5,488✔
3117
  pStmt->exec.pRequest->inCallback = true;
5,488✔
3118

3119
  // NEED_CLIENT_HANDLE_ERROR: retry internally without notifying user; retry completion uses this same cb + fp once.
3120
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pStmt->pVgDataBlocksForRetry != NULL) {
5,684✔
3121
    int32_t origExecCode = code;
392✔
3122
    STMT2_ELOG("async exec got NEED_CLIENT_HANDLE_ERROR (code:%s), retrying internally", tstrerror(code));
392✔
3123

3124
    // Try to retry internally; completion uses asyncQueryCb so user fp runs once with the final result.
3125
    int32_t retryCode = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
392✔
3126
    if (retryCode == TSDB_CODE_SUCCESS) {
392✔
3127
      stmtInvalidateStbInterlaceTableUidCache(pStmt);
196✔
3128
      if (origExecCode == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
196✔
3129
        retryCode = stmtUpdateVgDataBlocksTbMetaFromCatalog(pStmt, pStmt->exec.pRequest);
196✔
NEW
3130
      } else if (stmtIsSchemaVersionRetryError(origExecCode)) {
×
NEW
3131
        retryCode = stmtUpdateVgDataBlocksSchemaVer(pStmt, pStmt->exec.pRequest);
×
3132
      }
3133
    }
3134
    if (retryCode == TSDB_CODE_SUCCESS) {
392✔
3135
      (void)stmtRestoreVgDataBlocksForRetry(pStmt);
196✔
3136
      resetRequest(pStmt);
196✔
3137
      pStmt->asyncResultAvailable = false;
196✔
3138
      retryCode = stmtCreateRequest(pStmt);
196✔
3139
      if (retryCode == TSDB_CODE_SUCCESS) {
196✔
3140
        SRequestObj*         pNewReq = pStmt->exec.pRequest;
196✔
3141
        SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
196✔
3142
        if (pWrapper == NULL) {
196✔
NEW
3143
          retryCode = terrno;
×
NEW
3144
          resetRequest(pStmt);
×
3145
        } else {
3146
          pWrapper->pRequest = pNewReq;
196✔
3147
          pNewReq->pWrapper = pWrapper;
196✔
3148
          retryCode = createParseContext(pNewReq, &pWrapper->pParseCtx, pWrapper);
196✔
3149
          if (retryCode == TSDB_CODE_SUCCESS) {
196✔
3150
            pNewReq->syncQuery = false;
196✔
3151
            // Same as first exec: asyncQueryCb invokes user asyncExecFn once with userdata (not raw pStmt as fp's 1st arg).
3152
            pNewReq->body.queryFp = asyncQueryCb;
196✔
3153
            ((SSyncQueryParam*)(pNewReq)->body.interParam)->userParam = pStmt;
196✔
3154
            launchAsyncQuery(pNewReq, pStmt->sql.pQuery, NULL, pWrapper);
196✔
3155
            // Retry asyncQueryCb will call fp, stmtCleanExecInfo, and tsem_post(asyncExecSem).
3156
            return;
196✔
3157
          }
3158
          // Do not taosMemoryFree(pWrapper): destroyRequest frees it via destorySqlCallbackWrapper.
NEW
3159
          resetRequest(pStmt);
×
3160
        }
3161
      }
3162
    }
3163
    // Retry setup failed (did not return above): notify user once with the original error, then cleanup + post sem.
3164
    if (fp) {
196✔
3165
      fp(pStmt->options.userdata, res, code);
196✔
3166
    }
3167
  } else {
3168
    if (code == TSDB_CODE_SUCCESS) {
5,096✔
3169
      pStmt->exec.affectedRows = taos_affected_rows(res);
5,096✔
3170
      pStmt->affectedRows += pStmt->exec.affectedRows;
5,096✔
3171
    }
3172

3173
    if (fp) {
5,096✔
3174
      fp(pStmt->options.userdata, res, code);
5,096✔
3175
    }
3176
  }
3177

3178
  while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
6,777✔
3179
    taosUsleep(1);
1,485✔
3180
  }
3181
  (void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
5,292✔
3182
  ++pStmt->sql.runTimes;
5,292✔
3183
  if (pStmt->exec.pRequest != NULL) {
5,292✔
3184
    pStmt->exec.pRequest->inCallback = false;
3,332✔
3185
  }
3186

3187
  if (tsem_post(&pStmt->asyncExecSem) != 0) {
5,292✔
UNCOV
3188
    STMT2_ELOG_E("fail to post asyncExecSem");
×
3189
  }
3190
}
3191

3192
int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
640,854✔
3193
  STscStmt2* pStmt = (STscStmt2*)stmt;
640,854✔
3194
  int32_t    code = 0;
640,854✔
3195
  int64_t    startUs = taosGetTimestampUs();
641,057✔
3196

3197
  STMT2_DLOG_E("start to exec");
641,057✔
3198

3199
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
641,031✔
3200
    return pStmt->errCode;
196✔
3201
  }
3202

3203
  STMT_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
640,835✔
3204
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
640,865✔
UNCOV
3205
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
3206
  }
3207
  STMT_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
640,891✔
3208

3209
  if (pStmt->sql.stbInterlaceMode) {
640,891✔
3210
    STMT_ERR_RET(stmtAddBatch2(pStmt));
495,916✔
3211
  }
3212

3213
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
640,895✔
3214

3215
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
640,913✔
3216
    if (pStmt->sql.stbInterlaceMode) {
633,382✔
3217
      int64_t startTs = taosGetTimestampUs();
495,955✔
3218
      // wait for stmt bind thread to finish
3219
      while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
1,961,834✔
3220
        taosUsleep(1);
1,465,787✔
3221
      }
3222

3223
      if (pStmt->errCode != TSDB_CODE_SUCCESS) {
495,881✔
3224
        return pStmt->errCode;
196✔
3225
      }
3226

3227
      pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs;
495,694✔
3228
      STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList));
495,694✔
3229
      taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
495,650✔
3230
      pStmt->sql.siInfo.pVgroupHash = NULL;
495,711✔
3231
      pStmt->sql.siInfo.pVgroupList = NULL;
495,711✔
3232
    } else {
3233
      tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
137,440✔
3234
      taosMemoryFreeClear(pStmt->exec.pCurrTbData);
137,514✔
3235

3236
      STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
137,514✔
3237

3238
      STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
137,245✔
3239
    }
3240
    // Save serialized data blocks for potential NEED_CLIENT_HANDLE_ERROR retry before the planner
3241
    // takes ownership of pDataBlocks during createQueryPlan.
3242
    STMT_ERR_RET(stmtSaveVgDataBlocksForRetry(pStmt));
633,000✔
3243
  }
3244

3245
  pStmt->asyncResultAvailable = false;
640,426✔
3246
  SRequestObj*      pRequest = pStmt->exec.pRequest;
640,496✔
3247
  __taos_async_fn_t fp = pStmt->options.asyncExecFn;
640,496✔
3248
  STMT2_DLOG("EXEC INFO :req:0x%" PRIx64 ", QID:0x%" PRIx64 ", exec sql:%s,  conn:%" PRId64, pRequest->self,
640,496✔
3249
             pRequest->requestId, pStmt->sql.sqlStr, pRequest->pTscObj->id);
3250

3251
  if (!fp) {
640,362✔
3252
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
635,070✔
3253

3254
    if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
635,077✔
3255
      int32_t origExecCode = pStmt->exec.pRequest->code;
392✔
3256
      STMT2_WLOG_E("exec failed errorcode:NEED_CLIENT_HANDLE_ERROR, refresh meta and retry internally");
392✔
3257
      code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
392✔
3258
      if (code == TSDB_CODE_SUCCESS) {
392✔
3259
        stmtInvalidateStbInterlaceTableUidCache(pStmt);
392✔
3260
      }
3261
      if (code == TSDB_CODE_SUCCESS && pStmt->pVgDataBlocksForRetry != NULL) {
392✔
3262
        if (origExecCode == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
392✔
3263
          code = stmtUpdateVgDataBlocksTbMetaFromCatalog(pStmt, pStmt->exec.pRequest);
196✔
3264
        } else if (stmtIsSchemaVersionRetryError(origExecCode)) {
196✔
3265
          code = stmtUpdateVgDataBlocksSchemaVer(pStmt, pStmt->exec.pRequest);
196✔
3266
        }
3267
      }
3268
      if (code == TSDB_CODE_SUCCESS && pStmt->pVgDataBlocksForRetry != NULL) {
392✔
3269
        // Restore saved serialized data blocks and re-execute with refreshed meta.
3270
        STMT_ERR_JRET(stmtRestoreVgDataBlocksForRetry(pStmt));
392✔
3271
        resetRequest(pStmt);
392✔
3272
        STMT_ERR_JRET(stmtCreateRequest(pStmt));
392✔
3273
        launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
392✔
3274
        code = pStmt->exec.pRequest->code;
392✔
NEW
3275
      } else if (code == TSDB_CODE_SUCCESS) {
×
NEW
3276
        code = pStmt->exec.pRequest->code;
×
3277
      } else {
NEW
3278
        pStmt->exec.pRequest->code = code;
×
3279
      }
3280
    }
3281

3282
    STMT_ERR_JRET(pStmt->exec.pRequest->code);
635,177✔
3283

3284
    pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
634,716✔
3285
    if (affected_rows) {
634,556✔
3286
      *affected_rows = pStmt->exec.affectedRows;
625,032✔
3287
    }
3288
    pStmt->affectedRows += pStmt->exec.affectedRows;
634,218✔
3289

3290
    // wait for stmt bind thread to finish
3291
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
634,858✔
3292
      taosUsleep(1);
297✔
3293
    }
3294

3295
    STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false));
634,763✔
3296

3297
    ++pStmt->sql.runTimes;
634,798✔
3298
  } else {
3299
    SSqlCallbackWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
5,292✔
3300
    if (pWrapper == NULL) {
5,292✔
3301
      code = terrno;
×
3302
    } else {
3303
      pWrapper->pRequest = pRequest;
5,292✔
3304
      pRequest->pWrapper = pWrapper;
5,292✔
3305
    }
3306
    if (TSDB_CODE_SUCCESS == code) {
5,292✔
3307
      code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper);
5,292✔
3308
    }
3309
    pRequest->syncQuery = false;
5,292✔
3310
    pRequest->body.queryFp = asyncQueryCb;
5,292✔
3311
    ((SSyncQueryParam*)(pRequest)->body.interParam)->userParam = pStmt;
5,292✔
3312

3313
    pStmt->execSemWaited = false;
5,292✔
3314
    launchAsyncQuery(pRequest, pStmt->sql.pQuery, NULL, pWrapper);
5,292✔
3315
  }
3316

3317
_return:
640,482✔
3318
  if (code) {
640,482✔
3319
    STMT2_ELOG("exec failed, error:%s", tstrerror(code));
392✔
3320
  }
3321
  pStmt->stat.execUseUs += taosGetTimestampUs() - startUs;
640,312✔
3322

3323
  STMT_RET(code);
640,352✔
3324
}
3325

3326
int stmtClose2(TAOS_STMT2* stmt) {
154,522✔
3327
  STscStmt2* pStmt = (STscStmt2*)stmt;
154,522✔
3328

3329
  STMT2_DLOG_E("start to close stmt");
154,522✔
3330
  taosMemoryFreeClear(pStmt->db);
154,522✔
3331

3332
  if (pStmt->bindThreadInUse) {
154,522✔
3333
    // wait for stmt bind thread to finish
3334
    while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) {
139,431✔
3335
      taosUsleep(1);
394✔
3336
    }
3337

3338
    (void)taosThreadMutexLock(&pStmt->queue.mutex);
138,809✔
3339
    pStmt->queue.stopQueue = true;
139,037✔
3340
    (void)taosThreadCondSignal(&(pStmt->queue.waitCond));
139,037✔
3341
    (void)taosThreadMutexUnlock(&pStmt->queue.mutex);
139,037✔
3342

3343
    (void)taosThreadJoin(pStmt->bindThread, NULL);
139,037✔
3344
    pStmt->bindThreadInUse = false;
139,037✔
3345

3346
    (void)taosThreadCondDestroy(&pStmt->queue.waitCond);
139,037✔
3347
    (void)taosThreadMutexDestroy(&pStmt->queue.mutex);
139,037✔
3348
  }
3349

3350
  TSC_ERR_RET(taosThreadMutexLock(&pStmt->asyncBindParam.mutex));
154,522✔
3351
  while (atomic_load_8((int8_t*)&pStmt->asyncBindParam.asyncBindNum) > 0) {
154,522✔
UNCOV
3352
    (void)taosThreadCondWait(&pStmt->asyncBindParam.waitCond, &pStmt->asyncBindParam.mutex);
×
3353
  }
3354
  TSC_ERR_RET(taosThreadMutexUnlock(&pStmt->asyncBindParam.mutex));
154,522✔
3355

3356
  (void)taosThreadCondDestroy(&pStmt->asyncBindParam.waitCond);
154,522✔
3357
  (void)taosThreadMutexDestroy(&pStmt->asyncBindParam.mutex);
154,522✔
3358

3359
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
154,522✔
3360
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
2,548✔
UNCOV
3361
      STMT2_ELOG_E("fail to wait asyncExecSem");
×
3362
    }
3363
  }
3364

3365
  /* On macOS dispatch_semaphore_dispose requires value >= orig (1). After tsem_wait above value is 0; post once before
3366
   * destroy. */
3367
  if (pStmt->options.asyncExecFn) {
154,522✔
3368
    if (tsem_post(&pStmt->asyncExecSem) != 0) {
2,548✔
UNCOV
3369
      STMT2_ELOG_E("fail to post asyncExecSem");
×
3370
    }
3371
  }
3372

3373
  STMT2_DLOG("stbInterlaceMode:%d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
154,522✔
3374
             ", parseSqlNum=>%" PRId64 ", pStmt->stat.bindDataNum=>%" PRId64
3375
             ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
3376
             ", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64
3377
             ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64,
3378
             pStmt->sql.stbInterlaceMode, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo,
3379
             pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum, pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND],
3380
             pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE], pStmt->stat.setTbNameUs,
3381
             pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
3382
             pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
3383
  if (pStmt->sql.stbInterlaceMode) {
154,522✔
3384
    pStmt->bInfo.tagsCached = false;
10,489✔
3385
  }
3386
  pStmt->bInfo.boundColsCached = false;
154,522✔
3387

3388
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
154,522✔
3389

3390
  if (pStmt->options.asyncExecFn) {
154,522✔
3391
    if (tsem_destroy(&pStmt->asyncExecSem) != 0) {
2,548✔
UNCOV
3392
      STMT2_ELOG_E("fail to destroy asyncExecSem");
×
3393
    }
3394
  }
3395
  taosMemoryFree(stmt);
154,522✔
3396

3397
  return TSDB_CODE_SUCCESS;
154,522✔
3398
}
3399

3400
const char* stmt2Errstr(TAOS_STMT2* stmt) {
9,770✔
3401
  STscStmt2* pStmt = (STscStmt2*)stmt;
9,770✔
3402

3403
  if (stmt == NULL || NULL == pStmt->exec.pRequest) {
9,770✔
3404
    return (char*)tstrerror(terrno);
784✔
3405
  }
3406

3407
  // if stmt async exec ,error code is pStmt->exec.pRequest->code
3408
  if (!(pStmt->sql.status >= STMT_EXECUTE && pStmt->options.asyncExecFn != NULL && pStmt->asyncResultAvailable)) {
8,986✔
3409
    pStmt->exec.pRequest->code = terrno;
8,986✔
3410
  }
3411

3412
  SRequestObj* pRequest = pStmt->exec.pRequest;
8,986✔
3413
  if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) {
8,986✔
3414
    return pRequest->msgBuf;
6,076✔
3415
  }
3416
  return (const char*)tstrerror(pRequest->code);
2,910✔
3417
}
3418

3419
// Alias kept for compatibility with object files compiled against older headers.
NEW
3420
const char* stmtErrstr2(TAOS_STMT2* stmt) { return stmt2Errstr(stmt); }
×
3421
/*
3422
int stmtAffectedRows(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->affectedRows; }
3423

3424
int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt2*)stmt)->exec.affectedRows; }
3425
*/
3426

3427
int stmtParseColFields2(TAOS_STMT2* stmt) {
14,103✔
3428
  int32_t    code = 0;
14,103✔
3429
  STscStmt2* pStmt = (STscStmt2*)stmt;
14,103✔
3430
  int32_t    preCode = pStmt->errCode;
14,103✔
3431

3432
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
14,103✔
UNCOV
3433
    return pStmt->errCode;
×
3434
  }
3435

3436
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
14,103✔
UNCOV
3437
    STMT2_ELOG_E("stmtParseColFields2 only for insert");
×
UNCOV
3438
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
×
3439
  }
3440

3441
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
14,103✔
3442

3443
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
14,103✔
3444
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
588✔
UNCOV
3445
    pStmt->bInfo.needParse = false;
×
3446
  }
3447
  if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
14,103✔
3448
    pStmt->bInfo.needParse = false;
1,176✔
3449
  }
3450

3451
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
14,103✔
3452

3453
  if (pStmt->bInfo.needParse) {
14,103✔
3454
    STMT_ERRI_JRET(stmtParseSql(pStmt));
12,731✔
3455
  }
3456

3457
_return:
10,605✔
3458
  // compatible with previous versions
3459
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST && (pStmt->bInfo.tbNameFlag & NO_DATA_USING_CLAUSE) == 0x0) {
14,103✔
3460
    code = TSDB_CODE_TSC_STMT_TBNAME_ERROR;
784✔
3461
  }
3462

3463
  pStmt->errCode = preCode;
14,103✔
3464

3465
  return code;
14,103✔
3466
}
3467

3468
int stmtGetStbColFields2(TAOS_STMT2* stmt, int* nums, TAOS_FIELD_ALL** fields) {
14,103✔
3469
  int32_t code = stmtParseColFields2(stmt);
14,103✔
3470
  if (code != TSDB_CODE_SUCCESS) {
14,103✔
3471
    return code;
3,498✔
3472
  }
3473

3474
  return stmtFetchStbColFields2(stmt, nums, fields);
10,605✔
3475
}
3476

3477
int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) {
3,724✔
3478
  int32_t    code = 0;
3,724✔
3479
  STscStmt2* pStmt = (STscStmt2*)stmt;
3,724✔
3480
  int32_t    preCode = pStmt->errCode;
3,724✔
3481

3482
  if (pStmt->errCode != TSDB_CODE_SUCCESS) {
3,724✔
UNCOV
3483
    return pStmt->errCode;
×
3484
  }
3485

3486
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
3,724✔
3487

3488
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
3,724✔
UNCOV
3489
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
×
UNCOV
3490
    pStmt->bInfo.needParse = false;
×
3491
  }
3492

3493
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
3,724✔
UNCOV
3494
    resetRequest(pStmt);
×
3495
  }
3496

3497
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
3,724✔
3498

3499
  if (pStmt->bInfo.needParse) {
3,724✔
UNCOV
3500
    STMT_ERRI_JRET(stmtParseSql(pStmt));
×
3501
  }
3502

3503
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
3,724✔
3504
    *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
3,724✔
3505
  } else {
UNCOV
3506
    STMT_ERRI_JRET(stmtFetchColFields2(stmt, nums, NULL));
×
3507
  }
3508

3509
  STMT2_DLOG("get param num success, nums:%d", *nums);
3,724✔
3510

3511
_return:
3,724✔
3512

3513
  pStmt->errCode = preCode;
3,724✔
3514

3515
  return code;
3,724✔
3516
}
3517

3518
TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) {
6,673✔
3519
  STscStmt2* pStmt = (STscStmt2*)stmt;
6,673✔
3520

3521
  STMT2_TLOG_E("start to use result");
6,673✔
3522

3523
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
6,673✔
UNCOV
3524
    STMT2_ELOG_E("useResult only for query statement");
×
UNCOV
3525
    return NULL;
×
3526
  }
3527

3528
  if (pStmt->options.asyncExecFn != NULL && !pStmt->asyncResultAvailable) {
6,673✔
3529
    STMT2_ELOG_E("use result after callBackFn return");
196✔
3530
    return NULL;
196✔
3531
  }
3532

3533
  if (tsUseAdapter) {
6,477✔
3534
    TAOS_RES* res = (TAOS_RES*)pStmt->exec.pRequest;
4,508✔
3535
    pStmt->exec.pRequest = NULL;
4,508✔
3536
    return res;
4,508✔
3537
  }
3538

3539
  return pStmt->exec.pRequest;
1,969✔
3540
}
3541

UNCOV
3542
int32_t stmtAsyncBindThreadFunc(void* args) {
×
UNCOV
3543
  qInfo("async stmt bind thread started");
×
3544

UNCOV
3545
  ThreadArgs* targs = (ThreadArgs*)args;
×
3546
  STscStmt2*  pStmt = (STscStmt2*)targs->stmt;
×
3547

UNCOV
3548
  int code = taos_stmt2_bind_param(targs->stmt, targs->bindv, targs->col_idx);
×
UNCOV
3549
  targs->fp(targs->param, NULL, code);
×
UNCOV
3550
  (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
×
UNCOV
3551
  (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
×
UNCOV
3552
  (void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
×
3553
  (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
×
UNCOV
3554
  taosMemoryFree(args);
×
3555

UNCOV
3556
  qInfo("async stmt bind thread stopped");
×
3557

UNCOV
3558
  return code;
×
3559
}
3560

3561
void stmtBuildErrorMsg(STscStmt2* pStmt, const char* msg) {
588✔
3562
  if (pStmt == NULL || msg == NULL) {
588✔
UNCOV
3563
    return;
×
3564
  }
3565

3566
  if (pStmt->exec.pRequest == NULL) {
588✔
UNCOV
3567
    return;
×
3568
  }
3569

3570
  if (pStmt->exec.pRequest->msgBuf == NULL) {
588✔
UNCOV
3571
    return;
×
3572
  }
3573

3574
  size_t msgLen = strlen(msg);
588✔
3575
  size_t bufLen = pStmt->exec.pRequest->msgBufLen;
588✔
3576

3577
  if (msgLen >= bufLen) {
588✔
UNCOV
3578
    tstrncpy(pStmt->exec.pRequest->msgBuf, msg, bufLen - 1);
×
UNCOV
3579
    pStmt->exec.pRequest->msgBuf[bufLen - 1] = '\0';
×
UNCOV
3580
    pStmt->exec.pRequest->msgBufLen = bufLen - 1;
×
3581
  } else {
3582
    tstrncpy(pStmt->exec.pRequest->msgBuf, msg, bufLen);
588✔
3583
    pStmt->exec.pRequest->msgBufLen = msgLen;
588✔
3584
  }
3585

3586
  return;
588✔
3587
}
3588

3589
int32_t stmtBuildErrorMsgWithCode(STscStmt2* pStmt, const char* msg, int32_t errorCode) {
588✔
3590
  stmtBuildErrorMsg(pStmt, msg);
588✔
3591
  pStmt->errCode = errorCode;
588✔
3592

3593
  return errorCode;
588✔
3594
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc