• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4861

22 Nov 2025 07:23AM UTC coverage: 64.274% (-0.06%) from 64.335%
#4861

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

0 of 79 new or added lines in 2 files covered. (0.0%)

820 existing lines in 129 files now uncovered.

154466 of 240326 relevant lines covered (64.27%)

112821527.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.82
/source/libs/executor/src/dataInserter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <stdint.h>
17
#include <stdlib.h>
18
#include <string.h>
19
#include "dataSinkInt.h"
20
#include "dataSinkMgt.h"
21
#include "executor.h"
22
#include "executorInt.h"
23
#include "functionMgt.h"
24
#include "libs/new-stream/stream.h"
25
#include "osAtomic.h"
26
#include "osMemPool.h"
27
#include "osMemory.h"
28
#include "osSemaphore.h"
29
#include "planner.h"
30
#include "query.h"
31
#include "querytask.h"
32
#include "storageapi.h"
33
#include "taoserror.h"
34
#include "tarray.h"
35
#include "tcompression.h"
36
#include "tdatablock.h"
37
#include "tdataformat.h"
38
#include "tglobal.h"
39
#include "thash.h"
40
#include "tmsg.h"
41
#include "tqueue.h"
42

43
extern SDataSinkStat gDataSinkStat;
44
SHashObj*            gStreamGrpTableHash = NULL;
45
typedef struct SSubmitRes {
46
  int64_t      affectedRows;
47
  int32_t      code;
48
  SSubmitRsp2* pRsp;
49
} SSubmitRes;
50

51
typedef struct SSubmitTbDataMsg {
52
  int32_t vgId;
53
  int32_t len;
54
  void*   pData;
55
} SSubmitTbDataMsg;
56

57
static void destroySSubmitTbDataMsg(void* p) {
×
58
  if (p == NULL) return;
×
59
  SSubmitTbDataMsg* pVg = p;
×
60
  taosMemoryFree(pVg->pData);
×
61
  taosMemoryFree(pVg);
×
62
}
63

64
typedef struct SDataInserterHandle {
65
  SDataSinkHandle     sink;
66
  SDataSinkManager*   pManager;
67
  STSchema*           pSchema;
68
  SQueryInserterNode* pNode;
69
  SSubmitRes          submitRes;
70
  SInserterParam*     pParam;
71
  SArray*             pDataBlocks;
72
  SHashObj*           pCols;
73
  int32_t             status;
74
  bool                queryEnd;
75
  bool                fullOrderColList;
76
  uint64_t            useconds;
77
  uint64_t            cachedSize;
78
  uint64_t            flags;
79
  TdThreadMutex       mutex;
80
  tsem_t              ready;
81
  bool                explain;
82
  bool                isStbInserter;
83
  SSchemaWrapper*     pTagSchema;
84
  const char*         dbFName;
85
  SHashObj*           dbVgInfoMap;
86
  SUseDbRsp*          pRsp;
87
} SDataInserterHandle;
88

89
typedef struct SSubmitRspParam {
90
  SDataInserterHandle* pInserter;
91
  void*                putParam;
92
} SSubmitRspParam;
93

94
typedef struct SBuildInsertDataInfo {
95
  SSubmitTbData  pTbData;
96
  bool           isFirstBlock;
97
  bool           isLastBlock;
98
  int64_t        lastTs;
99
  bool           needSortMerge;
100
} SBuildInsertDataInfo;
101

102
typedef struct SDropTbCtx {
103
  SSTriggerDropRequest* req;
104
  tsem_t                ready;
105
  int32_t               code;
106
} SDropTbCtx;
107
typedef struct SDropTbDataMsg {
108
  SMsgHead header;
109
  void*    pData;
110
} SDropTbDataMsg;
111

112
typedef struct SRunnerDropTableInfo {
113
  SSTriggerDropRequest* pReq;
114
  int32_t               code;
115
} SRunnerDropTableInfo;
116

117
static int32_t initInsertProcessInfo(SBuildInsertDataInfo* pBuildInsertDataInfo, int32_t rows) {
4,951,102✔
118
  pBuildInsertDataInfo->isLastBlock = false;
4,951,102✔
119
  pBuildInsertDataInfo->lastTs = TSKEY_MIN;
4,951,102✔
120
  pBuildInsertDataInfo->isFirstBlock = true;
4,951,102✔
121
  pBuildInsertDataInfo->needSortMerge = false;
4,950,707✔
122

123
  if (!(pBuildInsertDataInfo->pTbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
4,951,102✔
124
    return terrno;
×
125
  }
126

127
  return TSDB_CODE_SUCCESS;
4,951,102✔
128
}
129

130
static void freeCacheTbInfo(void* pp) {
305,344✔
131
  if (pp == NULL || *(SInsertTableInfo**)pp == NULL) {
305,344✔
132
    return;
×
133
  }
134
  SInsertTableInfo* pTbInfo = *(SInsertTableInfo**)pp;
305,344✔
135
  if (pTbInfo->tbname) {
305,344✔
136
    taosMemFree(pTbInfo->tbname);
305,344✔
137
    pTbInfo->tbname = NULL;
305,344✔
138
  }
139
  if (pTbInfo->pSchema) {
305,344✔
140
    tDestroyTSchema(pTbInfo->pSchema);
305,344✔
141
    pTbInfo->pSchema = NULL;
305,344✔
142
  }
143
  taosMemoryFree(pTbInfo);
305,344✔
144
}
145

146
int32_t initInserterGrpInfo() {
672,720✔
147
  gStreamGrpTableHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
672,720✔
148
  if (NULL == gStreamGrpTableHash) {
672,720✔
149
    qError("failed to create stream group table hash");
×
150
    return terrno;
×
151
  }
152
  taosHashSetFreeFp(gStreamGrpTableHash, freeCacheTbInfo);
672,720✔
153
  return TSDB_CODE_SUCCESS;
672,720✔
154
}
155

156
void destroyInserterGrpInfo() {
672,720✔
157
  static int8_t destoryGrpInfo = 0;
158
  int8_t        flag = atomic_val_compare_exchange_8(&destoryGrpInfo, 0, 1);
672,720✔
159
  if (flag != 0) {
672,720✔
160
    return;
×
161
  }
162
  if (NULL != gStreamGrpTableHash) {
672,720✔
163
    taosHashCleanup(gStreamGrpTableHash);
672,720✔
164
    gStreamGrpTableHash = NULL;
672,720✔
165
  }
166
}
167

168
static int32_t checkResAndResetTableInfo(const SSubmitRes* pSubmitRes, SInsertTableInfo* res,
301,954✔
169
                                         bool* pSchemaChaned) {
170
  int32_t code = TSDB_CODE_SUCCESS;
301,954✔
171
  if (!pSubmitRes->pRsp) {
301,954✔
172
    stError("create table response is NULL");
×
173
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
174
  }
175
  if (pSubmitRes->pRsp->aCreateTbRsp->size < 1) {
301,539✔
176
    stError("create table response size is less than 1");
×
177
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
178
  }
179
  SVCreateTbRsp* pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
301,539✔
180
  if (pCreateTbRsp->code != 0 && pCreateTbRsp->code != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
301,539✔
181
    stError("create table failed, code:%d", pCreateTbRsp->code);
×
182
    return pCreateTbRsp->code;
×
183
  }
184
  if (!pCreateTbRsp->pMeta || pCreateTbRsp->pMeta->tuid == 0) {
301,539✔
185
    stError("create table can not get tuid");
×
186
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
187
  }
188

189
  *pSchemaChaned = false;
301,954✔
190
  res->vgid = pCreateTbRsp->pMeta->vgId;
301,954✔
191
  res->uid = pCreateTbRsp->pMeta->tuid;
301,954✔
192

193
  if (pCreateTbRsp->pMeta->sversion != 0 && res->version != pCreateTbRsp->pMeta->sversion) {
301,539✔
194
    *pSchemaChaned = true;
776✔
195
  }
196

197
  stDebug("inserter callback, uid:%" PRId64 "  vgid: %" PRId64 ", version: %d", res->uid, res->vgid, res->version);
301,954✔
198

199
  return TSDB_CODE_SUCCESS;
301,954✔
200
}
201

202
static int32_t createNewInsertTbInfo(const SSubmitRes* pSubmitRes, SInsertTableInfo* pOldInsertTbInfo,
776✔
203
                                     SInsertTableInfo** ppNewInsertTbInfo) {
204
  SVCreateTbRsp* pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
776✔
205
  if (pCreateTbRsp->code != 0 && pCreateTbRsp->code != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
776✔
206
    stError("create table failed, code:%d", pCreateTbRsp->code);
×
207
    return pCreateTbRsp->code;
×
208
  }
209

210
  SInsertTableInfo* res = taosMemoryCalloc(1, sizeof(SInsertTableInfo));
776✔
211
  if (res == NULL) {
776✔
212
    return terrno;
×
213
  }
214
  res->tbname = taosStrdup(pOldInsertTbInfo->tbname);
776✔
215
  if (res->tbname == NULL) {
776✔
216
    taosMemoryFree(res);
×
217
    stError("failed to allocate memory for table name");
×
218
    return terrno;
×
219
  }
220

221
  res->vgid = pCreateTbRsp->pMeta->vgId;
776✔
222

223
  res->uid = pCreateTbRsp->pMeta->tuid;
776✔
224
  res->vgid = pCreateTbRsp->pMeta->vgId;
776✔
225

226
  res->version = pCreateTbRsp->pMeta->sversion;
776✔
227
  res->pSchema = tBuildTSchema(pCreateTbRsp->pMeta->pSchemas, pCreateTbRsp->pMeta->numOfColumns, res->version);
776✔
228
  if (res->pSchema == NULL) {
776✔
229
    stError("failed to build schema for table:%s, uid:%" PRId64 ", vgid:%" PRId64 ", version:%d", res->tbname, res->uid,
×
230
            res->vgid, res->version);
231
    return terrno;
×
232
  }
233
  *ppNewInsertTbInfo = res;
776✔
234
  return TSDB_CODE_SUCCESS;
776✔
235
}
236

237
static int32_t updateInsertGrpTableInfo(SStreamDataInserterInfo* pInserterInfo, const SSubmitRes* pSubmitRes) {
301,954✔
238
  int32_t            code = TSDB_CODE_SUCCESS;
301,954✔
239
  int32_t            lino = 0;
301,954✔
240
  int64_t            key[2] = {pInserterInfo->streamId, pInserterInfo->groupId};
301,954✔
241
  SInsertTableInfo** ppTbRes = taosHashAcquire(gStreamGrpTableHash, key, sizeof(key));
301,954✔
242
  if (NULL == ppTbRes || *ppTbRes == NULL) {
301,954✔
243
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
244
  }
245

246
  bool schemaChanged = false;
301,954✔
247
  code = checkResAndResetTableInfo(pSubmitRes, *ppTbRes, &schemaChanged);
301,954✔
248
  QUERY_CHECK_CODE(code, lino, _exit);
301,954✔
249

250
  if (schemaChanged) {
301,954✔
251
    SInsertTableInfo* pNewInfo = NULL;
776✔
252
    code = createNewInsertTbInfo(pSubmitRes, *ppTbRes, &pNewInfo);
776✔
253
    QUERY_CHECK_CODE(code, lino, _exit);
776✔
254

255
    TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
776✔
256

257
    code = taosHashPut(gStreamGrpTableHash, key, sizeof(key), &pNewInfo, sizeof(SInsertTableInfo*));
776✔
258

259
    if (code == TSDB_CODE_DUP_KEY) {
776✔
260
      freeCacheTbInfo(&pNewInfo);
×
261
      code = TSDB_CODE_SUCCESS;
×
262
      goto _exit;
×
263
    } else if (code != TSDB_CODE_SUCCESS) {
776✔
264
      freeCacheTbInfo(&pNewInfo);
×
265
      stError("failed to put new insert tbInfo for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
266
              pInserterInfo->streamId, pInserterInfo->groupId, code);
267
      QUERY_CHECK_CODE(code, lino, _exit);
×
268
    }
269

270
    stInfo("update table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", uid:%" PRId64 ", vgid:%" PRId64
776✔
271
           ", version:%d",
272
           pInserterInfo->streamId, pInserterInfo->groupId, pNewInfo->uid, pNewInfo->vgid, pNewInfo->version);
273
  }
274
  return TSDB_CODE_SUCCESS;
301,954✔
275

276
_exit:
×
277
  if (code != TSDB_CODE_SUCCESS) {
×
278
    stError("failed to check and reset table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
279
            pInserterInfo->streamId, pInserterInfo->groupId, code);
280
  }
281
  taosHashRelease(gStreamGrpTableHash, ppTbRes);
×
282
  return code;
×
283
}
284

285
static int32_t buildTSchmaFromInserter(SStreamInserterParam* pInsertParam, STSchema** ppTSchema);
286
static int32_t initTableInfo(SDataInserterHandle* pInserter, SStreamDataInserterInfo* pInserterInfo) {
304,568✔
287
  int32_t           code = TSDB_CODE_SUCCESS;
304,568✔
288
  int32_t           lino = 0;
304,568✔
289
  SInsertTableInfo* res = taosMemoryCalloc(1, sizeof(SInsertTableInfo));
304,568✔
290
  if (res == NULL) {
304,568✔
291
    return terrno;
×
292
  }
293

294
  SStreamInserterParam* pInsertParam = pInserter->pParam->streamInserterParam;
304,568✔
295
  res->uid = 0;
304,568✔
296
  if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
304,568✔
297
    res->version = 1;
71,923✔
298
  } else {
299
    res->version = pInsertParam->sver;
232,645✔
300
  }
301

302
  res->tbname = taosStrdup(pInserterInfo->tbName);
304,568✔
303
  if (res->tbname == NULL) {
304,568✔
304
    taosMemoryFree(res);
×
305
    stError("failed to allocate memory for table name");
×
306
    return terrno;
×
307
  }
308

309
  code = buildTSchmaFromInserter(pInserter->pParam->streamInserterParam, &res->pSchema);
304,568✔
310
  QUERY_CHECK_CODE(code, lino, _return);
304,568✔
311

312
  int64_t key[2] = {pInserterInfo->streamId, pInserterInfo->groupId};
304,568✔
313
  code = taosHashPut(gStreamGrpTableHash, key, sizeof(key), &res, sizeof(SInsertTableInfo*));
304,568✔
314
  if (code == TSDB_CODE_DUP_KEY) {
304,568✔
315
    freeCacheTbInfo(&res);
82,316✔
316
    return TSDB_CODE_SUCCESS;
82,316✔
317
  }
318

319
_return:
222,252✔
320
  if (code != TSDB_CODE_SUCCESS) {
222,252✔
321
    stError("failed to build table info for streamId:%" PRIx64 ", groupId:%" PRIx64 ", code:%d",
×
322
            pInserterInfo->streamId, pInserterInfo->groupId, code);
323
    freeCacheTbInfo(&res);
×
324
  }
325
  return code;
222,252✔
326
}
327

328
static bool colsIsSupported(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
19,663✔
329
  SArray* pCreatingFields = pInserterParam->pFields;
19,663✔
330

331
  for (int32_t i = 0; i < pCreatingFields->size; ++i) {
109,643✔
332
    SFieldWithOptions* pField = taosArrayGet(pCreatingFields, i);
89,980✔
333
    if (NULL == pField) {
89,980✔
334
      stError("isSupportedSTableSchema: failed to get field from array");
×
335
      return false;
×
336
    }
337

338
    for (int j = 0; j < pTableMetaRsp->numOfColumns; ++j) {
267,347✔
339
      if (strncmp(pTableMetaRsp->pSchemas[j].name, pField->name, TSDB_COL_NAME_LEN) == 0) {
264,243✔
340
        if (pTableMetaRsp->pSchemas[j].type == pField->type && pTableMetaRsp->pSchemas[j].bytes == pField->bytes) {
86,876✔
341
          break;
342
        } else {
343
          return false;
×
344
        }
345
      }
346
    }
347
  }
348
  return true;
19,663✔
349
}
350

351
static bool TagsIsSupported(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
1,552✔
352
  SArray* pCreatingTags = pInserterParam->pTagFields;
1,552✔
353

354
  int32_t            tagIndexOffset = -1;
1,552✔
355
  SFieldWithOptions* pField = taosArrayGet(pCreatingTags, 0);
1,552✔
356
  if (NULL == pField) {
1,552✔
357
    stError("isSupportedSTableSchema: failed to get field from array");
×
358
    return false;
×
359
  }
360
  for (int32_t i = 0; i < pTableMetaRsp->numOfColumns + pTableMetaRsp->numOfTags; ++i) {
1,552✔
361
    if (strncmp(pTableMetaRsp->pSchemas[i].name, pField->name, TSDB_COL_NAME_LEN) != 0) {
776✔
362
      tagIndexOffset = i;
776✔
363
      break;
776✔
364
    }
365
  }
366
  if (tagIndexOffset == -1) {
1,552✔
367
    stError("isSupportedSTableSchema: failed to get tag index");
776✔
368
    return false;
776✔
369
  }
370

371
  for (int32_t i = 0; i < pTableMetaRsp->numOfTags; ++i) {
1,552✔
372
    int32_t            index = i + tagIndexOffset;
776✔
373
    SFieldWithOptions* pField = taosArrayGet(pCreatingTags, i);
776✔
374
    if (NULL == pField) {
776✔
375
      stError("isSupportedSTableSchema: failed to get field from array");
×
376
      return false;
×
377
    }
378

379
    for(int32_t j = 0; j < pTableMetaRsp->numOfTags; ++j) {
1,552✔
380
      if (strncmp(pTableMetaRsp->pSchemas[index].name, pField->name, TSDB_COL_NAME_LEN) == 0) {
776✔
381
        if (pTableMetaRsp->pSchemas[index].type == pField->type &&
×
382
            pTableMetaRsp->pSchemas[index].bytes == pField->bytes) {
×
383
          break;
384
        } else {
385
          return false;
×
386
        }
387
      }
388
    }
389
  }
390
  return true;
776✔
391
}
392

393
static bool isSupportedSTableSchema(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
1,552✔
394
  if (!colsIsSupported(pTableMetaRsp, pInserterParam)) {
1,552✔
395
    return false;
×
396
  }
397
  if (!TagsIsSupported(pTableMetaRsp, pInserterParam)) {
1,552✔
398
    return false;
776✔
399
  }
400
  return true;
776✔
401
}
402

403
static bool isSupportedNTableSchema(const STableMetaRsp* pTableMetaRsp, const SStreamInserterParam* pInserterParam) {
18,111✔
404
  return colsIsSupported(pTableMetaRsp, pInserterParam);
18,111✔
405
}
406

407
static int32_t checkAndSaveCreateGrpTableInfo(SDataInserterHandle*     pInserthandle,
19,663✔
408
                                              SStreamDataInserterInfo* pInserterInfo) {
409
  int32_t     code = TSDB_CODE_SUCCESS;
19,663✔
410
  SSubmitRes* pSubmitRes = &pInserthandle->submitRes;
19,663✔
411
  int8_t      tbType = pInserthandle->pParam->streamInserterParam->tbType;
19,663✔
412

413
  SVCreateTbRsp*        pCreateTbRsp = taosArrayGet(pSubmitRes->pRsp->aCreateTbRsp, 0);
19,663✔
414
  SSchema*              pExistRow = pCreateTbRsp->pMeta->pSchemas;
19,663✔
415
  SStreamInserterParam* pInserterParam = pInserthandle->pParam->streamInserterParam;
19,663✔
416

417
  if (tbType == TSDB_CHILD_TABLE || tbType == TSDB_SUPER_TABLE) {
19,663✔
418
    if (!isSupportedSTableSchema(pCreateTbRsp->pMeta, pInserterParam)) {
1,552✔
419
      stError("create table failed, schema is not supported");
776✔
420
      return TSDB_CODE_STREAM_INSERT_SCHEMA_NOT_MATCH;
776✔
421
    }
422
  } else if (tbType == TSDB_NORMAL_TABLE) {
18,111✔
423
    if (!isSupportedNTableSchema(pCreateTbRsp->pMeta, pInserterParam)) {
18,111✔
424
      stError("create table failed, schema is not supported");
×
425
      return TSDB_CODE_STREAM_INSERT_SCHEMA_NOT_MATCH;
×
426
    }
427
  } else {
428
    stError("checkAndSaveCreateGrpTableInfo failed, tbType:%d is not supported", tbType);
×
429
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
430
  }
431

432
  return updateInsertGrpTableInfo(pInserterInfo, pSubmitRes);
18,887✔
433
}
434

435
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
5,006,021✔
436
  SSubmitRspParam*     pParam = (SSubmitRspParam*)param;
5,006,021✔
437
  SDataInserterHandle* pInserter = pParam->pInserter;
5,006,021✔
438
  int32_t              code2 = 0;
5,006,021✔
439

440
  if (code) {
5,006,021✔
441
    pInserter->submitRes.code = code;
23,314✔
442
  } else {
443
    pInserter->submitRes.code = TSDB_CODE_SUCCESS;
4,982,707✔
444
  }
445
  SDecoder coder = {0};
5,006,021✔
446

447
  if (code == TSDB_CODE_SUCCESS) {
5,006,021✔
448
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
4,982,707✔
449
    if (NULL == pInserter->submitRes.pRsp) {
4,982,707✔
450
      pInserter->submitRes.code = terrno;
×
451
      goto _return;
×
452
    }
453

454
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
4,982,707✔
455
    code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
4,982,707✔
456
    if (code) {
4,982,707✔
457
      tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
×
458
      taosMemoryFree(pInserter->submitRes.pRsp);
×
459
      pInserter->submitRes.code = code;
×
460
      goto _return;
×
461
    }
462

463
    if (pInserter->submitRes.pRsp->affectedRows > 0) {
4,982,707✔
464
      SArray* pCreateTbList = pInserter->submitRes.pRsp->aCreateTbRsp;
520,534✔
465
      int32_t numOfTables = taosArrayGetSize(pCreateTbList);
520,949✔
466

467
      for (int32_t i = 0; i < numOfTables; ++i) {
794,936✔
468
        SVCreateTbRsp* pRsp = taosArrayGet(pCreateTbList, i);
273,987✔
469
        if (NULL == pRsp) {
273,572✔
470
          pInserter->submitRes.code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
471
          goto _return;
×
472
        }
473
        if (TSDB_CODE_SUCCESS != pRsp->code) {
273,572✔
474
          code = pRsp->code;
×
475
          tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
×
476
          taosMemoryFree(pInserter->submitRes.pRsp);
×
477
          pInserter->submitRes.code = code;
×
478
          goto _return;
×
479
        }
480
      }
481
    }
482

483
    if (pParam->putParam != NULL && ((SStreamDataInserterInfo*)pParam->putParam)->isAutoCreateTable) {
4,983,122✔
484
      code2 = updateInsertGrpTableInfo((SStreamDataInserterInfo*)pParam->putParam, &pInserter->submitRes);
283,067✔
485
    }
486

487
    pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
4,983,122✔
488
    qDebug("submit rsp received, affectedRows:%d, total:%" PRId64, pInserter->submitRes.pRsp->affectedRows,
4,982,707✔
489
           pInserter->submitRes.affectedRows);
490
    tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
4,982,707✔
491
    taosMemoryFree(pInserter->submitRes.pRsp);
4,982,707✔
492
  } else if ((TSDB_CODE_TDB_TABLE_ALREADY_EXIST == code && pParam->putParam != NULL &&
23,314✔
493
              ((SStreamDataInserterInfo*)pParam->putParam)->isAutoCreateTable) ||
23,314✔
494
             TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER == code) {
495
    pInserter->submitRes.code = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
19,663✔
496
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
19,663✔
497
    if (NULL == pInserter->submitRes.pRsp) {
19,663✔
498
      code2 = terrno;
×
499
      goto _return;
×
500
    }
501

502
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
19,663✔
503
    code2 = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
19,663✔
504
    if (code2 == TSDB_CODE_SUCCESS) {
19,663✔
505
      code2 = checkAndSaveCreateGrpTableInfo(pInserter, (SStreamDataInserterInfo*)pParam->putParam);
19,663✔
506
    }
507
    tDestroySSubmitRsp2(pInserter->submitRes.pRsp, TSDB_MSG_FLG_DECODE);
19,663✔
508
    taosMemoryFree(pInserter->submitRes.pRsp);
19,663✔
509
  }
510

511
_return:
4,993,621✔
512

513
  if (code2) {
5,006,021✔
514
    qError("update inserter table info failed, error:%s", tstrerror(code2));
776✔
515
  }
516
  tDecoderClear(&coder);
5,006,021✔
517
  TAOS_UNUSED(tsem_post(&pInserter->ready));
5,006,021✔
518

519
  taosMemoryFree(pMsg->pData);
5,006,021✔
520

521
  return TSDB_CODE_SUCCESS;
5,006,021✔
522
}
523

524
void freeUseDbOutput_tmp(void* ppOutput) {
9,053✔
525
  SUseDbOutput* pOut = *(SUseDbOutput**)ppOutput;
9,053✔
526
  if (NULL == ppOutput) {
9,053✔
527
    return;
×
528
  }
529

530
  if (pOut->dbVgroup) {
9,053✔
531
    freeVgInfo(pOut->dbVgroup);
9,053✔
532
  }
533
  taosMemFree(pOut);
9,053✔
534
  *(SUseDbOutput**)ppOutput = NULL;
9,053✔
535
}
536

537
static int32_t processUseDbRspForInserter(void* param, SDataBuf* pMsg, int32_t code) {
57,915✔
538
  int32_t       lino = 0;
57,915✔
539
  SDBVgInfoReq* pVgInfoReq = (SDBVgInfoReq*)param;
57,915✔
540

541
  if (TSDB_CODE_SUCCESS != code) {
57,915✔
542
    // pInserter->pTaskInfo->code = rpcCvtErrCode(code);
543
    // if (pInserter->pTaskInfo->code != code) {
544
    //   qError("load db info rsp received, error:%s, cvted error:%s", tstrerror(code),
545
    //          tstrerror(pInserter->pTaskInfo->code));
546
    // } else {
547
    //   qError("load db info rsp received, error:%s", tstrerror(code));
548
    // }
549
    goto _return;
×
550
  }
551

552
  pVgInfoReq->pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
57,915✔
553
  QUERY_CHECK_NULL(pVgInfoReq->pRsp, code, lino, _return, terrno);
57,915✔
554

555
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pVgInfoReq->pRsp);
57,915✔
556
  QUERY_CHECK_CODE(code, lino, _return);
57,915✔
557

558
_return:
57,915✔
559
  taosMemoryFreeClear(pMsg->pData);
57,915✔
560
  taosMemoryFreeClear(pMsg->pEpSet);
57,915✔
561
  if (code != 0){
57,915✔
562
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
563
  }
564
  int ret = tsem_post(&pVgInfoReq->ready);
57,915✔
565
  if (ret != 0) {
57,915✔
566
    qError("%s failed code: %d", __func__, ret);
×
567
  }
568
  return code;
57,915✔
569
}
570

571

572
int inserterVgInfoComp(const void* lp, const void* rp) {
84,261✔
573
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
84,261✔
574
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
84,261✔
575
  if (pLeft->hashBegin < pRight->hashBegin) {
84,261✔
576
    return -1;
73,692✔
577
  } else if (pLeft->hashBegin > pRight->hashBegin) {
10,569✔
578
    return 1;
10,569✔
579
  }
580

581
  return 0;
×
582
}
583

584
static int32_t buildDbVgInfoMap(void* clientRpc, const char* dbFName, SUseDbOutput* output) {
57,915✔
585
  int32_t      code = TSDB_CODE_SUCCESS;
57,915✔
586
  int32_t      lino = 0;
57,915✔
587
  char*        buf1 = NULL;
57,915✔
588
  SUseDbReq*   pReq = NULL;
57,915✔
589
  SDBVgInfoReq dbVgInfoReq = {0};
57,915✔
590
  code = tsem_init(&dbVgInfoReq.ready, 0, 0);
57,915✔
591
  if (code != TSDB_CODE_SUCCESS) {
57,915✔
592
    qError("tsem_init failed, error:%s", tstrerror(code));
×
593
    return code;
×
594
  }
595

596
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
57,915✔
597
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno);
57,915✔
598

599
  tstrncpy(pReq->db, dbFName, TSDB_DB_FNAME_LEN);
57,915✔
600
  QUERY_CHECK_CODE(code, lino, _return);
57,915✔
601

602
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
57,915✔
603
  buf1 = taosMemoryCalloc(1, contLen);
57,915✔
604
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno);
57,915✔
605

606
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
57,915✔
607
  if (tempRes < 0) {
57,915✔
608
    QUERY_CHECK_CODE(terrno, lino, _return);
×
609
  }
610

611
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
57,915✔
612
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno);
57,915✔
613

614
  SEpSet pEpSet = {0};
57,915✔
615
  QUERY_CHECK_CODE(getCurrentMnodeEpset(&pEpSet), lino, _return);
57,915✔
616

617
  pMsgSendInfo->param = &dbVgInfoReq;
57,915✔
618
  pMsgSendInfo->msgInfo.pData = buf1;
57,915✔
619
  buf1 = NULL;
57,915✔
620
  pMsgSendInfo->msgInfo.len = contLen;
57,915✔
621
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
57,915✔
622
  pMsgSendInfo->fp = processUseDbRspForInserter;
57,519✔
623
  // pMsgSendInfo->requestId = pTaskInfo->id.queryId;
624

625
  code = asyncSendMsgToServer(clientRpc, &pEpSet, NULL, pMsgSendInfo);
57,915✔
626
  QUERY_CHECK_CODE(code, lino, _return);
57,915✔
627

628
  code = tsem_wait(&dbVgInfoReq.ready);
57,915✔
629
  QUERY_CHECK_CODE(code, lino, _return);
57,915✔
630

631
  code = queryBuildUseDbOutput(output, dbVgInfoReq.pRsp);
57,915✔
632
  QUERY_CHECK_CODE(code, lino, _return);
57,915✔
633

634
  output->dbVgroup->vgArray = taosArrayInit(dbVgInfoReq.pRsp->vgNum, sizeof(SVgroupInfo));
57,915✔
635
  if (NULL == output->dbVgroup->vgArray) {
57,915✔
636
    code = terrno;
×
637
    QUERY_CHECK_CODE(code, lino, _return);
×
638
  }
639

640
  void* pIter = taosHashIterate(output->dbVgroup->vgHash, NULL);
57,915✔
641
  while (pIter) {
151,997✔
642
    if (NULL == taosArrayPush(output->dbVgroup->vgArray, pIter)) {
188,164✔
643
      taosHashCancelIterate(output->dbVgroup->vgHash, pIter);
×
644
      return terrno;
×
645
    }
646

647
    pIter = taosHashIterate(output->dbVgroup->vgHash, pIter);
94,082✔
648
  }
649

650
  taosArraySort(output->dbVgroup->vgArray, inserterVgInfoComp);
57,915✔
651

652
_return:
57,915✔
653

654
  if (code) {
57,915✔
655
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
656
    taosMemoryFree(buf1);
×
657
  }
658
  taosMemoryFree(pReq);
57,915✔
659
  TAOS_UNUSED(tsem_destroy(&dbVgInfoReq.ready));
57,915✔
660
  if (dbVgInfoReq.pRsp) {
57,915✔
661
    tFreeSUsedbRsp(dbVgInfoReq.pRsp);
57,915✔
662
    taosMemoryFreeClear(dbVgInfoReq.pRsp);
57,915✔
663
  }
664
  return code;
57,915✔
665
}
666

667
int32_t inserterBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
236,395✔
668
                                 SArray* tagName, uint8_t tagNum, int32_t ttl) {
669
  pTbReq->type = TD_CHILD_TABLE;
236,395✔
670
  pTbReq->ctb.pTag = (uint8_t*)pTag;
236,395✔
671
  pTbReq->name = taosStrdup(tname);
236,395✔
672
  if (!pTbReq->name) return terrno;
236,395✔
673
  pTbReq->ctb.suid = suid;
236,395✔
674
  pTbReq->ctb.tagNum = tagNum;
236,395✔
675
  if (sname) {
236,395✔
676
    pTbReq->ctb.stbName = taosStrdup(sname);
236,395✔
677
    if (!pTbReq->ctb.stbName) {
236,395✔
678
      taosMemoryFree(pTbReq->name);
×
679
      return terrno;
×
680
    }
681
  }
682
  pTbReq->ctb.tagName = tagName;
236,395✔
683
  pTbReq->ttl = ttl;
236,395✔
684
  pTbReq->commentLen = -1;
235,999✔
685

686
  return TSDB_CODE_SUCCESS;
236,395✔
687
}
688

689
int32_t inserterHashValueComp(void const* lp, void const* rp) {
6,792,139✔
690
  uint32_t*    key = (uint32_t*)lp;
6,792,139✔
691
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
6,792,139✔
692

693
  if (*key < pVg->hashBegin) {
6,792,139✔
694
    return -1;
1,779,630✔
695
  } else if (*key > pVg->hashEnd) {
5,012,509✔
696
    return 1;
55,787✔
697
  }
698

699
  return 0;
4,956,722✔
700
}
701

702

703
int32_t inserterGetVgInfo(SDBVgInfo* dbInfo, char* tbName, SVgroupInfo* pVgInfo) {
4,956,722✔
704
  if (NULL == dbInfo) {
4,956,722✔
705
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
706
  }
707

708
  if (NULL == dbInfo->vgArray) {
4,956,722✔
709
    qError("empty db vgArray, hashSize:%d", taosHashGetSize(dbInfo->vgHash));
×
710
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
711
  }
712

713
  uint32_t hashValue =
4,956,722✔
714
      taosGetTbHashVal(tbName, (int32_t)strlen(tbName), dbInfo->hashMethod, dbInfo->hashPrefix, dbInfo->hashSuffix);
4,956,722✔
715
  SVgroupInfo* vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, inserterHashValueComp, TD_EQ);
4,956,722✔
716
  if (NULL == vgInfo) {
4,956,722✔
717
    qError("no hash range found for hash value [%u], table:%s, numOfVgId:%d", hashValue, tbName,
×
718
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
719
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
720
  }
721
  
722
  *pVgInfo = *vgInfo;
4,956,722✔
723
  qDebug("insert get vgInfo, tbName:%s vgId:%d epset(%s:%d)", tbName, pVgInfo->vgId, pVgInfo->epSet.eps[0].fqdn,
4,956,722✔
724
        pVgInfo->epSet.eps[0].port);
725
        
726
  return TSDB_CODE_SUCCESS;
4,956,722✔
727
}
728

729
int32_t inserterGetVgId(SDBVgInfo* dbInfo, char* tbName, int32_t* vgId) {
3,750✔
730
  SVgroupInfo vgInfo = {0};
3,750✔
731
  int32_t     code = inserterGetVgInfo(dbInfo, tbName, &vgInfo);
3,750✔
732
  if (code != TSDB_CODE_SUCCESS) {
3,750✔
733
    qError("inserterGetVgId failed, code:%d", code);
×
734
    return code;
×
735
  }
736
  *vgId = vgInfo.vgId;
3,750✔
737

738
  return TSDB_CODE_SUCCESS;
3,750✔
739
}
740

741
int32_t inserterGetDbVgInfo(SDataInserterHandle* pInserter, const char* dbFName, SDBVgInfo** dbVgInfo) {
1,875✔
742
  int32_t       code = TSDB_CODE_SUCCESS;
1,875✔
743
  int32_t       line = 0;
1,875✔
744
  SUseDbOutput* output = NULL;
1,875✔
745

746
  // QRY_PARAM_CHECK(dbVgInfo);
747
  // QRY_PARAM_CHECK(pInserter);
748
  // QRY_PARAM_CHECK(name);
749

750
  if (pInserter->dbVgInfoMap == NULL) {
1,875✔
751
    pInserter->dbVgInfoMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,875✔
752
    if (pInserter->dbVgInfoMap == NULL) {
1,875✔
753
      return TSDB_CODE_OUT_OF_MEMORY;
×
754
    }
755
  }
756

757
  SUseDbOutput** find = (SUseDbOutput**)taosHashGet(pInserter->dbVgInfoMap, dbFName, strlen(dbFName));
1,875✔
758

759
  if (find == NULL) {
1,875✔
760
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
1,875✔
761
    if (output == NULL) {
1,875✔
762
      return TSDB_CODE_OUT_OF_MEMORY;
×
763
    }
764

765
    code = buildDbVgInfoMap(pInserter->pParam->readHandle->pMsgCb->clientRpc, dbFName, output);
1,875✔
766
    QUERY_CHECK_CODE(code, line, _return);
1,875✔
767

768
    code = taosHashPut(pInserter->dbVgInfoMap, dbFName, strlen(dbFName), &output, POINTER_BYTES);
1,875✔
769
    QUERY_CHECK_CODE(code, line, _return);
1,875✔
770
  } else {
771
    output = *find;
×
772
  }
773

774
  *dbVgInfo = output->dbVgroup;
1,875✔
775
  return code;
1,875✔
776

777
_return:
×
778
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
779
  freeUseDbOutput_tmp(&output);
×
780
  return code;
×
781
}
782

783
int32_t getTableVgInfo(SDataInserterHandle* pInserter, const char* dbFName,
4,952,545✔
784
                       const char* tbName, SVgroupInfo* pVgInfo) {
785
  return getDbVgInfoForExec(pInserter->pParam->readHandle->pMsgCb->clientRpc, dbFName,
4,952,545✔
786
                              tbName, pVgInfo);
787
}
788

789
static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* putParam, void* pMsg, int32_t msgLen,
5,006,021✔
790
                                 void* pTransporter, SEpSet* pEpset) {
791
  // send the fetch remote task result reques
792
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
5,006,021✔
793
  if (NULL == pMsgSendInfo) {
5,005,625✔
794
    taosMemoryFreeClear(pMsg);
×
795
    return terrno;
×
796
  }
797

798
  SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
5,005,625✔
799
  if (NULL == pParam) {
5,006,021✔
800
    taosMemoryFreeClear(pMsg);
×
801
    taosMemoryFreeClear(pMsgSendInfo);
×
802
    return terrno;
×
803
  }
804
  pParam->pInserter = pInserter;
5,006,021✔
805
  pParam->putParam = putParam;
5,006,021✔
806

807
  pMsgSendInfo->param = pParam;
5,006,021✔
808
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
5,006,021✔
809
  pMsgSendInfo->msgInfo.pData = pMsg;
5,006,021✔
810
  pMsgSendInfo->msgInfo.len = msgLen;
5,006,021✔
811
  pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
5,006,021✔
812
  pMsgSendInfo->fp = inserterCallback;
5,005,625✔
813

814
  return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
5,005,625✔
815
}
816

817
static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) {
5,006,021✔
818
  int32_t code = TSDB_CODE_SUCCESS;
5,006,021✔
819
  int32_t len = 0;
5,006,021✔
820
  void*   pBuf = NULL;
5,006,021✔
821
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
5,006,021✔
822
  if (TSDB_CODE_SUCCESS == code) {
5,004,783✔
823
    SEncoder encoder;
4,992,383✔
824
    len += sizeof(SSubmitReq2Msg);
5,005,591✔
825
    pBuf = taosMemoryMalloc(len);
5,005,591✔
826
    if (NULL == pBuf) {
5,005,178✔
827
      return terrno;
×
828
    }
829
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
5,005,178✔
830
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
5,005,178✔
831
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
5,004,759✔
832
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
5,005,172✔
833
    code = tEncodeSubmitReq(&encoder, pReq);
5,006,021✔
834
    tEncoderClear(&encoder);
5,005,602✔
835
  }
836

837
  if (TSDB_CODE_SUCCESS == code) {
5,004,753✔
838
    *pData = pBuf;
5,004,753✔
839
    *pLen = len;
5,004,753✔
840
  } else {
841
    taosMemoryFree(pBuf);
×
842
  }
843

844
  return code;
5,003,904✔
845
}
846

847
int32_t buildSubmitReqFromStbBlock(SDataInserterHandle* pInserter, SHashObj* pHash, const SSDataBlock* pDataBlock,
1,875✔
848
                                   const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid) {
849
  SArray* pVals = NULL;
1,875✔
850
  SArray* pTagVals = NULL;
1,875✔
851
  SSubmitReq2** ppReq = NULL;
1,875✔
852
  int32_t numOfBlks = 0;
1,875✔
853

854
  terrno = TSDB_CODE_SUCCESS;
1,875✔
855

856
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
1,875✔
857
  int32_t rows = pDataBlock->info.rows;
1,875✔
858

859
  if (!pTagVals && !(pTagVals = taosArrayInit(colNum, sizeof(STagVal)))) {
1,875✔
860
    goto _end;
×
861
  }
862

863
  if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
1,875✔
864
    goto _end;
×
865
  }
866

867
  SDBVgInfo* dbInfo = NULL;
1,875✔
868
  int32_t    code = inserterGetDbVgInfo(pInserter, pInserter->dbFName, &dbInfo);
1,875✔
869
  if (code != TSDB_CODE_SUCCESS) {
1,875✔
870
    terrno = code;
×
871
    goto _end;
×
872
  }
873

874
  for (int32_t j = 0; j < rows; ++j) {
5,625✔
875
    SSubmitTbData tbData = {0};
3,750✔
876
    if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
3,750✔
877
      goto _end;
×
878
    }
879
    tbData.suid = suid;
3,750✔
880
    tbData.uid = uid;
3,750✔
881
    tbData.sver = pTSchema->version;
3,750✔
882

883
    int64_t lastTs = TSKEY_MIN;
3,750✔
884

885
    taosArrayClear(pVals);
3,750✔
886

887
    int32_t offset = 0;
3,750✔
888
    taosArrayClear(pTagVals);
3,750✔
889
    tbData.uid = 0;
3,750✔
890
    tbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
3,750✔
891
    if (NULL == tbData.pCreateTbReq) {
3,750✔
892
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
893
      goto _end;
×
894
    }
895
    tbData.flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
3,750✔
896

897
    SColumnInfoData* tbname = taosArrayGet(pDataBlock->pDataBlock, 0);
3,750✔
898
    if (NULL == tbname) {
3,750✔
899
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
900
      qError("Insert into stable must have tbname column");
×
901
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
902
      goto _end;
×
903
    }
904
    if (tbname->info.type != TSDB_DATA_TYPE_BINARY) {
3,750✔
905
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
906
      qError("tbname column must be binary");
×
907
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
908
      goto _end;
×
909
    }
910

911
    if (colDataIsNull_s(tbname, j)) {
7,500✔
912
      qError("insert into stable tbname column is null");
×
913
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
914
      goto _end;
×
915
    }
916
    void*   data = colDataGetVarData(tbname, j);
3,750✔
917
    SValue  sv = (SValue){TSDB_DATA_TYPE_VARCHAR, .nData = varDataLen(data), .pData = varDataVal(data)};
3,750✔
918
    SColVal cv = COL_VAL_VALUE(0, sv);
3,750✔
919

920
    char tbFullName[TSDB_TABLE_FNAME_LEN];
3,750✔
921
    char tableName[TSDB_TABLE_FNAME_LEN];
3,750✔
922
    memcpy(tableName, sv.pData, sv.nData);
3,750✔
923
    tableName[sv.nData] = '\0';
3,750✔
924

925
    int32_t len = snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", pInserter->dbFName, tableName);
3,750✔
926
    if (len >= TSDB_TABLE_FNAME_LEN) {
3,750✔
927
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
928
      qError("table name too long after format, len:%d, maxLen:%d", len, TSDB_TABLE_FNAME_LEN);
×
929
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
930
      goto _end;
×
931
    }
932
    int32_t vgIdForTbName = 0;
3,750✔
933
    code = inserterGetVgId(dbInfo, tbFullName, &vgIdForTbName);
3,750✔
934
    if (code != TSDB_CODE_SUCCESS) {
3,750✔
935
      terrno = code;
×
936
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
937
      goto _end;
×
938
    }
939
    SSubmitReq2* pReq = NULL;
3,750✔
940
    ppReq = taosHashGet(pHash, &vgIdForTbName, sizeof(int32_t));
3,750✔
941
    if (ppReq == NULL) {
3,750✔
942
      pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2));
3,750✔
943
      if (NULL == pReq) {
3,750✔
944
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
945
        goto _end;
×
946
      }
947

948
      if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
3,750✔
949
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
950
        goto _end;
×
951
      }
952
      code = taosHashPut(pHash, &vgIdForTbName, sizeof(int32_t), &pReq, POINTER_BYTES);
3,750✔
953
    } else {
954
      pReq = *ppReq;
×
955
    }
956

957
    if (code != TSDB_CODE_SUCCESS) {
3,750✔
958
      terrno = code;
×
959
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
960
      goto _end;
×
961
    }
962
    SArray* TagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
3,750✔
963
    if (!TagNames) {
3,750✔
964
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
965
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
966
      goto _end;
×
967
    }
968
    for (int32_t i = 0; i < pInserter->pTagSchema->nCols; ++i) {
11,250✔
969
      SSchema* tSchema = &pInserter->pTagSchema->pSchema[i];
7,500✔
970
      int16_t  colIdx = tSchema->colId;
7,500✔
971
      int16_t* slotId = taosHashGet(pInserter->pCols, &colIdx, sizeof(colIdx));
7,500✔
972
      if (NULL == slotId) {
7,500✔
973
        continue;
5,000✔
974
      }
975
      if (NULL == taosArrayPush(TagNames, tSchema->name)) {
5,000✔
976
        taosArrayDestroy(TagNames);
×
977
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
978
        goto _end;
×
979
      }
980

981
      colIdx = *slotId;
2,500✔
982
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
2,500✔
983
      if (NULL == pColInfoData) {
2,500✔
984
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
985
        taosArrayDestroy(TagNames);
×
986
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
987
        goto _end;
×
988
      }
989
      // void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
990
      switch (pColInfoData->info.type) {
2,500✔
991
        case TSDB_DATA_TYPE_NCHAR:
2,500✔
992
        case TSDB_DATA_TYPE_VARBINARY:
993
        case TSDB_DATA_TYPE_VARCHAR: {
994
          if (pColInfoData->info.type != tSchema->type) {
2,500✔
995
            qError("tag:%d type:%d in block dismatch with schema tag:%d type:%d", colIdx, pColInfoData->info.type, i,
×
996
                   tSchema->type);
997
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
998
            taosArrayDestroy(TagNames);
×
999
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1000
            goto _end;
×
1001
          }
1002
          if (colDataIsNull_s(pColInfoData, j)) {
5,000✔
1003
            continue;
×
1004
          } else {
1005
            void*   data = colDataGetVarData(pColInfoData, j);
2,500✔
1006
            STagVal tv = (STagVal){
2,500✔
1007
                .cid = tSchema->colId, .type = tSchema->type, .nData = varDataLen(data), .pData = varDataVal(data)};
2,500✔
1008
            if (NULL == taosArrayPush(pTagVals, &tv)) {
2,500✔
1009
              taosArrayDestroy(TagNames);
×
1010
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1011
              goto _end;
×
1012
            }
1013
          }
1014
          break;
2,500✔
1015
        }
1016
        case TSDB_DATA_TYPE_BLOB:
×
1017
        case TSDB_DATA_TYPE_JSON:
1018
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1019
          qError("the tag type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1020
          terrno = TSDB_CODE_APP_ERROR;
×
1021
          taosArrayDestroy(TagNames);
×
1022
          tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1023
          goto _end;
×
1024
          break;
1025
        default:
×
1026
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
×
1027
            if (colDataIsNull_s(pColInfoData, j)) {
×
1028
              continue;
×
1029
            } else {
1030
              void*   data = colDataGetData(pColInfoData, j);
×
1031
              STagVal tv = {.cid = tSchema->colId, .type = tSchema->type};
×
1032
              memcpy(&tv.i64, data, tSchema->bytes);
×
1033
              if (NULL == taosArrayPush(pTagVals, &tv)) {
×
1034
                taosArrayDestroy(TagNames);
×
1035
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1036
                goto _end;
×
1037
              }
1038
            }
1039
          } else {
1040
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1041
            terrno = TSDB_CODE_APP_ERROR;
×
1042
            taosArrayDestroy(TagNames);
×
1043
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1044
            goto _end;
×
1045
          }
1046
          break;
×
1047
      }
1048
    }
1049
    STag* pTag = NULL;
3,750✔
1050
    code = tTagNew(pTagVals, 1, false, &pTag);
3,750✔
1051
    if (code != TSDB_CODE_SUCCESS) {
3,750✔
1052
      terrno = code;
×
1053
      qError("failed to create tag, error:%s", tstrerror(code));
×
1054
      taosArrayDestroy(TagNames);
×
1055
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1056
      goto _end;
×
1057
    }
1058

1059
    code = inserterBuildCreateTbReq(tbData.pCreateTbReq, tableName, pTag, suid, pInserter->pNode->tableName, TagNames,
3,750✔
1060
                                    pInserter->pTagSchema->nCols, TSDB_DEFAULT_TABLE_TTL);
3,750✔
1061
    if (code != TSDB_CODE_SUCCESS) {
3,750✔
1062
      terrno = code;
×
1063
      qError("failed to build create table request, error:%s", tstrerror(code));
×
1064
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1065
      goto _end;
×
1066
    }
1067

1068
    for (int32_t k = 0; k < pTSchema->numOfCols; ++k) {
18,750✔
1069
      int16_t         colIdx = k;
15,000✔
1070
      const STColumn* pCol = &pTSchema->columns[k];
15,000✔
1071
      int16_t*        slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
15,000✔
1072
      if (NULL == slotId) {
15,000✔
1073
        continue;
3,750✔
1074
      }
1075
      colIdx = *slotId;
11,250✔
1076

1077
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
11,250✔
1078
      if (NULL == pColInfoData) {
11,250✔
1079
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1080
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1081
        goto _end;
×
1082
      }
1083
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
11,250✔
1084

1085
      switch (pColInfoData->info.type) {
11,250✔
1086
        case TSDB_DATA_TYPE_NCHAR:
×
1087
        case TSDB_DATA_TYPE_VARBINARY:
1088
        case TSDB_DATA_TYPE_VARCHAR: {
1089
          if (pColInfoData->info.type != pCol->type) {
×
1090
            qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k,
×
1091
                   pCol->type);
1092
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1093
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1094
            goto _end;
×
1095
          }
1096
          if (colDataIsNull_s(pColInfoData, j)) {
×
1097
            SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
1098
            if (NULL == taosArrayPush(pVals, &cv)) {
×
1099
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1100
              goto _end;
×
1101
            }
1102
          } else {
1103
            void*   data = colDataGetVarData(pColInfoData, j);
×
1104
            SValue  sv = (SValue){.type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};
×
1105
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
1106
            if (NULL == taosArrayPush(pVals, &cv)) {
×
1107
              tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1108
              goto _end;
×
1109
            }
1110
          }
1111
          break;
×
1112
        }
1113
        case TSDB_DATA_TYPE_BLOB:
×
1114
        case TSDB_DATA_TYPE_JSON:
1115
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1116
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1117
          terrno = TSDB_CODE_APP_ERROR;
×
1118
          tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1119
          goto _end;
×
1120
          break;
1121
        default:
11,250✔
1122
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
11,250✔
1123
            if (colDataIsNull_s(pColInfoData, j)) {
22,500✔
1124
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
×
1125
                qError("Primary timestamp column should not be null");
×
1126
                terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
×
1127
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1128
                goto _end;
×
1129
              }
1130

1131
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
1132
              if (NULL == taosArrayPush(pVals, &cv)) {
×
1133
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1134
                goto _end;
×
1135
              }
1136
            } else {
1137
              // if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
1138
              //   if (*(int64_t*)var <= lastTs) {
1139
              //     needSortMerge = true;
1140
              //   } else {
1141
              //     lastTs = *(int64_t*)var;
1142
              //   }
1143
              // }
1144

1145
              SValue sv = {.type = pCol->type};
11,250✔
1146
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
11,250✔
1147
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
11,250✔
1148
              if (NULL == taosArrayPush(pVals, &cv)) {
11,250✔
1149
                tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1150
                goto _end;
×
1151
              }
1152
            }
1153
          } else {
1154
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1155
            terrno = TSDB_CODE_APP_ERROR;
×
1156
            tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1157
            goto _end;
×
1158
          }
1159
          break;
11,250✔
1160
      }
1161
    }
1162

1163
    SRow* pRow = NULL;
3,750✔
1164
    SRowBuildScanInfo sinfo = {0};
3,750✔
1165
    if ((terrno = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) < 0) {
3,750✔
1166
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1167
      goto _end;
×
1168
    }
1169
    if (NULL == taosArrayPush(tbData.aRowP, &pRow)) {
7,500✔
1170
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
1171
      goto _end;
×
1172
    }
1173

1174
    if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbData)) {
7,500✔
1175
      goto _end;
×
1176
    }
1177
  }
1178

1179
_end:
1,875✔
1180
  taosArrayDestroy(pTagVals);
1,875✔
1181
  taosArrayDestroy(pVals);
1,875✔
1182

1183
  return terrno;
1,875✔
1184
}
1185

1186
int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** ppReq, const SSDataBlock* pDataBlock,
58,885✔
1187
                                const STSchema* pTSchema, int64_t* uid, int32_t* vgId, tb_uid_t* suid) {
1188
  SSubmitReq2* pReq = *ppReq;
58,885✔
1189
  SArray*      pVals = NULL;
58,885✔
1190
  SArray*      pTagVals = NULL;
58,885✔
1191
  int32_t      numOfBlks = 0;
58,885✔
1192
  char*        tableName = NULL;
58,885✔
1193
  int32_t      code = 0, lino = 0;
58,885✔
1194

1195
  terrno = TSDB_CODE_SUCCESS;
58,885✔
1196

1197
  if (NULL == pReq) {
58,885✔
1198
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
58,885✔
1199
      goto _end;
×
1200
    }
1201

1202
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
58,885✔
1203
      goto _end;
×
1204
    }
1205
  }
1206

1207
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
58,885✔
1208
  int32_t rows = pDataBlock->info.rows;
58,885✔
1209

1210
  SSubmitTbData tbData = {0};
58,885✔
1211
  if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
58,885✔
1212
    goto _end;
×
1213
  }
1214
  tbData.suid = *suid;
58,885✔
1215
  tbData.uid = *uid;
58,885✔
1216
  tbData.sver = pTSchema->version;
58,885✔
1217

1218
  if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
58,885✔
1219
    taosArrayDestroy(tbData.aRowP);
×
1220
    goto _end;
×
1221
  }
1222

1223
  if (pInserter->isStbInserter) {
58,885✔
1224
    if (!pTagVals && !(pTagVals = taosArrayInit(colNum, sizeof(STagVal)))) {
×
1225
      taosArrayDestroy(tbData.aRowP);
×
1226
      goto _end;
×
1227
    }
1228
  }
1229

1230
  int64_t lastTs = TSKEY_MIN;
58,885✔
1231
  bool    needSortMerge = false;
58,885✔
1232

1233
  for (int32_t j = 0; j < rows; ++j) {  // iterate by row
3,506,613✔
1234
    taosArrayClear(pVals);
3,455,444✔
1235

1236
    int32_t offset = 0;
3,455,444✔
1237
    // 处理超级表的tbname和tags
1238
    if (pInserter->isStbInserter) {
3,455,444✔
1239
      taosArrayClear(pTagVals);
×
1240
      tbData.uid = 0;
×
1241
      *uid = 0;
×
1242
      tbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
×
1243
      tbData.flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
×
1244

1245
      SColumnInfoData* tbname = taosArrayGet(pDataBlock->pDataBlock, 0);
×
1246
      if (NULL == tbname) {
×
1247
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1248
        qError("Insert into stable must have tbname column");
×
1249
        goto _end;
×
1250
      }
1251
      if (tbname->info.type != TSDB_DATA_TYPE_BINARY) {
×
1252
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1253
        qError("tbname column must be binary");
×
1254
        goto _end;
×
1255
      }
1256

1257
      if (colDataIsNull_s(tbname, j)) {
×
1258
        qError("insert into stable tbname column is null");
×
1259
        goto _end;
×
1260
      }
1261
      void*   data = colDataGetVarData(tbname, j);
×
1262
      SValue  sv = (SValue){TSDB_DATA_TYPE_VARCHAR, .nData = varDataLen(data),
×
1263
                            .pData = varDataVal(data)};  // address copy, no value
×
1264
      SColVal cv = COL_VAL_VALUE(0, sv);
×
1265

1266
      // 获取子表vgId
1267
      SDBVgInfo* dbInfo = NULL;
×
1268
      code = inserterGetDbVgInfo(pInserter, pInserter->dbFName, &dbInfo);
×
1269
      if (code != TSDB_CODE_SUCCESS) {
×
1270
        goto _end;
×
1271
      }
1272

1273
      char tbFullName[TSDB_TABLE_FNAME_LEN];
×
1274
      taosMemoryFreeClear(tableName);
×
1275
      tableName = taosMemoryCalloc(1, sv.nData + 1);
×
1276
      TSDB_CHECK_NULL(tableName, code, lino, _end, terrno);
×
1277
      tstrncpy(tableName, sv.pData, sv.nData);
×
1278
      tableName[sv.nData] = '\0';
×
1279

1280
      int32_t len = snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", pInserter->dbFName, tableName);
×
1281
      if (len >= TSDB_TABLE_FNAME_LEN) {
×
1282
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1283
        qError("table name too long after format, len:%d, maxLen:%d", len, TSDB_TABLE_FNAME_LEN);
×
1284
        goto _end;
×
1285
      }
1286
      code = inserterGetVgId(dbInfo, tbFullName, vgId);
×
1287
      if (code != TSDB_CODE_SUCCESS) {
×
1288
        terrno = code;
×
1289
        goto _end;
×
1290
      }
1291
      // 解析tag
1292
      SArray* TagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
×
1293
      if (!TagNames) {
×
1294
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1295
        goto _end;
×
1296
      }
1297
      for (int32_t i = 0; i < pInserter->pTagSchema->nCols; ++i) {
×
1298
        SSchema* tSchema = &pInserter->pTagSchema->pSchema[i];
×
1299
        int16_t  colIdx = tSchema->colId;
×
1300
        if (NULL == taosArrayPush(TagNames, tSchema->name)) {
×
1301
          goto _end;
×
1302
        }
1303
        int16_t* slotId = taosHashGet(pInserter->pCols, &colIdx, sizeof(colIdx));
×
1304
        if (NULL == slotId) {
×
1305
          continue;
×
1306
        }
1307

1308
        colIdx = *slotId;
×
1309
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
×
1310
        if (NULL == pColInfoData) {
×
1311
          terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1312
          goto _end;
×
1313
        }
1314
        // void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
1315
        switch (pColInfoData->info.type) {
×
1316
          case TSDB_DATA_TYPE_NCHAR:
×
1317
          case TSDB_DATA_TYPE_VARBINARY:
1318
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1319
            if (pColInfoData->info.type != tSchema->type) {
×
1320
              qError("tag:%d type:%d in block dismatch with schema tag:%d type:%d", colIdx, pColInfoData->info.type, i,
×
1321
                     tSchema->type);
1322
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1323
              goto _end;
×
1324
            }
1325
            if (colDataIsNull_s(pColInfoData, j)) {
×
1326
              continue;
×
1327
            } else {
1328
              void*   data = colDataGetVarData(pColInfoData, j);
×
1329
              STagVal tv = (STagVal){.cid = tSchema->colId,
×
1330
                                     .type = tSchema->type,
×
1331
                                     .nData = varDataLen(data),
×
1332
                                     .pData = varDataVal(data)};  // address copy, no value
×
1333
              if (NULL == taosArrayPush(pTagVals, &tv)) {
×
1334
                goto _end;
×
1335
              }
1336
            }
1337
            break;
×
1338
          }
1339
          case TSDB_DATA_TYPE_BLOB:
×
1340
          case TSDB_DATA_TYPE_JSON:
1341
          case TSDB_DATA_TYPE_MEDIUMBLOB:
1342
            qError("the tag type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1343
            terrno = TSDB_CODE_APP_ERROR;
×
1344
            goto _end;
×
1345
            break;
1346
          default:
×
1347
            if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
×
1348
              if (colDataIsNull_s(pColInfoData, j)) {
×
1349
                continue;
×
1350
              } else {
1351
                void*   data = colDataGetData(pColInfoData, j);
×
1352
                STagVal tv = {.cid = tSchema->colId, .type = tSchema->type};
×
1353
                memcpy(&tv.i64, data, tSchema->bytes);
×
1354
                if (NULL == taosArrayPush(pTagVals, &tv)) {
×
1355
                  goto _end;
×
1356
                }
1357
              }
1358
            } else {
1359
              uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1360
              terrno = TSDB_CODE_APP_ERROR;
×
1361
              goto _end;
×
1362
            }
1363
            break;
×
1364
        }
1365
      }
1366
      STag* pTag = NULL;
×
1367
      code = tTagNew(pTagVals, 1, false, &pTag);
×
1368
      if (code != TSDB_CODE_SUCCESS) {
×
1369
        terrno = code;
×
1370
        qError("failed to create tag, error:%s", tstrerror(code));
×
1371
        goto _end;
×
1372
      }
1373

1374
      code = inserterBuildCreateTbReq(tbData.pCreateTbReq, tableName, pTag, *suid, pInserter->pNode->tableName, TagNames,
×
1375
                               pInserter->pTagSchema->nCols, TSDB_DEFAULT_TABLE_TTL);
×
1376
    }
1377

1378
    for (int32_t k = 0; k < pTSchema->numOfCols; ++k) {  // iterate by column
17,302,244✔
1379
      int16_t         colIdx = k;
13,846,800✔
1380
      const STColumn* pCol = &pTSchema->columns[k];
13,846,800✔
1381
      if (!pInserter->fullOrderColList) {
13,846,800✔
1382
        int16_t* slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
173,654✔
1383
        if (NULL == slotId) {
173,654✔
1384
          continue;
45,180✔
1385
        }
1386

1387
        colIdx = *slotId;
128,474✔
1388
      }
1389

1390
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
13,801,620✔
1391
      if (NULL == pColInfoData) {
13,801,620✔
1392
        terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1393
        goto _end;
×
1394
      }
1395
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
13,801,620✔
1396

1397
      switch (pColInfoData->info.type) {
13,801,620✔
1398
        case TSDB_DATA_TYPE_NCHAR:
138,288✔
1399
        case TSDB_DATA_TYPE_VARBINARY:
1400
        case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1401
          if (pColInfoData->info.type != pCol->type) {
138,288✔
1402
            qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k,
×
1403
                   pCol->type);
1404
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1405
            goto _end;
×
1406
          }
1407
          if (colDataIsNull_s(pColInfoData, j)) {
276,576✔
1408
            SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
2,572✔
1409
            if (NULL == taosArrayPush(pVals, &cv)) {
2,572✔
1410
              goto _end;
×
1411
            }
1412
          } else {
1413
            void*  data = colDataGetVarData(pColInfoData, j);
135,716✔
1414
            SValue sv = (SValue){
407,148✔
1415
                .type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};  // address copy, no value
135,716✔
1416
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
135,716✔
1417
            if (NULL == taosArrayPush(pVals, &cv)) {
135,716✔
1418
              goto _end;
×
1419
            }
1420
          }
1421
          break;
138,288✔
1422
        }
1423
        case TSDB_DATA_TYPE_BLOB:
×
1424
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1425
        case TSDB_DATA_TYPE_JSON:
1426
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1427
          terrno = TSDB_CODE_APP_ERROR;
×
1428
          goto _end;
×
1429
          break;
1430
        default:
13,663,332✔
1431
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
13,663,332✔
1432
            if (colDataIsNull_s(pColInfoData, j)) {
27,326,664✔
1433
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
8,024✔
1434
                qError("Primary timestamp column should not be null");
×
1435
                terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
×
1436
                goto _end;
×
1437
              }
1438

1439
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);  // should use pCol->type
8,024✔
1440
              if (NULL == taosArrayPush(pVals, &cv)) {
8,024✔
1441
                goto _end;
×
1442
              }
1443
            } else {
1444
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
13,655,308✔
1445
                if (*(int64_t*)var <= lastTs) {
3,410,590✔
1446
                  needSortMerge = true;
10,757✔
1447
                } else {
1448
                  lastTs = *(int64_t*)var;
3,399,833✔
1449
                }
1450
              }
1451

1452
              SValue sv = {.type = pCol->type};
13,655,308✔
1453
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
13,655,308✔
1454
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
13,655,308✔
1455
              if (NULL == taosArrayPush(pVals, &cv)) {
13,655,308✔
1456
                goto _end;
×
1457
              }
1458
            }
1459
          } else {
1460
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1461
            terrno = TSDB_CODE_APP_ERROR;
×
1462
            goto _end;
×
1463
          }
1464
          break;
13,663,332✔
1465
      }
1466
    }
1467

1468
    SRow*             pRow = NULL;
3,455,444✔
1469
    SRowBuildScanInfo sinfo = {0};
3,455,444✔
1470
    if ((terrno = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) < 0) {
3,455,444✔
1471
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
7,716✔
1472
      goto _end;
7,716✔
1473
    }
1474
    if (NULL == taosArrayPush(tbData.aRowP, &pRow)) {
6,895,456✔
1475
      goto _end;
×
1476
    }
1477
  }
1478

1479
  if (needSortMerge) {
51,169✔
1480
    if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) ||
10,757✔
1481
        (terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, KEEP_CONSISTENCY)) != 0) {
10,757✔
1482
      goto _end;
×
1483
    }
1484
  }
1485

1486
  if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbData)) {
102,338✔
1487
    goto _end;
×
1488
  }
1489

1490
_end:
58,885✔
1491

1492
  taosMemoryFreeClear(tableName);
58,885✔
1493

1494
  taosArrayDestroy(pTagVals);
58,885✔
1495
  taosArrayDestroy(pVals);
58,885✔
1496
  if (terrno != 0) {
58,885✔
1497
    *ppReq = NULL;
7,716✔
1498
    if (pReq) {
7,716✔
1499
      tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
7,716✔
1500
      taosMemoryFree(pReq);
7,716✔
1501
    }
1502

1503
    return terrno;
7,716✔
1504
  }
1505
  *ppReq = pReq;
51,169✔
1506

1507
  return TSDB_CODE_SUCCESS;
51,169✔
1508
}
1509

1510
static void destroySubmitReqWrapper(void* p) {
3,750✔
1511
  SSubmitReq2* pReq = *(SSubmitReq2**)p;
3,750✔
1512
  if (pReq != NULL) {
3,750✔
1513
    tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
3,750✔
1514
    taosMemoryFree(pReq);
3,750✔
1515
  }
1516
}
3,750✔
1517

1518
int32_t dataBlocksToSubmitReqArray(SDataInserterHandle* pInserter, SArray* pMsgs) {
1,875✔
1519
  const SArray*   pBlocks = pInserter->pDataBlocks;
1,875✔
1520
  const STSchema* pTSchema = pInserter->pSchema;
1,875✔
1521
  int64_t         uid = pInserter->pNode->tableId;
1,875✔
1522
  int64_t         suid = pInserter->pNode->stableId;
1,875✔
1523
  int32_t         vgId = pInserter->pNode->vgId;
1,875✔
1524
  int32_t         sz = taosArrayGetSize(pBlocks);
1,875✔
1525
  int32_t         code = 0;
1,875✔
1526

1527
  SHashObj* pHash = NULL;
1,875✔
1528
  void*     iterator = NULL;
1,875✔
1529

1530
  for (int32_t i = 0; i < sz; i++) {
3,750✔
1531
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
1,875✔
1532
    if (NULL == pDataBlock) {
1,875✔
1533
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1534
    }
1535
    if (pHash == NULL) {
1,875✔
1536
      pHash = taosHashInit(sz * pDataBlock->info.rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false,
1,875✔
1537
                           HASH_ENTRY_LOCK);
1538
      if (NULL == pHash) {
1,875✔
1539
        return terrno;
×
1540
      }
1541
      taosHashSetFreeFp(pHash, destroySubmitReqWrapper);
1,875✔
1542
    }
1543
    code = buildSubmitReqFromStbBlock(pInserter, pHash, pDataBlock, pTSchema, uid, vgId, suid);
1,875✔
1544
    if (code != TSDB_CODE_SUCCESS) {
1,875✔
1545
      goto _end;
×
1546
    }
1547
  }
1548

1549
  size_t keyLen = 0;
1,875✔
1550
  while ((iterator = taosHashIterate(pHash, iterator))) {
5,625✔
1551
    SSubmitReq2* pReq = *(SSubmitReq2**)iterator;
3,750✔
1552
    int32_t*     ctbVgId = taosHashGetKey(iterator, &keyLen);
3,750✔
1553

1554
    SSubmitTbDataMsg* pMsg = taosMemoryCalloc(1, sizeof(SSubmitTbDataMsg));
3,750✔
1555
    if (NULL == pMsg) {
3,750✔
1556
      code = terrno;
×
1557
      goto _end;
×
1558
    }
1559
    code = submitReqToMsg(*ctbVgId, pReq, &pMsg->pData, &pMsg->len);
3,750✔
1560
    if (code != TSDB_CODE_SUCCESS) {
3,750✔
1561
      goto _end;
×
1562
    }
1563
    if (NULL == taosArrayPush(pMsgs, &pMsg)) {
3,750✔
1564
      code = terrno;
×
1565
      goto _end;
×
1566
    }
1567
  }
1568

1569
_end:
1,875✔
1570
  if (pHash != NULL) {
1,875✔
1571
    taosHashCleanup(pHash);
1,875✔
1572
  }
1573

1574
  return code;
1,875✔
1575
}
1576

1577
int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32_t* msgLen) {
58,885✔
1578
  const SArray*   pBlocks = pInserter->pDataBlocks;
58,885✔
1579
  const STSchema* pTSchema = pInserter->pSchema;
58,885✔
1580
  int64_t         uid = pInserter->pNode->tableId;
58,885✔
1581
  int64_t         suid = pInserter->pNode->stableId;
58,885✔
1582
  int32_t         vgId = pInserter->pNode->vgId;
58,885✔
1583
  int32_t         sz = taosArrayGetSize(pBlocks);
58,885✔
1584
  int32_t         code = 0;
58,885✔
1585
  SSubmitReq2*    pReq = NULL;
58,885✔
1586

1587
  for (int32_t i = 0; i < sz; i++) {
110,054✔
1588
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
58,885✔
1589
    if (NULL == pDataBlock) {
58,885✔
1590
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1591
    }
1592
    code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, &uid, &vgId, &suid);
58,885✔
1593
    if (code) {
58,885✔
1594
      if (pReq) {
7,716✔
1595
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
1596
        taosMemoryFree(pReq);
×
1597
      }
1598

1599
      return code;
7,716✔
1600
    }
1601
  }
1602

1603
  code = submitReqToMsg(vgId, pReq, pMsg, msgLen);
51,169✔
1604
  tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
51,169✔
1605
  taosMemoryFree(pReq);
51,169✔
1606

1607
  return code;
51,169✔
1608
}
1609

1610
int32_t getStreamInsertTableInfo(int64_t streamId, int64_t groupId, SInsertTableInfo*** ppTbInfo) {
4,972,635✔
1611
  int64_t            key[2] = {streamId, groupId};
4,972,635✔
1612
  SInsertTableInfo** pTmp = taosHashAcquire(gStreamGrpTableHash, key, sizeof(key));
4,972,635✔
1613
  if (NULL == pTmp || *pTmp == NULL) {
4,972,635✔
1614
    return TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
×
1615
  }
1616

1617
  *ppTbInfo = pTmp;
4,972,635✔
1618
  return TSDB_CODE_SUCCESS;
4,972,635✔
1619
}
1620

1621
static int32_t releaseStreamInsertTableInfo(SInsertTableInfo** ppTbInfo) {
4,972,635✔
1622
  taosHashRelease(gStreamGrpTableHash, ppTbInfo);
4,972,635✔
1623
  return TSDB_CODE_SUCCESS;
4,972,635✔
1624
}
1625

1626
int32_t buildNormalTableCreateReq(SDataInserterHandle* pInserter, SStreamInserterParam* pInsertParam,
71,923✔
1627
                                  SSubmitTbData* tbData) {
1628
  int32_t code = TSDB_CODE_SUCCESS;
71,923✔
1629

1630
  tbData->suid = 0;
71,923✔
1631

1632
  tbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
71,923✔
1633
  if (NULL == tbData->pCreateTbReq) {
71,923✔
1634
    goto _end;
×
1635
  }
1636
  tbData->flags |= (SUBMIT_REQ_AUTO_CREATE_TABLE | SUBMIT_REQ_SCHEMA_RES);
71,923✔
1637
  tbData->pCreateTbReq->type = TSDB_NORMAL_TABLE;
71,923✔
1638
  tbData->pCreateTbReq->flags |= (TD_CREATE_NORMAL_TB_IN_STREAM | TD_CREATE_IF_NOT_EXISTS);
71,923✔
1639
  tbData->pCreateTbReq->uid = 0;
71,923✔
1640
  tbData->sver = pInsertParam->sver;
71,923✔
1641

1642
  tbData->pCreateTbReq->name = taosStrdup(pInsertParam->tbname);
71,923✔
1643
  if (!tbData->pCreateTbReq->name) return terrno;
71,923✔
1644

1645
  int32_t numOfCols = pInsertParam->pFields->size;
71,923✔
1646
  tbData->pCreateTbReq->ntb.schemaRow.nCols = numOfCols;
71,923✔
1647
  tbData->pCreateTbReq->ntb.schemaRow.version = 1;
71,923✔
1648

1649
  tbData->pCreateTbReq->ntb.schemaRow.pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema));
71,923✔
1650
  if (NULL == tbData->pCreateTbReq->ntb.schemaRow.pSchema) {
71,923✔
1651
    goto _end;
×
1652
  }
1653
  for (int32_t i = 0; i < numOfCols; ++i) {
390,992✔
1654
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, i);
319,069✔
1655
    if (NULL == pField) {
319,069✔
1656
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1657
      goto _end;
×
1658
    }
1659
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].colId = i + 1;
319,069✔
1660
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].type = pField->type;
319,069✔
1661
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].bytes = pField->bytes;
319,069✔
1662
    tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].flags = pField->flags;
319,069✔
1663
    if (i == 0 && pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
319,069✔
1664
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1665
      qError("buildNormalTableCreateReq, the first column must be timestamp.");
×
1666
      goto _end;
×
1667
    }
1668
    if (i == 0) {
319,069✔
1669
      tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].flags |= COL_IS_KEY;
71,923✔
1670
    }
1671
    snprintf(tbData->pCreateTbReq->ntb.schemaRow.pSchema[i].name, TSDB_COL_NAME_LEN, "%s", pField->name);
319,069✔
1672
    if (IS_DECIMAL_TYPE(pField->type)) {
319,069✔
1673
      if (!tbData->pCreateTbReq->pExtSchemas) {
×
1674
        tbData->pCreateTbReq->pExtSchemas = taosMemoryCalloc(numOfCols, sizeof(SExtSchema));
×
1675
        if (NULL == tbData->pCreateTbReq->pExtSchemas) {
×
1676
          tdDestroySVCreateTbReq(tbData->pCreateTbReq);
×
1677
          tbData->pCreateTbReq = NULL;
×
1678
          return terrno;
×
1679
        }
1680
      }
1681
      tbData->pCreateTbReq->pExtSchemas[i].typeMod = pField->typeMod;
×
1682
    }
1683
  }
1684
  return TSDB_CODE_SUCCESS;
71,923✔
1685
_end:
×
1686
  return code;
×
1687
}
1688

1689
// reference tBuildTSchema funciton
1690
static int32_t buildTSchmaFromInserter(SStreamInserterParam* pInsertParam, STSchema** ppTSchema) {
304,199✔
1691
  int32_t code = TSDB_CODE_SUCCESS;
304,199✔
1692

1693
  int32_t   numOfCols = pInsertParam->pFields->size;
304,199✔
1694
  STSchema* pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols);
304,568✔
1695
  if (NULL == pTSchema) {
304,568✔
1696
    return terrno;
×
1697
  }
1698
  if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
304,568✔
1699
    pTSchema->version =
71,923✔
1700
        1;  // normal table version start from 1, if has exist table, it will be reset by resetInserterTbVersion
1701
  } else {
1702
    pTSchema->version = pInsertParam->sver;
232,645✔
1703
  }
1704
  pTSchema->numOfCols = numOfCols;
304,568✔
1705

1706
  SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, 0);
304,568✔
1707
  if (NULL == pField) {
304,568✔
1708
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1709
    goto _end;
×
1710
  }
1711
  pTSchema->columns[0].colId = PRIMARYKEY_TIMESTAMP_COL_ID;
304,568✔
1712
  pTSchema->columns[0].type = pField->type;
304,568✔
1713
  pTSchema->columns[0].flags = pField->flags | COL_IS_KEY;
304,568✔
1714
  pTSchema->columns[0].bytes = TYPE_BYTES[pField->type];
304,568✔
1715
  pTSchema->columns[0].offset = -1;
304,568✔
1716

1717
  pTSchema->tlen = 0;
304,568✔
1718
  pTSchema->flen = 0;
304,568✔
1719
  for (int32_t i = 1; i < numOfCols; ++i) {
1,346,600✔
1720
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pFields, i);
1,042,032✔
1721
    if (NULL == pField) {
1,042,032✔
1722
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1723
      goto _end;
×
1724
    }
1725
    pTSchema->columns[i].colId = i + 1;
1,042,032✔
1726
    pTSchema->columns[i].type = pField->type;
1,042,032✔
1727
    pTSchema->columns[i].flags = pField->flags;
1,042,032✔
1728
    pTSchema->columns[i].bytes = pField->bytes;
1,042,032✔
1729
    pTSchema->columns[i].offset = pTSchema->flen;
1,042,032✔
1730

1731
    if (IS_VAR_DATA_TYPE(pField->type)) {
1,042,032✔
1732
      pTSchema->columns[i].bytes = pField->bytes;
22,692✔
1733
      pTSchema->tlen += (TYPE_BYTES[pField->type] + pField->bytes);
22,692✔
1734
    } else {
1735
      pTSchema->columns[i].bytes = TYPE_BYTES[pField->type];
1,019,340✔
1736
      pTSchema->tlen += TYPE_BYTES[pField->type];
1,019,340✔
1737
    }
1738

1739
    pTSchema->flen += TYPE_BYTES[pField->type];
1,042,032✔
1740
  }
1741

1742
#if 1
1743
  pTSchema->tlen += (int32_t)TD_BITMAP_BYTES(numOfCols);
304,568✔
1744
#endif
1745

1746
_end:
304,568✔
1747
  if (code != TSDB_CODE_SUCCESS) {
304,568✔
1748
    taosMemoryFree(pTSchema);
×
1749
    *ppTSchema = NULL;
×
1750
  } else {
1751
    *ppTSchema = pTSchema;
304,568✔
1752
  }
1753
  return code;
304,568✔
1754
}
1755

1756
static int32_t getTagValsFromStreamInserterInfo(SStreamDataInserterInfo* pInserterInfo, int32_t preCols,
232,645✔
1757
                                                SArray** ppTagVals) {
1758
  int32_t code = TSDB_CODE_SUCCESS;
232,645✔
1759
  int32_t nTags = pInserterInfo->pTagVals->size;
232,645✔
1760
  *ppTagVals = taosArrayInit(nTags, sizeof(STagVal));
232,645✔
1761
  if (!ppTagVals) {
232,645✔
1762
    return terrno;
×
1763
  }
1764
  for (int32_t i = 0; i < pInserterInfo->pTagVals->size; ++i) {
632,755✔
1765
    SStreamTagInfo* pTagInfo = taosArrayGet(pInserterInfo->pTagVals, i);
400,110✔
1766
    STagVal         tagVal = {
400,110✔
1767
                .cid = preCols + i + 1,
400,110✔
1768
                .type = pTagInfo->val.data.type,
400,110✔
1769
    };
1770
    if (!pTagInfo->val.isNull) {
400,110✔
1771
      if (IS_VAR_DATA_TYPE(pTagInfo->val.data.type)) {
400,110✔
1772
        tagVal.nData = pTagInfo->val.data.nData;
252,841✔
1773
        tagVal.pData = pTagInfo->val.data.pData;
252,841✔
1774
      } else {
1775
        tagVal.i64 = pTagInfo->val.data.val;
147,269✔
1776
      }
1777

1778
      if (NULL == taosArrayPush(*ppTagVals, &tagVal)) {
800,220✔
1779
        code = terrno;
×
1780
        goto _end;
×
1781
      }
1782
    }
1783
  }
1784
_end:
232,645✔
1785
  if (code != TSDB_CODE_SUCCESS) {
232,645✔
1786
    taosArrayDestroy(*ppTagVals);
×
1787
    *ppTagVals = NULL;
×
1788
  }
1789
  return code;
232,645✔
1790
}
1791

1792
static int32_t buildStreamSubTableCreateReq(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
232,645✔
1793
                                            SStreamInserterParam* pInsertParam, SStreamDataInserterInfo* pInserterInfo,
1794
                                            SSubmitTbData* tbData) {
1795
  int32_t code = TSDB_CODE_SUCCESS;
232,645✔
1796
  STag*   pTag = NULL;
232,645✔
1797
  SArray* pTagVals = NULL;
232,645✔
1798
  SArray* TagNames = NULL;
232,645✔
1799

1800
  if (pInsertParam->pTagFields == NULL) {
232,645✔
1801
    ST_TASK_ELOG("buildStreamSubTableCreateReq, pTagFields is NULL, suid:%" PRId64 ", sver:%d", pInsertParam->suid,
×
1802
                 pInsertParam->sver);
1803
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1804
  }
1805
  if (pInserterInfo->pTagVals == NULL || pInserterInfo->pTagVals->size == 0) {
232,645✔
1806
    ST_TASK_ELOG("buildStreamSubTableCreateReq, pTagVals is NULL, suid:%" PRId64 ", sver:%d", pInsertParam->suid,
×
1807
                 pInsertParam->sver);
1808
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1809
  }
1810
  if (pInsertParam->suid <= 0 || pInsertParam->sver <= 0) {
232,645✔
UNCOV
1811
    ST_TASK_ELOG("buildStreamSubTableCreateReq, suid:%" PRId64
×
1812
                 ", sver:%d"
1813
                 " must be greater than 0",
1814
                 pInsertParam->suid, pInsertParam->sver);
1815
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1816
  }
1817
  int32_t nTags = pInserterInfo->pTagVals->size;
232,645✔
1818

1819
  TagNames = taosArrayInit(nTags, TSDB_COL_NAME_LEN);
232,645✔
1820
  if (!TagNames) {
232,645✔
1821
    code = terrno;
×
1822
    goto _end;
×
1823
  }
1824
  for (int32_t i = 0; i < nTags; ++i) {
632,755✔
1825
    SFieldWithOptions* pField = taosArrayGet(pInsertParam->pTagFields, i);
400,110✔
1826
    if (NULL == taosArrayPush(TagNames, pField->name)) {
800,220✔
1827
      code = terrno;
×
1828
      goto _end;
×
1829
    }
1830
  }
1831

1832
  tbData->flags |= (SUBMIT_REQ_AUTO_CREATE_TABLE | SUBMIT_REQ_SCHEMA_RES);
232,645✔
1833
  tbData->uid = 0;
232,645✔
1834
  tbData->suid = pInsertParam->suid;
232,645✔
1835
  tbData->sver = pInsertParam->sver;
232,645✔
1836

1837
  tbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
232,645✔
1838
  if (NULL == tbData->pCreateTbReq) {
232,645✔
1839
    code = terrno;
×
1840
    goto _end;
×
1841
  }
1842
  tbData->pCreateTbReq->type = TSDB_CHILD_TABLE;
232,645✔
1843
  tbData->pCreateTbReq->flags |= (TD_CREATE_SUB_TB_IN_STREAM | TD_CREATE_IF_NOT_EXISTS);
232,645✔
1844

1845
  code = getTagValsFromStreamInserterInfo(pInserterInfo, pInsertParam->pFields->size, &pTagVals);
232,645✔
1846
  if (code != TSDB_CODE_SUCCESS) {
232,645✔
1847
    goto _end;
×
1848
  }
1849

1850
  code = tTagNew(pTagVals, pInsertParam->sver, false, &pTag);
232,645✔
1851
  if (code != TSDB_CODE_SUCCESS) {
232,645✔
1852
    ST_TASK_ELOG("failed to create tag, error:%s", tstrerror(code));
×
1853
    goto _end;
×
1854
  }
1855
  code = inserterBuildCreateTbReq(tbData->pCreateTbReq, pInserterInfo->tbName, pTag, tbData->suid,
232,645✔
1856
                                  pInsertParam->stbname, TagNames, nTags, TSDB_DEFAULT_TABLE_TTL);
232,645✔
1857
  if (code != TSDB_CODE_SUCCESS) {
232,249✔
1858
    ST_TASK_ELOG("failed to build create table request, error:%s", tstrerror(code));
×
1859
    goto _end;
×
1860
  }
1861

1862
_end:
232,249✔
1863
  if (code != TSDB_CODE_SUCCESS) {
232,645✔
1864
    ST_TASK_ELOG("buildStreamSubTableCreateReq failed, error:%s", tstrerror(code));
×
1865
    if (tbData->pCreateTbReq) {
×
1866
      taosMemoryFreeClear(tbData->pCreateTbReq->name);
×
1867
      taosMemoryFreeClear(tbData->pCreateTbReq);
×
1868
    }
1869
    if (TagNames) {
×
1870
      taosArrayDestroy(TagNames);
×
1871
    }
1872
  }
1873

1874
  if (pTagVals) {
232,645✔
1875
    taosArrayDestroy(pTagVals);
232,645✔
1876
  }
1877
  return code;
232,645✔
1878
}
1879

1880
static int32_t appendInsertData(SStreamInserterParam* pInsertParam, const SSDataBlock* pDataBlock,
4,952,545✔
1881
                                SSubmitTbData* tbData, STSchema* pTSchema, SBuildInsertDataInfo* dataInsertInfo) {
1882
  int32_t code = TSDB_CODE_SUCCESS;
4,952,545✔
1883
  int32_t lino = 0;
4,952,545✔
1884

1885
  int32_t rows = pDataBlock ? pDataBlock->info.rows : 0;
4,952,545✔
1886
  int32_t numOfCols = pInsertParam->pFields->size;
4,952,545✔
1887
  int32_t colNum = pDataBlock ? taosArrayGetSize(pDataBlock->pDataBlock) : 0;
4,952,545✔
1888

1889
  SArray* pVals = NULL;
4,952,545✔
1890
  if (!(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
4,952,545✔
1891
    code = terrno;
×
1892
    QUERY_CHECK_CODE(code, lino, _end);
×
1893
  }
1894

1895
  for (int32_t j = 0; j < rows; ++j) {  // iterate by row
60,859,366✔
1896
    taosArrayClear(pVals);
55,906,827✔
1897

1898
    bool tsOrPrimaryKeyIsNull = false;
55,907,184✔
1899
    for (int32_t k = 0; k < numOfCols; ++k) {  // iterate by column
175,018,565✔
1900
      int16_t colIdx = k + 1;
119,067,051✔
1901

1902
      SFieldWithOptions* pCol = taosArrayGet(pInsertParam->pFields, k);
119,067,051✔
1903
      if (PRIMARYKEY_TIMESTAMP_COL_ID != colIdx && TSDB_DATA_TYPE_NULL == pCol->type) {
119,067,182✔
1904
        SColVal cv = COL_VAL_NULL(colIdx, pCol->type);
×
1905
        if (NULL == taosArrayPush(pVals, &cv)) {
×
1906
          code = terrno;
×
1907
          QUERY_CHECK_CODE(code, lino, _end);
×
1908
        }
1909
        continue;
×
1910
      }
1911

1912
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
119,067,182✔
1913
      if (NULL == pColInfoData) {
119,063,124✔
1914
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1915
        QUERY_CHECK_CODE(code, lino, _end);
×
1916
      }
1917
      void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
119,063,124✔
1918

1919
      if (colDataIsNull_s(pColInfoData, j) && (pCol->flags & COL_IS_KEY)) {
238,191,300✔
1920
        tsOrPrimaryKeyIsNull = true;
×
1921
        qDebug("Primary key column should not be null, skip this row");
×
1922
        break;
×
1923
      }
1924
      switch (pColInfoData->info.type) {
119,109,969✔
1925
        case TSDB_DATA_TYPE_NCHAR:
4,628,670✔
1926
        case TSDB_DATA_TYPE_VARBINARY:
1927
        case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1928
          if (pColInfoData->info.type != pCol->type) {
4,628,670✔
1929
            qError("tb:%s column:%d type:%d in block dismatch with schema col:%d type:%d", pInsertParam->tbname, k,
×
1930
                   pColInfoData->info.type, k, pCol->type);
1931
            code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1932
            QUERY_CHECK_CODE(code, lino, _end);
×
1933
          }
1934
          if (colDataIsNull_s(pColInfoData, j)) {
9,257,340✔
1935
            SColVal cv = COL_VAL_NULL(colIdx, pCol->type);
789✔
1936
            if (NULL == taosArrayPush(pVals, &cv)) {
789✔
1937
              code = terrno;
×
1938
              QUERY_CHECK_CODE(code, lino, _end);
×
1939
            }
1940
          } else {
1941
            if (pColInfoData->pData == NULL) {
4,627,881✔
1942
              qError("build insert tb:%s, column:%d data is NULL in block", pInsertParam->tbname, k);
×
1943
              code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1944
              QUERY_CHECK_CODE(code, lino, _end);
×
1945
            }
1946
            void*  data = colDataGetVarData(pColInfoData, j);
4,627,881✔
1947
            SValue sv = (SValue){
13,883,643✔
1948
                .type = pCol->type, .nData = varDataLen(data), .pData = varDataVal(data)};  // address copy, no value
4,627,881✔
1949
            SColVal cv = COL_VAL_VALUE(colIdx, sv);
4,627,881✔
1950
            if (NULL == taosArrayPush(pVals, &cv)) {
4,627,881✔
1951
              code = terrno;
×
1952
              QUERY_CHECK_CODE(code, lino, _end);
×
1953
            }
1954
          }
1955
          break;
4,628,670✔
1956
        }
1957
        case TSDB_DATA_TYPE_BLOB:
×
1958
        case TSDB_DATA_TYPE_JSON:
1959
        case TSDB_DATA_TYPE_MEDIUMBLOB:
1960
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
1961
          code = TSDB_CODE_APP_ERROR;
×
1962
          QUERY_CHECK_CODE(code, lino, _end);
×
1963
          break;
×
1964
        default:
114,481,656✔
1965
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
114,481,656✔
1966
            if (colDataIsNull_s(pColInfoData, j)) {
228,970,452✔
1967
              if (PRIMARYKEY_TIMESTAMP_COL_ID == colIdx) {
162,497✔
1968
                tsOrPrimaryKeyIsNull = true;
4,035✔
1969
                qDebug("Primary timestamp column should not be null, skip this row");
4,035✔
1970
                break;
4,035✔
1971
              }
1972

1973
              SColVal cv = COL_VAL_NULL(colIdx, pCol->type);  // should use pCol->type
158,462✔
1974
              if (NULL == taosArrayPush(pVals, &cv)) {
158,462✔
1975
                code = terrno;
×
1976
                QUERY_CHECK_CODE(code, lino, _end);
×
1977
              }
1978
            } else {
1979
              if (PRIMARYKEY_TIMESTAMP_COL_ID == colIdx && !dataInsertInfo->needSortMerge) {
114,326,656✔
1980
                if (*(int64_t*)var <= dataInsertInfo->lastTs) {
1,274,076✔
1981
                  dataInsertInfo->needSortMerge = true;
31,638✔
1982
                } else {
1983
                  dataInsertInfo->lastTs = *(int64_t*)var;
1,242,438✔
1984
                }
1985
              }
1986

1987
              SValue sv = {.type = pCol->type};
114,325,864✔
1988
              valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
114,325,772✔
1989
              SColVal cv = COL_VAL_VALUE(colIdx, sv);
114,322,503✔
1990
              if (NULL == taosArrayPush(pVals, &cv)) {
114,322,821✔
1991
                code = terrno;
×
1992
                QUERY_CHECK_CODE(code, lino, _end);
1,428✔
1993
              }
1994
            }
1995
          } else {
1996
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
1997
            code = TSDB_CODE_APP_ERROR;
×
1998
            QUERY_CHECK_CODE(code, lino, _end);
×
1999
          }
2000
          break;
114,482,354✔
2001
      }
2002
      if (tsOrPrimaryKeyIsNull) break;  // skip remaining columns because the primary key is null
119,115,416✔
2003
    }
2004
    if (tsOrPrimaryKeyIsNull) continue;  // skip this row if primary key is null
55,955,549✔
2005
    SRow*             pRow = NULL;
55,951,514✔
2006
    SRowBuildScanInfo sinfo = {0};
55,903,863✔
2007
    if ((code = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) != TSDB_CODE_SUCCESS) {
55,903,863✔
2008
      QUERY_CHECK_CODE(code, lino, _end);
×
2009
    }
2010
    if (NULL == taosArrayPush(tbData->aRowP, &pRow)) {
111,803,430✔
2011
      taosMemFree(pRow);
×
2012
      code = terrno;
×
2013
      QUERY_CHECK_CODE(code, lino, _end);
×
2014
    }
2015
  }
2016
  if (dataInsertInfo->isLastBlock) {
4,952,539✔
2017
    int32_t nRows = taosArrayGetSize(tbData->aRowP);
4,950,739✔
2018
    if (taosArrayGetSize(tbData->aRowP) == 0) {
4,950,739✔
2019
      tbData->flags |= SUBMIT_REQ_ONLY_CREATE_TABLE;
4,462,433✔
2020
      stDebug("no valid data to insert, try to only create tabale:%s", pInsertParam->tbname);
4,462,433✔
2021
    }
2022
    stDebug("appendInsertData, isLastBlock:%d, needSortMerge:%d, totalRows:%d", dataInsertInfo->isLastBlock,
4,951,102✔
2023
            dataInsertInfo->needSortMerge, nRows);
2024
    if (dataInsertInfo->needSortMerge) {
4,951,102✔
2025
      if ((tRowSort(tbData->aRowP) != TSDB_CODE_SUCCESS) ||
63,276✔
2026
          (code = tRowMerge(tbData->aRowP, (STSchema*)pTSchema, KEEP_CONSISTENCY)) != 0) {
31,638✔
2027
        QUERY_CHECK_CODE(code, lino, _end);
×
2028
      }
2029
    }
2030
    nRows = taosArrayGetSize(tbData->aRowP);
4,951,102✔
2031
    stDebug("appendInsertData, after merge, totalRows:%d", nRows);
4,951,102✔
2032
  }
2033

2034
_end:
250,631✔
2035
  taosArrayDestroy(pVals);
4,952,545✔
2036
  return code;
4,952,545✔
2037
}
2038

2039
int32_t buildStreamSubmitReqFromBlock(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
4,952,545✔
2040
                                      SStreamDataInserterInfo* pInserterInfo, SSubmitReq2** ppReq,
2041
                                      const SSDataBlock* pDataBlock, SVgroupInfo* vgInfo,
2042
                                      SBuildInsertDataInfo* tbDataInfo) {
2043
  SSubmitReq2* pReq = *ppReq;
4,952,545✔
2044
  int32_t      numOfBlks = 0;
4,952,545✔
2045

2046
  int32_t               code = TSDB_CODE_SUCCESS;
4,952,545✔
2047
  int32_t               lino = 0;
4,952,545✔
2048
  SStreamInserterParam* pInsertParam = pInserter->pParam->streamInserterParam;
4,952,545✔
2049
  SInsertTableInfo**    ppTbInfo = NULL;
4,952,545✔
2050
  SInsertTableInfo*     pTbInfo = NULL;
4,952,545✔
2051
  STSchema*             pTSchema = NULL;
4,952,545✔
2052
  SSubmitTbData*        tbData = &tbDataInfo->pTbData;
4,952,545✔
2053
  int32_t               colNum = 0;
4,952,545✔
2054
  int32_t               rows = 0;
4,952,545✔
2055

2056
  if (NULL == pReq) {
4,952,545✔
2057
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
4,951,102✔
2058
      code = terrno;
×
2059
      QUERY_CHECK_CODE(code, lino, _end);
×
2060
    }
2061
    *ppReq = pReq;
4,951,102✔
2062

2063
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
4,951,102✔
2064
      code = terrno;
×
2065
      QUERY_CHECK_CODE(code, lino, _end);
×
2066
    }
2067
  }
2068

2069
  if (pDataBlock) {
4,952,545✔
2070
    colNum = taosArrayGetSize(pDataBlock->pDataBlock);
494,147✔
2071
    rows = pDataBlock->info.rows;
494,147✔
2072
  }
2073

2074
  tbData->flags |= SUBMIT_REQ_SCHEMA_RES;
4,952,545✔
2075

2076
  if (tbDataInfo->isFirstBlock) {
4,952,545✔
2077
    if (pInserterInfo->isAutoCreateTable) {
4,951,102✔
2078
      code = initTableInfo(pInserter, pInserterInfo);
304,568✔
2079
      QUERY_CHECK_CODE(code, lino, _end);
304,568✔
2080
      if (pInsertParam->tbType == TSDB_NORMAL_TABLE) {
304,568✔
2081
        code = buildNormalTableCreateReq(pInserter, pInsertParam, tbData);
71,923✔
2082
      } else if (pInsertParam->tbType == TSDB_SUPER_TABLE) {
232,645✔
2083
        code = buildStreamSubTableCreateReq(pTask, pInserter, pInsertParam, pInserterInfo, tbData);
232,645✔
2084
      } else {
2085
        code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
2086
        ST_TASK_ELOG("buildStreamSubmitReqFromBlock, unknown table type %d", pInsertParam->tbType);
×
2087
      }
2088
      QUERY_CHECK_CODE(code, lino, _end);
304,568✔
2089
    }
2090
  }
2091

2092
  code = getStreamInsertTableInfo(pInserterInfo->streamId, pInserterInfo->groupId, &ppTbInfo);
4,952,545✔
2093
  pTbInfo = *ppTbInfo;
4,952,545✔
2094
  if (tbDataInfo->isFirstBlock) {
4,952,545✔
2095
    if (!pInserterInfo->isAutoCreateTable) {
4,951,102✔
2096
      tstrncpy(pInserterInfo->tbName, pTbInfo->tbname, TSDB_TABLE_NAME_LEN);
4,646,534✔
2097
    }
2098

2099
    tbData->uid = pTbInfo->uid;
4,951,102✔
2100
    tbData->sver = pTbInfo->version;
4,951,102✔
2101

2102
    if (pInsertParam->tbType == TSDB_SUPER_TABLE) {
4,951,102✔
2103
      tbData->suid = pInsertParam->suid;
4,811,127✔
2104
    }
2105

2106
    pTSchema = pTbInfo->pSchema;
4,951,102✔
2107
  } else {
2108
    pTSchema = pTbInfo->pSchema;
1,443✔
2109
  }
2110

2111
  code = getTableVgInfo(pInserter, pInsertParam->dbFName, pTbInfo->tbname, vgInfo);
4,952,545✔
2112
  QUERY_CHECK_CODE(code, lino, _end);
4,952,545✔
2113

2114
  ST_TASK_DLOG("[data inserter], Handle:%p, GROUP:%" PRId64 " tbname:%s autoCreate:%d uid:%" PRId64 " suid:%" PRId64
4,952,545✔
2115
               " sver:%d vgid:%d isLastBlock:%d",
2116
               pInserter, pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable, tbData->uid,
2117
               tbData->suid, tbData->sver, vgInfo->vgId, tbDataInfo->isFirstBlock);
2118

2119
  code = appendInsertData(pInsertParam, pDataBlock, tbData, pTSchema, tbDataInfo);
4,952,545✔
2120
  QUERY_CHECK_CODE(code, lino, _end);
4,952,545✔
2121

2122
_end:
4,952,545✔
2123
  releaseStreamInsertTableInfo(ppTbInfo);
4,952,545✔
2124
  if (code != TSDB_CODE_SUCCESS) {
4,952,545✔
2125
    ST_TASK_ELOG("buildStreamSubmitReqFromBlock, code:0x%0x, groupId:%" PRId64 " tbname:%s autoCreate:%d", code,
×
2126
                 pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable);
2127
  }
2128
  return code;
4,952,545✔
2129
}
2130

2131
int32_t streamDataBlocksToSubmitReq(SStreamRunnerTask* pTask, SDataInserterHandle* pInserter,
4,951,102✔
2132
                                    SStreamDataInserterInfo* pInserterInfo, void** pMsg, int32_t* msgLen,
2133
                                    SVgroupInfo* vgInfo) {
2134
  int32_t code = 0;
4,951,102✔
2135
  int32_t lino = 0;
4,951,102✔
2136

2137
  const SArray*        pBlocks = pInserter->pDataBlocks;
4,951,102✔
2138
  int32_t              sz = taosArrayGetSize(pBlocks);
4,951,102✔
2139
  SSubmitReq2*         pReq = NULL;
4,951,102✔
2140
  SBuildInsertDataInfo tbDataInfo = {0};
4,951,102✔
2141

2142
  int32_t rows = 0;
4,951,102✔
2143
  for (int32_t i = 0; i < sz; i++) {
9,903,647✔
2144
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
4,952,545✔
2145
    if (NULL == pDataBlock) {
4,952,545✔
2146
      stDebug("data block is NULL, just create empty table");
4,458,398✔
2147
      continue;
4,458,398✔
2148
    }
2149
    rows += pDataBlock->info.rows;
494,147✔
2150
  }
2151
  code = initInsertProcessInfo(&tbDataInfo, rows);
4,951,102✔
2152
  if (code != TSDB_CODE_SUCCESS) {
4,951,102✔
2153
    ST_TASK_ELOG("streamDataBlocksToSubmitReq, initInsertDataInfo failed, code:%d", code);
×
2154
    return code;
×
2155
  }
2156

2157
  for (int32_t i = 0; i < sz; i++) {
9,903,647✔
2158
    tbDataInfo.isFirstBlock = (i == 0);
4,952,545✔
2159
    tbDataInfo.isLastBlock = (i == sz - 1);
4,952,545✔
2160
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);  // pDataBlock select查询到的结果
4,952,545✔
2161
    ST_TASK_DLOG("[data inserter], Handle:%p, GROUP:%" PRId64
4,952,372✔
2162
            " tbname:%s autoCreate:%d block: %d/%d rows:%" PRId64,
2163
            pInserter, pInserterInfo->groupId, pInserterInfo->tbName,
2164
            pInserterInfo->isAutoCreateTable, i + 1, sz, (pDataBlock != NULL ? pDataBlock->info.rows : 0));
2165
    code = buildStreamSubmitReqFromBlock(pTask, pInserter, pInserterInfo, &pReq, pDataBlock, vgInfo, &tbDataInfo);
4,952,545✔
2166
    QUERY_CHECK_CODE(code, lino, _end);
4,952,545✔
2167
  }
2168

2169
  if (NULL == taosArrayPush(pReq->aSubmitTbData, &tbDataInfo.pTbData)) {
9,902,204✔
2170
    code = terrno;
×
2171
    QUERY_CHECK_CODE(code, lino, _end);
×
2172
  }
2173

2174
  code = submitReqToMsg(vgInfo->vgId, pReq, pMsg, msgLen);
4,951,102✔
2175
  tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
4,948,985✔
2176
  taosMemoryFree(pReq);
4,948,176✔
2177
  ST_TASK_DLOG("[data inserter], submit req, vgid:%d, GROUP:%" PRId64 " tbname:%s autoCreate:%d code:%d ", vgInfo->vgId,
4,948,985✔
2178
               pInserterInfo->groupId, pInserterInfo->tbName, pInserterInfo->isAutoCreateTable, code);
2179

2180
_end:
4,938,702✔
2181
  if (code != 0) {
4,950,703✔
2182
    tDestroySubmitTbData(&tbDataInfo.pTbData, TSDB_MSG_FLG_ENCODE);
×
2183
    tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
2184
    taosMemoryFree(pReq);
×
2185
  }
2186

2187
  return code;
4,951,102✔
2188
}
2189

2190
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
65,953✔
2191
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
65,953✔
2192
  if (!pInserter->explain) {
65,953✔
2193
    if (NULL == taosArrayPush(pInserter->pDataBlocks, &pInput->pData)) {
121,520✔
2194
      return terrno;
×
2195
    }
2196
    if (pInserter->isStbInserter) {
60,760✔
2197
      SArray* pMsgs = taosArrayInit(4, sizeof(POINTER_BYTES));
1,875✔
2198
      if (NULL == pMsgs) {
1,875✔
2199
        return terrno;
×
2200
      }
2201
      int32_t code = dataBlocksToSubmitReqArray(pInserter, pMsgs);
1,875✔
2202
      if (code) {
1,875✔
2203
        taosArrayDestroyP(pMsgs, destroySSubmitTbDataMsg);
×
2204
        return code;
×
2205
      }
2206
      taosArrayClear(pInserter->pDataBlocks);
1,875✔
2207
      for (int32_t i = 0; i < taosArrayGetSize(pMsgs); ++i) {
5,625✔
2208
        SSubmitTbDataMsg* pMsg = taosArrayGetP(pMsgs, i);
3,750✔
2209
        code = sendSubmitRequest(pInserter, NULL, pMsg->pData, pMsg->len,
7,500✔
2210
                                 pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet);
7,500✔
2211
        taosMemoryFree(pMsg);
3,750✔
2212
        if (code) {
3,750✔
2213
          for (int j = i + 1; j < taosArrayGetSize(pMsgs); ++j) {
×
2214
            SSubmitTbDataMsg* pMsg2 = taosArrayGetP(pMsgs, j);
×
2215
            destroySSubmitTbDataMsg(pMsg2);
×
2216
          }
2217
          taosArrayDestroy(pMsgs);
×
2218
          return code;
×
2219
        }
2220
        QRY_ERR_RET(tsem_wait(&pInserter->ready));
3,750✔
2221

2222
        if (pInserter->submitRes.code) {
3,750✔
2223
          for (int j = i + 1; j < taosArrayGetSize(pMsgs); ++j) {
×
2224
            SSubmitTbDataMsg* pMsg2 = taosArrayGetP(pMsgs, j);
×
2225
            destroySSubmitTbDataMsg(pMsg2);
×
2226
          }
2227
          taosArrayDestroy(pMsgs);
×
2228
          return pInserter->submitRes.code;
×
2229
        }
2230
      }
2231

2232
      taosArrayDestroy(pMsgs);
1,875✔
2233

2234
    } else {
2235
      void*   pMsg = NULL;
58,885✔
2236
      int32_t msgLen = 0;
58,885✔
2237
      int32_t code = dataBlocksToSubmitReq(pInserter, &pMsg, &msgLen);
58,885✔
2238
      if (code) {
58,885✔
2239
        return code;
7,716✔
2240
      }
2241

2242
      taosArrayClear(pInserter->pDataBlocks);
51,169✔
2243

2244
      code = sendSubmitRequest(pInserter, NULL, pMsg, msgLen, pInserter->pParam->readHandle->pMsgCb->clientRpc,
51,169✔
2245
                               &pInserter->pNode->epSet);
51,169✔
2246
      if (code) {
51,169✔
2247
        return code;
×
2248
      }
2249

2250
      QRY_ERR_RET(tsem_wait(&pInserter->ready));
51,169✔
2251

2252
      if (pInserter->submitRes.code) {
51,169✔
2253
        return pInserter->submitRes.code;
×
2254
      }
2255
    }
2256
  }
2257

2258
  *pContinue = true;
58,237✔
2259

2260
  return TSDB_CODE_SUCCESS;
58,237✔
2261
}
2262

2263
static int32_t resetInserterTbVersion(SDataInserterHandle* pInserter, const SInputData* pInput) {
19,663✔
2264
  SInsertTableInfo** ppTbInfo = NULL;
19,663✔
2265
  int32_t           code = getStreamInsertTableInfo(pInput->pStreamDataInserterInfo->streamId, pInput->pStreamDataInserterInfo->groupId, &ppTbInfo);
19,663✔
2266
  if (code != TSDB_CODE_SUCCESS) {
19,663✔
2267
    return code;
×
2268
  }
2269

2270
  SInsertTableInfo*  pTbInfo  = *ppTbInfo;
19,663✔
2271
  stDebug("resetInserterTbVersion, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbName:%s, uid:%" PRId64 ", version:%d",
19,663✔
2272
          pInput->pStreamDataInserterInfo->streamId, pInput->pStreamDataInserterInfo->groupId,
2273
          pInput->pStreamDataInserterInfo->tbName, pTbInfo->uid, pTbInfo->version);
2274
  if (pInserter->pParam->streamInserterParam->tbType != TSDB_NORMAL_TABLE) {
19,663✔
2275
    pInserter->pParam->streamInserterParam->sver = pTbInfo->version;
1,552✔
2276
  }
2277
  code = releaseStreamInsertTableInfo(ppTbInfo);
19,663✔
2278
  return code;
19,663✔
2279
}
2280

2281
static int32_t putStreamDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
4,931,439✔
2282
  int32_t              code = 0;
4,931,439✔
2283
  int32_t              lino = 0;
4,931,439✔
2284
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
4,931,439✔
2285
  SStreamRunnerTask*   pTask = pInput->pTask;
4,931,439✔
2286
  if (!pInserter || !pInserter->pParam || !pInserter->pParam->streamInserterParam) {
4,931,439✔
2287
    ST_TASK_ELOG("putStreamDataBlock invalid param, pInserter: %p, pParam:%p", pInserter,
×
2288
                 pInserter ? pInserter->pParam : NULL);
2289
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2290
  }
2291
  if (!pInserter->explain) {
4,931,044✔
2292
    code = TSDB_CODE_SUCCESS;
4,931,439✔
2293
    if (NULL == taosArrayPush(pInserter->pDataBlocks, &pInput->pData)) {
9,862,878✔
2294
      return terrno;
×
2295
    }
2296
    void*       pMsg = NULL;
4,931,439✔
2297
    int32_t     msgLen = 0;
4,931,439✔
2298
    SVgroupInfo vgInfo = {0};
4,931,439✔
2299

2300
    code = streamDataBlocksToSubmitReq(pTask, pInserter, pInput->pStreamDataInserterInfo, &pMsg, &msgLen, &vgInfo);
4,931,439✔
2301
    QUERY_CHECK_CODE(code, lino, _return);
4,931,439✔
2302

2303
    code = sendSubmitRequest(pInserter, pInput->pStreamDataInserterInfo, pMsg, msgLen,
4,931,439✔
2304
                             pInserter->pParam->readHandle->pMsgCb->clientRpc, &vgInfo.epSet);
4,931,439✔
2305
    QUERY_CHECK_CODE(code, lino, _return);
4,931,439✔
2306

2307
    code = tsem_wait(&pInserter->ready);
4,931,439✔
2308
    QUERY_CHECK_CODE(code, lino, _return);
4,931,439✔
2309

2310
    if (pInserter->submitRes.code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
4,931,439✔
2311
      pInput->pStreamDataInserterInfo->isAutoCreateTable = false;
19,663✔
2312
      code = resetInserterTbVersion(pInserter, pInput);
19,663✔
2313
      QUERY_CHECK_CODE(code, lino, _return);
19,663✔
2314

2315
      code = streamDataBlocksToSubmitReq(pTask, pInserter, pInput->pStreamDataInserterInfo, &pMsg, &msgLen, &vgInfo);
19,663✔
2316
      QUERY_CHECK_CODE(code, lino, _return);
19,663✔
2317

2318
      code = sendSubmitRequest(pInserter, pInput->pStreamDataInserterInfo, pMsg, msgLen,
19,663✔
2319
                               pInserter->pParam->readHandle->pMsgCb->clientRpc, &vgInfo.epSet);
19,663✔
2320
      QUERY_CHECK_CODE(code, lino, _return);
19,663✔
2321

2322
      code = tsem_wait(&pInserter->ready);
19,663✔
2323
      QUERY_CHECK_CODE(code, lino, _return);
19,663✔
2324
    }
2325

2326
    if (pInput->pStreamDataInserterInfo->isAutoCreateTable &&
4,931,439✔
2327
        pInserter->submitRes.code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
284,905✔
2328
      rmDbVgInfoFromCache(pInserter->pParam->streamInserterParam->dbFName);
1,443✔
2329
      ST_TASK_ILOG("putStreamDataBlock, stream inserter table info not found, groupId:%" PRId64
1,443✔
2330
                   ", tbName:%s. so reset dbVgInfo and try again",
2331
                   pInput->pStreamDataInserterInfo->groupId, pInput->pStreamDataInserterInfo->tbName);
2332
      return putStreamDataBlock(pHandle, pInput, pContinue);
1,443✔
2333
    }
2334

2335
    if ((pInserter->submitRes.code == TSDB_CODE_TDB_TABLE_NOT_EXIST &&
4,929,996✔
2336
         !pInput->pStreamDataInserterInfo->isAutoCreateTable) ||
1,138✔
2337
        pInserter->submitRes.code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
4,928,858✔
2338
      rmDbVgInfoFromCache(pInserter->pParam->streamInserterParam->dbFName);
1,138✔
2339
      ST_TASK_ILOG("putStreamDataBlock, stream inserter table info not found, groupId:%" PRId64
1,138✔
2340
                   ", tbName:%s. so reset dbVgInfo",
2341
                   pInput->pStreamDataInserterInfo->groupId, pInput->pStreamDataInserterInfo->tbName);
2342
      code = TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
1,138✔
2343
      QUERY_CHECK_CODE(code, lino, _return);
1,138✔
2344
    }
2345

2346
    if (pInserter->submitRes.code) {
4,928,858✔
2347
      code = pInserter->submitRes.code;
1,070✔
2348
      ST_TASK_ELOG("submitRes err:%s, code:%0x", tstrerror(pInserter->submitRes.code), pInserter->submitRes.code);
1,070✔
2349
      QUERY_CHECK_CODE(code, lino, _return);
1,070✔
2350
    }
2351

2352
    *pContinue = true;
4,927,788✔
2353

2354
  _return:
4,929,996✔
2355
    taosArrayClear(pInserter->pDataBlocks);
4,929,996✔
2356
    if (code == TSDB_CODE_STREAM_NO_DATA) {
4,929,996✔
2357
      ST_TASK_DLOG("putStreamDataBlock, no valid data to insert, skip this block, groupID:%" PRId64,
×
2358
                   pInput->pStreamDataInserterInfo->groupId);
2359
      code = TSDB_CODE_SUCCESS;
×
2360
    } else if (code) {
4,929,996✔
2361
      ST_TASK_ELOG("submitRes err:%s, code:%0x lino:%d", tstrerror(code), code, lino);
2,208✔
2362
      return code;
2,208✔
2363
    }
2364
    return code;
4,927,788✔
2365
  }
2366
  return TSDB_CODE_SUCCESS;
×
2367
}
2368

2369
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
57,547✔
2370
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
57,547✔
2371
  (void)taosThreadMutexLock(&pInserter->mutex);
57,547✔
2372
  pInserter->queryEnd = true;
57,547✔
2373
  pInserter->useconds = useconds;
57,547✔
2374
  (void)taosThreadMutexUnlock(&pInserter->mutex);
57,547✔
2375
}
57,547✔
2376

2377
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
57,547✔
2378
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
57,547✔
2379
  *pLen = pDispatcher->submitRes.affectedRows;
57,547✔
2380
  qDebug("got total affectedRows %" PRId64, *pLen);
57,547✔
2381
}
57,547✔
2382

2383
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
288,289✔
2384
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
288,289✔
2385
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
288,289✔
2386
  taosArrayDestroy(pInserter->pDataBlocks);
288,289✔
2387
  taosMemoryFree(pInserter->pSchema);
288,289✔
2388
  if (pInserter->pParam->streamInserterParam) {
288,289✔
2389
    destroyStreamInserterParam(pInserter->pParam->streamInserterParam);
222,401✔
2390
    taosMemoryFree(pInserter->pParam->readHandle); // only for stream
222,401✔
2391
  }
2392
  taosMemoryFree(pInserter->pParam);
288,289✔
2393
  taosHashCleanup(pInserter->pCols);
288,289✔
2394
  nodesDestroyNode((SNode*)pInserter->pNode);
288,289✔
2395
  pInserter->pNode = NULL;
288,289✔
2396

2397
  (void)taosThreadMutexDestroy(&pInserter->mutex);
288,289✔
2398

2399
  taosMemoryFree(pInserter->pManager);
288,289✔
2400

2401
  if (pInserter->dbVgInfoMap) {
288,289✔
2402
    taosHashSetFreeFp(pInserter->dbVgInfoMap, freeUseDbOutput_tmp);
1,875✔
2403
    taosHashCleanup(pInserter->dbVgInfoMap);
1,875✔
2404
  }
2405

2406
  if (pInserter->pTagSchema) {
288,289✔
2407
    taosMemoryFreeClear(pInserter->pTagSchema->pSchema);
2,500✔
2408
    taosMemoryFree(pInserter->pTagSchema);
2,500✔
2409
  }
2410

2411
  return TSDB_CODE_SUCCESS;
288,289✔
2412
}
2413

2414
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
×
2415
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
×
2416

2417
  *size = atomic_load_64(&pDispatcher->cachedSize);
×
2418
  return TSDB_CODE_SUCCESS;
×
2419
}
2420

2421
static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
65,263✔
2422
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
65,263✔
2423

2424
  *pFlags = atomic_load_64(&pDispatcher->flags);
65,263✔
2425
  return TSDB_CODE_SUCCESS;
65,263✔
2426
}
2427

2428
int32_t createDataInserter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle,
65,888✔
2429
                           void* pParam) {
2430
  SDataSinkNode*       pDataSink = *ppDataSink;
65,888✔
2431
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
65,888✔
2432
  if (NULL == inserter) {
65,888✔
2433
    taosMemoryFree(pParam);
×
2434
    goto _return;
×
2435
  }
2436

2437
  SQueryInserterNode* pInserterNode = (SQueryInserterNode*)pDataSink;
65,888✔
2438
  inserter->sink.fPut = putDataBlock;
65,888✔
2439
  inserter->sink.fEndPut = endPut;
65,888✔
2440
  inserter->sink.fGetLen = getDataLength;
65,888✔
2441
  inserter->sink.fGetData = NULL;
65,888✔
2442
  inserter->sink.fDestroy = destroyDataSinker;
65,888✔
2443
  inserter->sink.fGetCacheSize = getCacheSize;
65,888✔
2444
  inserter->sink.fGetFlags = getSinkFlags;
65,888✔
2445
  inserter->pManager = pManager;
65,888✔
2446
  inserter->pNode = pInserterNode;
65,888✔
2447
  inserter->pParam = pParam;
65,888✔
2448
  inserter->status = DS_BUF_EMPTY;
65,888✔
2449
  inserter->queryEnd = false;
65,888✔
2450
  inserter->explain = pInserterNode->explain;
65,888✔
2451
  *ppDataSink = NULL;
65,888✔
2452

2453
  int64_t suid = 0;
65,888✔
2454
  int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId,
65,888✔
2455
                                                       &inserter->pSchema, &suid, &inserter->pTagSchema);
2456
  if (code) {
65,888✔
2457
    terrno = code;
×
2458
    goto _return;
×
2459
  }
2460

2461
  pManager->pAPI->metaFn.getBasicInfo(inserter->pParam->readHandle->vnode, &inserter->dbFName, NULL, NULL, NULL);
65,888✔
2462

2463
  if (pInserterNode->tableType == TSDB_SUPER_TABLE) {
65,888✔
2464
    inserter->isStbInserter = true;
2,500✔
2465
  }
2466

2467
  if (pInserterNode->stableId != suid) {
65,888✔
2468
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
×
2469
    goto _return;
×
2470
  }
2471

2472
  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
65,888✔
2473
  if (NULL == inserter->pDataBlocks) {
65,888✔
2474
    goto _return;
×
2475
  }
2476
  QRY_ERR_JRET(taosThreadMutexInit(&inserter->mutex, NULL));
65,888✔
2477

2478
  inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols;
65,888✔
2479

2480
  inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT),
65,888✔
2481
                                 false, HASH_NO_LOCK);
2482
  if (NULL == inserter->pCols) {
65,888✔
2483
    goto _return;
×
2484
  }
2485

2486
  SNode*  pNode = NULL;
65,888✔
2487
  int32_t i = 0;
65,888✔
2488
  bool    foundTbname = false;
65,888✔
2489
  FOREACH(pNode, pInserterNode->pCols) {
273,858✔
2490
    if (pNode->type == QUERY_NODE_FUNCTION && ((SFunctionNode*)pNode)->funcType == FUNCTION_TYPE_TBNAME) {
207,970✔
2491
      int16_t colId = 0;
1,875✔
2492
      int16_t slotId = 0;
1,875✔
2493
      QRY_ERR_JRET(taosHashPut(inserter->pCols, &colId, sizeof(colId), &slotId, sizeof(slotId)));
1,875✔
2494
      foundTbname = true;
1,875✔
2495
      continue;
1,875✔
2496
    }
2497
    SColumnNode* pCol = (SColumnNode*)pNode;
206,095✔
2498
    QRY_ERR_JRET(taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId)));
206,095✔
2499
    if (inserter->fullOrderColList && pCol->colId != inserter->pSchema->columns[i].colId) {
206,095✔
2500
      inserter->fullOrderColList = false;
1,250✔
2501
    }
2502
    ++i;
206,095✔
2503
  }
2504

2505
  if (inserter->isStbInserter && !foundTbname) {
65,888✔
2506
    QRY_ERR_JRET(TSDB_CODE_PAR_TBNAME_ERROR);
625✔
2507
  }
2508

2509
  QRY_ERR_JRET(tsem_init(&inserter->ready, 0, 0));
65,263✔
2510

2511
  inserter->dbVgInfoMap = NULL;
65,263✔
2512

2513
  *pHandle = inserter;
65,263✔
2514
  return TSDB_CODE_SUCCESS;
65,263✔
2515

2516
_return:
625✔
2517

2518
  if (inserter) {
625✔
2519
    (void)destroyDataSinker((SDataSinkHandle*)inserter);
625✔
2520
    taosMemoryFree(inserter);
625✔
2521
  } else {
2522
    taosMemoryFree(pManager);
×
2523
  }
2524

2525
  nodesDestroyNode((SNode*)*ppDataSink);
625✔
2526
  *ppDataSink = NULL;
625✔
2527

2528
  return terrno;
625✔
2529
}
2530

2531
                           
2532
static TdThreadOnce g_dbVgInfoMgrInit = PTHREAD_ONCE_INIT;
2533

2534
SDBVgInfoMgr g_dbVgInfoMgr = {0};
2535
                           
2536
void dbVgInfoMgrInitOnce() {
24,740✔
2537
  g_dbVgInfoMgr.dbVgInfoMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
24,740✔
2538
  if (g_dbVgInfoMgr.dbVgInfoMap == NULL) {
24,740✔
2539
    stError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
2540
    return;
×
2541
  }
2542

2543
  taosHashSetFreeFp(g_dbVgInfoMgr.dbVgInfoMap, freeUseDbOutput_tmp);
24,740✔
2544
}
2545

2546

2547

2548
int32_t createStreamDataInserter(SDataSinkManager* pManager, DataSinkHandle* pHandle, void* pParam) {
222,401✔
2549
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
222,401✔
2550

2551
  TAOS_UNUSED(taosThreadOnce(&g_dbVgInfoMgrInit, dbVgInfoMgrInitOnce));
222,401✔
2552
  TSDB_CHECK_NULL(g_dbVgInfoMgr.dbVgInfoMap, code, lino, _exit, terrno);
222,401✔
2553

2554
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
222,401✔
2555
  TSDB_CHECK_NULL(inserter, code, lino, _exit, terrno);
222,401✔
2556

2557
  inserter->sink.fPut = putStreamDataBlock;
222,401✔
2558
  inserter->sink.fEndPut = endPut;
222,401✔
2559
  inserter->sink.fGetLen = getDataLength;
222,401✔
2560
  inserter->sink.fGetData = NULL;
222,401✔
2561
  inserter->sink.fDestroy = destroyDataSinker;
222,401✔
2562
  inserter->sink.fGetCacheSize = getCacheSize;
222,401✔
2563
  inserter->sink.fGetFlags = getSinkFlags;
222,401✔
2564
  inserter->pManager = pManager;
222,401✔
2565
  inserter->pNode = NULL;
222,401✔
2566
  inserter->pParam = pParam;
222,401✔
2567
  inserter->status = DS_BUF_EMPTY;
222,401✔
2568
  inserter->queryEnd = false;
222,401✔
2569
  inserter->explain = false;
222,401✔
2570

2571
  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
222,401✔
2572
  TSDB_CHECK_NULL(inserter->pDataBlocks, code, lino, _exit, terrno);
222,401✔
2573
  
2574
  TAOS_CHECK_EXIT(taosThreadMutexInit(&inserter->mutex, NULL));
222,401✔
2575
  TAOS_CHECK_EXIT(tsem_init(&inserter->ready, 0, 0));
222,401✔
2576

2577
  inserter->dbVgInfoMap = NULL;
222,401✔
2578

2579
  *pHandle = inserter;
222,401✔
2580
  return TSDB_CODE_SUCCESS;
222,401✔
2581

2582
_exit:
×
2583

2584
  if (inserter) {
×
2585
    (void)destroyDataSinker((SDataSinkHandle*)inserter);
×
2586
    taosMemoryFree(inserter);
×
2587
  } else {
2588
    taosMemoryFree(pManager);
×
2589
  }
2590

2591
  if (code) {
×
2592
    stError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2593
  }
2594

2595
  return code;
×
2596
}
2597

2598
int32_t getDbVgInfoByTbName(void* clientRpc, const char* dbFName, SDBVgInfo** dbVgInfo) {
4,952,972✔
2599
  int32_t       code = TSDB_CODE_SUCCESS;
4,952,972✔
2600
  int32_t       line = 0;
4,952,972✔
2601
  SUseDbOutput* output = NULL;
4,952,972✔
2602

2603
  SUseDbOutput** find = (SUseDbOutput**)taosHashGet(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName));
4,952,972✔
2604

2605
  if (find == NULL) {
4,952,972✔
2606
    output = taosMemoryCalloc(1, sizeof(SUseDbOutput));
56,040✔
2607
    if (output == NULL) {
56,040✔
2608
      return TSDB_CODE_OUT_OF_MEMORY;
×
2609
    }
2610

2611
    code = buildDbVgInfoMap(clientRpc, dbFName, output);
56,040✔
2612
    QUERY_CHECK_CODE(code, line, _return);
56,040✔
2613

2614
    code = taosHashPut(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName), &output, POINTER_BYTES);
56,040✔
2615
    if (code == TSDB_CODE_DUP_KEY) {
56,040✔
2616
      code = TSDB_CODE_SUCCESS;
4,597✔
2617
      // another thread has put the same dbFName, so we need to free the output
2618
      freeUseDbOutput_tmp(&output);
4,597✔
2619
      find = (SUseDbOutput**)taosHashGet(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName));
4,597✔
2620
      if (find == NULL) {
4,597✔
2621
        QUERY_CHECK_CODE(code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, line, _return);
×
2622
      }
2623
      output = *find;
4,597✔
2624
    }
2625
    QUERY_CHECK_CODE(code, line, _return);
56,040✔
2626
  } else {
2627
    output = *find;
4,896,932✔
2628
  }
2629

2630
  *dbVgInfo = output->dbVgroup;
4,952,972✔
2631
  return code;
4,952,972✔
2632

2633
_return:
×
2634
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2635
  freeUseDbOutput_tmp(&output);
×
2636
  return code;
×
2637
}
2638

2639
int32_t getDbVgInfoForExec(void* clientRpc, const char* dbFName, const char* tbName, SVgroupInfo* pVgInfo) {
4,952,972✔
2640
  SDBVgInfo* dbInfo = NULL;
4,952,972✔
2641
  int32_t code = 0, lino = 0;
4,952,972✔
2642
  char tbFullName[TSDB_TABLE_FNAME_LEN];
4,940,572✔
2643
  snprintf(tbFullName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbFName, tbName);
4,952,972✔
2644
  
2645
  taosRLockLatch(&g_dbVgInfoMgr.lock);
4,952,972✔
2646
  
2647
  TAOS_CHECK_EXIT(getDbVgInfoByTbName(clientRpc, dbFName, &dbInfo));
4,952,972✔
2648

2649
  TAOS_CHECK_EXIT(inserterGetVgInfo(dbInfo, tbFullName, pVgInfo));
4,952,972✔
2650

2651
_exit:
4,952,972✔
2652

2653
  taosRUnLockLatch(&g_dbVgInfoMgr.lock);
4,952,972✔
2654

2655
  if (code) {
4,952,972✔
2656
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2657
  }
2658

2659
  return code;
4,952,972✔
2660
}
2661

2662
void rmDbVgInfoFromCache(const char* dbFName) {
2,581✔
2663
  taosWLockLatch(&g_dbVgInfoMgr.lock);
2,581✔
2664

2665
  TAOS_UNUSED(taosHashRemove(g_dbVgInfoMgr.dbVgInfoMap, dbFName, strlen(dbFName)));
2,581✔
2666

2667
  taosWUnLockLatch(&g_dbVgInfoMgr.lock);
2,581✔
2668
}
2,581✔
2669

2670
static int32_t dropTableReqToMsg(int32_t vgId, SVDropTbBatchReq* pReq, void** pData, int32_t* pLen) {
427✔
2671
  int32_t code = TSDB_CODE_SUCCESS;
427✔
2672
  int32_t len = 0;
427✔
2673
  void*   pBuf = NULL;
427✔
2674
  tEncodeSize(tEncodeSVDropTbBatchReq, pReq, len, code);
427✔
2675
  if (TSDB_CODE_SUCCESS == code) {
427✔
2676
    SEncoder encoder;
427✔
2677
    len += sizeof(SMsgHead);
427✔
2678
    pBuf = taosMemoryMalloc(len);
427✔
2679
    if (NULL == pBuf) {
427✔
2680
      return terrno;
×
2681
    }
2682
    ((SDropTbDataMsg*)pBuf)->header.vgId = htonl(vgId);
427✔
2683
    ((SDropTbDataMsg*)pBuf)->header.contLen = htonl(len);
427✔
2684
    //((SDropTbDataMsg*)pBuf)->pData = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
2685
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
427✔
2686
    code = tEncodeSVDropTbBatchReq(&encoder, pReq);
427✔
2687
    tEncoderClear(&encoder);
427✔
2688
  }
2689

2690
  if (TSDB_CODE_SUCCESS == code) {
427✔
2691
    *pData = pBuf;
427✔
2692
    *pLen = len;
427✔
2693
  } else {
2694
    taosMemoryFree(pBuf);
×
2695
  }
2696

2697
  return code;
427✔
2698
}
2699

2700
int32_t dropTbCallback(void* param, SDataBuf* pMsg, int32_t code) {
427✔
2701
  SDropTbCtx* pCtx = (SDropTbCtx*)param;
427✔
2702
  if (code) {
427✔
2703
    stError("dropTbCallback, code:%d, stream:%" PRId64 " gid:%" PRId64, code, pCtx->req->streamId, pCtx->req->gid);
×
2704
  }
2705
  pCtx->code = code;
427✔
2706
  code = tsem_post(&pCtx->ready);
427✔
2707
  taosMemoryFree(pMsg->pData);
427✔
2708

2709
  return TSDB_CODE_SUCCESS;
427✔
2710
}
2711

2712
static int32_t sendDropTbRequest(SDropTbCtx* ctx, void* pMsg, int32_t msgLen, void* pTransporter, SEpSet* pEpset) {
427✔
2713
  // send the fetch remote task result reques
2714
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
427✔
2715
  if (NULL == pMsgSendInfo) {
427✔
2716
    return terrno;
×
2717
  }
2718

2719
  pMsgSendInfo->param = ctx;
427✔
2720
  pMsgSendInfo->paramFreeFp = NULL;
427✔
2721
  pMsgSendInfo->msgInfo.pData = pMsg;
427✔
2722
  pMsgSendInfo->msgInfo.len = msgLen;
427✔
2723
  pMsgSendInfo->msgType = TDMT_VND_SNODE_DROP_TABLE;
427✔
2724
  pMsgSendInfo->fp = dropTbCallback;
427✔
2725

2726
  return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo);
427✔
2727
}
2728

2729
int32_t doDropStreamTable(SMsgCb* pMsgCb, void* pTaskOutput, SSTriggerDropRequest* pReq) {
427✔
2730
  SStreamRunnerTaskOutput* pOutput = pTaskOutput;
427✔
2731
  int32_t                  code = 0;
427✔
2732
  int32_t                  lino = 0;
427✔
2733
  SVDropTbBatchReq         req = {.nReqs = 1};
427✔
2734
  SVDropTbReq*             pDropReq = NULL;
427✔
2735
  int32_t                  msgLen = 0;
427✔
2736
  tsem_t*                  pSem = NULL;
427✔
2737
  SDropTbDataMsg*          pMsg = NULL;
427✔
2738

2739
  SInsertTableInfo** ppTbInfo = NULL;
427✔
2740
  int32_t            vgId = 0;
427✔
2741

2742
  req.pArray = taosArrayInit_s(sizeof(SVDropTbReq), 1);
427✔
2743
  if (!req.pArray) return terrno;
427✔
2744

2745
  pDropReq = taosArrayGet(req.pArray, 0);
427✔
2746

2747
  code = getStreamInsertTableInfo(pReq->streamId, pReq->gid, &ppTbInfo);
427✔
2748
  if (TSDB_CODE_SUCCESS == code) {
427✔
2749
    pDropReq->name = taosStrdup((*ppTbInfo)->tbname);
427✔
2750
    pDropReq->suid = (*ppTbInfo)->uid;
427✔
2751
    pDropReq->uid = (*ppTbInfo)->uid;
427✔
2752
    pDropReq->igNotExists = true;
427✔
2753
    vgId = (*ppTbInfo)->vgid;
427✔
2754

2755
    int64_t key[2] = {pReq->streamId, pReq->gid};
427✔
2756
    TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
427✔
2757
  } else {
2758
    code = TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND;
×
2759
  }
2760
  QUERY_CHECK_CODE(code, lino, _end);
427✔
2761

2762
  code = dropTableReqToMsg(vgId, &req, (void**)&pMsg, &msgLen);
427✔
2763
  QUERY_CHECK_CODE(code, lino, _end);
427✔
2764

2765
  SVgroupInfo vgInfo = {0};
427✔
2766
  code = getDbVgInfoForExec(pMsgCb->clientRpc, pOutput->outDbFName, pDropReq->name, &vgInfo);
427✔
2767
  QUERY_CHECK_CODE(code, lino, _end);
427✔
2768

2769
  SDropTbCtx ctx = {.req = pReq};
427✔
2770
  code = tsem_init(&ctx.ready, 0, 0);
427✔
2771
  QUERY_CHECK_CODE(code, lino, _end);
427✔
2772
  pSem = &ctx.ready;
427✔
2773

2774
  code = sendDropTbRequest(&ctx, pMsg, msgLen, pMsgCb->clientRpc, &vgInfo.epSet);
427✔
2775
  QUERY_CHECK_CODE(code, lino, _end);
427✔
2776
  pMsg = NULL;  // now owned by sendDropTbRequest
427✔
2777

2778
  code = tsem_wait(&ctx.ready);
427✔
2779
  code = ctx.code;
427✔
2780
  stDebug("doDropStreamTable,  code:0x%" PRIx32 " req:%p, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq,
427✔
2781
          pReq->streamId, pReq->gid, pDropReq ? pDropReq->name : "unknown");
2782

2783
_end:
427✔
2784
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_STREAM_INSERT_TBINFO_NOT_FOUND) {
427✔
2785
    stError("doDropStreamTable, code:0x%" PRIx32 ", streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq->streamId,
×
2786
            pReq->gid, pDropReq ? pDropReq->name : "unknown");
2787
    if (pMsg) {
×
2788
      taosMemoryFreeClear(pMsg);
×
2789
    }
2790
  }
2791
  if (pSem) tsem_destroy(pSem);
427✔
2792
  if (pDropReq && pDropReq->name) taosMemoryFreeClear(pDropReq->name);
427✔
2793
  if (ppTbInfo) releaseStreamInsertTableInfo(ppTbInfo);
427✔
2794
  taosArrayDestroy(req.pArray);
427✔
2795

2796
  return code;
427✔
2797
}
2798

2799
int32_t doDropStreamTableByTbName(SMsgCb* pMsgCb, void* pTaskOutput, SSTriggerDropRequest* pReq, char* tbName) {
×
2800
  SStreamRunnerTaskOutput* pOutput = pTaskOutput;
×
2801
  int32_t                  code = 0;
×
2802
  int32_t                  lino = 0;
×
2803
  SVDropTbBatchReq         req = {.nReqs = 1};
×
2804
  SVDropTbReq*             pDropReq = NULL;
×
2805
  int32_t                  msgLen = 0;
×
2806
  tsem_t*                  pSem = NULL;
×
2807
  SDropTbDataMsg*          pMsg = NULL;
×
2808

2809
  TAOS_UNUSED(taosThreadOnce(&g_dbVgInfoMgrInit, dbVgInfoMgrInitOnce));
×
2810

2811
  req.pArray = taosArrayInit_s(sizeof(SVDropTbReq), 1);
×
2812
  if (!req.pArray) return terrno;
×
2813

2814
  pDropReq = taosArrayGet(req.pArray, 0);
×
2815

2816
  pDropReq->name = tbName;
×
2817
  pDropReq->igNotExists = true;
×
2818

2819
  int64_t key[2] = {pReq->streamId, pReq->gid};
×
2820
  TAOS_UNUSED(taosHashRemove(gStreamGrpTableHash, key, sizeof(key)));
×
2821

2822
  SVgroupInfo vgInfo = {0};
×
2823
  code = getDbVgInfoForExec(pMsgCb->clientRpc, pOutput->outDbFName, pDropReq->name, &vgInfo);
×
2824
  QUERY_CHECK_CODE(code, lino, _end);
×
2825

2826
  code = dropTableReqToMsg(vgInfo.vgId, &req, (void**)&pMsg, &msgLen);
×
2827
  QUERY_CHECK_CODE(code, lino, _end);
×
2828

2829
  SDropTbCtx ctx = {.req = pReq};
×
2830
  code = tsem_init(&ctx.ready, 0, 0);
×
2831
  QUERY_CHECK_CODE(code, lino, _end);
×
2832
  pSem = &ctx.ready;
×
2833

2834
  code = sendDropTbRequest(&ctx, pMsg, msgLen, pMsgCb->clientRpc, &vgInfo.epSet);
×
2835
  QUERY_CHECK_CODE(code, lino, _end);
×
2836
  pMsg = NULL;  // now owned by sendDropTbRequest
×
2837

2838
  code = tsem_wait(&ctx.ready);
×
2839
  code = ctx.code;
×
2840
  stDebug("doDropStreamTableByTbName,  code:%d req:%p, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq,
×
2841
          pReq->streamId, pReq->gid, pDropReq ? pDropReq->name : "unknown");
2842

2843
_end:
×
2844
  if (code != TSDB_CODE_SUCCESS) {
×
2845
    stError("doDropStreamTableByTbName, code:%d, streamId:0x%" PRIx64 " groupId:%" PRId64 " tbname:%s", code, pReq->streamId,
×
2846
            pReq->gid, pDropReq ? pDropReq->name : "unknown");
2847
    if (pMsg) {
×
2848
      taosMemoryFreeClear(pMsg);
×
2849
    }
2850
  }
2851
  if (pSem) tsem_destroy(pSem);
×
2852
  taosArrayDestroy(req.pArray);
×
2853

2854
  return code;
×
2855
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc