• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3522

07 Nov 2024 05:59AM UTC coverage: 58.216% (+1.3%) from 56.943%
#3522

push

travis-ci

web-flow
Merge pull request #28663 from taosdata/fix/3_liaohj

fix(stream): stop the underlying scan operations for stream

111884 of 248391 branches covered (45.04%)

Branch coverage included in aggregate %.

3 of 4 new or added lines in 1 file covered. (75.0%)

1164 existing lines in 134 files now uncovered.

191720 of 273118 relevant lines covered (70.2%)

13088725.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.58
/source/libs/parser/src/parInsertStmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "os.h"
17
#include "parInsertUtil.h"
18
#include "parInt.h"
19
#include "parToken.h"
20
#include "query.h"
21
#include "tglobal.h"
22
#include "ttime.h"
23
#include "ttypes.h"
24

25
typedef struct SKvParam {
26
  int16_t  pos;
27
  SArray*  pTagVals;
28
  SSchema* schema;
29
  char     buf[TSDB_MAX_TAGS_LEN];
30
} SKvParam;
31

32
int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData) {
57✔
33
  *pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
57✔
34
  if (NULL == *pData) {
57!
35
    return terrno;
×
36
  }
37

38
  SSubmitTbData* pNew = *pData;
57✔
39

40
  *pNew = *pDataBlock->pData;
57✔
41

42
  int32_t code = cloneSVreateTbReq(pDataBlock->pData->pCreateTbReq, &pNew->pCreateTbReq);
57✔
43
  if (TSDB_CODE_SUCCESS != code) {
57!
44
    taosMemoryFreeClear(*pData);
×
45
    return code;
×
46
  }
47
  pNew->aCol = taosArrayDup(pDataBlock->pData->aCol, NULL);
57✔
48
  if (!pNew->aCol) {
57!
49
    code = terrno;
×
50
    taosMemoryFreeClear(*pData);
×
51
    return code;
×
52
  }
53

54
  int32_t colNum = taosArrayGetSize(pNew->aCol);
57✔
55
  for (int32_t i = 0; i < colNum; ++i) {
622✔
56
    SColData* pCol = (SColData*)taosArrayGet(pNew->aCol, i);
565✔
57
    tColDataDeepClear(pCol);
565✔
58
  }
59

60
  return TSDB_CODE_SUCCESS;
57✔
61
}
62

63
int32_t qAppendStmtTableOutput(SQuery* pQuery, SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
16✔
64
                               SStbInterlaceInfo* pBuildInfo) {
65
  // merge according to vgId
66
  return insAppendStmtTableDataCxt(pAllVgHash, pTbData, pTbCtx, pBuildInfo);
16✔
67
}
68

69
int32_t qBuildStmtFinOutput(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
16✔
70
  int32_t             code = TSDB_CODE_SUCCESS;
16✔
71
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
16✔
72

73
  if (TSDB_CODE_SUCCESS == code) {
16!
74
    code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
16✔
75
  }
76

77
  if (pStmt->freeArrayFunc) {
16!
78
    pStmt->freeArrayFunc(pVgDataBlocks);
16✔
79
  }
80
  return code;
16✔
81
}
82

83
/*
84
int32_t qBuildStmtOutputFromTbList(SQuery* pQuery, SHashObj* pVgHash, SArray* pBlockList, STableDataCxt* pTbCtx, int32_t
85
tbNum) { int32_t             code = TSDB_CODE_SUCCESS; SArray*             pVgDataBlocks = NULL; SVnodeModifyOpStmt*
86
pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
87

88
  // merge according to vgId
89
  if (tbNum > 0) {
90
    code = insMergeStmtTableDataCxt(pTbCtx, pBlockList, &pVgDataBlocks, true, tbNum);
91
  }
92

93
  if (TSDB_CODE_SUCCESS == code) {
94
    code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks);
95
  }
96

97
  if (pStmt->freeArrayFunc) {
98
    pStmt->freeArrayFunc(pVgDataBlocks);
99
  }
100
  return code;
101
}
102
*/
103

104
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
57✔
105
  int32_t             code = TSDB_CODE_SUCCESS;
57✔
106
  SArray*             pVgDataBlocks = NULL;
57✔
107
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
57✔
108

109
  // merge according to vgId
110
  if (taosHashGetSize(pBlockHash) > 0) {
57!
111
    code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks, true);
57✔
112
  }
113

114
  if (TSDB_CODE_SUCCESS == code) {
57!
115
    code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks, false);
57✔
116
  }
117

118
  if (pStmt->freeArrayFunc) {
57!
119
    pStmt->freeArrayFunc(pVgDataBlocks);
57✔
120
  }
121
  return code;
57✔
122
}
123

124
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
8✔
125
                           TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
126
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
8✔
127
  SMsgBuf        pBuf = {.buf = msgBuf, .len = msgBufLen};
8✔
128
  int32_t        code = TSDB_CODE_SUCCESS;
8✔
129
  SBoundColInfo* tags = (SBoundColInfo*)boundTags;
8✔
130
  if (NULL == tags) {
8!
131
    return TSDB_CODE_APP_ERROR;
×
132
  }
133

134
  SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
8✔
135
  if (!pTagArray) {
8!
136
    return buildInvalidOperationMsg(&pBuf, "out of memory");
×
137
  }
138

139
  SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
8✔
140
  if (!tagName) {
8!
141
    code = buildInvalidOperationMsg(&pBuf, "out of memory");
×
142
    goto end;
×
143
  }
144

145
  SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta);
8✔
146

147
  bool  isJson = false;
8✔
148
  STag* pTag = NULL;
8✔
149

150
  for (int c = 0; c < tags->numOfBound; ++c) {
31✔
151
    if (bind[c].is_null && bind[c].is_null[0]) {
23!
152
      continue;
×
153
    }
154

155
    SSchema* pTagSchema = &pSchema[tags->pColIndex[c]];
23✔
156
    int32_t  colLen = pTagSchema->bytes;
23✔
157
    if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
23!
158
      if (!bind[c].length) {
2!
159
        code = buildInvalidOperationMsg(&pBuf, "var tag length is null");
×
160
        goto end;
×
161
      }
162
      colLen = bind[c].length[0];
2✔
163
      if ((colLen + VARSTR_HEADER_SIZE) > pTagSchema->bytes) {
2!
164
        code = buildInvalidOperationMsg(&pBuf, "tag length is too big");
×
165
        goto end;
×
166
      }
167
    }
168
    if (NULL == taosArrayPush(tagName, pTagSchema->name)) {
46!
169
      code = terrno;
×
170
      goto end;
×
171
    }
172
    if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
23!
173
      if (colLen > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
×
174
        code = buildSyntaxErrMsg(&pBuf, "json string too long than 4095", bind[c].buffer);
×
175
        goto end;
×
176
      }
177

178
      isJson = true;
×
179
      char* tmp = taosMemoryCalloc(1, colLen + 1);
×
180
      if (!tmp) {
×
181
        code = terrno;
×
182
        goto end;
×
183
      }
184
      memcpy(tmp, bind[c].buffer, colLen);
×
185
      code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf);
×
186
      taosMemoryFree(tmp);
×
187
      if (code != TSDB_CODE_SUCCESS) {
×
188
        goto end;
×
189
      }
190
    } else {
191
      STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
23✔
192
      //      strcpy(val.colName, pTagSchema->name);
193
      if (pTagSchema->type == TSDB_DATA_TYPE_BINARY || pTagSchema->type == TSDB_DATA_TYPE_VARBINARY ||
23!
194
          pTagSchema->type == TSDB_DATA_TYPE_GEOMETRY) {
22!
195
        val.pData = (uint8_t*)bind[c].buffer;
1✔
196
        val.nData = colLen;
1✔
197
      } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
22✔
198
        int32_t output = 0;
1✔
199
        void*   p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE);
1✔
200
        if (p == NULL) {
1!
201
          code = terrno;
×
202
          goto end;
×
203
        }
204
        if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), colLen * TSDB_NCHAR_SIZE, &output)) {
1!
205
          if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
×
206
            taosMemoryFree(p);
×
207
            code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
×
208
            goto end;
×
209
          }
210
          char buf[512] = {0};
×
211
          snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(terrno));
×
212
          taosMemoryFree(p);
×
213
          code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer);
×
214
          goto end;
×
215
        }
216
        val.pData = p;
1✔
217
        val.nData = output;
1✔
218
      } else {
219
        memcpy(&val.i64, bind[c].buffer, colLen);
21✔
220
      }
221
      if (IS_VAR_DATA_TYPE(pTagSchema->type) && val.nData > pTagSchema->bytes) {
23!
222
        code = TSDB_CODE_PAR_VALUE_TOO_LONG;
×
223
        goto end;
×
224
      }
225
      if (NULL == taosArrayPush(pTagArray, &val)) {
23!
226
        code = terrno;
×
227
        goto end;
×
228
      }
229
    }
230
  }
231

232
  if (!isJson && (code = tTagNew(pTagArray, 1, false, &pTag)) != TSDB_CODE_SUCCESS) {
8!
233
    goto end;
×
234
  }
235

236
  if (NULL == pDataBlock->pData->pCreateTbReq) {
8!
237
    pDataBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
8✔
238
    if (NULL == pDataBlock->pData->pCreateTbReq) {
8!
239
      code = terrno;
×
240
      goto end;
×
241
    }
242
  }
243

244
  code = insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName,
8✔
245
                             pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
8✔
246
  pTag = NULL;
8✔
247

248
end:
8✔
249
  for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
31✔
250
    STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
23✔
251
    if (p->type == TSDB_DATA_TYPE_NCHAR) {
23✔
252
      taosMemoryFreeClear(p->pData);
1!
253
    }
254
  }
255
  taosArrayDestroy(pTagArray);
8✔
256
  taosArrayDestroy(tagName);
8✔
257
  taosMemoryFree(pTag);
8✔
258

259
  return code;
8✔
260
}
261

262
int32_t convertStmtNcharCol(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_MULTI_BIND* src, TAOS_MULTI_BIND* dst) {
51✔
263
  int32_t output = 0;
51✔
264
  int32_t newBuflen = (pSchema->bytes - VARSTR_HEADER_SIZE) * src->num;
51✔
265
  if (dst->buffer_length < newBuflen) {
51!
266
    dst->buffer = taosMemoryRealloc(dst->buffer, newBuflen);
51✔
267
    if (NULL == dst->buffer) {
51!
268
      return terrno;
×
269
    }
270
  }
271

272
  if (NULL == dst->length) {
51!
273
    dst->length = taosMemoryRealloc(dst->length, sizeof(int32_t) * src->num);
51✔
274
    if (NULL == dst->length) {
51!
275
      taosMemoryFreeClear(dst->buffer);
×
276
      return terrno;
×
277
    }
278
  }
279

280
  dst->buffer_length = pSchema->bytes - VARSTR_HEADER_SIZE;
51✔
281

282
  for (int32_t i = 0; i < src->num; ++i) {
538✔
283
    if (src->is_null && src->is_null[i]) {
487!
284
      continue;
2✔
285
    }
286

287
    if (!taosMbsToUcs4(((char*)src->buffer) + src->buffer_length * i, src->length[i],
485!
288
                       (TdUcs4*)(((char*)dst->buffer) + dst->buffer_length * i), dst->buffer_length, &output)) {
485✔
289
      if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
×
290
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
291
      }
292
      char buf[512] = {0};
×
293
      snprintf(buf, tListLen(buf), "%s", strerror(terrno));
×
294
      return buildSyntaxErrMsg(pMsgBuf, buf, NULL);
×
295
    }
296

297
    dst->length[i] = output;
485✔
298
  }
299

300
  dst->buffer_type = src->buffer_type;
51✔
301
  dst->is_null = src->is_null;
51✔
302
  dst->num = src->num;
51✔
303

304
  return TSDB_CODE_SUCCESS;
51✔
305
}
306

307
int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen,
16✔
308
                              STSchema** pTSchema, SBindInfo* pBindInfos) {
309
  STableDataCxt*   pDataBlock = (STableDataCxt*)pBlock;
16✔
310
  SSchema*         pSchema = getTableColumnSchema(pDataBlock->pMeta);
16✔
311
  SBoundColInfo*   boundInfo = &pDataBlock->boundColsInfo;
16✔
312
  SMsgBuf          pBuf = {.buf = msgBuf, .len = msgBufLen};
16✔
313
  int32_t          rowNum = bind->num;
16✔
314
  TAOS_MULTI_BIND  ncharBind = {0};
16✔
315
  TAOS_MULTI_BIND* pBind = NULL;
16✔
316
  int32_t          code = 0;
16✔
317
  int16_t          lastColId = -1;
16✔
318
  bool             colInOrder = true;
16✔
319

320
  if (NULL == *pTSchema) {
16✔
321
    *pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
4✔
322
  }
323

324
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
252✔
325
    SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
236✔
326
    if (pColSchema->colId <= lastColId) {
236!
327
      colInOrder = false;
×
328
    } else {
329
      lastColId = pColSchema->colId;
236✔
330
    }
331
    // SColData* pCol = taosArrayGet(pCols, c);
332

333
    if (bind[c].num != rowNum) {
236!
334
      code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
×
335
      goto _return;
×
336
    }
337

338
    if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) &&
236!
339
        bind[c].buffer_type != pColSchema->type) {  // for rowNum ==1 , connector may not set buffer_type
236!
340
      code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
×
341
      goto _return;
×
342
    }
343

344
    if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
236✔
345
      code = convertStmtNcharCol(&pBuf, pColSchema, bind + c, &ncharBind);
16✔
346
      if (code) {
16!
347
        goto _return;
×
348
      }
349
      pBind = &ncharBind;
16✔
350
    } else {
351
      pBind = bind + c;
220✔
352
    }
353

354
    pBindInfos[c].columnId = pColSchema->colId;
236✔
355
    pBindInfos[c].bind = pBind;
236✔
356
    pBindInfos[c].type = pColSchema->type;
236✔
357

358
    // code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes -
359
    // VARSTR_HEADER_SIZE: -1); if (code) {
360
    //   goto _return;
361
    // }
362
  }
363

364
  code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
16✔
365

366
  qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
16!
367

368
_return:
16✔
369

370
  taosMemoryFree(ncharBind.buffer);
16✔
371
  taosMemoryFree(ncharBind.length);
16✔
372

373
  return code;
16✔
374
}
375

376
int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
72✔
377
  STableDataCxt*   pDataBlock = (STableDataCxt*)pBlock;
72✔
378
  SSchema*         pSchema = getTableColumnSchema(pDataBlock->pMeta);
72✔
379
  SBoundColInfo*   boundInfo = &pDataBlock->boundColsInfo;
72✔
380
  SMsgBuf          pBuf = {.buf = msgBuf, .len = msgBufLen};
72✔
381
  int32_t          rowNum = bind->num;
72✔
382
  TAOS_MULTI_BIND  ncharBind = {0};
72✔
383
  TAOS_MULTI_BIND* pBind = NULL;
72✔
384
  int32_t          code = 0;
72✔
385

386
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
679✔
387
    SSchema*  pColSchema = &pSchema[boundInfo->pColIndex[c]];
613✔
388
    SColData* pCol = taosArrayGet(pCols, c);
613✔
389

390
    if (bind[c].num != rowNum) {
613!
391
      code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
×
392
      goto _return;
×
393
    }
394

395
    if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) &&
613✔
396
        bind[c].buffer_type != pColSchema->type) {  // for rowNum ==1 , connector may not set buffer_type
604!
397
      code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
×
398
      goto _return;
×
399
    }
400

401
    if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
613✔
402
      code = convertStmtNcharCol(&pBuf, pColSchema, bind + c, &ncharBind);
35✔
403
      if (code) {
35!
404
        goto _return;
×
405
      }
406
      pBind = &ncharBind;
35✔
407
    } else {
408
      pBind = bind + c;
578✔
409
    }
410

411
    code = tColDataAddValueByBind(pCol, pBind,
613✔
412
                                  IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
613!
413
    if (code) {
613✔
414
      goto _return;
6✔
415
    }
416
  }
417

418
  qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
66✔
419

420
_return:
42✔
421

422
  taosMemoryFree(ncharBind.buffer);
72✔
423
  taosMemoryFree(ncharBind.length);
72✔
424

425
  return code;
72✔
426
}
427

428
int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen,
×
429
                                int32_t colIdx, int32_t rowNum) {
430
  STableDataCxt*   pDataBlock = (STableDataCxt*)pBlock;
×
431
  SSchema*         pSchema = getTableColumnSchema(pDataBlock->pMeta);
×
432
  SBoundColInfo*   boundInfo = &pDataBlock->boundColsInfo;
×
433
  SMsgBuf          pBuf = {.buf = msgBuf, .len = msgBufLen};
×
434
  SSchema*         pColSchema = &pSchema[boundInfo->pColIndex[colIdx]];
×
435
  SColData*        pCol = taosArrayGet(pCols, colIdx);
×
436
  TAOS_MULTI_BIND  ncharBind = {0};
×
437
  TAOS_MULTI_BIND* pBind = NULL;
×
438
  int32_t          code = 0;
×
439

440
  if (bind->num != rowNum) {
×
441
    return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
×
442
  }
443

444
  // Column index exceeds the number of columns
445
  if (colIdx >= pCols->size && pCol == NULL) {
×
446
    return buildInvalidOperationMsg(&pBuf, "column index exceeds the number of columns");
×
447
  }
448

449
  if (bind->buffer_type != pColSchema->type) {
×
450
    return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
×
451
  }
452

453
  if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
×
454
    code = convertStmtNcharCol(&pBuf, pColSchema, bind, &ncharBind);
×
455
    if (code) {
×
456
      goto _return;
×
457
    }
458
    pBind = &ncharBind;
×
459
  } else {
460
    pBind = bind;
×
461
  }
462

463
  code = tColDataAddValueByBind(pCol, pBind,
×
464
                                IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
×
465

466
  qDebug("stmt col %d bind %d rows data", colIdx, rowNum);
×
467

468
_return:
×
469

470
  taosMemoryFree(ncharBind.buffer);
×
471
  taosMemoryFree(ncharBind.length);
×
472

473
  return code;
×
474
}
475

476
int32_t qBindStmtTagsValue2(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
×
477
                            TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen) {
478
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
×
479
  SMsgBuf        pBuf = {.buf = msgBuf, .len = msgBufLen};
×
480
  int32_t        code = TSDB_CODE_SUCCESS;
×
481
  SBoundColInfo* tags = (SBoundColInfo*)boundTags;
×
482
  if (NULL == tags) {
×
483
    return TSDB_CODE_APP_ERROR;
×
484
  }
485

486
  SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
×
487
  if (!pTagArray) {
×
488
    return buildInvalidOperationMsg(&pBuf, "out of memory");
×
489
  }
490

491
  SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
×
492
  if (!tagName) {
×
493
    code = buildInvalidOperationMsg(&pBuf, "out of memory");
×
494
    goto end;
×
495
  }
496

497
  SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta);
×
498

499
  bool  isJson = false;
×
500
  STag* pTag = NULL;
×
501

502
  for (int c = 0; c < tags->numOfBound; ++c) {
×
503
    if (bind[c].is_null && bind[c].is_null[0]) {
×
504
      continue;
×
505
    }
506

507
    SSchema* pTagSchema = &pSchema[tags->pColIndex[c]];
×
508
    int32_t  colLen = pTagSchema->bytes;
×
509
    if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
×
510
      if (!bind[c].length) {
×
511
        code = buildInvalidOperationMsg(&pBuf, "var tag length is null");
×
512
        goto end;
×
513
      }
514
      colLen = bind[c].length[0];
×
515
      if ((colLen + VARSTR_HEADER_SIZE) > pTagSchema->bytes) {
×
516
        code = buildInvalidOperationMsg(&pBuf, "tag length is too big");
×
517
        goto end;
×
518
      }
519
    }
520
    if (NULL == taosArrayPush(tagName, pTagSchema->name)) {
×
521
      code = terrno;
×
522
      goto end;
×
523
    }
524
    if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
×
525
      if (colLen > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
×
526
        code = buildSyntaxErrMsg(&pBuf, "json string too long than 4095", bind[c].buffer);
×
527
        goto end;
×
528
      }
529

530
      isJson = true;
×
531
      char* tmp = taosMemoryCalloc(1, colLen + 1);
×
532
      if (!tmp) {
×
533
        code = terrno;
×
534
        goto end;
×
535
      }
536
      memcpy(tmp, bind[c].buffer, colLen);
×
537
      code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf);
×
538
      taosMemoryFree(tmp);
×
539
      if (code != TSDB_CODE_SUCCESS) {
×
540
        goto end;
×
541
      }
542
    } else {
543
      STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
×
544
      //      strcpy(val.colName, pTagSchema->name);
545
      if (pTagSchema->type == TSDB_DATA_TYPE_BINARY || pTagSchema->type == TSDB_DATA_TYPE_VARBINARY ||
×
546
          pTagSchema->type == TSDB_DATA_TYPE_GEOMETRY) {
×
547
        val.pData = (uint8_t*)bind[c].buffer;
×
548
        val.nData = colLen;
×
549
      } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
×
550
        int32_t output = 0;
×
551
        void*   p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE);
×
552
        if (p == NULL) {
×
553
          code = terrno;
×
554
          goto end;
×
555
        }
556
        if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), colLen * TSDB_NCHAR_SIZE, &output)) {
×
557
          if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
×
558
            taosMemoryFree(p);
×
559
            code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
×
560
            goto end;
×
561
          }
562
          char buf[512] = {0};
×
563
          snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(terrno));
×
564
          taosMemoryFree(p);
×
565
          code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer);
×
566
          goto end;
×
567
        }
568
        val.pData = p;
×
569
        val.nData = output;
×
570
      } else {
571
        memcpy(&val.i64, bind[c].buffer, colLen);
×
572
      }
573
      if (IS_VAR_DATA_TYPE(pTagSchema->type) && val.nData > pTagSchema->bytes) {
×
574
        code = TSDB_CODE_PAR_VALUE_TOO_LONG;
×
575
        goto end;
×
576
      }
577
      if (NULL == taosArrayPush(pTagArray, &val)) {
×
578
        code = terrno;
×
579
        goto end;
×
580
      }
581
    }
582
  }
583

584
  if (!isJson && (code = tTagNew(pTagArray, 1, false, &pTag)) != TSDB_CODE_SUCCESS) {
×
585
    goto end;
×
586
  }
587

588
  if (NULL == pDataBlock->pData->pCreateTbReq) {
×
589
    pDataBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
×
590
    if (NULL == pDataBlock->pData->pCreateTbReq) {
×
591
      code = terrno;
×
592
      goto end;
×
593
    }
594
  }
595

596
  code = insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName,
×
597
                             pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
×
598
  pTag = NULL;
×
599

600
end:
×
601
  for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
×
602
    STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
×
603
    if (p->type == TSDB_DATA_TYPE_NCHAR) {
×
604
      taosMemoryFreeClear(p->pData);
×
605
    }
606
  }
607
  taosArrayDestroy(pTagArray);
×
608
  taosArrayDestroy(tagName);
×
609
  taosMemoryFree(pTag);
×
610

611
  return code;
×
612
}
613

614
static int32_t convertStmtStbNcharCol2(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_STMT2_BIND* src, TAOS_STMT2_BIND* dst) {
×
615
  int32_t       output = 0;
×
616
  const int32_t max_buf_len = pSchema->bytes - VARSTR_HEADER_SIZE;
×
617

618
  dst->buffer = taosMemoryCalloc(src->num, max_buf_len);
×
619
  if (NULL == dst->buffer) {
×
620
    return terrno;
×
621
  }
622

623
  dst->length = taosMemoryCalloc(src->num, sizeof(int32_t));
×
624
  if (NULL == dst->length) {
×
625
    taosMemoryFreeClear(dst->buffer);
×
626
    return terrno;
×
627
  }
628

629
  char* src_buf = src->buffer;
×
630
  char* dst_buf = dst->buffer;
×
631
  for (int32_t i = 0; i < src->num; ++i) {
×
632
    if (src->is_null && src->is_null[i]) {
×
633
      continue;
×
634
    }
635

636
    if (!taosMbsToUcs4(src_buf, src->length[i], (TdUcs4*)dst_buf, max_buf_len, &output)) {
×
637
      if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
×
638
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
639
      }
640
      char buf[512] = {0};
×
641
      snprintf(buf, tListLen(buf), "%s", strerror(terrno));
×
642
      return buildSyntaxErrMsg(pMsgBuf, buf, NULL);
×
643
    }
644

645
    dst->length[i] = output;
×
646
    src_buf += src->length[i];
×
647
    dst_buf += output;
×
648
  }
649

650
  dst->buffer_type = src->buffer_type;
×
651
  dst->is_null = src->is_null;
×
652
  dst->num = src->num;
×
653

654
  return TSDB_CODE_SUCCESS;
×
655
}
656

657
int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
×
658
                               STSchema** pTSchema, SBindInfo2* pBindInfos) {
659
  STableDataCxt*  pDataBlock = (STableDataCxt*)pBlock;
×
660
  SSchema*        pSchema = getTableColumnSchema(pDataBlock->pMeta);
×
661
  SBoundColInfo*  boundInfo = &pDataBlock->boundColsInfo;
×
662
  SMsgBuf         pBuf = {.buf = msgBuf, .len = msgBufLen};
×
663
  int32_t         rowNum = bind->num;
×
664
  SArray*         ncharBinds = NULL;
×
665
  TAOS_STMT2_BIND ncharBind = {0};
×
666
  int32_t         code = 0;
×
667
  int16_t         lastColId = -1;
×
668
  bool            colInOrder = true;
×
669

670
  if (NULL == *pTSchema) {
×
671
    *pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
×
672
  }
673

674
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
×
675
    SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
×
676
    if (pColSchema->colId <= lastColId) {
×
677
      colInOrder = false;
×
678
    } else {
679
      lastColId = pColSchema->colId;
×
680
    }
681

682
    if (bind[c].num != rowNum) {
×
683
      code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
×
684
      goto _return;
×
685
    }
686

687
    if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) &&
×
688
        bind[c].buffer_type != pColSchema->type) {  // for rowNum ==1 , connector may not set buffer_type
×
689
      code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
×
690
      goto _return;
×
691
    }
692

693
    if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
×
694
      code = convertStmtStbNcharCol2(&pBuf, pColSchema, bind + c, &ncharBind);
×
695
      if (code) {
×
696
        goto _return;
×
697
      }
698
      if (!ncharBinds) {
×
699
        ncharBinds = taosArrayInit(1, sizeof(ncharBind));
×
700
        if (!ncharBinds) {
×
701
          code = terrno;
×
702
          goto _return;
×
703
        }
704
      }
705
      if (!taosArrayPush(ncharBinds, &ncharBind)) {
×
706
        code = terrno;
×
707
        goto _return;
×
708
      }
709
      pBindInfos[c].bind = taosArrayGetLast(ncharBinds);
×
710
    } else {
711
      pBindInfos[c].bind = bind + c;
×
712
    }
713

714
    pBindInfos[c].columnId = pColSchema->colId;
×
715
    pBindInfos[c].type = pColSchema->type;
×
716
    pBindInfos[c].bytes = pColSchema->bytes;
×
717
  }
718

719
  code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
×
720

721
  qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
×
722

723
_return:
×
724
  if (ncharBinds) {
×
725
    for (int i = 0; i < TARRAY_SIZE(ncharBinds); ++i) {
×
726
      TAOS_STMT2_BIND* ncBind = TARRAY_DATA(ncharBinds);
×
727
      taosMemoryFree(ncBind[i].buffer);
×
728
      taosMemoryFree(ncBind[i].length);
×
729
    }
730
    taosArrayDestroy(ncharBinds);
×
731
  }
732

733
  return code;
×
734
}
735

736
static int32_t convertStmtNcharCol2(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_STMT2_BIND* src, TAOS_STMT2_BIND* dst) {
×
737
  int32_t       output = 0;
×
738
  const int32_t max_buf_len = pSchema->bytes - VARSTR_HEADER_SIZE;
×
739

740
  int32_t newBuflen = (pSchema->bytes - VARSTR_HEADER_SIZE) * src->num;
×
741
  // if (dst->buffer_length < newBuflen) {
742
  dst->buffer = taosMemoryRealloc(dst->buffer, newBuflen);
×
743
  if (NULL == dst->buffer) {
×
744
    return terrno;
×
745
  }
746
  //}
747

748
  if (NULL == dst->length) {
×
749
    dst->length = taosMemoryRealloc(dst->length, sizeof(int32_t) * src->num);
×
750
    if (NULL == dst->length) {
×
751
      taosMemoryFreeClear(dst->buffer);
×
752
      return terrno;
×
753
    }
754
  }
755

756
  // dst->buffer_length = pSchema->bytes - VARSTR_HEADER_SIZE;
757
  char* src_buf = src->buffer;
×
758
  char* dst_buf = dst->buffer;
×
759
  for (int32_t i = 0; i < src->num; ++i) {
×
760
    if (src->is_null && src->is_null[i]) {
×
761
      continue;
×
762
    }
763

764
    /*if (!taosMbsToUcs4(((char*)src->buffer) + src->buffer_length * i, src->length[i],
765
      (TdUcs4*)(((char*)dst->buffer) + dst->buffer_length * i), dst->buffer_length, &output)) {*/
766
    if (!taosMbsToUcs4(src_buf, src->length[i], (TdUcs4*)dst_buf, max_buf_len, &output)) {
×
767
      if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
×
768
        return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
×
769
      }
770
      char buf[512] = {0};
×
771
      snprintf(buf, tListLen(buf), "%s", strerror(terrno));
×
772
      return buildSyntaxErrMsg(pMsgBuf, buf, NULL);
×
773
    }
774

775
    dst->length[i] = output;
×
776
    src_buf += src->length[i];
×
777
    dst_buf += output;
×
778
  }
779

780
  dst->buffer_type = src->buffer_type;
×
781
  dst->is_null = src->is_null;
×
782
  dst->num = src->num;
×
783

784
  return TSDB_CODE_SUCCESS;
×
785
}
786

787
int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen) {
×
788
  STableDataCxt*   pDataBlock = (STableDataCxt*)pBlock;
×
789
  SSchema*         pSchema = getTableColumnSchema(pDataBlock->pMeta);
×
790
  SBoundColInfo*   boundInfo = &pDataBlock->boundColsInfo;
×
791
  SMsgBuf          pBuf = {.buf = msgBuf, .len = msgBufLen};
×
792
  int32_t          rowNum = bind->num;
×
793
  TAOS_STMT2_BIND  ncharBind = {0};
×
794
  TAOS_STMT2_BIND* pBind = NULL;
×
795
  int32_t          code = 0;
×
796

797
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
×
798
    SSchema*  pColSchema = &pSchema[boundInfo->pColIndex[c]];
×
799
    SColData* pCol = taosArrayGet(pCols, c);
×
800
    if (pCol == NULL || pColSchema == NULL) {
×
801
      code = buildInvalidOperationMsg(&pBuf, "get column schema or column data failed");
×
802
      goto _return;
×
803
    }
804

805
    if (bind[c].num != rowNum) {
×
806
      code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
×
807
      goto _return;
×
808
    }
809

810
    if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) &&
×
811
        bind[c].buffer_type != pColSchema->type) {  // for rowNum ==1 , connector may not set buffer_type
×
812
      code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
×
813
      goto _return;
×
814
    }
815

816
    if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
×
817
      code = convertStmtNcharCol2(&pBuf, pColSchema, bind + c, &ncharBind);
×
818
      if (code) {
×
819
        goto _return;
×
820
      }
821
      pBind = &ncharBind;
×
822
    } else {
823
      pBind = bind + c;
×
824
    }
825

826
    code = tColDataAddValueByBind2(pCol, pBind,
×
827
                                   IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
×
828
    if (code) {
×
829
      goto _return;
×
830
    }
831
  }
832

833
  qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
×
834

835
_return:
×
836

837
  taosMemoryFree(ncharBind.buffer);
×
838
  taosMemoryFree(ncharBind.length);
×
839

840
  return code;
×
841
}
842

843
int32_t qBindStmtSingleColValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
×
844
                                 int32_t colIdx, int32_t rowNum) {
845
  STableDataCxt*   pDataBlock = (STableDataCxt*)pBlock;
×
846
  SSchema*         pSchema = getTableColumnSchema(pDataBlock->pMeta);
×
847
  SBoundColInfo*   boundInfo = &pDataBlock->boundColsInfo;
×
848
  SMsgBuf          pBuf = {.buf = msgBuf, .len = msgBufLen};
×
849
  SSchema*         pColSchema = &pSchema[boundInfo->pColIndex[colIdx]];
×
850
  SColData*        pCol = taosArrayGet(pCols, colIdx);
×
851
  TAOS_STMT2_BIND  ncharBind = {0};
×
852
  TAOS_STMT2_BIND* pBind = NULL;
×
853
  int32_t          code = 0;
×
854

855
  if (bind->num != rowNum) {
×
856
    return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
×
857
  }
858

859
  // Column index exceeds the number of columns
860
  if (colIdx >= pCols->size && pCol == NULL) {
×
861
    return buildInvalidOperationMsg(&pBuf, "column index exceeds the number of columns");
×
862
  }
863

864
  if (bind->buffer_type != pColSchema->type) {
×
865
    return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
×
866
  }
867

868
  if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
×
869
    code = convertStmtNcharCol2(&pBuf, pColSchema, bind, &ncharBind);
×
870
    if (code) {
×
871
      goto _return;
×
872
    }
873
    pBind = &ncharBind;
×
874
  } else {
875
    pBind = bind;
×
876
  }
877

878
  code = tColDataAddValueByBind2(pCol, pBind,
×
879
                                 IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
×
880

881
  qDebug("stmt col %d bind %d rows data", colIdx, rowNum);
×
882

883
_return:
×
884

885
  taosMemoryFree(ncharBind.buffer);
×
886
  taosMemoryFree(ncharBind.length);
×
887

888
  return code;
×
889
}
890

891
int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSchema, int32_t* fieldNum,
×
892
                         TAOS_FIELD_E** fields, uint8_t timePrec) {
893
  if (fields != NULL) {
×
894
    *fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD_E));
×
895
    if (NULL == *fields) {
×
896
      return terrno;
×
897
    }
898

899
    SSchema* schema = &pSchema[boundColumns[0]];
×
900
    if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) {
×
901
      (*fields)[0].precision = timePrec;
×
902
    }
903

904
    for (int32_t i = 0; i < numOfBound; ++i) {
×
905
      schema = &pSchema[boundColumns[i]];
×
906
      strcpy((*fields)[i].name, schema->name);
×
907
      (*fields)[i].type = schema->type;
×
908
      (*fields)[i].bytes = schema->bytes;
×
909
    }
910
  }
911

912
  *fieldNum = numOfBound;
×
913

914
  return TSDB_CODE_SUCCESS;
×
915
}
916

917
int32_t buildStbBoundFields(SBoundColInfo boundColsInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_STB** fields,
×
918
                            STableMeta* pMeta) {
919
  if (fields != NULL) {
×
920
    *fields = taosMemoryCalloc(boundColsInfo.numOfBound, sizeof(TAOS_FIELD_STB));
×
921
    if (NULL == *fields) {
×
922
      return terrno;
×
923
    }
924

925
    SSchema* schema = &pSchema[boundColsInfo.pColIndex[0]];
×
926
    if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) {
×
927
      (*fields)[0].precision = pMeta->tableInfo.precision;
×
928
    }
929

930
    for (int32_t i = 0; i < boundColsInfo.numOfBound; ++i) {
×
931
      int16_t idx = boundColsInfo.pColIndex[i];
×
932

933
      if (idx == pMeta->tableInfo.numOfColumns + pMeta->tableInfo.numOfTags) {
×
934
        (*fields)[i].field_type = TAOS_FIELD_TBNAME;
×
935
        tstrncpy((*fields)[i].name, "tbname", sizeof((*fields)[i].name));
×
936
        continue;
×
937
      } else if (idx < pMeta->tableInfo.numOfColumns) {
×
938
        (*fields)[i].field_type = TAOS_FIELD_COL;
×
939
      } else {
940
        (*fields)[i].field_type = TAOS_FIELD_TAG;
×
941
      }
942

943
      schema = &pSchema[idx];
×
944
      tstrncpy((*fields)[i].name, schema->name, sizeof((*fields)[i].name));
×
945
      (*fields)[i].type = schema->type;
×
946
      (*fields)[i].bytes = schema->bytes;
×
947
    }
948
  }
949

950
  *fieldNum = boundColsInfo.numOfBound;
×
951

952
  return TSDB_CODE_SUCCESS;
×
953
}
954

955
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) {
×
956
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
×
957
  SBoundColInfo* tags = (SBoundColInfo*)boundTags;
×
958
  if (NULL == tags) {
×
959
    return TSDB_CODE_APP_ERROR;
×
960
  }
961
  /*
962
  if (pDataBlock->pMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pMeta->tableType != TSDB_CHILD_TABLE) {
963
    return TSDB_CODE_TSC_STMT_API_ERROR;
964
  }
965
  */
966
  SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta);
×
967
  if (tags->numOfBound <= 0) {
×
968
    *fieldNum = 0;
×
969
    *fields = NULL;
×
970

971
    return TSDB_CODE_SUCCESS;
×
972
  }
973

974
  CHECK_CODE(buildBoundFields(tags->numOfBound, tags->pColIndex, pSchema, fieldNum, fields, 0));
×
975

976
  return TSDB_CODE_SUCCESS;
×
977
}
978

979
int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fields) {
×
980
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
×
981
  SSchema*       pSchema = getTableColumnSchema(pDataBlock->pMeta);
×
982
  if (pDataBlock->boundColsInfo.numOfBound <= 0) {
×
983
    *fieldNum = 0;
×
984
    if (fields != NULL) {
×
985
      *fields = NULL;
×
986
    }
987

988
    return TSDB_CODE_SUCCESS;
×
989
  }
990

991
  CHECK_CODE(buildBoundFields(pDataBlock->boundColsInfo.numOfBound, pDataBlock->boundColsInfo.pColIndex, pSchema,
×
992
                              fieldNum, fields, pDataBlock->pMeta->tableInfo.precision));
993

994
  return TSDB_CODE_SUCCESS;
×
995
}
996

997
int32_t qBuildStmtStbColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_STB** fields) {
×
998
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
×
999
  SSchema*       pSchema = getTableColumnSchema(pDataBlock->pMeta);
×
1000
  if (pDataBlock->boundColsInfo.numOfBound <= 0) {
×
1001
    *fieldNum = 0;
×
1002
    if (fields != NULL) {
×
1003
      *fields = NULL;
×
1004
    }
1005

1006
    return TSDB_CODE_SUCCESS;
×
1007
  }
1008

1009
  CHECK_CODE(buildStbBoundFields(pDataBlock->boundColsInfo, pSchema, fieldNum, fields, pDataBlock->pMeta));
×
1010

1011
  return TSDB_CODE_SUCCESS;
×
1012
}
1013

1014
int32_t qResetStmtColumns(SArray* pCols, bool deepClear) {
×
1015
  int32_t colNum = taosArrayGetSize(pCols);
×
1016

1017
  for (int32_t i = 0; i < colNum; ++i) {
×
1018
    SColData* pCol = (SColData*)taosArrayGet(pCols, i);
×
1019
    if (pCol == NULL) {
×
1020
      qError("qResetStmtColumns column is NULL");
×
1021
      return terrno;
×
1022
    }
1023
    if (deepClear) {
×
1024
      tColDataDeepClear(pCol);
×
1025
    } else {
1026
      tColDataClear(pCol);
×
1027
    }
1028
  }
1029

1030
  return TSDB_CODE_SUCCESS;
×
1031
}
1032

1033
int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) {
84✔
1034
  STableDataCxt* pBlock = (STableDataCxt*)block;
84✔
1035
  int32_t        colNum = taosArrayGetSize(pBlock->pData->aCol);
84✔
1036

1037
  for (int32_t i = 0; i < colNum; ++i) {
947✔
1038
    SColData* pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i);
864✔
1039
    if (pCol == NULL) {
863!
1040
      qError("qResetStmtDataBlock column is NULL");
×
1041
      return terrno;
×
1042
    }
1043
    if (deepClear) {
863✔
1044
      tColDataDeepClear(pCol);
287✔
1045
    } else {
1046
      tColDataClear(pCol);
576✔
1047
    }
1048
  }
1049

1050
  return TSDB_CODE_SUCCESS;
83✔
1051
}
1052

1053
int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset) {
28✔
1054
  int32_t code = 0;
28✔
1055

1056
  *pDst = taosMemoryCalloc(1, sizeof(STableDataCxt));
28✔
1057
  if (NULL == *pDst) {
28!
1058
    return terrno;
×
1059
  }
1060

1061
  STableDataCxt* pNewCxt = (STableDataCxt*)*pDst;
28✔
1062
  STableDataCxt* pCxt = (STableDataCxt*)pSrc;
28✔
1063
  pNewCxt->pSchema = NULL;
28✔
1064
  pNewCxt->pValues = NULL;
28✔
1065

1066
  if (pCxt->pMeta) {
28!
1067
    void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pCxt->pMeta));
28!
1068
    if (NULL == pNewMeta) {
28!
UNCOV
1069
      insDestroyTableDataCxt(*pDst);
×
1070
      return terrno;
×
1071
    }
1072
    memcpy(pNewMeta, pCxt->pMeta, TABLE_META_SIZE(pCxt->pMeta));
28!
1073
    pNewCxt->pMeta = pNewMeta;
28✔
1074
  }
1075

1076
  memcpy(&pNewCxt->boundColsInfo, &pCxt->boundColsInfo, sizeof(pCxt->boundColsInfo));
28✔
1077
  pNewCxt->boundColsInfo.pColIndex = NULL;
28✔
1078

1079
  if (pCxt->boundColsInfo.pColIndex) {
28!
1080
    void* pNewColIdx = taosMemoryMalloc(pCxt->boundColsInfo.numOfBound * sizeof(*pCxt->boundColsInfo.pColIndex));
28✔
1081
    if (NULL == pNewColIdx) {
28!
1082
      insDestroyTableDataCxt(*pDst);
×
1083
      return terrno;
×
1084
    }
1085
    memcpy(pNewColIdx, pCxt->boundColsInfo.pColIndex,
28✔
1086
           pCxt->boundColsInfo.numOfBound * sizeof(*pCxt->boundColsInfo.pColIndex));
28✔
1087
    pNewCxt->boundColsInfo.pColIndex = pNewColIdx;
28✔
1088
  }
1089

1090
  if (pCxt->pData) {
28!
1091
    SSubmitTbData* pNewTb = (SSubmitTbData*)taosMemoryMalloc(sizeof(SSubmitTbData));
28✔
1092
    if (NULL == pNewTb) {
28!
1093
      insDestroyTableDataCxt(*pDst);
×
1094
      return terrno;
×
1095
    }
1096

1097
    memcpy(pNewTb, pCxt->pData, sizeof(*pCxt->pData));
28✔
1098
    pNewTb->pCreateTbReq = NULL;
28✔
1099

1100
    pNewTb->aCol = taosArrayDup(pCxt->pData->aCol, NULL);
28✔
1101
    if (NULL == pNewTb->aCol) {
28!
1102
      insDestroyTableDataCxt(*pDst);
×
1103
      return terrno;
×
1104
    }
1105

1106
    pNewCxt->pData = pNewTb;
28✔
1107

1108
    if (reset) {
28!
1109
      code = qResetStmtDataBlock(*pDst, true);
28✔
1110
    }
1111
  }
1112

1113
  return code;
27✔
1114
}
1115

1116
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId,
×
1117
                              bool rebuildCreateTb) {
1118
  int32_t code = qCloneStmtDataBlock(pDst, pSrc, false);
×
1119
  if (code) {
×
1120
    return code;
×
1121
  }
1122

1123
  STableDataCxt* pBlock = (STableDataCxt*)*pDst;
×
1124
  if (pBlock->pMeta) {
×
1125
    pBlock->pMeta->uid = uid;
×
1126
    pBlock->pMeta->vgId = vgId;
×
1127
    pBlock->pMeta->suid = suid;
×
1128
  }
1129

1130
  pBlock->pData->suid = suid;
×
1131
  pBlock->pData->uid = uid;
×
1132

1133
  if (rebuildCreateTb && NULL == pBlock->pData->pCreateTbReq) {
×
1134
    pBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
×
1135
    if (NULL == pBlock->pData->pCreateTbReq) {
×
1136
      return terrno;
×
1137
    }
1138
  }
1139

1140
  return TSDB_CODE_SUCCESS;
×
1141
}
1142

1143
STableMeta* qGetTableMetaInDataBlock(STableDataCxt* pDataBlock) { return ((STableDataCxt*)pDataBlock)->pMeta; }
84✔
1144

1145
void qDestroyStmtDataBlock(STableDataCxt* pBlock) {
109✔
1146
  if (pBlock == NULL) {
109✔
1147
    return;
49✔
1148
  }
1149

1150
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
60✔
1151
  insDestroyTableDataCxt(pDataBlock);
60✔
1152
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc